Code

socket frontend: Manage open connections in listen_and_serve().
authorSebastian Harl <sh@tokkee.org>
Fri, 25 Oct 2013 15:40:57 +0000 (17:40 +0200)
committerSebastian Harl <sh@tokkee.org>
Fri, 25 Oct 2013 15:40:57 +0000 (17:40 +0200)
For that purpose, added a new object data type wrapping a connection. All open
connections are managed in a linked-link in the socket object. The main loop
now also waits data to be available on an open connection and then passed on
the connection object to the connection handler thread which is then supposed
to handle requests (not implemented yet).

src/frontend/sock.c

index f6656875e4d82a192d1da28b099366ca4913fd4b..12690d8cb83e23557306d8224c67ad1cf6c11e1d 100644 (file)
 
 #include "sysdb.h"
 #include "core/error.h"
+#include "core/object.h"
 #include "frontend/sock.h"
 
 #include "utils/channel.h"
+#include "utils/llist.h"
 
 #include <assert.h>
 
 #include <errno.h>
 
+#include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 
 
 #include <pthread.h>
 
+/* name of connection objects */
+#define CONN_FD_PREFIX "conn#"
+#define CONN_FD_PLACEHOLDER "XXXXXXX"
+
 /*
  * private data types
  */
@@ -58,6 +65,12 @@ typedef struct {
        socklen_t client_addr_len;
 } connection_t;
 
+typedef struct {
+       sdb_object_t super;
+       connection_t conn;
+} connection_obj_t;
+#define CONN(obj) ((connection_obj_t *)(obj))
+
 typedef struct {
        char *address;
        int   type;
@@ -75,6 +88,8 @@ typedef struct {
 struct sdb_fe_socket {
        listener_t *listeners;
        size_t listeners_num;
+
+       sdb_llist_t *open_connections;
 };
 
 /*
@@ -211,6 +226,66 @@ listener_create(sdb_fe_socket_t *sock, const char *address)
        return listener;
 } /* listener_create */
 
+/*
+ * private data types
+ */
+
+static int
+connection_init(sdb_object_t *obj, va_list ap)
+{
+       connection_t *conn;
+       int sock_fd;
+
+       assert(obj);
+       conn = &CONN(obj)->conn;
+
+       sock_fd = va_arg(ap, int);
+
+       conn->client_addr_len = sizeof(conn->client_addr);
+       conn->fd = accept(sock_fd, (struct sockaddr *)&conn->client_addr,
+                       &conn->client_addr_len);
+
+       if (conn->fd < 0) {
+               char buf[1024];
+               sdb_log(SDB_LOG_ERR, "frontend: Failed to accept remote "
+                               "connection: %s", sdb_strerror(errno,
+                                       buf, sizeof(buf)));
+               return -1;
+       }
+
+       if (conn->client_addr.ss_family != AF_UNIX) {
+               sdb_log(SDB_LOG_ERR, "frontend: Accepted connection using "
+                               "unexpected family type %d", conn->client_addr.ss_family);
+               return -1;
+       }
+
+       sdb_log(SDB_LOG_DEBUG, "frontend: Accepted connection on fd=%i\n",
+                       conn->fd);
+
+       /* update the object name */
+       snprintf(obj->name + strlen(CONN_FD_PREFIX),
+                       strlen(CONN_FD_PLACEHOLDER), "%i", conn->fd);
+       return 0;
+} /* connection_init */
+
+static void
+connection_destroy(sdb_object_t *obj)
+{
+       connection_t *conn;
+
+       assert(obj);
+       conn = &CONN(obj)->conn;
+
+       close(conn->fd);
+       conn->fd = -1;
+} /* connection_destroy */
+
+static sdb_type_t connection_type = {
+       /* size = */ sizeof(connection_obj_t),
+       /* init = */ connection_init,
+       /* destroy = */ connection_destroy,
+};
+
 /*
  * connection handler functions
  */
@@ -224,7 +299,7 @@ connection_handler(void *data)
 
        while (42) {
                struct timespec timeout = { 0, 500000000 }; /* .5 seconds */
-               connection_conn;
+               connection_obj_t *conn;
                int status;
 
                errno = 0;
@@ -242,19 +317,9 @@ connection_handler(void *data)
                        continue;
                }
 
-               if (conn.fd < 0)
-                       continue;
-
-               if (conn.client_addr.ss_family != AF_UNIX) {
-                       sdb_log(SDB_LOG_ERR, "frontend: Accepted connection using "
-                                       "unexpected family type %d", conn.client_addr.ss_family);
-                       continue;
-               }
-
                /* XXX */
-               sdb_log(SDB_LOG_INFO, "frontend: Accepted connection on fd=%i\n",
-                               conn.fd);
-               close(conn.fd);
+               sdb_log(SDB_LOG_INFO, "frontend: Data available on connection fd=%i\n",
+                               conn->conn.fd);
        }
        return NULL;
 } /* connection_handler */
@@ -271,6 +336,12 @@ sdb_fe_sock_create(void)
        sock = calloc(1, sizeof(*sock));
        if (! sock)
                return NULL;
+
+       sock->open_connections = sdb_llist_create();
+       if (! sock->open_connections) {
+               sdb_fe_sock_destroy(sock);
+               return NULL;
+       }
        return sock;
 } /* sdb_fe_sock_create */
 
@@ -287,6 +358,10 @@ sdb_fe_sock_destroy(sdb_fe_socket_t *sock)
        }
        if (sock->listeners)
                free(sock->listeners);
+       sock->listeners = NULL;
+
+       sdb_llist_destroy(sock->open_connections);
+       sock->open_connections = NULL;
        free(sock);
 } /* sdb_fe_sock_destroy */
 
@@ -309,7 +384,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
 {
        sdb_channel_t *chan;
        fd_set sockets;
-       int max_fd = 0;
+       int max_listen_fd = 0;
        size_t i;
 
        /* XXX: make the number of threads configurable */
@@ -331,11 +406,11 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                }
 
                FD_SET(listener->sock_fd, &sockets);
-               if (listener->sock_fd > max_fd)
-                       max_fd = listener->sock_fd;
+               if (listener->sock_fd > max_listen_fd)
+                       max_listen_fd = listener->sock_fd;
        }
 
-       chan = sdb_channel_create(1024, sizeof(connection_t));
+       chan = sdb_channel_create(1024, sizeof(connection_obj_t *));
        if (! chan)
                return -1;
 
@@ -346,13 +421,40 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                                connection_handler, /* arg = */ chan);
 
        while (loop->do_loop) {
-               fd_set ready = sockets;
+               fd_set ready;
+               fd_set exceptions;
+               int max_fd;
                int n;
 
                struct timeval timeout = { 1, 0 }; /* one second */
+               sdb_llist_iter_t *iter;
+
+               FD_ZERO(&ready);
+               FD_ZERO(&exceptions);
+
+               ready = sockets;
+
+               max_fd = max_listen_fd;
+
+               iter = sdb_llist_get_iter(sock->open_connections);
+               if (! iter) {
+                       sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator "
+                                       "for open connections");
+                       return -1;
+               }
+
+               while (sdb_llist_iter_has_next(iter)) {
+                       sdb_object_t *obj = sdb_llist_iter_get_next(iter);
+                       FD_SET(CONN(obj)->conn.fd, &ready);
+                       FD_SET(CONN(obj)->conn.fd, &exceptions);
+
+                       if (CONN(obj)->conn.fd > max_fd)
+                               max_fd = CONN(obj)->conn.fd;
+               }
+               sdb_llist_iter_destroy(iter);
 
                errno = 0;
-               n = select(max_fd + 1, &ready, NULL, NULL, &timeout);
+               n = select(max_fd + 1, &ready, NULL, &exceptions, &timeout);
                if (n < 0) {
                        char buf[1024];
 
@@ -371,26 +473,49 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                        listener_t *listener = sock->listeners + i;
 
                        if (FD_ISSET(listener->sock_fd, &ready)) {
-                               connection_t conn;
-
-                               memset(&conn, 0, sizeof(conn));
-                               conn.client_addr_len = sizeof(conn.client_addr);
+                               sdb_object_t *obj;
 
-                               conn.fd = accept(listener->sock_fd,
-                                               (struct sockaddr *)&conn.client_addr,
-                                               &conn.client_addr_len);
+                               /* the X's will be replaced with the accepted file descriptor
+                                * when initializing the object */
+                               obj = sdb_object_create(CONN_FD_PREFIX CONN_FD_PLACEHOLDER,
+                                               connection_type, listener->sock_fd);
+                               if (! obj)
+                                       continue;
 
-                               if (conn.fd < 0) {
-                                       char buf[1024];
-                                       sdb_log(SDB_LOG_ERR, "frontend: Failed to accept remote "
-                                                       "connection: %s", sdb_strerror(errno,
-                                                               buf, sizeof(buf)));
+                               if (sdb_llist_append(sock->open_connections, obj)) {
+                                       sdb_log(SDB_LOG_ERR, "frontend: Failed to append "
+                                                       "connection %s to list of open connections",
+                                                       obj->name);
+                                       sdb_object_deref(obj);
                                        continue;
                                }
 
-                               sdb_channel_write(chan, &conn);
+                               /* hand ownership over to the list */
+                               sdb_object_deref(obj);
+                       }
+               }
+
+               iter = sdb_llist_get_iter(sock->open_connections);
+               if (! iter) {
+                       sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator "
+                                       "for open connections");
+                       return -1;
+               }
+
+               while (sdb_llist_iter_has_next(iter)) {
+                       sdb_object_t *obj = sdb_llist_iter_get_next(iter);
+
+                       if (FD_ISSET(CONN(obj)->conn.fd, &exceptions))
+                               sdb_log(SDB_LOG_INFO, "Exception on fd %d",
+                                               CONN(obj)->conn.fd);
+
+                       if (FD_ISSET(CONN(obj)->conn.fd, &ready)) {
+                               sdb_log(SDB_LOG_INFO, "Data on fd %d", CONN(obj)->conn.fd);
+                               sdb_llist_iter_remove_current(iter);
+                               sdb_channel_write(chan, &obj);
                        }
                }
+               sdb_llist_iter_destroy(iter);
        }
 
        sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads "