53a8a860d6f5a811181f0e4ba831d9e58704165f
1 /*
2 * SysDB - src/frontend/sock.c
3 * Copyright (C) 2013 Sebastian 'tokkee' Harl <sh@tokkee.org>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
19 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
28 #include "sysdb.h"
29 #include "core/error.h"
30 #include "core/object.h"
31 #include "frontend/sock.h"
33 #include "utils/channel.h"
34 #include "utils/llist.h"
36 #include <assert.h>
38 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <string.h>
44 #include <unistd.h>
46 #include <fcntl.h>
48 #include <sys/time.h>
49 #include <sys/types.h>
50 #include <sys/select.h>
51 #include <sys/socket.h>
52 #include <sys/un.h>
54 #include <pthread.h>
56 /* name of connection objects */
57 #define CONN_FD_PREFIX "conn#"
58 #define CONN_FD_PLACEHOLDER "XXXXXXX"
60 /*
61 * private data types
62 */
64 typedef struct {
65 sdb_object_t super;
67 int fd;
68 struct sockaddr_storage client_addr;
69 socklen_t client_addr_len;
70 } connection_obj_t;
71 #define CONN(obj) ((connection_obj_t *)(obj))
73 typedef struct {
74 char *address;
75 int type;
77 int sock_fd;
78 } listener_t;
80 typedef struct {
81 int type;
82 const char *prefix;
84 int (*opener)(listener_t *);
85 } fe_listener_impl_t;
87 struct sdb_fe_socket {
88 listener_t *listeners;
89 size_t listeners_num;
91 sdb_llist_t *open_connections;
93 /* channel used for communication between main
94 * and connection handler threads */
95 sdb_channel_t *chan;
96 };
98 /*
99 * connection management functions
100 */
102 static int
103 open_unix_sock(listener_t *listener)
104 {
105 struct sockaddr_un sa;
106 int status;
108 listener->sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
109 if (listener->sock_fd < 0) {
110 char buf[1024];
111 sdb_log(SDB_LOG_ERR, "frontend: Failed to open UNIX socket %s: %s",
112 listener->address, sdb_strerror(errno, buf, sizeof(buf)));
113 return -1;
114 }
116 memset(&sa, 0, sizeof(sa));
117 sa.sun_family = AF_UNIX;
118 strncpy(sa.sun_path, listener->address + strlen("unix:"),
119 sizeof(sa.sun_path));
121 status = bind(listener->sock_fd, (struct sockaddr *)&sa, sizeof(sa));
122 if (status) {
123 char buf[1024];
124 sdb_log(SDB_LOG_ERR, "frontend: Failed to bind to UNIX socket %s: %s",
125 listener->address, sdb_strerror(errno, buf, sizeof(buf)));
126 return -1;
127 }
128 return 0;
129 } /* open_unix_sock */
131 /*
132 * private variables
133 */
135 /* the enum has to be sorted the same as the implementations array
136 * to ensure that the type may be used as index into the array */
137 enum {
138 LISTENER_UNIXSOCK = 0,
139 };
140 static fe_listener_impl_t listener_impls[] = {
141 { LISTENER_UNIXSOCK, "unix", open_unix_sock },
142 };
144 /*
145 * private helper functions
146 */
148 static int
149 get_type(const char *address)
150 {
151 char *sep;
152 size_t len;
153 size_t i;
155 sep = strchr(address, (int)':');
156 if (! sep)
157 return -1;
159 assert(sep > address);
160 len = (size_t)(sep - address);
162 for (i = 0; i < SDB_STATIC_ARRAY_LEN(listener_impls); ++i) {
163 fe_listener_impl_t *impl = listener_impls + i;
165 if (!strncmp(address, impl->prefix, len)) {
166 assert(impl->type == (int)i);
167 return impl->type;
168 }
169 }
170 return -1;
171 } /* get_type */
173 static void
174 listener_destroy(listener_t *listener)
175 {
176 if (! listener)
177 return;
179 if (listener->sock_fd >= 0)
180 close(listener->sock_fd);
181 listener->sock_fd = -1;
183 if (listener->address)
184 free(listener->address);
185 } /* listener_destroy */
187 static listener_t *
188 listener_create(sdb_fe_socket_t *sock, const char *address)
189 {
190 listener_t *listener;
191 int type;
193 type = get_type(address);
194 if (type < 0) {
195 sdb_log(SDB_LOG_ERR, "frontend: Unsupported address type specified "
196 "in listen address '%s'", address);
197 return NULL;
198 }
200 listener = realloc(sock->listeners,
201 sock->listeners_num * sizeof(*sock->listeners));
202 if (! listener) {
203 char buf[1024];
204 sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate memory: %s",
205 sdb_strerror(errno, buf, sizeof(buf)));
206 return NULL;
207 }
209 sock->listeners = listener;
210 listener = sock->listeners + sock->listeners_num;
212 listener->sock_fd = -1;
213 listener->address = strdup(address);
214 if (! listener->address) {
215 char buf[1024];
216 sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate memory: %s",
217 sdb_strerror(errno, buf, sizeof(buf)));
218 listener_destroy(listener);
219 return NULL;
220 }
221 listener->type = type;
223 if (listener_impls[type].opener(listener)) {
224 /* prints error */
225 listener_destroy(listener);
226 return NULL;
227 }
229 ++sock->listeners_num;
230 return listener;
231 } /* listener_create */
233 static int
234 listener_listen(listener_t *listener)
235 {
236 assert(listener);
238 /* try to reopen */
239 if (listener->sock_fd < 0)
240 if (listener_impls[listener->type].opener(listener))
241 return -1;
242 assert(listener->sock_fd >= 0);
244 if (listen(listener->sock_fd, /* backlog = */ 32)) {
245 char buf[1024];
246 sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s",
247 listener->address, sdb_strerror(errno, buf, sizeof(buf)));
248 return -1;
249 }
250 return 0;
251 } /* listener_listen */
253 static void
254 listener_close(listener_t *listener)
255 {
256 assert(listener);
258 if (listener->sock_fd < 0)
259 return;
261 close(listener->sock_fd);
262 listener->sock_fd = -1;
263 } /* listener_close */
265 static void
266 socket_close(sdb_fe_socket_t *sock)
267 {
268 size_t i;
270 assert(sock);
271 for (i = 0; i < sock->listeners_num; ++i)
272 listener_close(sock->listeners + i);
273 } /* socket_close */
275 /*
276 * private data types
277 */
279 static int
280 connection_init(sdb_object_t *obj, va_list ap)
281 {
282 connection_obj_t *conn;
283 int sock_fd;
284 int sock_fl;
286 assert(obj);
287 conn = CONN(obj);
289 sock_fd = va_arg(ap, int);
291 conn->client_addr_len = sizeof(conn->client_addr);
292 conn->fd = accept(sock_fd, (struct sockaddr *)&conn->client_addr,
293 &conn->client_addr_len);
295 if (conn->fd < 0) {
296 char buf[1024];
297 sdb_log(SDB_LOG_ERR, "frontend: Failed to accept remote "
298 "connection: %s", sdb_strerror(errno,
299 buf, sizeof(buf)));
300 return -1;
301 }
303 if (conn->client_addr.ss_family != AF_UNIX) {
304 sdb_log(SDB_LOG_ERR, "frontend: Accepted connection using "
305 "unexpected family type %d", conn->client_addr.ss_family);
306 return -1;
307 }
309 sock_fl = fcntl(conn->fd, F_GETFL);
310 if (fcntl(conn->fd, F_SETFL, sock_fl | O_NONBLOCK)) {
311 char buf[1024];
312 sdb_log(SDB_LOG_ERR, "frontend: Failed to switch connection conn#%i "
313 "to non-blocking mode: %s", conn->fd,
314 sdb_strerror(errno, buf, sizeof(buf)));
315 return -1;
316 }
318 sdb_log(SDB_LOG_DEBUG, "frontend: Accepted connection on fd=%i",
319 conn->fd);
321 /* update the object name */
322 snprintf(obj->name + strlen(CONN_FD_PREFIX),
323 strlen(CONN_FD_PLACEHOLDER), "%i", conn->fd);
324 return 0;
325 } /* connection_init */
327 static void
328 connection_destroy(sdb_object_t *obj)
329 {
330 connection_obj_t *conn;
332 assert(obj);
333 conn = CONN(obj);
335 sdb_log(SDB_LOG_DEBUG, "frontend: Closing connection on fd=%i", conn->fd);
336 close(conn->fd);
337 conn->fd = -1;
338 } /* connection_destroy */
340 static sdb_type_t connection_type = {
341 /* size = */ sizeof(connection_obj_t),
342 /* init = */ connection_init,
343 /* destroy = */ connection_destroy,
344 };
346 /*
347 * connection handler functions
348 */
350 /* returns negative value on error, 0 on EOF, number of packets else */
351 static int
352 connection_read(int fd)
353 {
354 int n = 0;
356 while (42) {
357 int32_t cmd;
358 ssize_t status;
360 errno = 0;
361 status = read(fd, &cmd, sizeof(cmd));
362 if (status < 0) {
363 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
364 return n + 1;
365 return (int)status;
366 }
367 else if (! status) /* EOF */
368 return 0;
370 /* XXX */
371 sdb_log(SDB_LOG_DEBUG, "frontend: read command %i from fd=%i",
372 cmd, fd);
373 ++n;
374 }
376 return n + 1;
377 } /* connection_read */
379 static void *
380 connection_handler(void *data)
381 {
382 sdb_fe_socket_t *sock = data;
384 assert(sock);
386 while (42) {
387 struct timespec timeout = { 0, 500000000 }; /* .5 seconds */
388 connection_obj_t *conn;
389 int status;
391 errno = 0;
392 status = sdb_channel_select(sock->chan, /* read */ NULL, &conn,
393 /* write */ NULL, NULL, &timeout);
394 if (status) {
395 char buf[1024];
397 if (errno == ETIMEDOUT)
398 continue;
399 if (errno == EBADF) /* channel shut down */
400 break;
402 sdb_log(SDB_LOG_ERR, "frontend: Failed to read from channel: %s",
403 sdb_strerror(errno, buf, sizeof(buf)));
404 continue;
405 }
407 status = connection_read(conn->fd);
408 if (status <= 0) {
409 /* error or EOF -> close connection */
410 sdb_object_deref(SDB_OBJ(conn));
411 }
412 else {
413 if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) {
414 sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append "
415 "connection %s to list of open connections",
416 SDB_OBJ(conn)->name);
417 }
419 /* pass ownership back to list; or destroy in case of an error */
420 sdb_object_deref(SDB_OBJ(conn));
421 }
422 }
423 return NULL;
424 } /* connection_handler */
426 static int
427 connection_accept(sdb_fe_socket_t *sock, listener_t *listener)
428 {
429 sdb_object_t *obj;
431 /* the placeholder will be replaced with the accepted file
432 * descriptor when initializing the object */
433 obj = sdb_object_create(CONN_FD_PREFIX CONN_FD_PLACEHOLDER,
434 connection_type, listener->sock_fd);
435 if (! obj)
436 return -1;
438 if (sdb_llist_append(sock->open_connections, obj)) {
439 sdb_log(SDB_LOG_ERR, "frontend: Failed to append "
440 "connection %s to list of open connections",
441 obj->name);
442 sdb_object_deref(obj);
443 return -1;
444 }
446 /* hand ownership over to the list */
447 sdb_object_deref(obj);
448 return 0;
449 } /* connection_accept */
451 static int
452 socket_handle_incoming(sdb_fe_socket_t *sock,
453 fd_set *ready, fd_set *exceptions)
454 {
455 sdb_llist_iter_t *iter;
456 size_t i;
458 for (i = 0; i < sock->listeners_num; ++i) {
459 listener_t *listener = sock->listeners + i;
460 if (FD_ISSET(listener->sock_fd, ready))
461 if (connection_accept(sock, listener))
462 continue;
463 }
465 iter = sdb_llist_get_iter(sock->open_connections);
466 if (! iter) {
467 sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator "
468 "for open connections");
469 return -1;
470 }
472 while (sdb_llist_iter_has_next(iter)) {
473 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
475 if (FD_ISSET(CONN(obj)->fd, exceptions))
476 sdb_log(SDB_LOG_INFO, "Exception on fd %d",
477 CONN(obj)->fd);
479 if (FD_ISSET(CONN(obj)->fd, ready)) {
480 sdb_llist_iter_remove_current(iter);
481 sdb_channel_write(sock->chan, &obj);
482 }
483 }
484 sdb_llist_iter_destroy(iter);
485 return 0;
486 } /* socket_handle_incoming */
488 /*
489 * public API
490 */
492 sdb_fe_socket_t *
493 sdb_fe_sock_create(void)
494 {
495 sdb_fe_socket_t *sock;
497 sock = calloc(1, sizeof(*sock));
498 if (! sock)
499 return NULL;
501 sock->open_connections = sdb_llist_create();
502 if (! sock->open_connections) {
503 sdb_fe_sock_destroy(sock);
504 return NULL;
505 }
506 return sock;
507 } /* sdb_fe_sock_create */
509 void
510 sdb_fe_sock_destroy(sdb_fe_socket_t *sock)
511 {
512 size_t i;
514 if (! sock)
515 return;
517 for (i = 0; i < sock->listeners_num; ++i) {
518 listener_destroy(sock->listeners + i);
519 }
520 if (sock->listeners)
521 free(sock->listeners);
522 sock->listeners = NULL;
524 sdb_llist_destroy(sock->open_connections);
525 sock->open_connections = NULL;
526 free(sock);
527 } /* sdb_fe_sock_destroy */
529 int
530 sdb_fe_sock_add_listener(sdb_fe_socket_t *sock, const char *address)
531 {
532 listener_t *listener;
534 if ((! sock) || (! address))
535 return -1;
537 listener = listener_create(sock, address);
538 if (! listener)
539 return -1;
540 return 0;
541 } /* sdb_fe_sock_add_listener */
543 int
544 sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
545 {
546 fd_set sockets;
547 int max_listen_fd = 0;
548 size_t i;
550 /* XXX: make the number of threads configurable */
551 pthread_t handler_threads[5];
553 if ((! sock) || (! sock->listeners_num) || (! loop) || sock->chan)
554 return -1;
556 FD_ZERO(&sockets);
557 for (i = 0; i < sock->listeners_num; ++i) {
558 listener_t *listener = sock->listeners + i;
560 if (listener_listen(listener)) {
561 socket_close(sock);
562 return -1;
563 }
565 FD_SET(listener->sock_fd, &sockets);
566 if (listener->sock_fd > max_listen_fd)
567 max_listen_fd = listener->sock_fd;
568 }
570 sock->chan = sdb_channel_create(1024, sizeof(connection_obj_t *));
571 if (! sock->chan) {
572 socket_close(sock);
573 return -1;
574 }
576 memset(&handler_threads, 0, sizeof(handler_threads));
577 /* XXX: error handling */
578 for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
579 pthread_create(&handler_threads[i], /* attr = */ NULL,
580 connection_handler, /* arg = */ sock);
582 while (loop->do_loop) {
583 struct timeval timeout = { 1, 0 }; /* one second */
584 sdb_llist_iter_t *iter;
586 int max_fd = max_listen_fd;
587 fd_set ready;
588 fd_set exceptions;
589 int n;
591 FD_ZERO(&ready);
592 FD_ZERO(&exceptions);
594 ready = sockets;
596 iter = sdb_llist_get_iter(sock->open_connections);
597 if (! iter) {
598 sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator "
599 "for open connections");
600 break;
601 }
603 while (sdb_llist_iter_has_next(iter)) {
604 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
605 FD_SET(CONN(obj)->fd, &ready);
606 FD_SET(CONN(obj)->fd, &exceptions);
608 if (CONN(obj)->fd > max_fd)
609 max_fd = CONN(obj)->fd;
610 }
611 sdb_llist_iter_destroy(iter);
613 errno = 0;
614 n = select(max_fd + 1, &ready, NULL, &exceptions, &timeout);
615 if (n < 0) {
616 char buf[1024];
618 if (errno == EINTR)
619 continue;
621 sdb_log(SDB_LOG_ERR, "frontend: Failed to monitor sockets: %s",
622 sdb_strerror(errno, buf, sizeof(buf)));
623 break;
624 }
625 else if (! n)
626 continue;
628 /* handle new and open connections */
629 if (socket_handle_incoming(sock, &ready, &exceptions))
630 break;
631 }
633 socket_close(sock);
635 sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads "
636 "to terminate");
637 if (! sdb_channel_shutdown(sock->chan))
638 for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
639 pthread_join(handler_threads[i], NULL);
640 /* else: we tried our best; let the operating system clean up */
642 sdb_channel_destroy(sock->chan);
643 sock->chan = NULL;
644 return 0;
645 } /* sdb_fe_sock_listen_and_server */
647 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */