From 9457a1fd0caa6b6be4bf9c42f79de8d62989c3ab Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Sat, 9 May 2015 15:41:09 +0200 Subject: [PATCH] client: Add a thread-safe Client object on top of the Conn object. A client maintains multiple connections to the server and uses one of them exclusively for each request. --- client/client.go | 205 ++++++++++++++++++++--------------------------- client/conn.go | 159 ++++++++++++++++++++++++++++++++++++ 2 files changed, 244 insertions(+), 120 deletions(-) create mode 100644 client/conn.go diff --git a/client/client.go b/client/client.go index c42c717..3315019 100644 --- a/client/client.go +++ b/client/client.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2014 Sebastian 'tokkee' Harl +// Copyright (C) 2014-2015 Sebastian 'tokkee' Harl // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -34,6 +34,33 @@ The Connect function connects to a SysDB server as the specified user: } defer c.Close() +Then, it can issue requests to the server: + + major, minor, patch, extra, err := c.ServerVersion() + if err != nil { + // handle error + } + fmt.Printf("Connected to SysDB %d.%d.%d%s\n", major, minor, patch, extra) + +or: + + res, err := c.Call(&proto.Message{Type: proto.ConnectionServerVersion}) + if err != nil { + // handle error + } + fmt.Printf("%v\n", res) + +Each client maintains multiple connections to a SysDB server allowing to +handle multiple requests in parallel. The SysDB server is able to handle that +easily making it a cheap approach. The low-level Dial function creates a +single connection to a SysDB server allowing to perform low-level operations: + + conn, err := client.Dial("unix:/var/run/sysdbd.sock", "username") + if err != nil { + // handle error + } + defer conn.Close() + The github.com/sysdb/go/proto package provides support for handling requests and responses. Use the Send and Receive functions to communicate with the server: @@ -42,10 +69,10 @@ server: Type: proto.ConnectionQuery, Raw: []byte{"LOOKUP hosts MATCHING attribute.architecture = 'amd64';"}, } - if err := c.Send(m); err != nil { + if err := conn.Send(m); err != nil { // handle error } - m, err := c.Receive() + m, err := conn.Receive() if err != nil { // handle error } @@ -59,158 +86,96 @@ package client import ( "encoding/binary" "fmt" - "net" - "strings" + "log" + "runtime" "github.com/sysdb/go/proto" ) -// A Conn is a connection to a SysDB server instance. +// A Client is a client for SysDB. // -// Multiple goroutines may invoke methods on a Conn simultaneously but since -// the SysDB protocol requires a strict ordering of request and response -// messages, the communication with the server will usually happen -// sequentially. -type Conn struct { - c net.Conn - network, addr, user string +// A client may be used from multiple goroutines in parallel. +type Client struct { + conns chan *Conn } -func (c *Conn) connect() (err error) { - if c.c, err = net.Dial(c.network, c.addr); err != nil { - return err - } - defer func() { - if err != nil { - c.Close() - } - }() - - m := &proto.Message{ - Type: proto.ConnectionStartup, - Raw: []byte(c.user), - } - if err := c.Send(m); err != nil { - return err - } - - m, err = c.Receive() - if err != nil { - return err - } - if m.Type == proto.ConnectionError { - return fmt.Errorf("failed to startup session: %s", string(m.Raw)) - } - if m.Type != proto.ConnectionOK { - return fmt.Errorf("failed to startup session: unsupported") - } - return nil -} - -// Connect sets up a client connection to a SysDB server instance at the +// Connect creates a new client connected to a SysDB server instance at the // specified address using the specified user. // -// The address may be a UNIX domain socket, either prefixed with 'unix:' or -// specifying an absolute file-system path. -func Connect(addr, user string) (*Conn, error) { - network := "tcp" - if strings.HasPrefix(addr, "unix:") { - network = "unix" - addr = addr[len("unix:"):] - } else if addr[0] == '/' { - network = "unix" - } +// The address may be a IP address or a UNIX domain socket, either prefixed +// with 'unix:' or specifying an absolute file-system path. +func Connect(addr, user string) (*Client, error) { + c := &Client{conns: make(chan *Conn, 2*runtime.NumCPU())} - c := &Conn{network: network, addr: addr, user: user} - if err := c.connect(); err != nil { - return nil, err + for i := 0; i < cap(c.conns); i++ { + conn, err := Dial(addr, user) + if err != nil { + return nil, err + } + c.conns <- conn } return c, nil } -// Close closes the client connection. +// Close closes a client connection. It may not be further used after calling +// this function. // -// Any blocked Send or Receive operations will be unblocked and return errors. -func (c *Conn) Close() { - if c.c == nil { - return - } - c.c.Close() - c.c = nil +// The function waits for all pending operations to finish. +func (c *Client) Close() { + for i := 0; i < cap(c.conns); i++ { + conn := <-c.conns + conn.Close() + } + close(c.conns) + c.conns = nil } -// Send sends the specified raw message to the server. -// -// Send operations block until the full message could be written to the -// underlying sockets. This ensures that server and client don't get out of -// sync. -func (c *Conn) Send(m *proto.Message) error { - var err error - if c.c != nil { - err = proto.Write(c.c, m) - if err == nil { - return nil - } - c.Close() - } +// Call sends the specified request to the server and waits for its reply. It +// blocks until the full reply has been received. +func (c *Client) Call(req *proto.Message) (*proto.Message, error) { + conn := <-c.conns + defer func() { c.conns <- conn }() - // Try to reconnect. - if e := c.connect(); e == nil { - return proto.Write(c.c, m) - } else if err == nil { - err = e + err := conn.Send(req) + if err != nil { + return nil, err } - return err -} -// Receive waits for a reply from the server and returns the raw message. -// -// Receive operations block until a full message could be read from the -// underlying socket. This ensures that server and client don't get out of -// sync. -func (c *Conn) Receive() (*proto.Message, error) { - var err error - if c.c != nil { - var m *proto.Message - m, err = proto.Read(c.c) - if err == nil { - return m, err + for { + res, err := conn.Receive() + switch { + case err != nil: + return nil, err + case res.Type == proto.ConnectionError: + return nil, fmt.Errorf("request failed: %s", string(res.Raw)) + case res.Type != proto.ConnectionLog: + return res, err } - c.Close() - } - // Try to reconnect. - if e := c.connect(); e == nil { - return proto.Read(c.c) - } else if err == nil { - err = e + if len(res.Raw) > 4 { + log.Println(string(res.Raw[4:])) + } } - return nil, err } // ServerVersion queries and returns the version of the remote server. -func (c *Conn) ServerVersion() (major, minor, patch int, extra string, err error) { - m := &proto.Message{Type: proto.ConnectionServerVersion} - if err = c.Send(m); err != nil { - return 0, 0, 0, "", err - } - - m, err = c.Receive() - if err != nil || m.Type != proto.ConnectionOK { +func (c *Client) ServerVersion() (major, minor, patch int, extra string, err error) { + res, err := c.Call(&proto.Message{Type: proto.ConnectionServerVersion}) + if err != nil || res.Type != proto.ConnectionOK { if err == nil { - err = fmt.Errorf("SERVER_VERSION command failed with status %d", m.Type) + err = fmt.Errorf("SERVER_VERSION command failed with status %d", res.Type) } return 0, 0, 0, "", err } - if len(m.Raw) < 4 { + if len(res.Raw) < 4 { return 0, 0, 0, "", fmt.Errorf("SERVER_VERSION reply is too short") } - version := int(binary.BigEndian.Uint32(m.Raw[:4])) + version := int(binary.BigEndian.Uint32(res.Raw[:4])) major = version / 10000 minor = version/100 - 100*major patch = version - 10000*major - 100*minor - if len(m.Raw) > 4 { - extra = string(m.Raw[4:]) + if len(res.Raw) > 4 { + extra = string(res.Raw[4:]) } return major, minor, patch, extra, nil } diff --git a/client/conn.go b/client/conn.go new file mode 100644 index 0000000..f950168 --- /dev/null +++ b/client/conn.go @@ -0,0 +1,159 @@ +// +// Copyright (C) 2014 Sebastian 'tokkee' Harl +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +// OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package client + +import ( + "fmt" + "net" + "strings" + + "github.com/sysdb/go/proto" +) + +// A Conn is a connection to a SysDB server instance. +// +// Multiple goroutines may invoke methods on a Conn simultaneously but since +// the SysDB protocol requires a strict ordering of request and response +// messages, the communication with the server will usually happen +// sequentially. +type Conn struct { + c net.Conn + network, addr, user string +} + +func (c *Conn) dial() (err error) { + if c.c, err = net.Dial(c.network, c.addr); err != nil { + return err + } + defer func() { + if err != nil { + c.Close() + } + }() + + m := &proto.Message{ + Type: proto.ConnectionStartup, + Raw: []byte(c.user), + } + if err := c.Send(m); err != nil { + return err + } + + m, err = c.Receive() + if err != nil { + return err + } + if m.Type == proto.ConnectionError { + return fmt.Errorf("failed to startup session: %s", string(m.Raw)) + } + if m.Type != proto.ConnectionOK { + return fmt.Errorf("failed to startup session: unsupported") + } + return nil +} + +// Dial sets up a client connection to a SysDB server instance at the +// specified address using the specified user. +// +// The address may be a UNIX domain socket, either prefixed with 'unix:' or +// specifying an absolute file-system path. +func Dial(addr, user string) (*Conn, error) { + network := "tcp" + if strings.HasPrefix(addr, "unix:") { + network = "unix" + addr = addr[len("unix:"):] + } else if len(addr) > 0 && addr[0] == '/' { + network = "unix" + } + + c := &Conn{network: network, addr: addr, user: user} + if err := c.dial(); err != nil { + return nil, err + } + return c, nil +} + +// Close closes the client connection. +// +// Any blocked Send or Receive operations will be unblocked and return errors. +func (c *Conn) Close() { + if c.c == nil { + return + } + c.c.Close() + c.c = nil +} + +// Send sends the specified raw message to the server. +// +// Send operations block until the full message could be written to the +// underlying sockets. This ensures that server and client don't get out of +// sync. +func (c *Conn) Send(m *proto.Message) error { + var err error + if c.c != nil { + err = proto.Write(c.c, m) + if err == nil { + return nil + } + c.Close() + } + + // Try to reconnect. + if e := c.dial(); e == nil { + return proto.Write(c.c, m) + } else if err == nil { + err = e + } + return err +} + +// Receive waits for a reply from the server and returns the raw message. +// +// Receive operations block until a full message could be read from the +// underlying socket. This ensures that server and client don't get out of +// sync. +func (c *Conn) Receive() (*proto.Message, error) { + var err error + if c.c != nil { + var m *proto.Message + m, err = proto.Read(c.c) + if err == nil { + return m, err + } + c.Close() + } + + // Try to reconnect. + if e := c.dial(); e == nil { + return proto.Read(c.c) + } else if err == nil { + err = e + } + return nil, err +} + +// vim: set tw=78 sw=4 sw=4 noexpandtab : -- 2.30.2