From 633201fd36edb21a0283c2541145a13ffea1d1b4 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Fri, 25 Oct 2013 19:03:32 +0200 Subject: [PATCH] socket frontend: Pass socket object to handler threads. This will allow them to pass back connections to the main loop. --- src/frontend/sock.c | 51 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/src/frontend/sock.c b/src/frontend/sock.c index 667b616..fb79f10 100644 --- a/src/frontend/sock.c +++ b/src/frontend/sock.c @@ -92,6 +92,10 @@ struct sdb_fe_socket { size_t listeners_num; sdb_llist_t *open_connections; + + /* channel used for communication between main + * and connection handler threads */ + sdb_channel_t *chan; }; /* @@ -302,12 +306,41 @@ static sdb_type_t connection_type = { * connection handler functions */ +/* returns negative value on error, 0 on EOF, number of packets else */ +static int +connection_read(int fd) +{ + int n = 0; + + while (42) { + int32_t cmd; + ssize_t status; + + errno = 0; + status = read(fd, &cmd, sizeof(cmd)); + if (status < 0) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + return n + 1; + return (int)status; + } + else if (! status) /* EOF */ + return 0; + + /* XXX */ + sdb_log(SDB_LOG_DEBUG, "frontend: read command %i from fd=%i", + cmd, fd); + ++n; + } + + return n + 1; +} /* connection_read */ + static void * connection_handler(void *data) { - sdb_channel_t *chan = data; + sdb_fe_socket_t *sock = data; - assert(chan); + assert(sock); while (42) { struct timespec timeout = { 0, 500000000 }; /* .5 seconds */ @@ -315,7 +348,8 @@ connection_handler(void *data) int status; errno = 0; - status = sdb_channel_select(chan, NULL, &conn, NULL, NULL, &timeout); + status = sdb_channel_select(sock->chan, /* read */ NULL, &conn, + /* write */ NULL, NULL, &timeout); if (status) { char buf[1024]; @@ -419,7 +453,6 @@ sdb_fe_sock_add_listener(sdb_fe_socket_t *sock, const char *address) int sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) { - sdb_channel_t *chan; fd_set sockets; int max_listen_fd = 0; size_t i; @@ -447,15 +480,15 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) max_listen_fd = listener->sock_fd; } - chan = sdb_channel_create(1024, sizeof(connection_obj_t *)); - if (! chan) + sock->chan = sdb_channel_create(1024, sizeof(connection_obj_t *)); + if (! sock->chan) return -1; memset(&handler_threads, 0, sizeof(handler_threads)); /* XXX: error handling */ for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i) pthread_create(&handler_threads[i], /* attr = */ NULL, - connection_handler, /* arg = */ chan); + connection_handler, /* arg = */ sock); while (loop->do_loop) { fd_set ready; @@ -530,7 +563,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) if (FD_ISSET(CONN(obj)->conn.fd, &ready)) { sdb_log(SDB_LOG_INFO, "Data on fd %d", CONN(obj)->conn.fd); sdb_llist_iter_remove_current(iter); - sdb_channel_write(chan, &obj); + sdb_channel_write(sock->chan, &obj); } } sdb_llist_iter_destroy(iter); @@ -538,7 +571,7 @@ sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop) sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads " "to terminate"); - if (! sdb_channel_shutdown(chan)) + if (! sdb_channel_shutdown(sock->chan)) for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i) pthread_join(handler_threads[i], NULL); /* else: we tried our best; let the operating system clean up */ -- 2.30.2