cac49e1db1648b93cc7cd55d933383dd274103f9
1 /*
2 * SysDB - src/frontend/sock.c
3 * Copyright (C) 2013 Sebastian 'tokkee' Harl <sh@tokkee.org>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
19 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
28 #include "sysdb.h"
29 #include "core/error.h"
30 #include "core/object.h"
31 #include "frontend/connection-private.h"
32 #include "frontend/sock.h"
34 #include "utils/channel.h"
35 #include "utils/llist.h"
36 #include "utils/strbuf.h"
38 #include <assert.h>
39 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
45 #include <unistd.h>
47 #include <sys/time.h>
48 #include <sys/types.h>
49 #include <sys/select.h>
50 #include <sys/socket.h>
51 #include <sys/un.h>
53 #include <pthread.h>
55 /*
56 * private data types
57 */
59 typedef struct {
60 char *address;
61 int type;
63 int sock_fd;
64 } listener_t;
66 typedef struct {
67 int type;
68 const char *prefix;
70 int (*opener)(listener_t *);
71 } fe_listener_impl_t;
73 struct sdb_fe_socket {
74 listener_t *listeners;
75 size_t listeners_num;
77 sdb_llist_t *open_connections;
79 /* channel used for communication between main
80 * and connection handler threads */
81 sdb_channel_t *chan;
82 };
84 /*
85 * connection management functions
86 */
88 static int
89 open_unix_sock(listener_t *listener)
90 {
91 struct sockaddr_un sa;
92 int status;
94 listener->sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
95 if (listener->sock_fd < 0) {
96 char buf[1024];
97 sdb_log(SDB_LOG_ERR, "frontend: Failed to open UNIX socket %s: %s",
98 listener->address, sdb_strerror(errno, buf, sizeof(buf)));
99 return -1;
100 }
102 memset(&sa, 0, sizeof(sa));
103 sa.sun_family = AF_UNIX;
104 strncpy(sa.sun_path, listener->address + strlen("unix:"),
105 sizeof(sa.sun_path));
107 status = bind(listener->sock_fd, (struct sockaddr *)&sa, sizeof(sa));
108 if (status) {
109 char buf[1024];
110 sdb_log(SDB_LOG_ERR, "frontend: Failed to bind to UNIX socket %s: %s",
111 listener->address, sdb_strerror(errno, buf, sizeof(buf)));
112 return -1;
113 }
114 return 0;
115 } /* open_unix_sock */
117 /*
118 * private variables
119 */
121 /* the enum has to be sorted the same as the implementations array
122 * to ensure that the type may be used as index into the array */
123 enum {
124 LISTENER_UNIXSOCK = 0,
125 };
126 static fe_listener_impl_t listener_impls[] = {
127 { LISTENER_UNIXSOCK, "unix", open_unix_sock },
128 };
130 /*
131 * private helper functions
132 */
134 static int
135 get_type(const char *address)
136 {
137 char *sep;
138 size_t len;
139 size_t i;
141 sep = strchr(address, (int)':');
142 if (! sep)
143 return -1;
145 assert(sep > address);
146 len = (size_t)(sep - address);
148 for (i = 0; i < SDB_STATIC_ARRAY_LEN(listener_impls); ++i) {
149 fe_listener_impl_t *impl = listener_impls + i;
151 if (!strncmp(address, impl->prefix, len)) {
152 assert(impl->type == (int)i);
153 return impl->type;
154 }
155 }
156 return -1;
157 } /* get_type */
159 static void
160 listener_destroy(listener_t *listener)
161 {
162 if (! listener)
163 return;
165 if (listener->sock_fd >= 0)
166 close(listener->sock_fd);
167 listener->sock_fd = -1;
169 if (listener->address)
170 free(listener->address);
171 } /* listener_destroy */
173 static listener_t *
174 listener_create(sdb_fe_socket_t *sock, const char *address)
175 {
176 listener_t *listener;
177 int type;
179 type = get_type(address);
180 if (type < 0) {
181 sdb_log(SDB_LOG_ERR, "frontend: Unsupported address type specified "
182 "in listen address '%s'", address);
183 return NULL;
184 }
186 listener = realloc(sock->listeners,
187 sock->listeners_num * sizeof(*sock->listeners));
188 if (! listener) {
189 char buf[1024];
190 sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate memory: %s",
191 sdb_strerror(errno, buf, sizeof(buf)));
192 return NULL;
193 }
195 sock->listeners = listener;
196 listener = sock->listeners + sock->listeners_num;
198 listener->sock_fd = -1;
199 listener->address = strdup(address);
200 if (! listener->address) {
201 char buf[1024];
202 sdb_log(SDB_LOG_ERR, "frontend: Failed to allocate memory: %s",
203 sdb_strerror(errno, buf, sizeof(buf)));
204 listener_destroy(listener);
205 return NULL;
206 }
207 listener->type = type;
209 if (listener_impls[type].opener(listener)) {
210 /* prints error */
211 listener_destroy(listener);
212 return NULL;
213 }
215 ++sock->listeners_num;
216 return listener;
217 } /* listener_create */
219 static int
220 listener_listen(listener_t *listener)
221 {
222 assert(listener);
224 /* try to reopen */
225 if (listener->sock_fd < 0)
226 if (listener_impls[listener->type].opener(listener))
227 return -1;
228 assert(listener->sock_fd >= 0);
230 if (listen(listener->sock_fd, /* backlog = */ 32)) {
231 char buf[1024];
232 sdb_log(SDB_LOG_ERR, "frontend: Failed to listen on socket %s: %s",
233 listener->address, sdb_strerror(errno, buf, sizeof(buf)));
234 return -1;
235 }
236 return 0;
237 } /* listener_listen */
239 static void
240 listener_close(listener_t *listener)
241 {
242 assert(listener);
244 if (listener->sock_fd < 0)
245 return;
247 close(listener->sock_fd);
248 listener->sock_fd = -1;
249 } /* listener_close */
251 static void
252 socket_close(sdb_fe_socket_t *sock)
253 {
254 size_t i;
256 assert(sock);
257 for (i = 0; i < sock->listeners_num; ++i)
258 listener_close(sock->listeners + i);
259 } /* socket_close */
261 /*
262 * connection handler functions
263 */
265 static void *
266 connection_handler(void *data)
267 {
268 sdb_fe_socket_t *sock = data;
270 assert(sock);
272 while (42) {
273 struct timespec timeout = { 0, 500000000 }; /* .5 seconds */
274 sdb_conn_t *conn;
275 int status;
277 errno = 0;
278 status = sdb_channel_select(sock->chan, /* read */ NULL, &conn,
279 /* write */ NULL, NULL, &timeout);
280 if (status) {
281 char buf[1024];
283 if (errno == ETIMEDOUT)
284 continue;
285 if (errno == EBADF) /* channel shut down */
286 break;
288 sdb_log(SDB_LOG_ERR, "frontend: Failed to read from channel: %s",
289 sdb_strerror(errno, buf, sizeof(buf)));
290 continue;
291 }
293 status = (int)sdb_connection_read(conn);
294 if (status <= 0) {
295 /* error or EOF -> close connection */
296 sdb_object_deref(SDB_OBJ(conn));
297 continue;
298 }
300 /* return the connection to the main loop */
301 if (sdb_llist_append(sock->open_connections, SDB_OBJ(conn))) {
302 sdb_log(SDB_LOG_ERR, "frontend: Failed to re-append "
303 "connection %s to list of open connections",
304 SDB_OBJ(conn)->name);
305 }
307 /* pass ownership back to list; or destroy in case of an error */
308 sdb_object_deref(SDB_OBJ(conn));
309 }
310 return NULL;
311 } /* connection_handler */
313 static int
314 connection_accept(sdb_fe_socket_t *sock, listener_t *listener)
315 {
316 sdb_object_t *obj;
317 int status;
319 obj = SDB_OBJ(sdb_connection_accept(listener->sock_fd));
320 if (! obj)
321 return -1;
323 status = sdb_llist_append(sock->open_connections, obj);
324 if (status)
325 sdb_log(SDB_LOG_ERR, "frontend: Failed to append "
326 "connection %s to list of open connections",
327 obj->name);
329 /* hand ownership over to the list; or destroy in case of an error */
330 sdb_object_deref(obj);
331 return status;
332 } /* connection_accept */
334 static int
335 socket_handle_incoming(sdb_fe_socket_t *sock,
336 fd_set *ready, fd_set *exceptions)
337 {
338 sdb_llist_iter_t *iter;
339 size_t i;
341 for (i = 0; i < sock->listeners_num; ++i) {
342 listener_t *listener = sock->listeners + i;
343 if (FD_ISSET(listener->sock_fd, ready))
344 if (connection_accept(sock, listener))
345 continue;
346 }
348 iter = sdb_llist_get_iter(sock->open_connections);
349 if (! iter) {
350 sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator "
351 "for open connections");
352 return -1;
353 }
355 while (sdb_llist_iter_has_next(iter)) {
356 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
358 if (FD_ISSET(CONN(obj)->fd, exceptions))
359 sdb_log(SDB_LOG_INFO, "Exception on fd %d",
360 CONN(obj)->fd);
362 if (FD_ISSET(CONN(obj)->fd, ready)) {
363 sdb_llist_iter_remove_current(iter);
364 sdb_channel_write(sock->chan, &obj);
365 }
366 }
367 sdb_llist_iter_destroy(iter);
368 return 0;
369 } /* socket_handle_incoming */
371 /*
372 * public API
373 */
375 sdb_fe_socket_t *
376 sdb_fe_sock_create(void)
377 {
378 sdb_fe_socket_t *sock;
380 sock = calloc(1, sizeof(*sock));
381 if (! sock)
382 return NULL;
384 sock->open_connections = sdb_llist_create();
385 if (! sock->open_connections) {
386 sdb_fe_sock_destroy(sock);
387 return NULL;
388 }
389 return sock;
390 } /* sdb_fe_sock_create */
392 void
393 sdb_fe_sock_destroy(sdb_fe_socket_t *sock)
394 {
395 size_t i;
397 if (! sock)
398 return;
400 for (i = 0; i < sock->listeners_num; ++i) {
401 listener_destroy(sock->listeners + i);
402 }
403 if (sock->listeners)
404 free(sock->listeners);
405 sock->listeners = NULL;
407 sdb_llist_destroy(sock->open_connections);
408 sock->open_connections = NULL;
409 free(sock);
410 } /* sdb_fe_sock_destroy */
412 int
413 sdb_fe_sock_add_listener(sdb_fe_socket_t *sock, const char *address)
414 {
415 listener_t *listener;
417 if ((! sock) || (! address))
418 return -1;
420 listener = listener_create(sock, address);
421 if (! listener)
422 return -1;
423 return 0;
424 } /* sdb_fe_sock_add_listener */
426 int
427 sdb_fe_sock_listen_and_serve(sdb_fe_socket_t *sock, sdb_fe_loop_t *loop)
428 {
429 fd_set sockets;
430 int max_listen_fd = 0;
431 size_t i;
433 /* XXX: make the number of threads configurable */
434 pthread_t handler_threads[5];
436 if ((! sock) || (! sock->listeners_num) || (! loop) || sock->chan)
437 return -1;
439 FD_ZERO(&sockets);
440 for (i = 0; i < sock->listeners_num; ++i) {
441 listener_t *listener = sock->listeners + i;
443 if (listener_listen(listener)) {
444 socket_close(sock);
445 return -1;
446 }
448 FD_SET(listener->sock_fd, &sockets);
449 if (listener->sock_fd > max_listen_fd)
450 max_listen_fd = listener->sock_fd;
451 }
453 sock->chan = sdb_channel_create(1024, sizeof(sdb_conn_t *));
454 if (! sock->chan) {
455 socket_close(sock);
456 return -1;
457 }
459 memset(&handler_threads, 0, sizeof(handler_threads));
460 /* XXX: error handling */
461 for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
462 pthread_create(&handler_threads[i], /* attr = */ NULL,
463 connection_handler, /* arg = */ sock);
465 while (loop->do_loop) {
466 struct timeval timeout = { 1, 0 }; /* one second */
467 sdb_llist_iter_t *iter;
469 int max_fd = max_listen_fd;
470 fd_set ready;
471 fd_set exceptions;
472 int n;
474 FD_ZERO(&ready);
475 FD_ZERO(&exceptions);
477 ready = sockets;
479 iter = sdb_llist_get_iter(sock->open_connections);
480 if (! iter) {
481 sdb_log(SDB_LOG_ERR, "frontend: Failed to acquire iterator "
482 "for open connections");
483 break;
484 }
486 while (sdb_llist_iter_has_next(iter)) {
487 sdb_object_t *obj = sdb_llist_iter_get_next(iter);
488 FD_SET(CONN(obj)->fd, &ready);
489 FD_SET(CONN(obj)->fd, &exceptions);
491 if (CONN(obj)->fd > max_fd)
492 max_fd = CONN(obj)->fd;
493 }
494 sdb_llist_iter_destroy(iter);
496 errno = 0;
497 n = select(max_fd + 1, &ready, NULL, &exceptions, &timeout);
498 if (n < 0) {
499 char buf[1024];
501 if (errno == EINTR)
502 continue;
504 sdb_log(SDB_LOG_ERR, "frontend: Failed to monitor sockets: %s",
505 sdb_strerror(errno, buf, sizeof(buf)));
506 break;
507 }
508 else if (! n)
509 continue;
511 /* handle new and open connections */
512 if (socket_handle_incoming(sock, &ready, &exceptions))
513 break;
514 }
516 socket_close(sock);
518 sdb_log(SDB_LOG_INFO, "frontend: Waiting for connection handler threads "
519 "to terminate");
520 if (! sdb_channel_shutdown(sock->chan))
521 for (i = 0; i < SDB_STATIC_ARRAY_LEN(handler_threads); ++i)
522 pthread_join(handler_threads[i], NULL);
523 /* else: we tried our best; let the operating system clean up */
525 sdb_channel_destroy(sock->chan);
526 sock->chan = NULL;
527 return 0;
528 } /* sdb_fe_sock_listen_and_server */
530 /* vim: set tw=78 sw=4 ts=4 noexpandtab : */