X-Git-Url: https://git.tokkee.org/?p=sysdb.git;a=blobdiff_plain;f=src%2Ffrontend%2Fsock.c;h=642f03f92b0f87d25e3783bb501ecbce793ff6c3;hp=fb79f105f686295c77ef4b8a332a47aa27f2840b;hb=ad77ec3d257ac66d74e91452d098882247abdc0a;hpb=633201fd36edb21a0283c2541145a13ffea1d1b4 diff --git a/src/frontend/sock.c b/src/frontend/sock.c index fb79f10..642f03f 100644 --- a/src/frontend/sock.c +++ b/src/frontend/sock.c @@ -25,16 +25,22 @@ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#if HAVE_CONFIG_H +# include "config.h" +#endif /* HAVE_CONFIG_H */ + #include "sysdb.h" -#include "core/error.h" #include "core/object.h" +#include "frontend/connection-private.h" #include "frontend/sock.h" #include "utils/channel.h" +#include "utils/error.h" #include "utils/llist.h" +#include "utils/os.h" +#include "utils/strbuf.h" #include - #include #include @@ -43,48 +49,34 @@ #include -#include - #include #include #include #include #include -#include +#include -/* name of connection objects */ -#define CONN_FD_PREFIX "conn#" -#define CONN_FD_PLACEHOLDER "XXXXXXX" +#include /* * private data types */ -typedef struct { - int fd; - struct sockaddr_storage client_addr; - socklen_t client_addr_len; -} connection_t; - -typedef struct { - sdb_object_t super; - connection_t conn; -} connection_obj_t; -#define CONN(obj) ((connection_obj_t *)(obj)) - typedef struct { char *address; int type; int sock_fd; + int (*accept)(sdb_conn_t *); } listener_t; typedef struct { int type; const char *prefix; - int (*opener)(listener_t *); + int (*open)(listener_t *); + void (*close)(listener_t *); } fe_listener_impl_t; struct sdb_fe_socket { @@ -105,6 +97,9 @@ struct sdb_fe_socket { static int open_unix_sock(listener_t *listener) { + const char *addr; + char *addr_copy; + char *base_dir; struct sockaddr_un sa; int status; @@ -116,10 +111,40 @@ open_unix_sock(listener_t *listener) return -1; } + if (*listener->address == '/') + addr = listener->address; + else + addr = listener->address + strlen("unix:"); + memset(&sa, 0, sizeof(sa)); sa.sun_family = AF_UNIX; - strncpy(sa.sun_path, listener->address + strlen("unix:"), - sizeof(sa.sun_path)); + strncpy(sa.sun_path, addr, sizeof(sa.sun_path)); + + addr_copy = strdup(addr); + if (! addr_copy) { + char errbuf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: strdup failed: %s", + sdb_strerror(errno, errbuf, sizeof(errbuf))); + return -1; + } + base_dir = dirname(addr_copy); + + /* ensure that the directory exists */ + if (sdb_mkdir_all(base_dir, 0777)) { + char errbuf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to create directory '%s': %s", + base_dir, sdb_strerror(errno, errbuf, sizeof(errbuf))); + free(addr_copy); + return -1; + } + free(addr_copy); + + if (unlink(addr) && (errno != ENOENT)) { + char errbuf[1024]; + sdb_log(SDB_LOG_WARNING, "frontend: Failed to remove stale UNIX " + "socket %s: %s", listener->address + strlen("unix:"), + sdb_strerror(errno, errbuf, sizeof(errbuf))); + } status = bind(listener->sock_fd, (struct sockaddr *)&sa, sizeof(sa)); if (status) { @@ -131,6 +156,27 @@ open_unix_sock(listener_t *listener) return 0; } /* open_unix_sock */ +static void +close_unix_sock(listener_t *listener) +{ + const char *addr; + assert(listener); + + if (! listener->address) + return; + + if (*listener->address == '/') + addr = listener->address; + else + addr = listener->address + strlen("unix:"); + + if (listener->sock_fd >= 0) + close(listener->sock_fd); + listener->sock_fd = -1; + + unlink(addr); +} /* close_unix_sock */ + /* * private variables */ @@ -138,16 +184,49 @@ open_unix_sock(listener_t *listener) /* the enum has to be sorted the same as the implementations array * to ensure that the type may be used as index into the array */ enum { - LISTENER_UNIXSOCK = 0, + LISTENER_UNIXSOCK = 0, /* this is the default */ }; static fe_listener_impl_t listener_impls[] = { - { LISTENER_UNIXSOCK, "unix", open_unix_sock }, + { LISTENER_UNIXSOCK, "unix", open_unix_sock, close_unix_sock }, }; /* * private helper functions */ +static int +listener_listen(listener_t *listener) +{ + assert(listener); + + /* try to reopen */ + if (listener->sock_fd < 0) + if (listener_impls[listener->type].open(listener)) + return -1; + assert(listener->sock_fd >= 0); + + if (listen(listener->sock_fd, /* backlog = */ 32)) { + char buf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s", + listener->address, sdb_strerror(errno, buf, sizeof(buf))); + return -1; + } + return 0; +} /* listener_listen */ + +static void +listener_close(listener_t *listener) +{ + assert(listener); + + if (listener_impls[listener->type].close) + listener_impls[listener->type].close(listener); + + if (listener->sock_fd >= 0) + close(listener->sock_fd); + listener->sock_fd = -1; +} /* listener_close */ + static int get_type(const char *address) { @@ -157,7 +236,7 @@ get_type(const char *address) sep = strchr(address, (int)':'); if (! sep) - return -1; + return listener_impls[0].type; assert(sep > address); len = (size_t)(sep - address); @@ -179,11 +258,11 @@ listener_destroy(listener_t *listener) if (! listener) return; - if (listener->sock_fd >= 0) - close(listener->sock_fd); + listener_close(listener); if (listener->address) free(listener->address); + listener->address = NULL; } /* listener_destroy */ static listener_t * @@ -200,7 +279,7 @@ listener_create(sdb_fe_socket_t *sock, const char *address) } listener = realloc(sock->listeners, - sock->listeners_num * sizeof(*sock->listeners)); + (sock->listeners_num + 1) * sizeof(*sock->listeners)); if (! listener) { char buf[1024]; sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate memory: %s", @@ -221,8 +300,9 @@ listener_create(sdb_fe_socket_t *sock, const char *address) return NULL; } listener->type = type; + listener->accept = NULL; - if (listener_impls[type].opener(listener)) { + if (listener_impls[type].open(listener)) { /* prints error */ listener_destroy(listener); return NULL; @@ -232,109 +312,34 @@ listener_create(sdb_fe_socket_t *sock, const char *address) return listener; } /* listener_create */ -/* - * private data types - */ - -static int -connection_init(sdb_object_t *obj, va_list ap) +static void +socket_clear(sdb_fe_socket_t *sock) { - connection_t *conn; - int sock_fd; - int sock_fl; - - assert(obj); - conn = &CONN(obj)->conn; - - sock_fd = va_arg(ap, int); - - conn->client_addr_len = sizeof(conn->client_addr); - conn->fd = accept(sock_fd, (struct sockaddr *)&conn->client_addr, - &conn->client_addr_len); - - if (conn->fd < 0) { - char buf[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to accept remote " - "connection: %s", sdb_strerror(errno, - buf, sizeof(buf))); - return -1; - } - - if (conn->client_addr.ss_family != AF_UNIX) { - sdb_log(SDB_LOG_ERR, "frontend: Accepted connection using " - "unexpected family type %d", conn->client_addr.ss_family); - return -1; - } - - sock_fl = fcntl(conn->fd, F_GETFL); - if (fcntl(conn->fd, F_SETFL, sock_fl | O_NONBLOCK)) { - char buf[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to switch connection conn#%i " - "to non-blocking mode: %s", conn->fd, - sdb_strerror(errno, buf, sizeof(buf))); - return -1; - } - - sdb_log(SDB_LOG_DEBUG, "frontend: Accepted connection on fd=%i", - conn->fd); + size_t i; - /* update the object name */ - snprintf(obj->name + strlen(CONN_FD_PREFIX), - strlen(CONN_FD_PLACEHOLDER), "%i", conn->fd); - return 0; -} /* connection_init */ + assert(sock); + for (i = 0; i < sock->listeners_num; ++i) + listener_destroy(sock->listeners + i); + if (sock->listeners) + free(sock->listeners); + sock->listeners = NULL; + sock->listeners_num = 0; +} /* socket_clear */ static void -connection_destroy(sdb_object_t *obj) +socket_close(sdb_fe_socket_t *sock) { - connection_t *conn; - - assert(obj); - conn = &CONN(obj)->conn; - - close(conn->fd); - conn->fd = -1; -} /* connection_destroy */ + size_t i; -static sdb_type_t connection_type = { - /* size = */ sizeof(connection_obj_t), - /* init = */ connection_init, - /* destroy = */ connection_destroy, -}; + assert(sock); + for (i = 0; i < sock->listeners_num; ++i) + listener_close(sock->listeners + i); +} /* socket_close */ /* * connection handler functions */ -/* returns negative value on error, 0 on EOF, number of packets else */ -static int -connection_read(int fd) -{ - int n = 0; - - while (42) { - int32_t cmd; - ssize_t status; - - errno = 0; - status = read(fd, &cmd, sizeof(cmd)); - if (status < 0) { - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) - return n + 1; - return (int)status; - } - else if (! status) /* EOF */ - return 0; - - /* XXX */ - sdb_log(SDB_LOG_DEBUG, "frontend: read command %i from fd=%i", - cmd, fd); - ++n; - } - - return n + 1; -} /* connection_read */ - static void * connection_handler(void *data) { @@ -344,7 +349,7 @@ connection_handler(void *data) while (42) { struct timespec timeout = { 0, 500000000 }; /* .5 seconds */ - connection_obj_t *conn; + sdb_conn_t *conn; int status; errno = 0; @@ -363,9 +368,22 @@ connection_handler(void *data) continue; } - /* XXX */ - sdb_log(SDB_LOG_INFO, "frontend: Data available on connection fd=%i\n", - conn->conn.fd); + status = (int)sdb_connection_handle(conn); + if (status <= 0) { + /* error or EOF -> close connection */ + sdb_object_deref(SDB_OBJ(conn)); + continue; + } + + /* return the connection to the main loop */ + if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append " + "connection %s to list of open connections", + SDB_OBJ(conn)->name); + } + + /* pass ownership back to list; or destroy in case of an error */ + sdb_object_deref(SDB_OBJ(conn)); } return NULL; } /* connection_handler */ @@ -374,26 +392,70 @@ static int connection_accept(sdb_fe_socket_t *sock, listener_t *listener) { sdb_object_t *obj; + int status; - /* the placeholder will be replaced with the accepted file - * descriptor when initializing the object */ - obj = sdb_object_create(CONN_FD_PREFIX CONN_FD_PLACEHOLDER, - connection_type, listener->sock_fd); + obj = SDB_OBJ(sdb_connection_accept(listener->sock_fd)); if (! obj) return -1; - if (sdb_llist_append(sock->open_connections, obj)) { + if (listener->accept && listener->accept(CONN(obj))) { + /* accept() is expected to log an error */ + sdb_object_deref(obj); + return -1; + } + + status = sdb_llist_append(sock->open_connections, obj); + if (status) sdb_log(SDB_LOG_ERR, "frontend: Failed to append " "connection %s to list of open connections", obj->name); - sdb_object_deref(obj); + + /* hand ownership over to the list; or destroy in case of an error */ + sdb_object_deref(obj); + return status; +} /* connection_accept */ + +static int +socket_handle_incoming(sdb_fe_socket_t *sock, + fd_set *ready, fd_set *exceptions) +{ + sdb_llist_iter_t *iter; + size_t i; + + for (i = 0; i < sock->listeners_num; ++i) { + listener_t *listener = sock->listeners + i; + if (FD_ISSET(listener->sock_fd, ready)) + if (connection_accept(sock, listener)) + continue; + } + + iter = sdb_llist_get_iter(sock->open_connections); + if (! iter) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator " + "for open connections"); return -1; } - /* hand ownership over to the list */ - sdb_object_deref(obj); + while (sdb_llist_iter_has_next(iter)) { + sdb_object_t *obj = sdb_llist_iter_get_next(iter); + + if (FD_ISSET(CONN(obj)->fd, exceptions)) { + sdb_log(SDB_LOG_INFO, "Exception on fd %d", + CONN(obj)->fd); + /* close the connection */ + sdb_llist_iter_remove_current(iter); + sdb_object_deref(obj); + continue; + } + + if (FD_ISSET(CONN(obj)->fd, ready)) { + sdb_llist_iter_remove_current(iter); + sdb_channel_write(sock->chan, &obj); + } + } + sdb_llist_iter_destroy(iter); return 0; -} /* connection_accept */ +} /* socket_handle_incoming */ /* * public API @@ -419,17 +481,10 @@ sdb_fe_sock_create(void) void sdb_fe_sock_destroy(sdb_fe_socket_t *sock) { - size_t i; - if (! sock) return; - for (i = 0; i < sock->listeners_num; ++i) { - listener_destroy(sock->listeners + i); - } - if (sock->listeners) - free(sock->listeners); - sock->listeners = NULL; + socket_clear(sock); sdb_llist_destroy(sock->open_connections); sock->open_connections = NULL; @@ -450,6 +505,15 @@ sdb_fe_sock_add_listener(sdb_fe_socket_t *sock, const char *address) return 0; } /* sdb_fe_sock_add_listener */ +void +sdb_fe_sock_clear_listeners(sdb_fe_socket_t *sock) +{ + if (! sock) + return; + + socket_clear(sock); +} /* sdb_fe_sock_clear_listeners */ + int sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) { @@ -457,21 +521,22 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) int max_listen_fd = 0; size_t i; - /* XXX: make the number of threads configurable */ - pthread_t handler_threads[5]; + pthread_t handler_threads[loop->num_threads]; + size_t num_threads; - if ((! sock) || (! sock->listeners_num) || (! loop)) + if ((! sock) || (! sock->listeners_num) || sock->chan + || (! loop) || (loop->num_threads <= 0)) return -1; - FD_ZERO(&sockets); + if (! loop->do_loop) + return 0; + FD_ZERO(&sockets); for (i = 0; i < sock->listeners_num; ++i) { listener_t *listener = sock->listeners + i; - if (listen(listener->sock_fd, /* backlog = */ 32)) { - char buf[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s", - listener->address, sdb_strerror(errno, buf, sizeof(buf))); + if (listener_listen(listener)) { + socket_close(sock); return -1; } @@ -480,46 +545,67 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) max_listen_fd = listener->sock_fd; } - sock->chan = sdb_channel_create(1024, sizeof(connection_obj_t *)); - if (! sock->chan) + sock->chan = sdb_channel_create(1024, sizeof(sdb_conn_t *)); + if (! sock->chan) { + socket_close(sock); return -1; + } + sdb_log(SDB_LOG_INFO, "frontend: Starting %zu connection " + "handler thread%s managing %zu listener%s", + loop->num_threads, loop->num_threads == 1 ? "" : "s", + sock->listeners_num, sock->listeners_num == 1 ? "" : "s"); + + num_threads = loop->num_threads; memset(&handler_threads, 0, sizeof(handler_threads)); - /* XXX: error handling */ - for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i) - pthread_create(&handler_threads[i], /* attr = */ NULL, - connection_handler, /* arg = */ sock); + for (i = 0; i < num_threads; ++i) { + errno = 0; + if (pthread_create(&handler_threads[i], /* attr = */ NULL, + connection_handler, /* arg = */ sock)) { + char errbuf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to create " + "connection handler thread: %s", + sdb_strerror(errno, errbuf, sizeof(errbuf))); + num_threads = i; + break; + } + } - while (loop->do_loop) { + while (loop->do_loop && num_threads) { + struct timeval timeout = { 1, 0 }; /* one second */ + sdb_llist_iter_t *iter; + + int max_fd = max_listen_fd; fd_set ready; fd_set exceptions; - int max_fd; int n; - struct timeval timeout = { 1, 0 }; /* one second */ - sdb_llist_iter_t *iter; - FD_ZERO(&ready); FD_ZERO(&exceptions); ready = sockets; - max_fd = max_listen_fd; - iter = sdb_llist_get_iter(sock->open_connections); if (! iter) { sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator " "for open connections"); - return -1; + break; } while (sdb_llist_iter_has_next(iter)) { sdb_object_t *obj = sdb_llist_iter_get_next(iter); - FD_SET(CONN(obj)->conn.fd, &ready); - FD_SET(CONN(obj)->conn.fd, &exceptions); - if (CONN(obj)->conn.fd > max_fd) - max_fd = CONN(obj)->conn.fd; + if (CONN(obj)->fd < 0) { + sdb_llist_iter_remove_current(iter); + sdb_object_deref(obj); + continue; + } + + FD_SET(CONN(obj)->fd, &ready); + FD_SET(CONN(obj)->fd, &exceptions); + + if (CONN(obj)->fd > max_fd) + max_fd = CONN(obj)->fd; } sdb_llist_iter_destroy(iter); @@ -533,48 +619,30 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) sdb_log(SDB_LOG_ERR, "frontend: Failed to monitor sockets: %s", sdb_strerror(errno, buf, sizeof(buf))); - return -1; + break; } - - if (! n) + else if (! n) continue; - for (i = 0; i < sock->listeners_num; ++i) { - listener_t *listener = sock->listeners + i; - if (FD_ISSET(listener->sock_fd, &ready)) - if (connection_accept(sock, listener)) - continue; - } - - iter = sdb_llist_get_iter(sock->open_connections); - if (! iter) { - sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator " - "for open connections"); - return -1; - } - - while (sdb_llist_iter_has_next(iter)) { - sdb_object_t *obj = sdb_llist_iter_get_next(iter); - - if (FD_ISSET(CONN(obj)->conn.fd, &exceptions)) - sdb_log(SDB_LOG_INFO, "Exception on fd %d", - CONN(obj)->conn.fd); - - if (FD_ISSET(CONN(obj)->conn.fd, &ready)) { - sdb_log(SDB_LOG_INFO, "Data on fd %d", CONN(obj)->conn.fd); - sdb_llist_iter_remove_current(iter); - sdb_channel_write(sock->chan, &obj); - } - } - sdb_llist_iter_destroy(iter); + /* handle new and open connections */ + if (socket_handle_incoming(sock, &ready, &exceptions)) + break; } + socket_close(sock); + sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads " "to terminate"); if (! sdb_channel_shutdown(sock->chan)) - for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i) + for (i = 0; i < num_threads; ++i) pthread_join(handler_threads[i], NULL); /* else: we tried our best; let the operating system clean up */ + + sdb_channel_destroy(sock->chan); + sock->chan = NULL; + + if (! num_threads) + return -1; return 0; } /* sdb_fe_sock_listen_and_server */