1 /*
2 * SysDB - src/frontend/sock.c
3 * Copyright (C) 2013 Sebastian 'tokkee' Harl <sh@tokkee.org>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
19 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
28 #if HAVE_CONFIG_H
29 # include "config.h"
30 #endif /* HAVE_CONFIG_H */
32 #include "sysdb.h"
33 #include "core/object.h"
34 #include "frontend/connection-private.h"
35 #include "frontend/sock.h"
37 #include "utils/channel.h"
38 #include "utils/error.h"
39 #include "utils/llist.h"
40 #include "utils/os.h"
41 #include "utils/strbuf.h"
43 #include <assert.h>
44 #include <errno.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <string.h>
50 #include <unistd.h>
52 #include <sys/time.h>
53 #include <sys/types.h>
54 #include <sys/select.h>
55 #include <sys/socket.h>
56 #include <sys/un.h>
58 #include <libgen.h>
60 #include <pthread.h>
62 /*
63 * private data types
64 */
66 typedef struct {
67 char *address;
68 int type;
70 int sock_fd;
71 } listener_t;
73 typedef struct {
74 int type;
75 const char *prefix;
77 int (*opener)(listener_t *);
78 void (*closer)(listener_t *);
79 } fe_listener_impl_t;
81 struct sdb_fe_socket {
82 listener_t *listeners;
83 size_t listeners_num;
85 sdb_llist_t *open_connections;
87 /* channel used for communication between main
88 * and connection handler threads */
89 sdb_channel_t *chan;
90 };
92 /*
93 * connection management functions
94 */
96 static int
97 open_unix_sock(listener_t *listener)
98 {
99 const char *addr;
100 char *addr_copy;
101 char *base_dir;
102 struct sockaddr_un sa;
103 int status;
105 listener->sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
106 if (listener->sock_fd < 0) {
107 char buf[1024];
108 sdb_log(SDB_LOG_ERR, "frontend: Failed to open UNIX socket %s: %s",
109 listener->address, sdb_strerror(errno, buf, sizeof(buf)));
110 return -1;
111 }
113 if (*listener->address == '/')
114 addr = listener->address;
115 else
116 addr = listener->address + strlen("unix:");
118 memset(&sa, 0, sizeof(sa));
119 sa.sun_family = AF_UNIX;
120 strncpy(sa.sun_path, addr, sizeof(sa.sun_path));
122 addr_copy = strdup(addr);
123 if (! addr_copy) {
124 char errbuf[1024];
125 sdb_log(SDB_LOG_ERR, "frontend: strdup failed: %s",
126 sdb_strerror(errno, errbuf, sizeof(errbuf)));
127 return -1;
128 }
129 base_dir = dirname(addr_copy);
131 /* ensure that the directory exists */
132 if (sdb_mkdir_all(base_dir, 0777)) {
133 char errbuf[1024];
134 sdb_log(SDB_LOG_ERR, "frontend: Failed to create directory '%s': %s",
135 base_dir, sdb_strerror(errno, errbuf, sizeof(errbuf)));
136 free(addr_copy);
137 return -1;
138 }
139 free(addr_copy);
141 if (unlink(addr) && (errno != ENOENT)) {
142 char errbuf[1024];
143 sdb_log(SDB_LOG_WARNING, "frontend: Failed to remove stale UNIX "
144 "socket %s: %s", listener->address + strlen("unix:"),
145 sdb_strerror(errno, errbuf, sizeof(errbuf)));
146 }
148 status = bind(listener->sock_fd, (struct sockaddr *)&sa, sizeof(sa));
149 if (status) {
150 char buf[1024];
151 sdb_log(SDB_LOG_ERR, "frontend: Failed to bind to UNIX socket %s: %s",
152 listener->address, sdb_strerror(errno, buf, sizeof(buf)));
153 return -1;
154 }
155 return 0;
156 } /* open_unix_sock */
158 static void
159 close_unix_sock(listener_t *listener)
160 {
161 const char *addr;
162 assert(listener);
164 if (! listener->address)
165 return;
167 if (*listener->address == '/')
168 addr = listener->address;
169 else
170 addr = listener->address + strlen("unix:");
172 if (listener->sock_fd >= 0)
173 close(listener->sock_fd);
174 listener->sock_fd = -1;
176 unlink(addr);
177 } /* close_unix_sock */
179 /*
180 * private variables
181 */
183 /* the enum has to be sorted the same as the implementations array
184 * to ensure that the type may be used as index into the array */
185 enum {
186 LISTENER_UNIXSOCK = 0, /* this is the default */
187 };
188 static fe_listener_impl_t listener_impls[] = {
189 { LISTENER_UNIXSOCK, "unix", open_unix_sock, close_unix_sock },
190 };
192 /*
193 * private helper functions
194 */
196 static int
197 listener_listen(listener_t *listener)
198 {
199 assert(listener);
201 /* try to reopen */
202 if (listener->sock_fd < 0)
203 if (listener_impls[listener->type].opener(listener))
204 return -1;
205 assert(listener->sock_fd >= 0);
207 if (listen(listener->sock_fd, /* backlog = */ 32)) {
208 char buf[1024];
209 sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s",
210 listener->address, sdb_strerror(errno, buf, sizeof(buf)));
211 return -1;
212 }
213 return 0;
214 } /* listener_listen */
216 static void
217 listener_close(listener_t *listener)
218 {
219 assert(listener);
221 if (listener_impls[listener->type].closer)
222 listener_impls[listener->type].closer(listener);
224 if (listener->sock_fd >= 0)
225 close(listener->sock_fd);
226 listener->sock_fd = -1;
227 } /* listener_close */
229 static int
230 get_type(const char *address)
231 {
232 char *sep;
233 size_t len;
234 size_t i;
236 sep = strchr(address, (int)':');
237 if (! sep)
238 return listener_impls[0].type;
240 assert(sep > address);
241 len = (size_t)(sep - address);
243 for (i = 0; i < SDB_STATIC_ARRAY_LEN(listener_impls); ++i) {
244 fe_listener_impl_t *impl = listener_impls + i;
246 if (!strncmp(address, impl->prefix, len)) {
247 assert(impl->type == (int)i);
248 return impl->type;
249 }
250 }
251 return -1;
252 } /* get_type */
254 static void
255 listener_destroy(listener_t *listener)
256 {
257 if (! listener)
258 return;
260 listener_close(listener);
262 if (listener->address)
263 free(listener->address);
264 listener->address = NULL;
265 } /* listener_destroy */
267 static listener_t *
268 listener_create(sdb_fe_socket_t *sock, const char *address)
269 {
270 listener_t *listener;
271 int type;
273 type = get_type(address);
274 if (type < 0) {
275 sdb_log(SDB_LOG_ERR, "frontend: Unsupported address type specified "
276 "in listen address '%s'", address);
277 return NULL;
278 }
280 listener = realloc(sock->listeners,
281 (sock->listeners_num + 1) * sizeof(*sock->listeners));
282 if (! listener) {
283 char buf[1024];
284 sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate memory: %s",
285 sdb_strerror(errno, buf, sizeof(buf)));
286 return NULL;
287 }
289 sock->listeners = listener;
290 listener = sock->listeners + sock->listeners_num;
292 listener->sock_fd = -1;
293 listener->address = strdup(address);
294 if (! listener->address) {
295 char buf[1024];
296 sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate memory: %s",
297 sdb_strerror(errno, buf, sizeof(buf)));
298 listener_destroy(listener);
299 return NULL;
300 }
301 listener->type = type;
303 if (listener_impls[type].opener(listener)) {
304 /* prints error */
305 listener_destroy(listener);
306 return NULL;
307 }
309 ++sock->listeners_num;
310 return listener;
311 } /* listener_create */
313 static void
314 socket_clear(sdb_fe_socket_t *sock)
315 {
316 size_t i;
318 assert(sock);
319 for (i = 0; i < sock->listeners_num; ++i)
320 listener_destroy(sock->listeners + i);
321 if (sock->listeners)
322 free(sock->listeners);
323 sock->listeners = NULL;
324 sock->listeners_num = 0;
325 } /* socket_clear */
327 static void
328 socket_close(sdb_fe_socket_t *sock)
329 {
330 size_t i;
332 assert(sock);
333 for (i = 0; i < sock->listeners_num; ++i)
334 listener_close(sock->listeners + i);
335 } /* socket_close */
337 /*
338 * connection handler functions
339 */
341 static void *
342 connection_handler(void *data)
343 {
344 sdb_fe_socket_t *sock = data;
346 assert(sock);
348 while (42) {
349 struct timespec timeout = { 0, 500000000 }; /* .5 seconds */
350 sdb_conn_t *conn;
351 int status;
353 errno = 0;
354 status = sdb_channel_select(sock->chan, /* read */ NULL, &conn,
355 /* write */ NULL, NULL, &timeout);
356 if (status) {
357 char buf[1024];
359 if (errno == ETIMEDOUT)
360 continue;
361 if (errno == EBADF) /* channel shut down */
362 break;
364 sdb_log(SDB_LOG_ERR, "frontend: Failed to read from channel: %s",
365 sdb_strerror(errno, buf, sizeof(buf)));
366 continue;
367 }
369 status = (int)sdb_connection_read(conn);
370 if (status <= 0) {
371 /* error or EOF -> close connection */
372 sdb_object_deref(SDB_OBJ(conn));
373 continue;
374 }
376 /* return the connection to the main loop */
377 if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) {
378 sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append "
379 "connection %s to list of open connections",
380 SDB_OBJ(conn)->name);
381 }
383 /* pass ownership back to list; or destroy in case of an error */
384 sdb_object_deref(SDB_OBJ(conn));
385 }
386 return NULL;
387 } /* connection_handler */
389 static int
390 connection_accept(sdb_fe_socket_t *sock, listener_t *listener)
391 {
392 sdb_object_t *obj;
393 int status;
395 obj = SDB_OBJ(sdb_connection_accept(listener->sock_fd));
396 if (! obj)
397 return -1;
399 status = sdb_llist_append(sock->open_connections, obj);
400 if (status)
401 sdb_log(SDB_LOG_ERR, "frontend: Failed to append "
402 "connection %s to list of open connections",
403 obj->name);
405 /* hand ownership over to the list; or destroy in case of an error */
406 sdb_object_deref(obj);
407 return status;
408 } /* connection_accept */
410 static int
411 socket_handle_incoming(sdb_fe_socket_t *sock,
412 fd_set *ready, fd_set *exceptions)
413 {
414 sdb_llist_iter_t *iter;
415 size_t i;
417 for (i = 0; i < sock->listeners_num; ++i) {
418 listener_t *listener = sock->listeners + i;
419 if (FD_ISSET(listener->sock_fd, ready))
420 if (connection_accept(sock, listener))
421 continue;
422 }
424 iter = sdb_llist_get_iter(sock->open_connections);
425 if (! iter) {
426 sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator "
427 "for open connections");
428 return -1;
429 }
431 while (sdb_llist_iter_has_next(iter)) {
432 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
434 if (FD_ISSET(CONN(obj)->fd, exceptions)) {
435 sdb_log(SDB_LOG_INFO, "Exception on fd %d",
436 CONN(obj)->fd);
437 /* close the connection */
438 sdb_llist_iter_remove_current(iter);
439 sdb_object_deref(obj);
440 continue;
441 }
443 if (FD_ISSET(CONN(obj)->fd, ready)) {
444 sdb_llist_iter_remove_current(iter);
445 sdb_channel_write(sock->chan, &obj);
446 }
447 }
448 sdb_llist_iter_destroy(iter);
449 return 0;
450 } /* socket_handle_incoming */
452 /*
453 * public API
454 */
456 sdb_fe_socket_t *
457 sdb_fe_sock_create(void)
458 {
459 sdb_fe_socket_t *sock;
461 sock = calloc(1, sizeof(*sock));
462 if (! sock)
463 return NULL;
465 sock->open_connections = sdb_llist_create();
466 if (! sock->open_connections) {
467 sdb_fe_sock_destroy(sock);
468 return NULL;
469 }
470 return sock;
471 } /* sdb_fe_sock_create */
473 void
474 sdb_fe_sock_destroy(sdb_fe_socket_t *sock)
475 {
476 if (! sock)
477 return;
479 socket_clear(sock);
481 sdb_llist_destroy(sock->open_connections);
482 sock->open_connections = NULL;
483 free(sock);
484 } /* sdb_fe_sock_destroy */
486 int
487 sdb_fe_sock_add_listener(sdb_fe_socket_t *sock, const char *address)
488 {
489 listener_t *listener;
491 if ((! sock) || (! address))
492 return -1;
494 listener = listener_create(sock, address);
495 if (! listener)
496 return -1;
497 return 0;
498 } /* sdb_fe_sock_add_listener */
500 void
501 sdb_fe_sock_clear_listeners(sdb_fe_socket_t *sock)
502 {
503 if (! sock)
504 return;
506 socket_clear(sock);
507 } /* sdb_fe_sock_clear_listeners */
509 int
510 sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
511 {
512 fd_set sockets;
513 int max_listen_fd = 0;
514 size_t i;
516 pthread_t handler_threads[loop->num_threads];
517 size_t num_threads;
519 if ((! sock) || (! sock->listeners_num) || sock->chan
520 || (! loop) || (loop->num_threads <= 0))
521 return -1;
523 if (! loop->do_loop)
524 return 0;
526 FD_ZERO(&sockets);
527 for (i = 0; i < sock->listeners_num; ++i) {
528 listener_t *listener = sock->listeners + i;
530 if (listener_listen(listener)) {
531 socket_close(sock);
532 return -1;
533 }
535 FD_SET(listener->sock_fd, &sockets);
536 if (listener->sock_fd > max_listen_fd)
537 max_listen_fd = listener->sock_fd;
538 }
540 sock->chan = sdb_channel_create(1024, sizeof(sdb_conn_t *));
541 if (! sock->chan) {
542 socket_close(sock);
543 return -1;
544 }
546 sdb_log(SDB_LOG_INFO, "frontend: Starting %zu connection "
547 "handler thread%s managing %zu listener%s",
548 loop->num_threads, loop->num_threads == 1 ? "" : "s",
549 sock->listeners_num, sock->listeners_num == 1 ? "" : "s");
551 num_threads = loop->num_threads;
552 memset(&handler_threads, 0, sizeof(handler_threads));
553 for (i = 0; i < num_threads; ++i) {
554 errno = 0;
555 if (pthread_create(&handler_threads[i], /* attr = */ NULL,
556 connection_handler, /* arg = */ sock)) {
557 char errbuf[1024];
558 sdb_log(SDB_LOG_ERR, "frontend: Failed to create "
559 "connection handler thread: %s",
560 sdb_strerror(errno, errbuf, sizeof(errbuf)));
561 num_threads = i;
562 break;
563 }
564 }
566 while (loop->do_loop && num_threads) {
567 struct timeval timeout = { 1, 0 }; /* one second */
568 sdb_llist_iter_t *iter;
570 int max_fd = max_listen_fd;
571 fd_set ready;
572 fd_set exceptions;
573 int n;
575 FD_ZERO(&ready);
576 FD_ZERO(&exceptions);
578 ready = sockets;
580 iter = sdb_llist_get_iter(sock->open_connections);
581 if (! iter) {
582 sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator "
583 "for open connections");
584 break;
585 }
587 while (sdb_llist_iter_has_next(iter)) {
588 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
590 if (CONN(obj)->fd < 0) {
591 sdb_llist_iter_remove_current(iter);
592 sdb_object_deref(obj);
593 continue;
594 }
596 FD_SET(CONN(obj)->fd, &ready);
597 FD_SET(CONN(obj)->fd, &exceptions);
599 if (CONN(obj)->fd > max_fd)
600 max_fd = CONN(obj)->fd;
601 }
602 sdb_llist_iter_destroy(iter);
604 errno = 0;
605 n = select(max_fd + 1, &ready, NULL, &exceptions, &timeout);
606 if (n < 0) {
607 char buf[1024];
609 if (errno == EINTR)
610 continue;
612 sdb_log(SDB_LOG_ERR, "frontend: Failed to monitor sockets: %s",
613 sdb_strerror(errno, buf, sizeof(buf)));
614 break;
615 }
616 else if (! n)
617 continue;
619 /* handle new and open connections */
620 if (socket_handle_incoming(sock, &ready, &exceptions))
621 break;
622 }
624 socket_close(sock);
626 sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads "
627 "to terminate");
628 if (! sdb_channel_shutdown(sock->chan))
629 for (i = 0; i < num_threads; ++i)
630 pthread_join(handler_threads[i], NULL);
631 /* else: we tried our best; let the operating system clean up */
633 sdb_channel_destroy(sock->chan);
634 sock->chan = NULL;
636 if (! num_threads)
637 return -1;
638 return 0;
639 } /* sdb_fe_sock_listen_and_server */
641 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */