Code

socket frontend: Pass socket object to handler threads.
authorSebastian Harl <sh@tokkee.org>
Fri, 25 Oct 2013 17:03:32 +0000 (19:03 +0200)
committerSebastian Harl <sh@tokkee.org>
Fri, 25 Oct 2013 17:03:32 +0000 (19:03 +0200)
This will allow them to pass back connections to the main loop.

src/frontend/sock.c

index 667b616d49105214d5a9ad7f814d2ce43b387bfb..fb79f105f686295c77ef4b8a332a47aa27f2840b 100644 (file)
@@ -92,6 +92,10 @@ struct sdb_fe_socket {
        size_t listeners_num;
 
        sdb_llist_t *open_connections;
+
+       /* channel used for communication between main
+        * and connection handler threads */
+       sdb_channel_t *chan;
 };
 
 /*
@@ -302,12 +306,41 @@ static sdb_type_t connection_type = {
  * connection handler functions
  */
 
+/* returns negative value on error, 0 on EOF, number of packets else */
+static int
+connection_read(int fd)
+{
+       int n = 0;
+
+       while (42) {
+               int32_t cmd;
+               ssize_t status;
+
+               errno = 0;
+               status = read(fd, &cmd, sizeof(cmd));
+               if (status < 0) {
+                       if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
+                               return n + 1;
+                       return (int)status;
+               }
+               else if (! status) /* EOF */
+                       return 0;
+
+               /* XXX */
+               sdb_log(SDB_LOG_DEBUG, "frontend: read command %i from fd=%i",
+                               cmd, fd);
+               ++n;
+       }
+
+       return n + 1;
+} /* connection_read */
+
 static void *
 connection_handler(void *data)
 {
-       sdb_channel_t *chan = data;
+       sdb_fe_socket_t *sock = data;
 
-       assert(chan);
+       assert(sock);
 
        while (42) {
                struct timespec timeout = { 0, 500000000 }; /* .5 seconds */
@@ -315,7 +348,8 @@ connection_handler(void *data)
                int status;
 
                errno = 0;
-               status = sdb_channel_select(chan, NULL, &conn, NULL, NULL, &timeout);
+               status = sdb_channel_select(sock->chan, /* read */ NULL, &conn,
+                               /* write */ NULL, NULL, &timeout);
                if (status) {
                        char buf[1024];
 
@@ -419,7 +453,6 @@ sdb_fe_sock_add_listener(sdb_fe_socket_t *sock, const char *address)
 int
 sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
 {
-       sdb_channel_t *chan;
        fd_set sockets;
        int max_listen_fd = 0;
        size_t i;
@@ -447,15 +480,15 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                        max_listen_fd = listener->sock_fd;
        }
 
-       chan = sdb_channel_create(1024, sizeof(connection_obj_t *));
-       if (! chan)
+       sock->chan = sdb_channel_create(1024, sizeof(connection_obj_t *));
+       if (! sock->chan)
                return -1;
 
        memset(&handler_threads, 0, sizeof(handler_threads));
        /* XXX: error handling */
        for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
                pthread_create(&handler_threads[i], /* attr = */ NULL,
-                               connection_handler, /* arg = */ chan);
+                               connection_handler, /* arg = */ sock);
 
        while (loop->do_loop) {
                fd_set ready;
@@ -530,7 +563,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                        if (FD_ISSET(CONN(obj)->conn.fd, &ready)) {
                                sdb_log(SDB_LOG_INFO, "Data on fd %d", CONN(obj)->conn.fd);
                                sdb_llist_iter_remove_current(iter);
-                               sdb_channel_write(chan, &obj);
+                               sdb_channel_write(sock->chan, &obj);
                        }
                }
                sdb_llist_iter_destroy(iter);
@@ -538,7 +571,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
 
        sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads "
                        "to terminate");
-       if (! sdb_channel_shutdown(chan))
+       if (! sdb_channel_shutdown(sock->chan))
                for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
                        pthread_join(handler_threads[i], NULL);
        /* else: we tried our best; let the operating system clean up */