Code

Merged branch 'master' of git://git.tokkee.org/sysdb.
[sysdb.git] / src / frontend / sock.c
index 4cf1d79e295a832e44d08a272156f4800fe6eb6b..0c4829e2efc4d3737fb9cf4a5f21cf3df5fa59b5 100644 (file)
@@ -68,6 +68,7 @@ typedef struct {
        const char *prefix;
 
        int (*opener)(listener_t *);
+       void (*closer)(listener_t *);
 } fe_listener_impl_t;
 
 struct sdb_fe_socket {
@@ -104,6 +105,13 @@ open_unix_sock(listener_t *listener)
        strncpy(sa.sun_path, listener->address + strlen("unix:"),
                        sizeof(sa.sun_path));
 
+       if (unlink(listener->address + strlen("unix:")) && (errno != ENOENT)) {
+               char errbuf[1024];
+               sdb_log(SDB_LOG_WARNING, "frontend: Failed to remove stale UNIX "
+                               "socket %s: %s", listener->address + strlen("unix:"),
+                               sdb_strerror(errno, errbuf, sizeof(errbuf)));
+       }
+
        status = bind(listener->sock_fd, (struct sockaddr *)&sa, sizeof(sa));
        if (status) {
                char buf[1024];
@@ -114,6 +122,20 @@ open_unix_sock(listener_t *listener)
        return 0;
 } /* open_unix_sock */
 
+static void
+close_unix_sock(listener_t *listener)
+{
+       assert(listener);
+       if (! listener->address)
+               return;
+
+       if (listener->sock_fd >= 0)
+               close(listener->sock_fd);
+       listener->sock_fd = -1;
+
+       unlink(listener->address + strlen("unix:"));
+} /* close_unix_sock */
+
 /*
  * private variables
  */
@@ -124,13 +146,46 @@ enum {
        LISTENER_UNIXSOCK = 0,
 };
 static fe_listener_impl_t listener_impls[] = {
-       { LISTENER_UNIXSOCK, "unix", open_unix_sock },
+       { LISTENER_UNIXSOCK, "unix", open_unix_sock, close_unix_sock },
 };
 
 /*
  * private helper functions
  */
 
+static int
+listener_listen(listener_t *listener)
+{
+       assert(listener);
+
+       /* try to reopen */
+       if (listener->sock_fd < 0)
+               if (listener_impls[listener->type].opener(listener))
+                       return -1;
+       assert(listener->sock_fd >= 0);
+
+       if (listen(listener->sock_fd, /* backlog = */ 32)) {
+               char buf[1024];
+               sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s",
+                               listener->address, sdb_strerror(errno, buf, sizeof(buf)));
+               return -1;
+       }
+       return 0;
+} /* listener_listen */
+
+static void
+listener_close(listener_t *listener)
+{
+       assert(listener);
+
+       if (listener_impls[listener->type].closer)
+               listener_impls[listener->type].closer(listener);
+
+       if (listener->sock_fd >= 0)
+               close(listener->sock_fd);
+       listener->sock_fd = -1;
+} /* listener_close */
+
 static int
 get_type(const char *address)
 {
@@ -162,12 +217,11 @@ listener_destroy(listener_t *listener)
        if (! listener)
                return;
 
-       if (listener->sock_fd >= 0)
-               close(listener->sock_fd);
-       listener->sock_fd = -1;
+       listener_close(listener);
 
        if (listener->address)
                free(listener->address);
+       listener->address = NULL;
 } /* listener_destroy */
 
 static listener_t *
@@ -216,38 +270,6 @@ listener_create(sdb_fe_socket_t *sock, const char *address)
        return listener;
 } /* listener_create */
 
-static int
-listener_listen(listener_t *listener)
-{
-       assert(listener);
-
-       /* try to reopen */
-       if (listener->sock_fd < 0)
-               if (listener_impls[listener->type].opener(listener))
-                       return -1;
-       assert(listener->sock_fd >= 0);
-
-       if (listen(listener->sock_fd, /* backlog = */ 32)) {
-               char buf[1024];
-               sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s",
-                               listener->address, sdb_strerror(errno, buf, sizeof(buf)));
-               return -1;
-       }
-       return 0;
-} /* listener_listen */
-
-static void
-listener_close(listener_t *listener)
-{
-       assert(listener);
-
-       if (listener->sock_fd < 0)
-               return;
-
-       close(listener->sock_fd);
-       listener->sock_fd = -1;
-} /* listener_close */
-
 static void
 socket_close(sdb_fe_socket_t *sock)
 {
@@ -430,12 +452,16 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
        int max_listen_fd = 0;
        size_t i;
 
-       /* XXX: make the number of threads configurable */
-       pthread_t handler_threads[5];
+       pthread_t handler_threads[loop->num_threads];
+       size_t num_threads;
 
-       if ((! sock) || (! sock->listeners_num) || (! loop) || sock->chan)
+       if ((! sock) || (! sock->listeners_num) || sock->chan
+                       || (! loop) || (loop->num_threads <= 0))
                return -1;
 
+       if (! loop->do_loop)
+               return 0;
+
        FD_ZERO(&sockets);
        for (i = 0; i < sock->listeners_num; ++i) {
                listener_t *listener = sock->listeners + i;
@@ -456,13 +482,27 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                return -1;
        }
 
+       sdb_log(SDB_LOG_INFO, "frontend: Starting %d connection "
+                       "handler thread%s managing %d listener%s",
+                       loop->num_threads, loop->num_threads == 1 ? "" : "s",
+                       sock->listeners_num, sock->listeners_num == 1 ? "" : "s");
+
+       num_threads = loop->num_threads;
        memset(&handler_threads, 0, sizeof(handler_threads));
-       /* XXX: error handling */
-       for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
-               pthread_create(&handler_threads[i], /* attr = */ NULL,
-                               connection_handler, /* arg = */ sock);
+       for (i = 0; i < num_threads; ++i) {
+               errno = 0;
+               if (pthread_create(&handler_threads[i], /* attr = */ NULL,
+                                       connection_handler, /* arg = */ sock)) {
+                       char errbuf[1024];
+                       sdb_log(SDB_LOG_ERR, "frontend: Failed to create "
+                                       "connection handler thread: %s",
+                                       sdb_strerror(errno, errbuf, sizeof(errbuf)));
+                       num_threads = i;
+                       break;
+               }
+       }
 
-       while (loop->do_loop) {
+       while (loop->do_loop && num_threads) {
                struct timeval timeout = { 1, 0 }; /* one second */
                sdb_llist_iter_t *iter;
 
@@ -518,12 +558,15 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
        sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads "
                        "to terminate");
        if (! sdb_channel_shutdown(sock->chan))
-               for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
+               for (i = 0; i < num_threads; ++i)
                        pthread_join(handler_threads[i], NULL);
        /* else: we tried our best; let the operating system clean up */
 
        sdb_channel_destroy(sock->chan);
        sock->chan = NULL;
+
+       if (! num_threads)
+               return -1;
        return 0;
 } /* sdb_fe_sock_listen_and_server */