X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Ffrontend%2Fsock.c;h=d11fc421669ee3535b6762f5a09d8d3e4227936a;hb=25ee4640ebe9532d137024cb45760869eef1eee9;hp=063b2ca3d1d6528d0a22c7304faab65408e398c2;hpb=1e227f75832c867d9174132dc7b9ffe0a4d9f96b;p=sysdb.git diff --git a/src/frontend/sock.c b/src/frontend/sock.c index 063b2ca..d11fc42 100644 --- a/src/frontend/sock.c +++ b/src/frontend/sock.c @@ -27,19 +27,25 @@ #include "sysdb.h" #include "core/error.h" +#include "core/object.h" +#include "frontend/connection.h" #include "frontend/sock.h" #include "utils/channel.h" +#include "utils/llist.h" +#include "utils/strbuf.h" #include - #include +#include #include #include #include +#include + #include #include #include @@ -48,15 +54,24 @@ #include +/* name of connection objects */ +#define CONN_FD_PREFIX "conn#" +#define CONN_FD_PLACEHOLDER "XXXXXXX" + /* * private data types */ typedef struct { - int fd; + sdb_object_t super; + + /* connection and client information */ struct sockaddr_storage client_addr; socklen_t client_addr_len; -} connection_t; + + sdb_conn_t conn; +} connection_obj_t; +#define CONN(obj) ((connection_obj_t *)(obj)) typedef struct { char *address; @@ -75,6 +90,12 @@ typedef struct { struct sdb_fe_socket { listener_t *listeners; size_t listeners_num; + + sdb_llist_t *open_connections; + + /* channel used for communication between main + * and connection handler threads */ + sdb_channel_t *chan; }; /* @@ -90,8 +111,8 @@ open_unix_sock(listener_t *listener) listener->sock_fd = socket(AF_UNIX, SOCK_STREAM, 0); if (listener->sock_fd < 0) { char buf[1024]; - sdb_log(SDB_LOG_ERR, "sock: Failed to open UNIX socket: %s", - sdb_strerror(errno, buf, sizeof(buf))); + sdb_log(SDB_LOG_ERR, "frontend: Failed to open UNIX socket %s: %s", + listener->address, sdb_strerror(errno, buf, sizeof(buf))); return -1; } @@ -103,8 +124,8 @@ open_unix_sock(listener_t *listener) status = bind(listener->sock_fd, (struct sockaddr *)&sa, sizeof(sa)); if (status) { char buf[1024]; - sdb_log(SDB_LOG_ERR, "sock: Failed to bind to UNIX socket: %s", - sdb_strerror(errno, buf, sizeof(buf))); + sdb_log(SDB_LOG_ERR, "frontend: Failed to bind to UNIX socket %s: %s", + listener->address, sdb_strerror(errno, buf, sizeof(buf))); return -1; } return 0; @@ -160,6 +181,7 @@ listener_destroy(listener_t *listener) if (listener->sock_fd >= 0) close(listener->sock_fd); + listener->sock_fd = -1; if (listener->address) free(listener->address); @@ -172,25 +194,37 @@ listener_create(sdb_fe_socket_t *sock, const char *address) int type; type = get_type(address); - if (type < 0) + if (type < 0) { + sdb_log(SDB_LOG_ERR, "frontend: Unsupported address type specified " + "in listen address '%s'", address); return NULL; + } listener = realloc(sock->listeners, sock->listeners_num * sizeof(*sock->listeners)); - if (! listener) + if (! listener) { + char buf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate memory: %s", + sdb_strerror(errno, buf, sizeof(buf))); return NULL; + } + sock->listeners = listener; listener = sock->listeners + sock->listeners_num; listener->sock_fd = -1; listener->address = strdup(address); if (! listener->address) { + char buf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate memory: %s", + sdb_strerror(errno, buf, sizeof(buf))); listener_destroy(listener); return NULL; } listener->type = type; if (listener_impls[type].opener(listener)) { + /* prints error */ listener_destroy(listener); return NULL; } @@ -199,6 +233,135 @@ listener_create(sdb_fe_socket_t *sock, const char *address) return listener; } /* listener_create */ +static int +listener_listen(listener_t *listener) +{ + assert(listener); + + /* try to reopen */ + if (listener->sock_fd < 0) + if (listener_impls[listener->type].opener(listener)) + return -1; + assert(listener->sock_fd >= 0); + + if (listen(listener->sock_fd, /* backlog = */ 32)) { + char buf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s", + listener->address, sdb_strerror(errno, buf, sizeof(buf))); + return -1; + } + return 0; +} /* listener_listen */ + +static void +listener_close(listener_t *listener) +{ + assert(listener); + + if (listener->sock_fd < 0) + return; + + close(listener->sock_fd); + listener->sock_fd = -1; +} /* listener_close */ + +static void +socket_close(sdb_fe_socket_t *sock) +{ + size_t i; + + assert(sock); + for (i = 0; i < sock->listeners_num; ++i) + listener_close(sock->listeners + i); +} /* socket_close */ + +/* + * private data types + */ + +static int +connection_init(sdb_object_t *obj, va_list ap) +{ + connection_obj_t *conn; + int sock_fd; + int sock_fl; + + assert(obj); + conn = CONN(obj); + + sock_fd = va_arg(ap, int); + + CONN(obj)->conn.buf = sdb_strbuf_create(/* size = */ 128); + if (! CONN(obj)->conn.buf) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate a read buffer " + "for a new remote connection"); + return -1; + } + + conn->client_addr_len = sizeof(conn->client_addr); + conn->conn.fd = accept(sock_fd, (struct sockaddr *)&conn->client_addr, + &conn->client_addr_len); + + if (conn->conn.fd < 0) { + char buf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to accept remote " + "connection: %s", sdb_strerror(errno, + buf, sizeof(buf))); + return -1; + } + + if (conn->client_addr.ss_family != AF_UNIX) { + sdb_log(SDB_LOG_ERR, "frontend: Accepted connection using " + "unexpected family type %d", conn->client_addr.ss_family); + return -1; + } + + sock_fl = fcntl(conn->conn.fd, F_GETFL); + if (fcntl(conn->conn.fd, F_SETFL, sock_fl | O_NONBLOCK)) { + char buf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to switch connection conn#%i " + "to non-blocking mode: %s", conn->conn.fd, + sdb_strerror(errno, buf, sizeof(buf))); + return -1; + } + + sdb_log(SDB_LOG_DEBUG, "frontend: Accepted connection on fd=%i", + conn->conn.fd); + + /* update the object name */ + snprintf(obj->name + strlen(CONN_FD_PREFIX), + strlen(CONN_FD_PLACEHOLDER), "%i", conn->conn.fd); + return 0; +} /* connection_init */ + +static void +connection_destroy(sdb_object_t *obj) +{ + connection_obj_t *conn; + size_t len; + + assert(obj); + conn = CONN(obj); + + len = sdb_strbuf_len(conn->conn.buf); + if (len) + sdb_log(SDB_LOG_INFO, "frontend: Discarding incomplete command " + "(%zu bytes left in buffer)", len); + + sdb_log(SDB_LOG_DEBUG, "frontend: Closing connection on fd=%i", + conn->conn.fd); + close(conn->conn.fd); + conn->conn.fd = -1; + + sdb_strbuf_destroy(CONN(obj)->conn.buf); +} /* connection_destroy */ + +static sdb_type_t connection_type = { + /* size = */ sizeof(connection_obj_t), + /* init = */ connection_init, + /* destroy = */ connection_destroy, +}; + /* * connection handler functions */ @@ -206,17 +369,18 @@ listener_create(sdb_fe_socket_t *sock, const char *address) static void * connection_handler(void *data) { - sdb_channel_t *chan = data; + sdb_fe_socket_t *sock = data; - assert(chan); + assert(sock); while (42) { struct timespec timeout = { 0, 500000000 }; /* .5 seconds */ - connection_t conn; + connection_obj_t *conn; int status; errno = 0; - status = sdb_channel_select(chan, NULL, &conn, NULL, NULL, &timeout); + status = sdb_channel_select(sock->chan, /* read */ NULL, &conn, + /* write */ NULL, NULL, &timeout); if (status) { char buf[1024]; @@ -225,27 +389,93 @@ connection_handler(void *data) if (errno == EBADF) /* channel shut down */ break; - sdb_log(SDB_LOG_ERR, "sock: Failed to read from channel: %s", + sdb_log(SDB_LOG_ERR, "frontend: Failed to read from channel: %s", sdb_strerror(errno, buf, sizeof(buf))); continue; } - if (conn.fd < 0) + status = (int)sdb_connection_read(&conn->conn); + if (status <= 0) { + /* error or EOF -> close connection */ + sdb_object_deref(SDB_OBJ(conn)); continue; + } - if (conn.client_addr.ss_family != AF_UNIX) { - sdb_log(SDB_LOG_ERR, "sock: Accepted connection using unexpected " - "family type %d", conn.client_addr.ss_family); - continue; + /* return the connection to the main loop */ + if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append " + "connection %s to list of open connections", + SDB_OBJ(conn)->name); } - /* XXX */ - sdb_log(SDB_LOG_INFO, "Accepted connection on fd=%i\n", conn.fd); - close(conn.fd); + /* pass ownership back to list; or destroy in case of an error */ + sdb_object_deref(SDB_OBJ(conn)); } return NULL; } /* connection_handler */ +static int +connection_accept(sdb_fe_socket_t *sock, listener_t *listener) +{ + sdb_object_t *obj; + + /* the placeholder will be replaced with the accepted file + * descriptor when initializing the object */ + obj = sdb_object_create(CONN_FD_PREFIX CONN_FD_PLACEHOLDER, + connection_type, listener->sock_fd); + if (! obj) + return -1; + + if (sdb_llist_append(sock->open_connections, obj)) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to append " + "connection %s to list of open connections", + obj->name); + sdb_object_deref(obj); + return -1; + } + + /* hand ownership over to the list */ + sdb_object_deref(obj); + return 0; +} /* connection_accept */ + +static int +socket_handle_incoming(sdb_fe_socket_t *sock, + fd_set *ready, fd_set *exceptions) +{ + sdb_llist_iter_t *iter; + size_t i; + + for (i = 0; i < sock->listeners_num; ++i) { + listener_t *listener = sock->listeners + i; + if (FD_ISSET(listener->sock_fd, ready)) + if (connection_accept(sock, listener)) + continue; + } + + iter = sdb_llist_get_iter(sock->open_connections); + if (! iter) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator " + "for open connections"); + return -1; + } + + while (sdb_llist_iter_has_next(iter)) { + sdb_object_t *obj = sdb_llist_iter_get_next(iter); + + if (FD_ISSET(CONN(obj)->conn.fd, exceptions)) + sdb_log(SDB_LOG_INFO, "Exception on fd %d", + CONN(obj)->conn.fd); + + if (FD_ISSET(CONN(obj)->conn.fd, ready)) { + sdb_llist_iter_remove_current(iter); + sdb_channel_write(sock->chan, &obj); + } + } + sdb_llist_iter_destroy(iter); + return 0; +} /* socket_handle_incoming */ + /* * public API */ @@ -258,6 +488,12 @@ sdb_fe_sock_create(void) sock = calloc(1, sizeof(*sock)); if (! sock) return NULL; + + sock->open_connections = sdb_llist_create(); + if (! sock->open_connections) { + sdb_fe_sock_destroy(sock); + return NULL; + } return sock; } /* sdb_fe_sock_create */ @@ -274,6 +510,10 @@ sdb_fe_sock_destroy(sdb_fe_socket_t *sock) } if (sock->listeners) free(sock->listeners); + sock->listeners = NULL; + + sdb_llist_destroy(sock->open_connections); + sock->open_connections = NULL; free(sock); } /* sdb_fe_sock_destroy */ @@ -294,98 +534,104 @@ sdb_fe_sock_add_listener(sdb_fe_socket_t *sock, const char *address) int sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) { - sdb_channel_t *chan; fd_set sockets; - int max_fd = 0; + int max_listen_fd = 0; size_t i; /* XXX: make the number of threads configurable */ pthread_t handler_threads[5]; - if ((! sock) || (! sock->listeners_num) || (! loop)) + if ((! sock) || (! sock->listeners_num) || (! loop) || sock->chan) return -1; FD_ZERO(&sockets); - for (i = 0; i < sock->listeners_num; ++i) { listener_t *listener = sock->listeners + i; - if (listen(listener->sock_fd, /* backlog = */ 32)) { - char buf[1024]; - sdb_log(SDB_LOG_ERR, "sock: Failed to listen on socket %s: %s", - listener->address, sdb_strerror(errno, buf, sizeof(buf))); + if (listener_listen(listener)) { + socket_close(sock); return -1; } FD_SET(listener->sock_fd, &sockets); - if (listener->sock_fd > max_fd) - max_fd = listener->sock_fd; + if (listener->sock_fd > max_listen_fd) + max_listen_fd = listener->sock_fd; } - chan = sdb_channel_create(1024, sizeof(connection_t)); - if (! chan) + sock->chan = sdb_channel_create(1024, sizeof(connection_obj_t *)); + if (! sock->chan) { + socket_close(sock); return -1; + } memset(&handler_threads, 0, sizeof(handler_threads)); /* XXX: error handling */ for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i) pthread_create(&handler_threads[i], /* attr = */ NULL, - connection_handler, /* arg = */ chan); + connection_handler, /* arg = */ sock); while (loop->do_loop) { - fd_set ready = sockets; + struct timeval timeout = { 1, 0 }; /* one second */ + sdb_llist_iter_t *iter; + + int max_fd = max_listen_fd; + fd_set ready; + fd_set exceptions; int n; - struct timeval timeout = { 1, 0 }; /* one second */ + FD_ZERO(&ready); + FD_ZERO(&exceptions); + + ready = sockets; + + iter = sdb_llist_get_iter(sock->open_connections); + if (! iter) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator " + "for open connections"); + break; + } + + while (sdb_llist_iter_has_next(iter)) { + sdb_object_t *obj = sdb_llist_iter_get_next(iter); + FD_SET(CONN(obj)->conn.fd, &ready); + FD_SET(CONN(obj)->conn.fd, &exceptions); + + if (CONN(obj)->conn.fd > max_fd) + max_fd = CONN(obj)->conn.fd; + } + sdb_llist_iter_destroy(iter); errno = 0; - n = select(max_fd + 1, &ready, NULL, NULL, &timeout); + n = select(max_fd + 1, &ready, NULL, &exceptions, &timeout); if (n < 0) { char buf[1024]; if (errno == EINTR) continue; - sdb_log(SDB_LOG_ERR, "sock: Failed to monitor sockets: %s", + sdb_log(SDB_LOG_ERR, "frontend: Failed to monitor sockets: %s", sdb_strerror(errno, buf, sizeof(buf))); - return -1; + break; } - - if (! n) + else if (! n) continue; - for (i = 0; i < sock->listeners_num; ++i) { - listener_t *listener = sock->listeners + i; - - if (FD_ISSET(listener->sock_fd, &ready)) { - connection_t conn; - - memset(&conn, 0, sizeof(conn)); - conn.client_addr_len = sizeof(conn.client_addr); - - conn.fd = accept(listener->sock_fd, - (struct sockaddr *)&conn.client_addr, - &conn.client_addr_len); - - if (conn.fd < 0) { - char buf[1024]; - sdb_log(SDB_LOG_ERR, "sock: Failed to accept remote " - "connection: %s", sdb_strerror(errno, - buf, sizeof(buf))); - continue; - } - - sdb_channel_write(chan, &conn); - } - } + /* handle new and open connections */ + if (socket_handle_incoming(sock, &ready, &exceptions)) + break; } - sdb_log(SDB_LOG_INFO, "sock: Waiting for connection handler threads " + socket_close(sock); + + sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads " "to terminate"); - if (! sdb_channel_shutdown(chan)) + if (! sdb_channel_shutdown(sock->chan)) for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i) pthread_join(handler_threads[i], NULL); /* else: we tried our best; let the operating system clean up */ + + sdb_channel_destroy(sock->chan); + sock->chan = NULL; return 0; } /* sdb_fe_sock_listen_and_server */