From: Sebastian Harl Date: Fri, 25 Oct 2013 15:40:57 +0000 (+0200) Subject: socket frontend: Manage open connections in listen_and_serve(). X-Git-Tag: sysdb-0.1.0~336^2~37 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=6352cfade8fc075e0af5cf7ae9fadc6611a9b476;p=sysdb.git socket frontend: Manage open connections in listen_and_serve(). For that purpose, added a new object data type wrapping a connection. All open connections are managed in a linked-link in the socket object. The main loop now also waits data to be available on an open connection and then passed on the connection object to the connection handler thread which is then supposed to handle requests (not implemented yet). --- diff --git a/src/frontend/sock.c b/src/frontend/sock.c index f665687..12690d8 100644 --- a/src/frontend/sock.c +++ b/src/frontend/sock.c @@ -27,14 +27,17 @@ #include "sysdb.h" #include "core/error.h" +#include "core/object.h" #include "frontend/sock.h" #include "utils/channel.h" +#include "utils/llist.h" #include #include +#include #include #include @@ -48,6 +51,10 @@ #include +/* name of connection objects */ +#define CONN_FD_PREFIX "conn#" +#define CONN_FD_PLACEHOLDER "XXXXXXX" + /* * private data types */ @@ -58,6 +65,12 @@ typedef struct { socklen_t client_addr_len; } connection_t; +typedef struct { + sdb_object_t super; + connection_t conn; +} connection_obj_t; +#define CONN(obj) ((connection_obj_t *)(obj)) + typedef struct { char *address; int type; @@ -75,6 +88,8 @@ typedef struct { struct sdb_fe_socket { listener_t *listeners; size_t listeners_num; + + sdb_llist_t *open_connections; }; /* @@ -211,6 +226,66 @@ listener_create(sdb_fe_socket_t *sock, const char *address) return listener; } /* listener_create */ +/* + * private data types + */ + +static int +connection_init(sdb_object_t *obj, va_list ap) +{ + connection_t *conn; + int sock_fd; + + assert(obj); + conn = &CONN(obj)->conn; + + sock_fd = va_arg(ap, int); + + conn->client_addr_len = sizeof(conn->client_addr); + conn->fd = accept(sock_fd, (struct sockaddr *)&conn->client_addr, + &conn->client_addr_len); + + if (conn->fd < 0) { + char buf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to accept remote " + "connection: %s", sdb_strerror(errno, + buf, sizeof(buf))); + return -1; + } + + if (conn->client_addr.ss_family != AF_UNIX) { + sdb_log(SDB_LOG_ERR, "frontend: Accepted connection using " + "unexpected family type %d", conn->client_addr.ss_family); + return -1; + } + + sdb_log(SDB_LOG_DEBUG, "frontend: Accepted connection on fd=%i\n", + conn->fd); + + /* update the object name */ + snprintf(obj->name + strlen(CONN_FD_PREFIX), + strlen(CONN_FD_PLACEHOLDER), "%i", conn->fd); + return 0; +} /* connection_init */ + +static void +connection_destroy(sdb_object_t *obj) +{ + connection_t *conn; + + assert(obj); + conn = &CONN(obj)->conn; + + close(conn->fd); + conn->fd = -1; +} /* connection_destroy */ + +static sdb_type_t connection_type = { + /* size = */ sizeof(connection_obj_t), + /* init = */ connection_init, + /* destroy = */ connection_destroy, +}; + /* * connection handler functions */ @@ -224,7 +299,7 @@ connection_handler(void *data) while (42) { struct timespec timeout = { 0, 500000000 }; /* .5 seconds */ - connection_t conn; + connection_obj_t *conn; int status; errno = 0; @@ -242,19 +317,9 @@ connection_handler(void *data) continue; } - if (conn.fd < 0) - continue; - - if (conn.client_addr.ss_family != AF_UNIX) { - sdb_log(SDB_LOG_ERR, "frontend: Accepted connection using " - "unexpected family type %d", conn.client_addr.ss_family); - continue; - } - /* XXX */ - sdb_log(SDB_LOG_INFO, "frontend: Accepted connection on fd=%i\n", - conn.fd); - close(conn.fd); + sdb_log(SDB_LOG_INFO, "frontend: Data available on connection fd=%i\n", + conn->conn.fd); } return NULL; } /* connection_handler */ @@ -271,6 +336,12 @@ sdb_fe_sock_create(void) sock = calloc(1, sizeof(*sock)); if (! sock) return NULL; + + sock->open_connections = sdb_llist_create(); + if (! sock->open_connections) { + sdb_fe_sock_destroy(sock); + return NULL; + } return sock; } /* sdb_fe_sock_create */ @@ -287,6 +358,10 @@ sdb_fe_sock_destroy(sdb_fe_socket_t *sock) } if (sock->listeners) free(sock->listeners); + sock->listeners = NULL; + + sdb_llist_destroy(sock->open_connections); + sock->open_connections = NULL; free(sock); } /* sdb_fe_sock_destroy */ @@ -309,7 +384,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) { sdb_channel_t *chan; fd_set sockets; - int max_fd = 0; + int max_listen_fd = 0; size_t i; /* XXX: make the number of threads configurable */ @@ -331,11 +406,11 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) } FD_SET(listener->sock_fd, &sockets); - if (listener->sock_fd > max_fd) - max_fd = listener->sock_fd; + if (listener->sock_fd > max_listen_fd) + max_listen_fd = listener->sock_fd; } - chan = sdb_channel_create(1024, sizeof(connection_t)); + chan = sdb_channel_create(1024, sizeof(connection_obj_t *)); if (! chan) return -1; @@ -346,13 +421,40 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) connection_handler, /* arg = */ chan); while (loop->do_loop) { - fd_set ready = sockets; + fd_set ready; + fd_set exceptions; + int max_fd; int n; struct timeval timeout = { 1, 0 }; /* one second */ + sdb_llist_iter_t *iter; + + FD_ZERO(&ready); + FD_ZERO(&exceptions); + + ready = sockets; + + max_fd = max_listen_fd; + + iter = sdb_llist_get_iter(sock->open_connections); + if (! iter) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator " + "for open connections"); + return -1; + } + + while (sdb_llist_iter_has_next(iter)) { + sdb_object_t *obj = sdb_llist_iter_get_next(iter); + FD_SET(CONN(obj)->conn.fd, &ready); + FD_SET(CONN(obj)->conn.fd, &exceptions); + + if (CONN(obj)->conn.fd > max_fd) + max_fd = CONN(obj)->conn.fd; + } + sdb_llist_iter_destroy(iter); errno = 0; - n = select(max_fd + 1, &ready, NULL, NULL, &timeout); + n = select(max_fd + 1, &ready, NULL, &exceptions, &timeout); if (n < 0) { char buf[1024]; @@ -371,26 +473,49 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) listener_t *listener = sock->listeners + i; if (FD_ISSET(listener->sock_fd, &ready)) { - connection_t conn; - - memset(&conn, 0, sizeof(conn)); - conn.client_addr_len = sizeof(conn.client_addr); + sdb_object_t *obj; - conn.fd = accept(listener->sock_fd, - (struct sockaddr *)&conn.client_addr, - &conn.client_addr_len); + /* the X's will be replaced with the accepted file descriptor + * when initializing the object */ + obj = sdb_object_create(CONN_FD_PREFIX CONN_FD_PLACEHOLDER, + connection_type, listener->sock_fd); + if (! obj) + continue; - if (conn.fd < 0) { - char buf[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to accept remote " - "connection: %s", sdb_strerror(errno, - buf, sizeof(buf))); + if (sdb_llist_append(sock->open_connections, obj)) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to append " + "connection %s to list of open connections", + obj->name); + sdb_object_deref(obj); continue; } - sdb_channel_write(chan, &conn); + /* hand ownership over to the list */ + sdb_object_deref(obj); + } + } + + iter = sdb_llist_get_iter(sock->open_connections); + if (! iter) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator " + "for open connections"); + return -1; + } + + while (sdb_llist_iter_has_next(iter)) { + sdb_object_t *obj = sdb_llist_iter_get_next(iter); + + if (FD_ISSET(CONN(obj)->conn.fd, &exceptions)) + sdb_log(SDB_LOG_INFO, "Exception on fd %d", + CONN(obj)->conn.fd); + + if (FD_ISSET(CONN(obj)->conn.fd, &ready)) { + sdb_log(SDB_LOG_INFO, "Data on fd %d", CONN(obj)->conn.fd); + sdb_llist_iter_remove_current(iter); + sdb_channel_write(chan, &obj); } } + sdb_llist_iter_destroy(iter); } sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads "