From 16274941b4881a5ad9f68282f5f325bc1ebaef26 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Wed, 6 Nov 2013 22:25:53 +0100 Subject: [PATCH] socket frontend: Read incoming commands into a buffer. Handle commands once all of them has been read. Give preference to reading data over handling a command in order not to block clients or exhaust operating system buffers. --- src/frontend/sock.c | 102 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 79 insertions(+), 23 deletions(-) diff --git a/src/frontend/sock.c b/src/frontend/sock.c index 53a8a86..792c0e4 100644 --- a/src/frontend/sock.c +++ b/src/frontend/sock.c @@ -32,11 +32,13 @@ #include "utils/channel.h" #include "utils/llist.h" +#include "utils/strbuf.h" #include - #include +#include + #include #include #include @@ -64,9 +66,17 @@ typedef struct { sdb_object_t super; + /* connection and client information */ int fd; struct sockaddr_storage client_addr; socklen_t client_addr_len; + + /* read buffer */ + sdb_strbuf_t *buf; + + /* state information for the currently executed command */ + uint32_t cmd; + uint32_t cmd_len; } connection_obj_t; #define CONN(obj) ((connection_obj_t *)(obj)) @@ -288,6 +298,13 @@ connection_init(sdb_object_t *obj, va_list ap) sock_fd = va_arg(ap, int); + CONN(obj)->buf = sdb_strbuf_create(/* size = */ 128); + if (! CONN(obj)->buf) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate a read buffer " + "for a new remote connection"); + return -1; + } + conn->client_addr_len = sizeof(conn->client_addr); conn->fd = accept(sock_fd, (struct sockaddr *)&conn->client_addr, &conn->client_addr_len); @@ -335,6 +352,8 @@ connection_destroy(sdb_object_t *obj) sdb_log(SDB_LOG_DEBUG, "frontend: Closing connection on fd=%i", conn->fd); close(conn->fd); conn->fd = -1; + + sdb_strbuf_destroy(CONN(obj)->buf); } /* connection_destroy */ static sdb_type_t connection_type = { @@ -347,33 +366,64 @@ static sdb_type_t connection_type = { * connection handler functions */ -/* returns negative value on error, 0 on EOF, number of packets else */ +static uint32_t +connection_get_int32(connection_obj_t *conn, size_t offset) +{ + const char *data; + uint32_t n; + + assert(conn && (sdb_strbuf_len(conn->buf) >= offset + sizeof(uint32_t))); + + data = sdb_strbuf_string(conn->buf); + memcpy(&n, data + offset, sizeof(n)); + n = ntohl(n); + return n; +} /* connection_get_int32 */ + static int -connection_read(int fd) +command_handle(connection_obj_t *conn) { - int n = 0; + assert(conn && conn->cmd && conn->cmd_len); + /* XXX */ + sdb_strbuf_skip(conn->buf, conn->cmd_len); + return 0; +} /* command_handle */ + +/* initialize the connection state information */ +static int +command_init(connection_obj_t *conn) +{ + assert(conn && (! conn->cmd) && (! conn->cmd_len)); + + conn->cmd = connection_get_int32(conn, 0); + conn->cmd_len = connection_get_int32(conn, sizeof(uint32_t)); + sdb_strbuf_skip(conn->buf, 2 * sizeof(uint32_t)); + return 0; +} /* command_init */ + +/* returns negative value on error, 0 on EOF, number of octets else */ +static ssize_t +connection_read(connection_obj_t *conn) +{ + ssize_t n = 0; while (42) { - int32_t cmd; ssize_t status; errno = 0; - status = read(fd, &cmd, sizeof(cmd)); + status = sdb_strbuf_read(conn->buf, conn->fd, 1024); if (status < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) - return n + 1; + return n; return (int)status; } else if (! status) /* EOF */ - return 0; + return n; - /* XXX */ - sdb_log(SDB_LOG_DEBUG, "frontend: read command %i from fd=%i", - cmd, fd); - ++n; + n += status; } - return n + 1; + return n; } /* connection_read */ static void * @@ -404,21 +454,27 @@ connection_handler(void *data) continue; } - status = connection_read(conn->fd); + status = (int)connection_read(conn); if (status <= 0) { /* error or EOF -> close connection */ sdb_object_deref(SDB_OBJ(conn)); + continue; } - else { - if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) { - sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append " - "connection %s to list of open connections", - SDB_OBJ(conn)->name); - } - - /* pass ownership back to list; or destroy in case of an error */ - sdb_object_deref(SDB_OBJ(conn)); + + if (conn->cmd_len && (sdb_strbuf_len(conn->buf) >= conn->cmd_len)) + command_handle(conn); + else if (sdb_strbuf_len(conn->buf) >= 2 * sizeof(int32_t)) + command_init(conn); + + /* return the connection to the main loop */ + if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) { + sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append " + "connection %s to list of open connections", + SDB_OBJ(conn)->name); } + + /* pass ownership back to list; or destroy in case of an error */ + sdb_object_deref(SDB_OBJ(conn)); } return NULL; } /* connection_handler */ -- 2.30.2