summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: e4d7b0f)
raw | patch | inline | side by side (parent: e4d7b0f)
author | Sebastian Harl <sh@tokkee.org> | |
Sat, 9 May 2015 13:41:09 +0000 (15:41 +0200) | ||
committer | Sebastian Harl <sh@tokkee.org> | |
Sat, 9 May 2015 13:41:09 +0000 (15:41 +0200) |
A client maintains multiple connections to the server and uses one of them
exclusively for each request.
exclusively for each request.
client/client.go | patch | blob | history | |
client/conn.go | [new file with mode: 0644] | patch | blob |
diff --git a/client/client.go b/client/client.go
index c42c7173793bacbd53a3a08eef1ddd108de97b6e..331501909ea8378104a13c189a3123332854ecc6 100644 (file)
--- a/client/client.go
+++ b/client/client.go
//
-// Copyright (C) 2014 Sebastian 'tokkee' Harl <sh@tokkee.org>
+// Copyright (C) 2014-2015 Sebastian 'tokkee' Harl <sh@tokkee.org>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
}
defer c.Close()
+Then, it can issue requests to the server:
+
+ major, minor, patch, extra, err := c.ServerVersion()
+ if err != nil {
+ // handle error
+ }
+ fmt.Printf("Connected to SysDB %d.%d.%d%s\n", major, minor, patch, extra)
+
+or:
+
+ res, err := c.Call(&proto.Message{Type: proto.ConnectionServerVersion})
+ if err != nil {
+ // handle error
+ }
+ fmt.Printf("%v\n", res)
+
+Each client maintains multiple connections to a SysDB server allowing to
+handle multiple requests in parallel. The SysDB server is able to handle that
+easily making it a cheap approach. The low-level Dial function creates a
+single connection to a SysDB server allowing to perform low-level operations:
+
+ conn, err := client.Dial("unix:/var/run/sysdbd.sock", "username")
+ if err != nil {
+ // handle error
+ }
+ defer conn.Close()
+
The github.com/sysdb/go/proto package provides support for handling requests
and responses. Use the Send and Receive functions to communicate with the
server:
Type: proto.ConnectionQuery,
Raw: []byte{"LOOKUP hosts MATCHING attribute.architecture = 'amd64';"},
}
- if err := c.Send(m); err != nil {
+ if err := conn.Send(m); err != nil {
// handle error
}
- m, err := c.Receive()
+ m, err := conn.Receive()
if err != nil {
// handle error
}
import (
"encoding/binary"
"fmt"
- "net"
- "strings"
+ "log"
+ "runtime"
"github.com/sysdb/go/proto"
)
-// A Conn is a connection to a SysDB server instance.
+// A Client is a client for SysDB.
//
-// Multiple goroutines may invoke methods on a Conn simultaneously but since
-// the SysDB protocol requires a strict ordering of request and response
-// messages, the communication with the server will usually happen
-// sequentially.
-type Conn struct {
- c net.Conn
- network, addr, user string
+// A client may be used from multiple goroutines in parallel.
+type Client struct {
+ conns chan *Conn
}
-func (c *Conn) connect() (err error) {
- if c.c, err = net.Dial(c.network, c.addr); err != nil {
- return err
- }
- defer func() {
- if err != nil {
- c.Close()
- }
- }()
-
- m := &proto.Message{
- Type: proto.ConnectionStartup,
- Raw: []byte(c.user),
- }
- if err := c.Send(m); err != nil {
- return err
- }
-
- m, err = c.Receive()
- if err != nil {
- return err
- }
- if m.Type == proto.ConnectionError {
- return fmt.Errorf("failed to startup session: %s", string(m.Raw))
- }
- if m.Type != proto.ConnectionOK {
- return fmt.Errorf("failed to startup session: unsupported")
- }
- return nil
-}
-
-// Connect sets up a client connection to a SysDB server instance at the
+// Connect creates a new client connected to a SysDB server instance at the
// specified address using the specified user.
//
-// The address may be a UNIX domain socket, either prefixed with 'unix:' or
-// specifying an absolute file-system path.
-func Connect(addr, user string) (*Conn, error) {
- network := "tcp"
- if strings.HasPrefix(addr, "unix:") {
- network = "unix"
- addr = addr[len("unix:"):]
- } else if addr[0] == '/' {
- network = "unix"
- }
+// The address may be a IP address or a UNIX domain socket, either prefixed
+// with 'unix:' or specifying an absolute file-system path.
+func Connect(addr, user string) (*Client, error) {
+ c := &Client{conns: make(chan *Conn, 2*runtime.NumCPU())}
- c := &Conn{network: network, addr: addr, user: user}
- if err := c.connect(); err != nil {
- return nil, err
+ for i := 0; i < cap(c.conns); i++ {
+ conn, err := Dial(addr, user)
+ if err != nil {
+ return nil, err
+ }
+ c.conns <- conn
}
return c, nil
}
-// Close closes the client connection.
+// Close closes a client connection. It may not be further used after calling
+// this function.
//
-// Any blocked Send or Receive operations will be unblocked and return errors.
-func (c *Conn) Close() {
- if c.c == nil {
- return
- }
- c.c.Close()
- c.c = nil
+// The function waits for all pending operations to finish.
+func (c *Client) Close() {
+ for i := 0; i < cap(c.conns); i++ {
+ conn := <-c.conns
+ conn.Close()
+ }
+ close(c.conns)
+ c.conns = nil
}
-// Send sends the specified raw message to the server.
-//
-// Send operations block until the full message could be written to the
-// underlying sockets. This ensures that server and client don't get out of
-// sync.
-func (c *Conn) Send(m *proto.Message) error {
- var err error
- if c.c != nil {
- err = proto.Write(c.c, m)
- if err == nil {
- return nil
- }
- c.Close()
- }
+// Call sends the specified request to the server and waits for its reply. It
+// blocks until the full reply has been received.
+func (c *Client) Call(req *proto.Message) (*proto.Message, error) {
+ conn := <-c.conns
+ defer func() { c.conns <- conn }()
- // Try to reconnect.
- if e := c.connect(); e == nil {
- return proto.Write(c.c, m)
- } else if err == nil {
- err = e
+ err := conn.Send(req)
+ if err != nil {
+ return nil, err
}
- return err
-}
-// Receive waits for a reply from the server and returns the raw message.
-//
-// Receive operations block until a full message could be read from the
-// underlying socket. This ensures that server and client don't get out of
-// sync.
-func (c *Conn) Receive() (*proto.Message, error) {
- var err error
- if c.c != nil {
- var m *proto.Message
- m, err = proto.Read(c.c)
- if err == nil {
- return m, err
+ for {
+ res, err := conn.Receive()
+ switch {
+ case err != nil:
+ return nil, err
+ case res.Type == proto.ConnectionError:
+ return nil, fmt.Errorf("request failed: %s", string(res.Raw))
+ case res.Type != proto.ConnectionLog:
+ return res, err
}
- c.Close()
- }
- // Try to reconnect.
- if e := c.connect(); e == nil {
- return proto.Read(c.c)
- } else if err == nil {
- err = e
+ if len(res.Raw) > 4 {
+ log.Println(string(res.Raw[4:]))
+ }
}
- return nil, err
}
// ServerVersion queries and returns the version of the remote server.
-func (c *Conn) ServerVersion() (major, minor, patch int, extra string, err error) {
- m := &proto.Message{Type: proto.ConnectionServerVersion}
- if err = c.Send(m); err != nil {
- return 0, 0, 0, "", err
- }
-
- m, err = c.Receive()
- if err != nil || m.Type != proto.ConnectionOK {
+func (c *Client) ServerVersion() (major, minor, patch int, extra string, err error) {
+ res, err := c.Call(&proto.Message{Type: proto.ConnectionServerVersion})
+ if err != nil || res.Type != proto.ConnectionOK {
if err == nil {
- err = fmt.Errorf("SERVER_VERSION command failed with status %d", m.Type)
+ err = fmt.Errorf("SERVER_VERSION command failed with status %d", res.Type)
}
return 0, 0, 0, "", err
}
- if len(m.Raw) < 4 {
+ if len(res.Raw) < 4 {
return 0, 0, 0, "", fmt.Errorf("SERVER_VERSION reply is too short")
}
- version := int(binary.BigEndian.Uint32(m.Raw[:4]))
+ version := int(binary.BigEndian.Uint32(res.Raw[:4]))
major = version / 10000
minor = version/100 - 100*major
patch = version - 10000*major - 100*minor
- if len(m.Raw) > 4 {
- extra = string(m.Raw[4:])
+ if len(res.Raw) > 4 {
+ extra = string(res.Raw[4:])
}
return major, minor, patch, extra, nil
}
diff --git a/client/conn.go b/client/conn.go
--- /dev/null
+++ b/client/conn.go
@@ -0,0 +1,159 @@
+//
+// Copyright (C) 2014 Sebastian 'tokkee' Harl <sh@tokkee.org>
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions
+// are met:
+// 1. Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// 2. Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+// TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
+// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+// OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+// ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+package client
+
+import (
+ "fmt"
+ "net"
+ "strings"
+
+ "github.com/sysdb/go/proto"
+)
+
+// A Conn is a connection to a SysDB server instance.
+//
+// Multiple goroutines may invoke methods on a Conn simultaneously but since
+// the SysDB protocol requires a strict ordering of request and response
+// messages, the communication with the server will usually happen
+// sequentially.
+type Conn struct {
+ c net.Conn
+ network, addr, user string
+}
+
+func (c *Conn) dial() (err error) {
+ if c.c, err = net.Dial(c.network, c.addr); err != nil {
+ return err
+ }
+ defer func() {
+ if err != nil {
+ c.Close()
+ }
+ }()
+
+ m := &proto.Message{
+ Type: proto.ConnectionStartup,
+ Raw: []byte(c.user),
+ }
+ if err := c.Send(m); err != nil {
+ return err
+ }
+
+ m, err = c.Receive()
+ if err != nil {
+ return err
+ }
+ if m.Type == proto.ConnectionError {
+ return fmt.Errorf("failed to startup session: %s", string(m.Raw))
+ }
+ if m.Type != proto.ConnectionOK {
+ return fmt.Errorf("failed to startup session: unsupported")
+ }
+ return nil
+}
+
+// Dial sets up a client connection to a SysDB server instance at the
+// specified address using the specified user.
+//
+// The address may be a UNIX domain socket, either prefixed with 'unix:' or
+// specifying an absolute file-system path.
+func Dial(addr, user string) (*Conn, error) {
+ network := "tcp"
+ if strings.HasPrefix(addr, "unix:") {
+ network = "unix"
+ addr = addr[len("unix:"):]
+ } else if len(addr) > 0 && addr[0] == '/' {
+ network = "unix"
+ }
+
+ c := &Conn{network: network, addr: addr, user: user}
+ if err := c.dial(); err != nil {
+ return nil, err
+ }
+ return c, nil
+}
+
+// Close closes the client connection.
+//
+// Any blocked Send or Receive operations will be unblocked and return errors.
+func (c *Conn) Close() {
+ if c.c == nil {
+ return
+ }
+ c.c.Close()
+ c.c = nil
+}
+
+// Send sends the specified raw message to the server.
+//
+// Send operations block until the full message could be written to the
+// underlying sockets. This ensures that server and client don't get out of
+// sync.
+func (c *Conn) Send(m *proto.Message) error {
+ var err error
+ if c.c != nil {
+ err = proto.Write(c.c, m)
+ if err == nil {
+ return nil
+ }
+ c.Close()
+ }
+
+ // Try to reconnect.
+ if e := c.dial(); e == nil {
+ return proto.Write(c.c, m)
+ } else if err == nil {
+ err = e
+ }
+ return err
+}
+
+// Receive waits for a reply from the server and returns the raw message.
+//
+// Receive operations block until a full message could be read from the
+// underlying socket. This ensures that server and client don't get out of
+// sync.
+func (c *Conn) Receive() (*proto.Message, error) {
+ var err error
+ if c.c != nil {
+ var m *proto.Message
+ m, err = proto.Read(c.c)
+ if err == nil {
+ return m, err
+ }
+ c.Close()
+ }
+
+ // Try to reconnect.
+ if e := c.dial(); e == nil {
+ return proto.Read(c.c)
+ } else if err == nil {
+ err = e
+ }
+ return nil, err
+}
+
+// vim: set tw=78 sw=4 sw=4 noexpandtab :