Code

socket frontend: Close listening sockets before returning.
[sysdb.git] / src / frontend / sock.c
index a389a142b94f8a1a609a02a2ae1773239dc0e624..27a1b679c2815f17932cd3845383ff7649a9d502 100644 (file)
@@ -92,6 +92,10 @@ struct sdb_fe_socket {
        size_t listeners_num;
 
        sdb_llist_t *open_connections;
+
+       /* channel used for communication between main
+        * and connection handler threads */
+       sdb_channel_t *chan;
 };
 
 /*
@@ -177,6 +181,7 @@ listener_destroy(listener_t *listener)
 
        if (listener->sock_fd >= 0)
                close(listener->sock_fd);
+       listener->sock_fd = -1;
 
        if (listener->address)
                free(listener->address);
@@ -228,6 +233,32 @@ listener_create(sdb_fe_socket_t *sock, const char *address)
        return listener;
 } /* listener_create */
 
+static int
+listener_listen(listener_t *listener)
+{
+       assert(listener);
+
+       if (listen(listener->sock_fd, /* backlog = */ 32)) {
+               char buf[1024];
+               sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s",
+                               listener->address, sdb_strerror(errno, buf, sizeof(buf)));
+               return -1;
+       }
+       return 0;
+} /* listener_listen */
+
+static void
+listener_close(listener_t *listener)
+{
+       assert(listener);
+
+       if (listener->sock_fd < 0)
+               return;
+
+       close(listener->sock_fd);
+       listener->sock_fd = -1;
+} /* listener_close */
+
 /*
  * private data types
  */
@@ -288,6 +319,7 @@ connection_destroy(sdb_object_t *obj)
        assert(obj);
        conn = &CONN(obj)->conn;
 
+       sdb_log(SDB_LOG_DEBUG, "frontend: Closing connection on fd=%i", conn->fd);
        close(conn->fd);
        conn->fd = -1;
 } /* connection_destroy */
@@ -302,12 +334,41 @@ static sdb_type_t connection_type = {
  * connection handler functions
  */
 
+/* returns negative value on error, 0 on EOF, number of packets else */
+static int
+connection_read(int fd)
+{
+       int n = 0;
+
+       while (42) {
+               int32_t cmd;
+               ssize_t status;
+
+               errno = 0;
+               status = read(fd, &cmd, sizeof(cmd));
+               if (status < 0) {
+                       if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
+                               return n + 1;
+                       return (int)status;
+               }
+               else if (! status) /* EOF */
+                       return 0;
+
+               /* XXX */
+               sdb_log(SDB_LOG_DEBUG, "frontend: read command %i from fd=%i",
+                               cmd, fd);
+               ++n;
+       }
+
+       return n + 1;
+} /* connection_read */
+
 static void *
 connection_handler(void *data)
 {
-       sdb_channel_t *chan = data;
+       sdb_fe_socket_t *sock = data;
 
-       assert(chan);
+       assert(sock);
 
        while (42) {
                struct timespec timeout = { 0, 500000000 }; /* .5 seconds */
@@ -315,7 +376,8 @@ connection_handler(void *data)
                int status;
 
                errno = 0;
-               status = sdb_channel_select(chan, NULL, &conn, NULL, NULL, &timeout);
+               status = sdb_channel_select(sock->chan, /* read */ NULL, &conn,
+                               /* write */ NULL, NULL, &timeout);
                if (status) {
                        char buf[1024];
 
@@ -329,20 +391,32 @@ connection_handler(void *data)
                        continue;
                }
 
-               /* XXX */
-               sdb_log(SDB_LOG_INFO, "frontend: Data available on connection fd=%i\n",
-                               conn->conn.fd);
+               status = connection_read(conn->conn.fd);
+               if (status <= 0) {
+                       /* error or EOF -> close connection */
+                       sdb_object_deref(SDB_OBJ(conn));
+               }
+               else {
+                       if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) {
+                               sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append "
+                                               "connection %s to list of open connections",
+                                               SDB_OBJ(conn)->name);
+                       }
+
+                       /* pass ownership back to list; or destroy in case of an error */
+                       sdb_object_deref(SDB_OBJ(conn));
+               }
        }
        return NULL;
 } /* connection_handler */
 
 static int
-accept_connection(sdb_fe_socket_t *sock, listener_t *listener)
+connection_accept(sdb_fe_socket_t *sock, listener_t *listener)
 {
        sdb_object_t *obj;
 
-       /* the X's will be replaced with the accepted file descriptor
-        * when initializing the object */
+       /* the placeholder will be replaced with the accepted file
+        * descriptor when initializing the object */
        obj = sdb_object_create(CONN_FD_PREFIX CONN_FD_PLACEHOLDER,
                        connection_type, listener->sock_fd);
        if (! obj)
@@ -359,7 +433,7 @@ accept_connection(sdb_fe_socket_t *sock, listener_t *listener)
        /* hand ownership over to the list */
        sdb_object_deref(obj);
        return 0;
-} /* accept_connection */
+} /* connection_accept */
 
 /*
  * public API
@@ -419,7 +493,6 @@ sdb_fe_sock_add_listener(sdb_fe_socket_t *sock, const char *address)
 int
 sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
 {
-       sdb_channel_t *chan;
        fd_set sockets;
        int max_listen_fd = 0;
        size_t i;
@@ -430,32 +503,31 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
        if ((! sock) || (! sock->listeners_num) || (! loop))
                return -1;
 
+       if (sock->chan)
+               return -1;
+
        FD_ZERO(&sockets);
 
        for (i = 0; i < sock->listeners_num; ++i) {
                listener_t *listener = sock->listeners + i;
 
-               if (listen(listener->sock_fd, /* backlog = */ 32)) {
-                       char buf[1024];
-                       sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s",
-                                       listener->address, sdb_strerror(errno, buf, sizeof(buf)));
+               if (listener_listen(listener))
                        return -1;
-               }
 
                FD_SET(listener->sock_fd, &sockets);
                if (listener->sock_fd > max_listen_fd)
                        max_listen_fd = listener->sock_fd;
        }
 
-       chan = sdb_channel_create(1024, sizeof(connection_obj_t *));
-       if (! chan)
+       sock->chan = sdb_channel_create(1024, sizeof(connection_obj_t *));
+       if (! sock->chan)
                return -1;
 
        memset(&handler_threads, 0, sizeof(handler_threads));
        /* XXX: error handling */
        for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
                pthread_create(&handler_threads[i], /* attr = */ NULL,
-                               connection_handler, /* arg = */ chan);
+                               connection_handler, /* arg = */ sock);
 
        while (loop->do_loop) {
                fd_set ready;
@@ -477,7 +549,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                if (! iter) {
                        sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator "
                                        "for open connections");
-                       return -1;
+                       break;
                }
 
                while (sdb_llist_iter_has_next(iter)) {
@@ -500,7 +572,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
 
                        sdb_log(SDB_LOG_ERR, "frontend: Failed to monitor sockets: %s",
                                        sdb_strerror(errno, buf, sizeof(buf)));
-                       return -1;
+                       break;
                }
 
                if (! n)
@@ -509,7 +581,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                for (i = 0; i < sock->listeners_num; ++i) {
                        listener_t *listener = sock->listeners + i;
                        if (FD_ISSET(listener->sock_fd, &ready))
-                               if (accept_connection(sock, listener))
+                               if (connection_accept(sock, listener))
                                        continue;
                }
 
@@ -517,7 +589,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                if (! iter) {
                        sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator "
                                        "for open connections");
-                       return -1;
+                       break;
                }
 
                while (sdb_llist_iter_has_next(iter)) {
@@ -528,20 +600,25 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                                                CONN(obj)->conn.fd);
 
                        if (FD_ISSET(CONN(obj)->conn.fd, &ready)) {
-                               sdb_log(SDB_LOG_INFO, "Data on fd %d", CONN(obj)->conn.fd);
                                sdb_llist_iter_remove_current(iter);
-                               sdb_channel_write(chan, &obj);
+                               sdb_channel_write(sock->chan, &obj);
                        }
                }
                sdb_llist_iter_destroy(iter);
        }
 
+       for (i = 0; i < sock->listeners_num; ++i)
+               listener_close(sock->listeners + i);
+
        sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads "
                        "to terminate");
-       if (! sdb_channel_shutdown(chan))
+       if (! sdb_channel_shutdown(sock->chan))
                for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
                        pthread_join(handler_threads[i], NULL);
        /* else: we tried our best; let the operating system clean up */
+
+       sdb_channel_destroy(sock->chan);
+       sock->chan = NULL;
        return 0;
 } /* sdb_fe_sock_listen_and_server */