Code

socket frontend: Don't use UTF-8 in comments ;-)
[sysdb.git] / src / frontend / sock.c
index 3662a2650cc41d313ea5c6e8613c59810825fd50..be69f0ade8832c9076e301da44078cafb1c3fcf0 100644 (file)
@@ -43,6 +43,8 @@
 
 #include <unistd.h>
 
+#include <fcntl.h>
+
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/select.h>
@@ -90,6 +92,10 @@ struct sdb_fe_socket {
        size_t listeners_num;
 
        sdb_llist_t *open_connections;
+
+       /* channel used for communication between main
+        * and connection handler threads */
+       sdb_channel_t *chan;
 };
 
 /*
@@ -235,6 +241,7 @@ connection_init(sdb_object_t *obj, va_list ap)
 {
        connection_t *conn;
        int sock_fd;
+       int sock_fl;
 
        assert(obj);
        conn = &CONN(obj)->conn;
@@ -259,7 +266,16 @@ connection_init(sdb_object_t *obj, va_list ap)
                return -1;
        }
 
-       sdb_log(SDB_LOG_DEBUG, "frontend: Accepted connection on fd=%i\n",
+       sock_fl = fcntl(conn->fd, F_GETFL);
+       if (fcntl(conn->fd, F_SETFL, sock_fl | O_NONBLOCK)) {
+               char buf[1024];
+               sdb_log(SDB_LOG_ERR, "frontend: Failed to switch connection conn#%i "
+                               "to non-blocking mode: %s", conn->fd,
+                               sdb_strerror(errno, buf, sizeof(buf)));
+               return -1;
+       }
+
+       sdb_log(SDB_LOG_DEBUG, "frontend: Accepted connection on fd=%i",
                        conn->fd);
 
        /* update the object name */
@@ -276,6 +292,7 @@ connection_destroy(sdb_object_t *obj)
        assert(obj);
        conn = &CONN(obj)->conn;
 
+       sdb_log(SDB_LOG_DEBUG, "frontend: Closing connection on fd=%i", conn->fd);
        close(conn->fd);
        conn->fd = -1;
 } /* connection_destroy */
@@ -290,12 +307,41 @@ static sdb_type_t connection_type = {
  * connection handler functions
  */
 
+/* returns negative value on error, 0 on EOF, number of packets else */
+static int
+connection_read(int fd)
+{
+       int n = 0;
+
+       while (42) {
+               int32_t cmd;
+               ssize_t status;
+
+               errno = 0;
+               status = read(fd, &cmd, sizeof(cmd));
+               if (status < 0) {
+                       if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
+                               return n + 1;
+                       return (int)status;
+               }
+               else if (! status) /* EOF */
+                       return 0;
+
+               /* XXX */
+               sdb_log(SDB_LOG_DEBUG, "frontend: read command %i from fd=%i",
+                               cmd, fd);
+               ++n;
+       }
+
+       return n + 1;
+} /* connection_read */
+
 static void *
 connection_handler(void *data)
 {
-       sdb_channel_t *chan = data;
+       sdb_fe_socket_t *sock = data;
 
-       assert(chan);
+       assert(sock);
 
        while (42) {
                struct timespec timeout = { 0, 500000000 }; /* .5 seconds */
@@ -303,7 +349,8 @@ connection_handler(void *data)
                int status;
 
                errno = 0;
-               status = sdb_channel_select(chan, NULL, &conn, NULL, NULL, &timeout);
+               status = sdb_channel_select(sock->chan, /* read */ NULL, &conn,
+                               /* write */ NULL, NULL, &timeout);
                if (status) {
                        char buf[1024];
 
@@ -317,20 +364,32 @@ connection_handler(void *data)
                        continue;
                }
 
-               /* XXX */
-               sdb_log(SDB_LOG_INFO, "frontend: Data available on connection fd=%i\n",
-                               conn->conn.fd);
+               status = connection_read(conn->conn.fd);
+               if (status <= 0) {
+                       /* error or EOF -> close connection */
+                       sdb_object_deref(SDB_OBJ(conn));
+               }
+               else {
+                       if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) {
+                               sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append "
+                                               "connection %s to list of open connections",
+                                               SDB_OBJ(conn)->name);
+                       }
+
+                       /* pass ownership back to list; or destroy in case of an error */
+                       sdb_object_deref(SDB_OBJ(conn));
+               }
        }
        return NULL;
 } /* connection_handler */
 
 static int
-accept_connection(sdb_fe_socket_t *sock, listener_t *listener)
+connection_accept(sdb_fe_socket_t *sock, listener_t *listener)
 {
        sdb_object_t *obj;
 
-       /* the X's will be replaced with the accepted file descriptor
-        * when initializing the object */
+       /* the placeholder will be replaced with the accepted file
+        * descriptor when initializing the object */
        obj = sdb_object_create(CONN_FD_PREFIX CONN_FD_PLACEHOLDER,
                        connection_type, listener->sock_fd);
        if (! obj)
@@ -347,7 +406,7 @@ accept_connection(sdb_fe_socket_t *sock, listener_t *listener)
        /* hand ownership over to the list */
        sdb_object_deref(obj);
        return 0;
-} /* accept_connection */
+} /* connection_accept */
 
 /*
  * public API
@@ -407,7 +466,6 @@ sdb_fe_sock_add_listener(sdb_fe_socket_t *sock, const char *address)
 int
 sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
 {
-       sdb_channel_t *chan;
        fd_set sockets;
        int max_listen_fd = 0;
        size_t i;
@@ -435,15 +493,15 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                        max_listen_fd = listener->sock_fd;
        }
 
-       chan = sdb_channel_create(1024, sizeof(connection_obj_t *));
-       if (! chan)
+       sock->chan = sdb_channel_create(1024, sizeof(connection_obj_t *));
+       if (! sock->chan)
                return -1;
 
        memset(&handler_threads, 0, sizeof(handler_threads));
        /* XXX: error handling */
        for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
                pthread_create(&handler_threads[i], /* attr = */ NULL,
-                               connection_handler, /* arg = */ chan);
+                               connection_handler, /* arg = */ sock);
 
        while (loop->do_loop) {
                fd_set ready;
@@ -497,7 +555,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                for (i = 0; i < sock->listeners_num; ++i) {
                        listener_t *listener = sock->listeners + i;
                        if (FD_ISSET(listener->sock_fd, &ready))
-                               if (accept_connection(sock, listener))
+                               if (connection_accept(sock, listener))
                                        continue;
                }
 
@@ -516,9 +574,8 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
                                                CONN(obj)->conn.fd);
 
                        if (FD_ISSET(CONN(obj)->conn.fd, &ready)) {
-                               sdb_log(SDB_LOG_INFO, "Data on fd %d", CONN(obj)->conn.fd);
                                sdb_llist_iter_remove_current(iter);
-                               sdb_channel_write(chan, &obj);
+                               sdb_channel_write(sock->chan, &obj);
                        }
                }
                sdb_llist_iter_destroy(iter);
@@ -526,7 +583,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
 
        sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads "
                        "to terminate");
-       if (! sdb_channel_shutdown(chan))
+       if (! sdb_channel_shutdown(sock->chan))
                for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
                        pthread_join(handler_threads[i], NULL);
        /* else: we tried our best; let the operating system clean up */