X-Git-Url: https://git.tokkee.org/?p=sysdb.git;a=blobdiff_plain;f=src%2Ffrontend%2Fsock.c;h=0c4829e2efc4d3737fb9cf4a5f21cf3df5fa59b5;hp=d7b2ee97324ddef9d32246f9be163ec30152f998;hb=edba65afec8c547fb6c02346eda68595ce9a5839;hpb=7062bfea6bd16380f3a20915f635aff2f5c4305d diff --git a/src/frontend/sock.c b/src/frontend/sock.c index d7b2ee9..0c4829e 100644 --- a/src/frontend/sock.c +++ b/src/frontend/sock.c @@ -26,27 +26,24 @@ */ #include "sysdb.h" -#include "core/error.h" #include "core/object.h" +#include "frontend/connection-private.h" #include "frontend/sock.h" #include "utils/channel.h" +#include "utils/error.h" #include "utils/llist.h" #include "utils/strbuf.h" #include #include -#include - #include #include #include #include -#include - #include #include #include @@ -55,31 +52,10 @@ #include -/* name of connection objects */ -#define CONN_FD_PREFIX "conn#" -#define CONN_FD_PLACEHOLDER "XXXXXXX" - /* * private data types */ -typedef struct { - sdb_object_t super; - - /* connection and client information */ - int fd; - struct sockaddr_storage client_addr; - socklen_t client_addr_len; - - /* read buffer */ - sdb_strbuf_t *buf; - - /* state information for the currently executed command */ - uint32_t cmd; - uint32_t cmd_len; -} connection_obj_t; -#define CONN(obj) ((connection_obj_t *)(obj)) - typedef struct { char *address; int type; @@ -92,6 +68,7 @@ typedef struct { const char *prefix; int (*opener)(listener_t *); + void (*closer)(listener_t *); } fe_listener_impl_t; struct sdb_fe_socket { @@ -128,6 +105,13 @@ open_unix_sock(listener_t *listener) strncpy(sa.sun_path, listener->address + strlen("unix:"), sizeof(sa.sun_path)); + if (unlink(listener->address + strlen("unix:")) && (errno != ENOENT)) { + char errbuf[1024]; + sdb_log(SDB_LOG_WARNING, "frontend: Failed to remove stale UNIX " + "socket %s: %s", listener->address + strlen("unix:"), + sdb_strerror(errno, errbuf, sizeof(errbuf))); + } + status = bind(listener->sock_fd, (struct sockaddr *)&sa, sizeof(sa)); if (status) { char buf[1024]; @@ -138,6 +122,20 @@ open_unix_sock(listener_t *listener) return 0; } /* open_unix_sock */ +static void +close_unix_sock(listener_t *listener) +{ + assert(listener); + if (! listener->address) + return; + + if (listener->sock_fd >= 0) + close(listener->sock_fd); + listener->sock_fd = -1; + + unlink(listener->address + strlen("unix:")); +} /* close_unix_sock */ + /* * private variables */ @@ -148,13 +146,46 @@ enum { LISTENER_UNIXSOCK = 0, }; static fe_listener_impl_t listener_impls[] = { - { LISTENER_UNIXSOCK, "unix", open_unix_sock }, + { LISTENER_UNIXSOCK, "unix", open_unix_sock, close_unix_sock }, }; /* * private helper functions */ +static int +listener_listen(listener_t *listener) +{ + assert(listener); + + /* try to reopen */ + if (listener->sock_fd < 0) + if (listener_impls[listener->type].opener(listener)) + return -1; + assert(listener->sock_fd >= 0); + + if (listen(listener->sock_fd, /* backlog = */ 32)) { + char buf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s", + listener->address, sdb_strerror(errno, buf, sizeof(buf))); + return -1; + } + return 0; +} /* listener_listen */ + +static void +listener_close(listener_t *listener) +{ + assert(listener); + + if (listener_impls[listener->type].closer) + listener_impls[listener->type].closer(listener); + + if (listener->sock_fd >= 0) + close(listener->sock_fd); + listener->sock_fd = -1; +} /* listener_close */ + static int get_type(const char *address) { @@ -186,12 +217,11 @@ listener_destroy(listener_t *listener) if (! listener) return; - if (listener->sock_fd >= 0) - close(listener->sock_fd); - listener->sock_fd = -1; + listener_close(listener); if (listener->address) free(listener->address); + listener->address = NULL; } /* listener_destroy */ static listener_t * @@ -240,38 +270,6 @@ listener_create(sdb_fe_socket_t *sock, const char *address) return listener; } /* listener_create */ -static int -listener_listen(listener_t *listener) -{ - assert(listener); - - /* try to reopen */ - if (listener->sock_fd < 0) - if (listener_impls[listener->type].opener(listener)) - return -1; - assert(listener->sock_fd >= 0); - - if (listen(listener->sock_fd, /* backlog = */ 32)) { - char buf[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s", - listener->address, sdb_strerror(errno, buf, sizeof(buf))); - return -1; - } - return 0; -} /* listener_listen */ - -static void -listener_close(listener_t *listener) -{ - assert(listener); - - if (listener->sock_fd < 0) - return; - - close(listener->sock_fd); - listener->sock_fd = -1; -} /* listener_close */ - static void socket_close(sdb_fe_socket_t *sock) { @@ -282,156 +280,10 @@ socket_close(sdb_fe_socket_t *sock) listener_close(sock->listeners + i); } /* socket_close */ -/* - * private data types - */ - -static int -connection_init(sdb_object_t *obj, va_list ap) -{ - connection_obj_t *conn; - int sock_fd; - int sock_fl; - - assert(obj); - conn = CONN(obj); - - sock_fd = va_arg(ap, int); - - CONN(obj)->buf = sdb_strbuf_create(/* size = */ 128); - if (! CONN(obj)->buf) { - sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate a read buffer " - "for a new remote connection"); - return -1; - } - - conn->client_addr_len = sizeof(conn->client_addr); - conn->fd = accept(sock_fd, (struct sockaddr *)&conn->client_addr, - &conn->client_addr_len); - - if (conn->fd < 0) { - char buf[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to accept remote " - "connection: %s", sdb_strerror(errno, - buf, sizeof(buf))); - return -1; - } - - if (conn->client_addr.ss_family != AF_UNIX) { - sdb_log(SDB_LOG_ERR, "frontend: Accepted connection using " - "unexpected family type %d", conn->client_addr.ss_family); - return -1; - } - - sock_fl = fcntl(conn->fd, F_GETFL); - if (fcntl(conn->fd, F_SETFL, sock_fl | O_NONBLOCK)) { - char buf[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to switch connection conn#%i " - "to non-blocking mode: %s", conn->fd, - sdb_strerror(errno, buf, sizeof(buf))); - return -1; - } - - sdb_log(SDB_LOG_DEBUG, "frontend: Accepted connection on fd=%i", - conn->fd); - - /* update the object name */ - snprintf(obj->name + strlen(CONN_FD_PREFIX), - strlen(CONN_FD_PLACEHOLDER), "%i", conn->fd); - return 0; -} /* connection_init */ - -static void -connection_destroy(sdb_object_t *obj) -{ - connection_obj_t *conn; - size_t len; - - assert(obj); - conn = CONN(obj); - - len = sdb_strbuf_len(conn->buf); - if (len) - sdb_log(SDB_LOG_INFO, "frontend: Discarding incomplete command " - "(%zu bytes left in buffer)", len); - - sdb_log(SDB_LOG_DEBUG, "frontend: Closing connection on fd=%i", conn->fd); - close(conn->fd); - conn->fd = -1; - - sdb_strbuf_destroy(CONN(obj)->buf); -} /* connection_destroy */ - -static sdb_type_t connection_type = { - /* size = */ sizeof(connection_obj_t), - /* init = */ connection_init, - /* destroy = */ connection_destroy, -}; - /* * connection handler functions */ -static uint32_t -connection_get_int32(connection_obj_t *conn, size_t offset) -{ - const char *data; - uint32_t n; - - assert(conn && (sdb_strbuf_len(conn->buf) >= offset + sizeof(uint32_t))); - - data = sdb_strbuf_string(conn->buf); - memcpy(&n, data + offset, sizeof(n)); - n = ntohl(n); - return n; -} /* connection_get_int32 */ - -static int -command_handle(connection_obj_t *conn) -{ - assert(conn && conn->cmd && conn->cmd_len); - /* XXX */ - sdb_strbuf_skip(conn->buf, conn->cmd_len); - return 0; -} /* command_handle */ - -/* initialize the connection state information */ -static int -command_init(connection_obj_t *conn) -{ - assert(conn && (! conn->cmd) && (! conn->cmd_len)); - - conn->cmd = connection_get_int32(conn, 0); - conn->cmd_len = connection_get_int32(conn, sizeof(uint32_t)); - sdb_strbuf_skip(conn->buf, 2 * sizeof(uint32_t)); - return 0; -} /* command_init */ - -/* returns negative value on error, 0 on EOF, number of octets else */ -static ssize_t -connection_read(connection_obj_t *conn) -{ - ssize_t n = 0; - - while (42) { - ssize_t status; - - errno = 0; - status = sdb_strbuf_read(conn->buf, conn->fd, 1024); - if (status < 0) { - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) - return n; - return (int)status; - } - else if (! status) /* EOF */ - return n; - - n += status; - } - - return n; -} /* connection_read */ - static void * connection_handler(void *data) { @@ -441,7 +293,7 @@ connection_handler(void *data) while (42) { struct timespec timeout = { 0, 500000000 }; /* .5 seconds */ - connection_obj_t *conn; + sdb_conn_t *conn; int status; errno = 0; @@ -460,18 +312,13 @@ connection_handler(void *data) continue; } - status = (int)connection_read(conn); + status = (int)sdb_connection_read(conn); if (status <= 0) { /* error or EOF -> close connection */ sdb_object_deref(SDB_OBJ(conn)); continue; } - if (conn->cmd_len && (sdb_strbuf_len(conn->buf) >= conn->cmd_len)) - command_handle(conn); - else if (sdb_strbuf_len(conn->buf) >= 2 * sizeof(int32_t)) - command_init(conn); - /* return the connection to the main loop */ if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) { sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append " @@ -489,25 +336,21 @@ static int connection_accept(sdb_fe_socket_t *sock, listener_t *listener) { sdb_object_t *obj; + int status; - /* the placeholder will be replaced with the accepted file - * descriptor when initializing the object */ - obj = sdb_object_create(CONN_FD_PREFIX CONN_FD_PLACEHOLDER, - connection_type, listener->sock_fd); + obj = SDB_OBJ(sdb_connection_accept(listener->sock_fd)); if (! obj) return -1; - if (sdb_llist_append(sock->open_connections, obj)) { + status = sdb_llist_append(sock->open_connections, obj); + if (status) sdb_log(SDB_LOG_ERR, "frontend: Failed to append " "connection %s to list of open connections", obj->name); - sdb_object_deref(obj); - return -1; - } - /* hand ownership over to the list */ + /* hand ownership over to the list; or destroy in case of an error */ sdb_object_deref(obj); - return 0; + return status; } /* connection_accept */ static int @@ -609,12 +452,16 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) int max_listen_fd = 0; size_t i; - /* XXX: make the number of threads configurable */ - pthread_t handler_threads[5]; + pthread_t handler_threads[loop->num_threads]; + size_t num_threads; - if ((! sock) || (! sock->listeners_num) || (! loop) || sock->chan) + if ((! sock) || (! sock->listeners_num) || sock->chan + || (! loop) || (loop->num_threads <= 0)) return -1; + if (! loop->do_loop) + return 0; + FD_ZERO(&sockets); for (i = 0; i < sock->listeners_num; ++i) { listener_t *listener = sock->listeners + i; @@ -629,19 +476,33 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) max_listen_fd = listener->sock_fd; } - sock->chan = sdb_channel_create(1024, sizeof(connection_obj_t *)); + sock->chan = sdb_channel_create(1024, sizeof(sdb_conn_t *)); if (! sock->chan) { socket_close(sock); return -1; } + sdb_log(SDB_LOG_INFO, "frontend: Starting %d connection " + "handler thread%s managing %d listener%s", + loop->num_threads, loop->num_threads == 1 ? "" : "s", + sock->listeners_num, sock->listeners_num == 1 ? "" : "s"); + + num_threads = loop->num_threads; memset(&handler_threads, 0, sizeof(handler_threads)); - /* XXX: error handling */ - for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i) - pthread_create(&handler_threads[i], /* attr = */ NULL, - connection_handler, /* arg = */ sock); + for (i = 0; i < num_threads; ++i) { + errno = 0; + if (pthread_create(&handler_threads[i], /* attr = */ NULL, + connection_handler, /* arg = */ sock)) { + char errbuf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to create " + "connection handler thread: %s", + sdb_strerror(errno, errbuf, sizeof(errbuf))); + num_threads = i; + break; + } + } - while (loop->do_loop) { + while (loop->do_loop && num_threads) { struct timeval timeout = { 1, 0 }; /* one second */ sdb_llist_iter_t *iter; @@ -697,12 +558,15 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads " "to terminate"); if (! sdb_channel_shutdown(sock->chan)) - for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i) + for (i = 0; i < num_threads; ++i) pthread_join(handler_threads[i], NULL); /* else: we tried our best; let the operating system clean up */ sdb_channel_destroy(sock->chan); sock->chan = NULL; + + if (! num_threads) + return -1; return 0; } /* sdb_fe_sock_listen_and_server */