Code

Merged branch 'master' of git://git.tokkee.org/sysdb.
authorSebastian Harl <sh@tokkee.org>
Fri, 20 Dec 2013 19:09:48 +0000 (20:09 +0100)
committerSebastian Harl <sh@tokkee.org>
Fri, 20 Dec 2013 19:09:48 +0000 (20:09 +0100)
src/client/sock.c
src/core/plugin.c
src/frontend/sock.c
src/include/frontend/sock.h
src/include/utils/proto.h
src/utils/proto.c
t/Makefile.am
t/frontend/sock_test.c [new file with mode: 0644]
t/libsysdb_test.c
t/libsysdb_test.h

index 66b83a70009f16921ef70f8e487ae14eabe454dd..a2417ad6d872029e8c80f51d83899b65e9026a06 100644 (file)
@@ -233,7 +233,8 @@ sdb_client_recv(sdb_client_t *client,
        while (42) {
                ssize_t status;
 
-               /* XXX: use select */
+               if (sdb_proto_select(client->fd, SDB_PROTO_SELECTIN))
+                       return -1;
 
                errno = 0;
                status = sdb_strbuf_read(buf, client->fd, req);
index 8200338a9d8fbb0804869b96f9267ed7b776e683..46976d01f7435c580e088f3aebd7836ad14ebbf4 100644 (file)
@@ -117,6 +117,9 @@ static sdb_plugin_info_t plugin_default_info = SDB_PLUGIN_INFO_INIT;
 static pthread_key_t     plugin_ctx_key;
 static _Bool             plugin_ctx_key_initialized = 0;
 
+/* a list of the plugin contexts of all registered plugins */
+static sdb_llist_t      *all_plugins = NULL;
+
 static sdb_llist_t      *config_list = NULL;
 static sdb_llist_t      *init_list = NULL;
 static sdb_llist_t      *collector_list = NULL;
@@ -257,11 +260,11 @@ static sdb_type_t ctx_type = {
 };
 
 static ctx_t *
-ctx_create(void)
+ctx_create(const char *name)
 {
        ctx_t *ctx;
 
-       ctx = CTX(sdb_object_create("plugin-context", ctx_type));
+       ctx = CTX(sdb_object_create(name, ctx_type));
        if (! ctx)
                return NULL;
 
@@ -433,7 +436,7 @@ sdb_plugin_load(const char *name, const sdb_plugin_ctx_t *plugin_ctx)
        if (ctx_get())
                sdb_log(SDB_LOG_WARNING, "core: Discarding old plugin context");
 
-       ctx = ctx_create();
+       ctx = ctx_create(real_name);
        if (! ctx) {
                sdb_log(SDB_LOG_ERR, "core: Failed to initialize plugin context");
                return -1;
@@ -471,6 +474,18 @@ sdb_plugin_load(const char *name, const sdb_plugin_ctx_t *plugin_ctx)
                                name, SDB_VERSION_DECODE(ctx->info.version),
                                SDB_VERSION_DECODE(SDB_VERSION));
 
+       if (! all_plugins) {
+               if (! (all_plugins = sdb_llist_create())) {
+                       sdb_log(SDB_LOG_ERR, "core: Failed to load plugin '%s': "
+                                       "internal error while creating linked list", name);
+                       plugin_unregister_by_name(ctx->info.plugin_name);
+                       sdb_object_deref(SDB_OBJ(ctx));
+                       return -1;
+               }
+       }
+
+       sdb_llist_append(all_plugins, SDB_OBJ(ctx));
+
        sdb_log(SDB_LOG_INFO, "core: Successfully loaded "
                        "plugin '%s' v%i (%s)\n\t%s\n\tLicense: %s",
                        INFO_GET(&ctx->info, name), ctx->info.plugin_version,
@@ -695,8 +710,12 @@ sdb_plugin_configure(const char *name, oconfig_item_t *ci)
        plugin = SDB_PLUGIN_CB(sdb_llist_search_by_name(config_list, name));
        if (! plugin) {
                /* XXX: check if any such plugin has been loaded */
-               sdb_log(SDB_LOG_ERR, "core: Plugin '%s' did not register "
-                               "a config callback.", name);
+               ctx_t *ctx = CTX(sdb_llist_search_by_name(all_plugins, name));
+               if (! ctx)
+                       sdb_log(SDB_LOG_ERR, "core: Plugin '%s' not loaded.", name);
+               else
+                       sdb_log(SDB_LOG_ERR, "core: Plugin '%s' did not register "
+                                       "a config callback.", name);
                errno = ENOENT;
                return -1;
        }
index 65884ea945bf61fe935b48fe60ab819b453b1eb8..0c4829e2efc4d3737fb9cf4a5f21cf3df5fa59b5 100644 (file)
@@ -68,6 +68,7 @@ typedef struct {
        const char *prefix;
 
        int (*opener)(listener_t *);
+       void (*closer)(listener_t *);
 } fe_listener_impl_t;
 
 struct sdb_fe_socket {
@@ -121,6 +122,20 @@ open_unix_sock(listener_t *listener)
        return 0;
 } /* open_unix_sock */
 
+static void
+close_unix_sock(listener_t *listener)
+{
+       assert(listener);
+       if (! listener->address)
+               return;
+
+       if (listener->sock_fd >= 0)
+               close(listener->sock_fd);
+       listener->sock_fd = -1;
+
+       unlink(listener->address + strlen("unix:"));
+} /* close_unix_sock */
+
 /*
  * private variables
  */
@@ -131,13 +146,46 @@ enum {
        LISTENER_UNIXSOCK = 0,
 };
 static fe_listener_impl_t listener_impls[] = {
-       { LISTENER_UNIXSOCK, "unix", open_unix_sock },
+       { LISTENER_UNIXSOCK, "unix", open_unix_sock, close_unix_sock },
 };
 
 /*
  * private helper functions
  */
 
+static int
+listener_listen(listener_t *listener)
+{
+       assert(listener);
+
+       /* try to reopen */
+       if (listener->sock_fd < 0)
+               if (listener_impls[listener->type].opener(listener))
+                       return -1;
+       assert(listener->sock_fd >= 0);
+
+       if (listen(listener->sock_fd, /* backlog = */ 32)) {
+               char buf[1024];
+               sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s",
+                               listener->address, sdb_strerror(errno, buf, sizeof(buf)));
+               return -1;
+       }
+       return 0;
+} /* listener_listen */
+
+static void
+listener_close(listener_t *listener)
+{
+       assert(listener);
+
+       if (listener_impls[listener->type].closer)
+               listener_impls[listener->type].closer(listener);
+
+       if (listener->sock_fd >= 0)
+               close(listener->sock_fd);
+       listener->sock_fd = -1;
+} /* listener_close */
+
 static int
 get_type(const char *address)
 {
@@ -169,12 +217,11 @@ listener_destroy(listener_t *listener)
        if (! listener)
                return;
 
-       if (listener->sock_fd >= 0)
-               close(listener->sock_fd);
-       listener->sock_fd = -1;
+       listener_close(listener);
 
        if (listener->address)
                free(listener->address);
+       listener->address = NULL;
 } /* listener_destroy */
 
 static listener_t *
@@ -223,38 +270,6 @@ listener_create(sdb_fe_socket_t *sock, const char *address)
        return listener;
 } /* listener_create */
 
-static int
-listener_listen(listener_t *listener)
-{
-       assert(listener);
-
-       /* try to reopen */
-       if (listener->sock_fd < 0)
-               if (listener_impls[listener->type].opener(listener))
-                       return -1;
-       assert(listener->sock_fd >= 0);
-
-       if (listen(listener->sock_fd, /* backlog = */ 32)) {
-               char buf[1024];
-               sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s",
-                               listener->address, sdb_strerror(errno, buf, sizeof(buf)));
-               return -1;
-       }
-       return 0;
-} /* listener_listen */
-
-static void
-listener_close(listener_t *listener)
-{
-       assert(listener);
-
-       if (listener->sock_fd < 0)
-               return;
-
-       close(listener->sock_fd);
-       listener->sock_fd = -1;
-} /* listener_close */
-
 static void
 socket_close(sdb_fe_socket_t *sock)
 {
@@ -437,12 +452,16 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
        int max_listen_fd = 0;
        size_t i;
 
-       /* XXX: make the number of threads configurable */
-       pthread_t handler_threads[5];
+       pthread_t handler_threads[loop->num_threads];
+       size_t num_threads;
 
-       if ((! sock) || (! sock->listeners_num) || (! loop) || sock->chan)
+       if ((! sock) || (! sock->listeners_num) || sock->chan
+                       || (! loop) || (loop->num_threads <= 0))
                return -1;
 
+       if (! loop->do_loop)
+               return 0;
+
        FD_ZERO(&sockets);
        for (i = 0; i < sock->listeners_num; ++i) {
                listener_t *listener = sock->listeners + i;
@@ -463,13 +482,27 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                return -1;
        }
 
+       sdb_log(SDB_LOG_INFO, "frontend: Starting %d connection "
+                       "handler thread%s managing %d listener%s",
+                       loop->num_threads, loop->num_threads == 1 ? "" : "s",
+                       sock->listeners_num, sock->listeners_num == 1 ? "" : "s");
+
+       num_threads = loop->num_threads;
        memset(&handler_threads, 0, sizeof(handler_threads));
-       /* XXX: error handling */
-       for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
-               pthread_create(&handler_threads[i], /* attr = */ NULL,
-                               connection_handler, /* arg = */ sock);
+       for (i = 0; i < num_threads; ++i) {
+               errno = 0;
+               if (pthread_create(&handler_threads[i], /* attr = */ NULL,
+                                       connection_handler, /* arg = */ sock)) {
+                       char errbuf[1024];
+                       sdb_log(SDB_LOG_ERR, "frontend: Failed to create "
+                                       "connection handler thread: %s",
+                                       sdb_strerror(errno, errbuf, sizeof(errbuf)));
+                       num_threads = i;
+                       break;
+               }
+       }
 
-       while (loop->do_loop) {
+       while (loop->do_loop && num_threads) {
                struct timeval timeout = { 1, 0 }; /* one second */
                sdb_llist_iter_t *iter;
 
@@ -525,12 +558,15 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
        sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads "
                        "to terminate");
        if (! sdb_channel_shutdown(sock->chan))
-               for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
+               for (i = 0; i < num_threads; ++i)
                        pthread_join(handler_threads[i], NULL);
        /* else: we tried our best; let the operating system clean up */
 
        sdb_channel_destroy(sock->chan);
        sock->chan = NULL;
+
+       if (! num_threads)
+               return -1;
        return 0;
 } /* sdb_fe_sock_listen_and_server */
 
index 6ab8ccadbf9ebb4ebee101a2701e9e43d57cb296..7d1fd7a3ceed81995e790921a2258949d8d25f7a 100644 (file)
@@ -25,6 +25,8 @@
  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+#include <unistd.h>
+
 #ifndef SDB_FRONTEND_SOCK_H
 #define SDB_FRONTEND_SOCK_H 1
 
@@ -34,9 +36,13 @@ extern "C" {
 
 /* manage a front-end listener loop */
 typedef struct {
+       /* number of handler threads to create */
+       size_t num_threads;
+
+       /* front-end listener shuts down when this is set to false */
        _Bool do_loop;
 } sdb_fe_loop_t;
-#define SDB_FE_LOOP_INIT { 1 }
+#define SDB_FE_LOOP_INIT { 5, 1 }
 
 /*
  * sdb_fe_socket_t:
index 372e34d474b7be20dd5fc13ce6c04025748bc0a8..f3f1a871d83e48c28e96bee92fd8a40ac4e3188a 100644 (file)
 extern "C" {
 #endif
 
+enum {
+       SDB_PROTO_SELECTIN = 0,
+       SDB_PROTO_SELECTOUT,
+       SDB_PROTO_SELECTERR,
+};
+
+/*
+ * sdb_proto_select:
+ * Wait for a file-descriptor to become ready for I/O operations of the
+ * specified type. This is a simple wrapper around the select() system call.
+ * The type argument may be any of the SDB_PROTO_SELECT* constants.
+ *
+ * Returns:
+ *  - the number of file descriptors ready for I/O
+ *  - a negative value on error
+ */
+int
+sdb_proto_select(int fd, int type);
+
 ssize_t
 sdb_proto_send(int fd, size_t msg_len, const char *msg);
 
index 29783efd5aefb3db7cd09037f6f534fa4363b020..404d2a30ec5d2dabd270ab393bd9e262ec429017 100644 (file)
 #include <string.h>
 #include <unistd.h>
 
+#include <sys/select.h>
+
 /*
  * public API
  */
 
+int
+sdb_proto_select(int fd, int type)
+{
+       fd_set fds;
+       fd_set *readfds = NULL;
+       fd_set *writefds = NULL;
+       fd_set *exceptfds = NULL;
+
+       if (fd < 0) {
+               errno = EBADF;
+               return -1;
+       }
+
+       FD_ZERO(&fds);
+
+       switch (type) {
+               case SDB_PROTO_SELECTIN:
+                       readfds = &fds;
+                       break;
+               case SDB_PROTO_SELECTOUT:
+                       writefds = &fds;
+                       break;
+               case SDB_PROTO_SELECTERR:
+                       exceptfds = &fds;
+                       break;
+               default:
+                       errno = EINVAL;
+                       return -1;
+       }
+
+       FD_SET(fd, &fds);
+
+       while (42) {
+               int n;
+               errno = 0;
+               n = select(fd + 1, readfds, writefds, exceptfds, NULL);
+
+               if ((n < 0) && (errno != EINTR))
+                       return (ssize_t)n;
+               if (n > 0)
+                       break;
+       }
+       return 0;
+} /* sdb_proto_select */
+
 ssize_t
 sdb_proto_send(int fd, size_t msg_len, const char *msg)
 {
@@ -56,7 +103,8 @@ sdb_proto_send(int fd, size_t msg_len, const char *msg)
        while (len > 0) {
                ssize_t status;
 
-               /* XXX: use select() */
+               if (sdb_proto_select(fd, SDB_PROTO_SELECTOUT))
+                       return -1;
 
                errno = 0;
                status = write(fd, buf, len);
index 87140cf2016051b371baed84369e145b36d32a10..d7e40139f61a8adab8f41154ad5676c1db3eb1a3 100644 (file)
@@ -8,6 +8,7 @@ libsysdb_test_SOURCES = \
                libsysdb_test.c libsysdb_test.h \
                core/object_test.c \
                core/store_test.c \
+               frontend/sock_test.c \
                utils/channel_test.c \
                utils/dbi_test.c \
                utils/llist_test.c \
diff --git a/t/frontend/sock_test.c b/t/frontend/sock_test.c
new file mode 100644 (file)
index 0000000..55f1a0c
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * SysDB - t/frontend/sock_test.c
+ * Copyright (C) 2013 Sebastian 'tokkee' Harl <sh@tokkee.org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "frontend/sock.h"
+#include "libsysdb_test.h"
+
+#include <check.h>
+
+#include <errno.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <unistd.h>
+
+#include <pthread.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+
+/*
+ * private variables
+ */
+
+static sdb_fe_socket_t *sock;
+
+static void
+setup(void)
+{
+       sock = sdb_fe_sock_create();
+       fail_unless(sock != NULL,
+                       "sdb_fe_sock_create() = NULL; expected frontend sock object");
+} /* setup */
+
+static void
+teardown(void)
+{
+       sdb_fe_sock_destroy(sock);
+       sock = NULL;
+} /* teardown */
+
+static void
+sock_listen(char *tmp_file)
+{
+       char sock_addr[strlen("unix:") + L_tmpnam + 1];
+       char *filename;
+
+       int check;
+
+       filename = tmpnam(tmp_file);
+       fail_unless(filename != NULL,
+                       "INTERNAL ERROR: tmpnam() = NULL; expected: a string");
+
+       sprintf(sock_addr, "unix:%s", tmp_file);
+       check = sdb_fe_sock_add_listener(sock, sock_addr);
+       fail_unless(check == 0,
+                       "sdb_fe_sock_add_listener(%s) = %i; expected: 0",
+                       sock_addr, check);
+} /* conn */
+
+/*
+ * parallel testing
+ */
+
+static void *
+sock_handler(void *data)
+{
+       sdb_fe_loop_t *loop = data;
+       int check;
+
+       check = sdb_fe_sock_listen_and_serve(sock, loop);
+       fail_unless(check == 0,
+                       "sdb_fe_sock_listen_and_serve() = %i; "
+                       "expected: 0 (after adding listener)", check);
+       return NULL;
+} /* sock_handler */
+
+/*
+ * tests
+ */
+
+START_TEST(test_listen_and_serve)
+{
+       sdb_fe_loop_t loop = SDB_FE_LOOP_INIT;
+
+       char tmp_file[L_tmpnam];
+       int check;
+
+       pthread_t thr;
+
+       int sock_fd;
+       struct sockaddr_un sa;
+
+       check = sdb_fe_sock_listen_and_serve(sock, &loop);
+       fail_unless(check < 0,
+                       "sdb_fe_sock_listen_and_serve() = %i; "
+                       "expected: <0 (before adding listeners)", check);
+
+       sock_listen(tmp_file);
+
+       loop.do_loop = 1;
+       check = pthread_create(&thr, /* attr = */ NULL, sock_handler, &loop);
+       fail_unless(check == 0,
+                       "INTERNAL ERROR: pthread_create() = %i; expected: 0", check);
+
+       sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       fail_unless(sock_fd >= 0,
+                       "INTERNAL ERROR: socket() = %d; expected: >= 0", sock_fd);
+
+       sa.sun_family = AF_UNIX;
+       strncpy(sa.sun_path, tmp_file, sizeof(sa.sun_path));
+
+       /* wait for socket to become available */
+       errno = ECONNREFUSED;
+       while (errno == ECONNREFUSED) {
+               check = connect(sock_fd, (struct sockaddr *)&sa, sizeof(sa));
+               if (! check)
+                       break;
+
+               fail_unless(errno == ECONNREFUSED,
+                               "INTERNAL ERROR: connect() = %d [errno=%d]; expected: 0",
+                               check, errno);
+       }
+
+       close(sock_fd);
+
+       loop.do_loop = 0;
+       pthread_join(thr, NULL);
+
+       fail_unless(access(tmp_file, F_OK),
+                       "sdb_fe_sock_listen_and_serve() did not clean up "
+                       "socket %s", tmp_file);
+
+       /* should do nothing and not report errors */
+       check = sdb_fe_sock_listen_and_serve(sock, &loop);
+       fail_unless(check == 0,
+                       "sdb_fe_sock_listen_and_serve() = %i; "
+                       "expected: <0 (do_loop == 0)", check);
+       fail_unless(access(tmp_file, F_OK),
+                       "sdb_fe_sock_listen_and_serve() recreated socket "
+                       "(do_loop == 0)");
+}
+END_TEST
+
+Suite *
+fe_sock_suite(void)
+{
+       Suite *s = suite_create("frontend::sock");
+       TCase *tc;
+
+       tc = tcase_create("core");
+       tcase_add_checked_fixture(tc, setup, teardown);
+       tcase_add_test(tc, test_listen_and_serve);
+       suite_add_tcase(s, tc);
+
+       return s;
+} /* util_unixsock_suite */
+
+/* vim: set tw=78 sw=4 ts=4 noexpandtab : */
+
index ca2a22d01efd1c7f862dd8fedb1b6dea9a462086..110df8b08c37e3f3abf3a5e01dfadee4c83cddb0 100644 (file)
@@ -40,6 +40,7 @@ main(void)
        suite_creator_t creators[] = {
                { core_object_suite, NULL },
                { core_store_suite, NULL },
+               { fe_sock_suite, NULL },
                { util_channel_suite, NULL },
                { util_dbi_suite, NULL },
                { util_llist_suite, NULL },
index f9fddebdec64b48ec23f2473cf7c7d8b602d8e09..616a84e0a437921c9b491b9d09d952258ac9e7dc 100644 (file)
@@ -67,6 +67,10 @@ core_object_suite(void);
 Suite *
 core_store_suite(void);
 
+/* t/frontend/sock_test */
+Suite *
+fe_sock_suite(void);
+
 /* t/utils/channel_test */
 Suite *
 util_channel_suite(void);