X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Ffrontend%2Fsock.c;h=d7b2ee97324ddef9d32246f9be163ec30152f998;hb=23760ca65713cd9b8338f09bf45ec20ff4c3ef8d;hp=3662a2650cc41d313ea5c6e8613c59810825fd50;hpb=748bf089e36d56076583a8466052505fc786dba9;p=sysdb.git diff --git a/src/frontend/sock.c b/src/frontend/sock.c index 3662a26..d7b2ee9 100644 --- a/src/frontend/sock.c +++ b/src/frontend/sock.c @@ -32,17 +32,21 @@ #include "utils/channel.h" #include "utils/llist.h" +#include "utils/strbuf.h" #include - #include +#include + #include #include #include #include +#include + #include #include #include @@ -60,14 +64,19 @@ */ typedef struct { + sdb_object_t super; + + /* connection and client information */ int fd; struct sockaddr_storage client_addr; socklen_t client_addr_len; -} connection_t; -typedef struct { - sdb_object_t super; - connection_t conn; + /* read buffer */ + sdb_strbuf_t *buf; + + /* state information for the currently executed command */ + uint32_t cmd; + uint32_t cmd_len; } connection_obj_t; #define CONN(obj) ((connection_obj_t *)(obj)) @@ -90,6 +99,10 @@ struct sdb_fe_socket { size_t listeners_num; sdb_llist_t *open_connections; + + /* channel used for communication between main + * and connection handler threads */ + sdb_channel_t *chan; }; /* @@ -175,6 +188,7 @@ listener_destroy(listener_t *listener) if (listener->sock_fd >= 0) close(listener->sock_fd); + listener->sock_fd = -1; if (listener->address) free(listener->address); @@ -226,6 +240,48 @@ listener_create(sdb_fe_socket_t *sock, const char *address) return listener; } /* listener_create */ +static int +listener_listen(listener_t *listener) +{ + assert(listener); + + /* try to reopen */ + if (listener->sock_fd < 0) + if (listener_impls[listener->type].opener(listener)) + return -1; + assert(listener->sock_fd >= 0); + + if (listen(listener->sock_fd, /* backlog = */ 32)) { + char buf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s", + listener->address, sdb_strerror(errno, buf, sizeof(buf))); + return -1; + } + return 0; +} /* listener_listen */ + +static void +listener_close(listener_t *listener) +{ + assert(listener); + + if (listener->sock_fd < 0) + return; + + close(listener->sock_fd); + listener->sock_fd = -1; +} /* listener_close */ + +static void +socket_close(sdb_fe_socket_t *sock) +{ + size_t i; + + assert(sock); + for (i = 0; i < sock->listeners_num; ++i) + listener_close(sock->listeners + i); +} /* socket_close */ + /* * private data types */ @@ -233,14 +289,22 @@ listener_create(sdb_fe_socket_t *sock, const char *address) static int connection_init(sdb_object_t *obj, va_list ap) { - connection_t *conn; + connection_obj_t *conn; int sock_fd; + int sock_fl; assert(obj); - conn = &CONN(obj)->conn; + conn = CONN(obj); sock_fd = va_arg(ap, int); + CONN(obj)->buf = sdb_strbuf_create(/* size = */ 128); + if (! CONN(obj)->buf) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate a read buffer " + "for a new remote connection"); + return -1; + } + conn->client_addr_len = sizeof(conn->client_addr); conn->fd = accept(sock_fd, (struct sockaddr *)&conn->client_addr, &conn->client_addr_len); @@ -259,7 +323,16 @@ connection_init(sdb_object_t *obj, va_list ap) return -1; } - sdb_log(SDB_LOG_DEBUG, "frontend: Accepted connection on fd=%i\n", + sock_fl = fcntl(conn->fd, F_GETFL); + if (fcntl(conn->fd, F_SETFL, sock_fl | O_NONBLOCK)) { + char buf[1024]; + sdb_log(SDB_LOG_ERR, "frontend: Failed to switch connection conn#%i " + "to non-blocking mode: %s", conn->fd, + sdb_strerror(errno, buf, sizeof(buf))); + return -1; + } + + sdb_log(SDB_LOG_DEBUG, "frontend: Accepted connection on fd=%i", conn->fd); /* update the object name */ @@ -271,13 +344,22 @@ connection_init(sdb_object_t *obj, va_list ap) static void connection_destroy(sdb_object_t *obj) { - connection_t *conn; + connection_obj_t *conn; + size_t len; assert(obj); - conn = &CONN(obj)->conn; + conn = CONN(obj); + len = sdb_strbuf_len(conn->buf); + if (len) + sdb_log(SDB_LOG_INFO, "frontend: Discarding incomplete command " + "(%zu bytes left in buffer)", len); + + sdb_log(SDB_LOG_DEBUG, "frontend: Closing connection on fd=%i", conn->fd); close(conn->fd); conn->fd = -1; + + sdb_strbuf_destroy(CONN(obj)->buf); } /* connection_destroy */ static sdb_type_t connection_type = { @@ -290,12 +372,72 @@ static sdb_type_t connection_type = { * connection handler functions */ +static uint32_t +connection_get_int32(connection_obj_t *conn, size_t offset) +{ + const char *data; + uint32_t n; + + assert(conn && (sdb_strbuf_len(conn->buf) >= offset + sizeof(uint32_t))); + + data = sdb_strbuf_string(conn->buf); + memcpy(&n, data + offset, sizeof(n)); + n = ntohl(n); + return n; +} /* connection_get_int32 */ + +static int +command_handle(connection_obj_t *conn) +{ + assert(conn && conn->cmd && conn->cmd_len); + /* XXX */ + sdb_strbuf_skip(conn->buf, conn->cmd_len); + return 0; +} /* command_handle */ + +/* initialize the connection state information */ +static int +command_init(connection_obj_t *conn) +{ + assert(conn && (! conn->cmd) && (! conn->cmd_len)); + + conn->cmd = connection_get_int32(conn, 0); + conn->cmd_len = connection_get_int32(conn, sizeof(uint32_t)); + sdb_strbuf_skip(conn->buf, 2 * sizeof(uint32_t)); + return 0; +} /* command_init */ + +/* returns negative value on error, 0 on EOF, number of octets else */ +static ssize_t +connection_read(connection_obj_t *conn) +{ + ssize_t n = 0; + + while (42) { + ssize_t status; + + errno = 0; + status = sdb_strbuf_read(conn->buf, conn->fd, 1024); + if (status < 0) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + return n; + return (int)status; + } + else if (! status) /* EOF */ + return n; + + n += status; + } + + return n; +} /* connection_read */ + static void * connection_handler(void *data) { - sdb_channel_t *chan = data; + sdb_fe_socket_t *sock = data; - assert(chan); + assert(sock); while (42) { struct timespec timeout = { 0, 500000000 }; /* .5 seconds */ @@ -303,7 +445,8 @@ connection_handler(void *data) int status; errno = 0; - status = sdb_channel_select(chan, NULL, &conn, NULL, NULL, &timeout); + status = sdb_channel_select(sock->chan, /* read */ NULL, &conn, + /* write */ NULL, NULL, &timeout); if (status) { char buf[1024]; @@ -317,20 +460,38 @@ connection_handler(void *data) continue; } - /* XXX */ - sdb_log(SDB_LOG_INFO, "frontend: Data available on connection fd=%i\n", - conn->conn.fd); + status = (int)connection_read(conn); + if (status <= 0) { + /* error or EOF -> close connection */ + sdb_object_deref(SDB_OBJ(conn)); + continue; + } + + if (conn->cmd_len && (sdb_strbuf_len(conn->buf) >= conn->cmd_len)) + command_handle(conn); + else if (sdb_strbuf_len(conn->buf) >= 2 * sizeof(int32_t)) + command_init(conn); + + /* return the connection to the main loop */ + if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append " + "connection %s to list of open connections", + SDB_OBJ(conn)->name); + } + + /* pass ownership back to list; or destroy in case of an error */ + sdb_object_deref(SDB_OBJ(conn)); } return NULL; } /* connection_handler */ static int -accept_connection(sdb_fe_socket_t *sock, listener_t *listener) +connection_accept(sdb_fe_socket_t *sock, listener_t *listener) { sdb_object_t *obj; - /* the X's will be replaced with the accepted file descriptor - * when initializing the object */ + /* the placeholder will be replaced with the accepted file + * descriptor when initializing the object */ obj = sdb_object_create(CONN_FD_PREFIX CONN_FD_PLACEHOLDER, connection_type, listener->sock_fd); if (! obj) @@ -347,7 +508,44 @@ accept_connection(sdb_fe_socket_t *sock, listener_t *listener) /* hand ownership over to the list */ sdb_object_deref(obj); return 0; -} /* accept_connection */ +} /* connection_accept */ + +static int +socket_handle_incoming(sdb_fe_socket_t *sock, + fd_set *ready, fd_set *exceptions) +{ + sdb_llist_iter_t *iter; + size_t i; + + for (i = 0; i < sock->listeners_num; ++i) { + listener_t *listener = sock->listeners + i; + if (FD_ISSET(listener->sock_fd, ready)) + if (connection_accept(sock, listener)) + continue; + } + + iter = sdb_llist_get_iter(sock->open_connections); + if (! iter) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator " + "for open connections"); + return -1; + } + + while (sdb_llist_iter_has_next(iter)) { + sdb_object_t *obj = sdb_llist_iter_get_next(iter); + + if (FD_ISSET(CONN(obj)->fd, exceptions)) + sdb_log(SDB_LOG_INFO, "Exception on fd %d", + CONN(obj)->fd); + + if (FD_ISSET(CONN(obj)->fd, ready)) { + sdb_llist_iter_remove_current(iter); + sdb_channel_write(sock->chan, &obj); + } + } + sdb_llist_iter_destroy(iter); + return 0; +} /* socket_handle_incoming */ /* * public API @@ -407,7 +605,6 @@ sdb_fe_sock_add_listener(sdb_fe_socket_t *sock, const char *address) int sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) { - sdb_channel_t *chan; fd_set sockets; int max_listen_fd = 0; size_t i; @@ -415,18 +612,15 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) /* XXX: make the number of threads configurable */ pthread_t handler_threads[5]; - if ((! sock) || (! sock->listeners_num) || (! loop)) + if ((! sock) || (! sock->listeners_num) || (! loop) || sock->chan) return -1; FD_ZERO(&sockets); - for (i = 0; i < sock->listeners_num; ++i) { listener_t *listener = sock->listeners + i; - if (listen(listener->sock_fd, /* backlog = */ 32)) { - char buf[1024]; - sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s", - listener->address, sdb_strerror(errno, buf, sizeof(buf))); + if (listener_listen(listener)) { + socket_close(sock); return -1; } @@ -435,46 +629,46 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) max_listen_fd = listener->sock_fd; } - chan = sdb_channel_create(1024, sizeof(connection_obj_t *)); - if (! chan) + sock->chan = sdb_channel_create(1024, sizeof(connection_obj_t *)); + if (! sock->chan) { + socket_close(sock); return -1; + } memset(&handler_threads, 0, sizeof(handler_threads)); /* XXX: error handling */ for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i) pthread_create(&handler_threads[i], /* attr = */ NULL, - connection_handler, /* arg = */ chan); + connection_handler, /* arg = */ sock); while (loop->do_loop) { + struct timeval timeout = { 1, 0 }; /* one second */ + sdb_llist_iter_t *iter; + + int max_fd = max_listen_fd; fd_set ready; fd_set exceptions; - int max_fd; int n; - struct timeval timeout = { 1, 0 }; /* one second */ - sdb_llist_iter_t *iter; - FD_ZERO(&ready); FD_ZERO(&exceptions); ready = sockets; - max_fd = max_listen_fd; - iter = sdb_llist_get_iter(sock->open_connections); if (! iter) { sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator " "for open connections"); - return -1; + break; } while (sdb_llist_iter_has_next(iter)) { sdb_object_t *obj = sdb_llist_iter_get_next(iter); - FD_SET(CONN(obj)->conn.fd, &ready); - FD_SET(CONN(obj)->conn.fd, &exceptions); + FD_SET(CONN(obj)->fd, &ready); + FD_SET(CONN(obj)->fd, &exceptions); - if (CONN(obj)->conn.fd > max_fd) - max_fd = CONN(obj)->conn.fd; + if (CONN(obj)->fd > max_fd) + max_fd = CONN(obj)->fd; } sdb_llist_iter_destroy(iter); @@ -488,48 +682,27 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) sdb_log(SDB_LOG_ERR, "frontend: Failed to monitor sockets: %s", sdb_strerror(errno, buf, sizeof(buf))); - return -1; + break; } - - if (! n) + else if (! n) continue; - for (i = 0; i < sock->listeners_num; ++i) { - listener_t *listener = sock->listeners + i; - if (FD_ISSET(listener->sock_fd, &ready)) - if (accept_connection(sock, listener)) - continue; - } - - iter = sdb_llist_get_iter(sock->open_connections); - if (! iter) { - sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator " - "for open connections"); - return -1; - } - - while (sdb_llist_iter_has_next(iter)) { - sdb_object_t *obj = sdb_llist_iter_get_next(iter); - - if (FD_ISSET(CONN(obj)->conn.fd, &exceptions)) - sdb_log(SDB_LOG_INFO, "Exception on fd %d", - CONN(obj)->conn.fd); - - if (FD_ISSET(CONN(obj)->conn.fd, &ready)) { - sdb_log(SDB_LOG_INFO, "Data on fd %d", CONN(obj)->conn.fd); - sdb_llist_iter_remove_current(iter); - sdb_channel_write(chan, &obj); - } - } - sdb_llist_iter_destroy(iter); + /* handle new and open connections */ + if (socket_handle_incoming(sock, &ready, &exceptions)) + break; } + socket_close(sock); + sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads " "to terminate"); - if (! sdb_channel_shutdown(chan)) + if (! sdb_channel_shutdown(sock->chan)) for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i) pthread_join(handler_threads[i], NULL); /* else: we tried our best; let the operating system clean up */ + + sdb_channel_destroy(sock->chan); + sock->chan = NULL; return 0; } /* sdb_fe_sock_listen_and_server */