1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
113 {
114 int fd;
115 char addr[PATH_MAX + 1];
116 int family;
117 socket_privilege privilege;
119 /* buffered IO */
120 char *rbuf;
121 off_t next_cmd;
122 off_t next_read;
124 char *wbuf;
125 ssize_t wbuf_len;
126 };
127 typedef struct listen_socket_s listen_socket_t;
129 struct cache_item_s;
130 typedef struct cache_item_s cache_item_t;
131 struct cache_item_s
132 {
133 char *file;
134 char **values;
135 int values_num;
136 time_t last_flush_time;
137 #define CI_FLAGS_IN_TREE (1<<0)
138 #define CI_FLAGS_IN_QUEUE (1<<1)
139 int flags;
140 pthread_cond_t flushed;
141 cache_item_t *prev;
142 cache_item_t *next;
143 };
145 struct callback_flush_data_s
146 {
147 time_t now;
148 time_t abs_timeout;
149 char **keys;
150 size_t keys_num;
151 };
152 typedef struct callback_flush_data_s callback_flush_data_t;
154 enum queue_side_e
155 {
156 HEAD,
157 TAIL
158 };
159 typedef enum queue_side_e queue_side_t;
161 /* max length of socket command or response */
162 #define CMD_MAX 4096
163 #define RBUF_SIZE (CMD_MAX*2)
165 /*
166 * Variables
167 */
168 static int stay_foreground = 0;
170 static listen_socket_t *listen_fds = NULL;
171 static size_t listen_fds_num = 0;
173 static int do_shutdown = 0;
175 static pthread_t queue_thread;
177 static pthread_t *connection_threads = NULL;
178 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
179 static int connection_threads_num = 0;
181 /* Cache stuff */
182 static GTree *cache_tree = NULL;
183 static cache_item_t *cache_queue_head = NULL;
184 static cache_item_t *cache_queue_tail = NULL;
185 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
186 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
188 static int config_write_interval = 300;
189 static int config_write_jitter = 0;
190 static int config_flush_interval = 3600;
191 static int config_flush_at_shutdown = 0;
192 static char *config_pid_file = NULL;
193 static char *config_base_dir = NULL;
194 static size_t _config_base_dir_len = 0;
195 static int config_write_base_only = 0;
197 static listen_socket_t **config_listen_address_list = NULL;
198 static int config_listen_address_list_len = 0;
200 static uint64_t stats_queue_length = 0;
201 static uint64_t stats_updates_received = 0;
202 static uint64_t stats_flush_received = 0;
203 static uint64_t stats_updates_written = 0;
204 static uint64_t stats_data_sets_written = 0;
205 static uint64_t stats_journal_bytes = 0;
206 static uint64_t stats_journal_rotate = 0;
207 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
209 /* Journaled updates */
210 static char *journal_cur = NULL;
211 static char *journal_old = NULL;
212 static FILE *journal_fh = NULL;
213 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
214 static int journal_write(char *cmd, char *args);
215 static void journal_done(void);
216 static void journal_rotate(void);
218 /*
219 * Functions
220 */
221 static void sig_common (const char *sig) /* {{{ */
222 {
223 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
224 do_shutdown++;
225 pthread_cond_broadcast(&cache_cond);
226 } /* }}} void sig_common */
228 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
229 {
230 sig_common("INT");
231 } /* }}} void sig_int_handler */
233 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
234 {
235 sig_common("TERM");
236 } /* }}} void sig_term_handler */
238 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
239 {
240 config_flush_at_shutdown = 1;
241 sig_common("USR1");
242 } /* }}} void sig_usr1_handler */
244 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
245 {
246 config_flush_at_shutdown = 0;
247 sig_common("USR2");
248 } /* }}} void sig_usr2_handler */
250 static void install_signal_handlers(void) /* {{{ */
251 {
252 /* These structures are static, because `sigaction' behaves weird if the are
253 * overwritten.. */
254 static struct sigaction sa_int;
255 static struct sigaction sa_term;
256 static struct sigaction sa_pipe;
257 static struct sigaction sa_usr1;
258 static struct sigaction sa_usr2;
260 /* Install signal handlers */
261 memset (&sa_int, 0, sizeof (sa_int));
262 sa_int.sa_handler = sig_int_handler;
263 sigaction (SIGINT, &sa_int, NULL);
265 memset (&sa_term, 0, sizeof (sa_term));
266 sa_term.sa_handler = sig_term_handler;
267 sigaction (SIGTERM, &sa_term, NULL);
269 memset (&sa_pipe, 0, sizeof (sa_pipe));
270 sa_pipe.sa_handler = SIG_IGN;
271 sigaction (SIGPIPE, &sa_pipe, NULL);
273 memset (&sa_pipe, 0, sizeof (sa_usr1));
274 sa_usr1.sa_handler = sig_usr1_handler;
275 sigaction (SIGUSR1, &sa_usr1, NULL);
277 memset (&sa_usr2, 0, sizeof (sa_usr2));
278 sa_usr2.sa_handler = sig_usr2_handler;
279 sigaction (SIGUSR2, &sa_usr2, NULL);
281 } /* }}} void install_signal_handlers */
283 static int open_pidfile(void) /* {{{ */
284 {
285 int fd;
286 char *file;
288 file = (config_pid_file != NULL)
289 ? config_pid_file
290 : LOCALSTATEDIR "/run/rrdcached.pid";
292 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
293 if (fd < 0)
294 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
295 file, rrd_strerror(errno));
297 return(fd);
298 } /* }}} static int open_pidfile */
300 static int write_pidfile (int fd) /* {{{ */
301 {
302 pid_t pid;
303 FILE *fh;
305 pid = getpid ();
307 fh = fdopen (fd, "w");
308 if (fh == NULL)
309 {
310 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
311 close(fd);
312 return (-1);
313 }
315 fprintf (fh, "%i\n", (int) pid);
316 fclose (fh);
318 return (0);
319 } /* }}} int write_pidfile */
321 static int remove_pidfile (void) /* {{{ */
322 {
323 char *file;
324 int status;
326 file = (config_pid_file != NULL)
327 ? config_pid_file
328 : LOCALSTATEDIR "/run/rrdcached.pid";
330 status = unlink (file);
331 if (status == 0)
332 return (0);
333 return (errno);
334 } /* }}} int remove_pidfile */
336 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
337 {
338 char *eol;
340 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
341 sock->next_read - sock->next_cmd);
343 if (eol == NULL)
344 {
345 /* no commands left, move remainder back to front of rbuf */
346 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
347 sock->next_read - sock->next_cmd);
348 sock->next_read -= sock->next_cmd;
349 sock->next_cmd = 0;
350 *len = 0;
351 return NULL;
352 }
353 else
354 {
355 char *cmd = sock->rbuf + sock->next_cmd;
356 *eol = '\0';
358 sock->next_cmd = eol - sock->rbuf + 1;
360 if (eol > sock->rbuf && *(eol-1) == '\r')
361 *(--eol) = '\0'; /* handle "\r\n" EOL */
363 *len = eol - cmd;
365 return cmd;
366 }
368 /* NOTREACHED */
369 assert(1==0);
370 }
372 /* add the characters directly to the write buffer */
373 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
374 {
375 char *new_buf;
377 assert(sock != NULL);
379 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
380 if (new_buf == NULL)
381 {
382 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
383 return -1;
384 }
386 strncpy(new_buf + sock->wbuf_len, str, len + 1);
388 sock->wbuf = new_buf;
389 sock->wbuf_len += len;
391 return 0;
392 } /* }}} static int add_to_wbuf */
394 /* add the text to the "extra" info that's sent after the status line */
395 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
396 {
397 va_list argp;
398 char buffer[CMD_MAX];
399 int len;
401 if (sock == NULL) return 0; /* journal replay mode */
403 va_start(argp, fmt);
404 #ifdef HAVE_VSNPRINTF
405 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
406 #else
407 len = vsprintf(buffer, fmt, argp);
408 #endif
409 va_end(argp);
410 if (len < 0)
411 {
412 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
413 return -1;
414 }
416 return add_to_wbuf(sock, buffer, len);
417 } /* }}} static int add_response_info */
419 static int count_lines(char *str) /* {{{ */
420 {
421 int lines = 0;
423 if (str != NULL)
424 {
425 while ((str = strchr(str, '\n')) != NULL)
426 {
427 ++lines;
428 ++str;
429 }
430 }
432 return lines;
433 } /* }}} static int count_lines */
435 /* send the response back to the user.
436 * returns 0 on success, -1 on error
437 * write buffer is always zeroed after this call */
438 static int send_response (listen_socket_t *sock, response_code rc,
439 char *fmt, ...) /* {{{ */
440 {
441 va_list argp;
442 char buffer[CMD_MAX];
443 int lines;
444 ssize_t wrote;
445 int rclen, len;
447 if (sock == NULL) return rc; /* journal replay mode */
449 if (rc == RESP_OK)
450 {
451 lines = count_lines(sock->wbuf);
452 }
453 else
454 lines = -1;
456 rclen = sprintf(buffer, "%d ", lines);
457 va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
460 #else
461 len = vsprintf(buffer+rclen, fmt, argp);
462 #endif
463 va_end(argp);
464 if (len < 0)
465 return -1;
467 len += rclen;
469 /* first write must be complete */
470 if (len != write(sock->fd, buffer, len))
471 {
472 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
473 return -1;
474 }
476 if (sock->wbuf != NULL)
477 {
478 wrote = 0;
479 while (wrote < sock->wbuf_len)
480 {
481 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
482 if (wb <= 0)
483 {
484 RRDD_LOG(LOG_INFO, "send_response: could not write results");
485 return -1;
486 }
487 wrote += wb;
488 }
489 }
491 free(sock->wbuf); sock->wbuf = NULL;
492 sock->wbuf_len = 0;
494 return 0;
495 } /* }}} */
497 static void wipe_ci_values(cache_item_t *ci, time_t when)
498 {
499 ci->values = NULL;
500 ci->values_num = 0;
502 ci->last_flush_time = when;
503 if (config_write_jitter > 0)
504 ci->last_flush_time += (random() % config_write_jitter);
505 }
507 /* remove_from_queue
508 * remove a "cache_item_t" item from the queue.
509 * must hold 'cache_lock' when calling this
510 */
511 static void remove_from_queue(cache_item_t *ci) /* {{{ */
512 {
513 if (ci == NULL) return;
515 if (ci->prev == NULL)
516 cache_queue_head = ci->next; /* reset head */
517 else
518 ci->prev->next = ci->next;
520 if (ci->next == NULL)
521 cache_queue_tail = ci->prev; /* reset the tail */
522 else
523 ci->next->prev = ci->prev;
525 ci->next = ci->prev = NULL;
526 ci->flags &= ~CI_FLAGS_IN_QUEUE;
527 } /* }}} static void remove_from_queue */
529 /*
530 * enqueue_cache_item:
531 * `cache_lock' must be acquired before calling this function!
532 */
533 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
534 queue_side_t side)
535 {
536 if (ci == NULL)
537 return (-1);
539 if (ci->values_num == 0)
540 return (0);
542 if (side == HEAD)
543 {
544 if (cache_queue_head == ci)
545 return 0;
547 /* remove from the double linked list */
548 if (ci->flags & CI_FLAGS_IN_QUEUE)
549 remove_from_queue(ci);
551 ci->prev = NULL;
552 ci->next = cache_queue_head;
553 if (ci->next != NULL)
554 ci->next->prev = ci;
555 cache_queue_head = ci;
557 if (cache_queue_tail == NULL)
558 cache_queue_tail = cache_queue_head;
559 }
560 else /* (side == TAIL) */
561 {
562 /* We don't move values back in the list.. */
563 if (ci->flags & CI_FLAGS_IN_QUEUE)
564 return (0);
566 assert (ci->next == NULL);
567 assert (ci->prev == NULL);
569 ci->prev = cache_queue_tail;
571 if (cache_queue_tail == NULL)
572 cache_queue_head = ci;
573 else
574 cache_queue_tail->next = ci;
576 cache_queue_tail = ci;
577 }
579 ci->flags |= CI_FLAGS_IN_QUEUE;
581 pthread_cond_broadcast(&cache_cond);
582 pthread_mutex_lock (&stats_lock);
583 stats_queue_length++;
584 pthread_mutex_unlock (&stats_lock);
586 return (0);
587 } /* }}} int enqueue_cache_item */
589 /*
590 * tree_callback_flush:
591 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
592 * while this is in progress.
593 */
594 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
595 gpointer data)
596 {
597 cache_item_t *ci;
598 callback_flush_data_t *cfd;
600 ci = (cache_item_t *) value;
601 cfd = (callback_flush_data_t *) data;
603 if ((ci->last_flush_time <= cfd->abs_timeout)
604 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
605 && (ci->values_num > 0))
606 {
607 enqueue_cache_item (ci, TAIL);
608 }
609 else if ((do_shutdown != 0)
610 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
611 && (ci->values_num > 0))
612 {
613 enqueue_cache_item (ci, TAIL);
614 }
615 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
616 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
617 && (ci->values_num <= 0))
618 {
619 char **temp;
621 temp = (char **) realloc (cfd->keys,
622 sizeof (char *) * (cfd->keys_num + 1));
623 if (temp == NULL)
624 {
625 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
626 return (FALSE);
627 }
628 cfd->keys = temp;
629 /* Make really sure this points to the _same_ place */
630 assert ((char *) key == ci->file);
631 cfd->keys[cfd->keys_num] = (char *) key;
632 cfd->keys_num++;
633 }
635 return (FALSE);
636 } /* }}} gboolean tree_callback_flush */
638 static int flush_old_values (int max_age)
639 {
640 callback_flush_data_t cfd;
641 size_t k;
643 memset (&cfd, 0, sizeof (cfd));
644 /* Pass the current time as user data so that we don't need to call
645 * `time' for each node. */
646 cfd.now = time (NULL);
647 cfd.keys = NULL;
648 cfd.keys_num = 0;
650 if (max_age > 0)
651 cfd.abs_timeout = cfd.now - max_age;
652 else
653 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
655 /* `tree_callback_flush' will return the keys of all values that haven't
656 * been touched in the last `config_flush_interval' seconds in `cfd'.
657 * The char*'s in this array point to the same memory as ci->file, so we
658 * don't need to free them separately. */
659 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
661 for (k = 0; k < cfd.keys_num; k++)
662 {
663 cache_item_t *ci;
665 /* This must not fail. */
666 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
667 assert (ci != NULL);
669 /* If we end up here with values available, something's seriously
670 * messed up. */
671 assert (ci->values_num == 0);
673 /* Remove the node from the tree */
674 g_tree_remove (cache_tree, cfd.keys[k]);
675 cfd.keys[k] = NULL;
677 /* Now free and clean up `ci'. */
678 free (ci->file);
679 ci->file = NULL;
680 free (ci);
681 ci = NULL;
682 } /* for (k = 0; k < cfd.keys_num; k++) */
684 if (cfd.keys != NULL)
685 {
686 free (cfd.keys);
687 cfd.keys = NULL;
688 }
690 return (0);
691 } /* int flush_old_values */
693 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
694 {
695 struct timeval now;
696 struct timespec next_flush;
697 int final_flush = 0; /* make sure we only flush once on shutdown */
699 gettimeofday (&now, NULL);
700 next_flush.tv_sec = now.tv_sec + config_flush_interval;
701 next_flush.tv_nsec = 1000 * now.tv_usec;
703 pthread_mutex_lock (&cache_lock);
704 while ((do_shutdown == 0) || (cache_queue_head != NULL))
705 {
706 cache_item_t *ci;
707 char *file;
708 char **values;
709 int values_num;
710 int status;
711 int i;
713 /* First, check if it's time to do the cache flush. */
714 gettimeofday (&now, NULL);
715 if ((now.tv_sec > next_flush.tv_sec)
716 || ((now.tv_sec == next_flush.tv_sec)
717 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
718 {
719 /* Flush all values that haven't been written in the last
720 * `config_write_interval' seconds. */
721 flush_old_values (config_write_interval);
723 /* Determine the time of the next cache flush. */
724 while (next_flush.tv_sec <= now.tv_sec)
725 next_flush.tv_sec += config_flush_interval;
727 /* unlock the cache while we rotate so we don't block incoming
728 * updates if the fsync() blocks on disk I/O */
729 pthread_mutex_unlock(&cache_lock);
730 journal_rotate();
731 pthread_mutex_lock(&cache_lock);
732 }
734 /* Now, check if there's something to store away. If not, wait until
735 * something comes in or it's time to do the cache flush. if we are
736 * shutting down, do not wait around. */
737 if (cache_queue_head == NULL && !do_shutdown)
738 {
739 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
740 if ((status != 0) && (status != ETIMEDOUT))
741 {
742 RRDD_LOG (LOG_ERR, "queue_thread_main: "
743 "pthread_cond_timedwait returned %i.", status);
744 }
745 }
747 /* We're about to shut down */
748 if (do_shutdown != 0 && !final_flush++)
749 {
750 if (config_flush_at_shutdown)
751 flush_old_values (-1); /* flush everything */
752 else
753 break;
754 }
756 /* Check if a value has arrived. This may be NULL if we timed out or there
757 * was an interrupt such as a signal. */
758 if (cache_queue_head == NULL)
759 continue;
761 ci = cache_queue_head;
763 /* copy the relevant parts */
764 file = strdup (ci->file);
765 if (file == NULL)
766 {
767 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
768 continue;
769 }
771 assert(ci->values != NULL);
772 assert(ci->values_num > 0);
774 values = ci->values;
775 values_num = ci->values_num;
777 wipe_ci_values(ci, time(NULL));
778 remove_from_queue(ci);
780 pthread_mutex_lock (&stats_lock);
781 assert (stats_queue_length > 0);
782 stats_queue_length--;
783 pthread_mutex_unlock (&stats_lock);
785 pthread_mutex_unlock (&cache_lock);
787 rrd_clear_error ();
788 status = rrd_update_r (file, NULL, values_num, (void *) values);
789 if (status != 0)
790 {
791 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
792 "rrd_update_r (%s) failed with status %i. (%s)",
793 file, status, rrd_get_error());
794 }
796 journal_write("wrote", file);
797 pthread_cond_broadcast(&ci->flushed);
799 for (i = 0; i < values_num; i++)
800 free (values[i]);
802 free(values);
803 free(file);
805 if (status == 0)
806 {
807 pthread_mutex_lock (&stats_lock);
808 stats_updates_written++;
809 stats_data_sets_written += values_num;
810 pthread_mutex_unlock (&stats_lock);
811 }
813 pthread_mutex_lock (&cache_lock);
815 /* We're about to shut down */
816 if (do_shutdown != 0 && !final_flush++)
817 {
818 if (config_flush_at_shutdown)
819 flush_old_values (-1); /* flush everything */
820 else
821 break;
822 }
823 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
824 pthread_mutex_unlock (&cache_lock);
826 if (config_flush_at_shutdown)
827 {
828 assert(cache_queue_head == NULL);
829 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
830 }
832 journal_done();
834 return (NULL);
835 } /* }}} void *queue_thread_main */
837 static int buffer_get_field (char **buffer_ret, /* {{{ */
838 size_t *buffer_size_ret, char **field_ret)
839 {
840 char *buffer;
841 size_t buffer_pos;
842 size_t buffer_size;
843 char *field;
844 size_t field_size;
845 int status;
847 buffer = *buffer_ret;
848 buffer_pos = 0;
849 buffer_size = *buffer_size_ret;
850 field = *buffer_ret;
851 field_size = 0;
853 if (buffer_size <= 0)
854 return (-1);
856 /* This is ensured by `handle_request'. */
857 assert (buffer[buffer_size - 1] == '\0');
859 status = -1;
860 while (buffer_pos < buffer_size)
861 {
862 /* Check for end-of-field or end-of-buffer */
863 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
864 {
865 field[field_size] = 0;
866 field_size++;
867 buffer_pos++;
868 status = 0;
869 break;
870 }
871 /* Handle escaped characters. */
872 else if (buffer[buffer_pos] == '\\')
873 {
874 if (buffer_pos >= (buffer_size - 1))
875 break;
876 buffer_pos++;
877 field[field_size] = buffer[buffer_pos];
878 field_size++;
879 buffer_pos++;
880 }
881 /* Normal operation */
882 else
883 {
884 field[field_size] = buffer[buffer_pos];
885 field_size++;
886 buffer_pos++;
887 }
888 } /* while (buffer_pos < buffer_size) */
890 if (status != 0)
891 return (status);
893 *buffer_ret = buffer + buffer_pos;
894 *buffer_size_ret = buffer_size - buffer_pos;
895 *field_ret = field;
897 return (0);
898 } /* }}} int buffer_get_field */
900 /* if we're restricting writes to the base directory,
901 * check whether the file falls within the dir
902 * returns 1 if OK, otherwise 0
903 */
904 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
905 {
906 assert(file != NULL);
908 if (!config_write_base_only
909 || sock == NULL /* journal replay */
910 || config_base_dir == NULL)
911 return 1;
913 if (strstr(file, "../") != NULL) goto err;
915 /* relative paths without "../" are ok */
916 if (*file != '/') return 1;
918 /* file must be of the format base + "/" + <1+ char filename> */
919 if (strlen(file) < _config_base_dir_len + 2) goto err;
920 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
921 if (*(file + _config_base_dir_len) != '/') goto err;
923 return 1;
925 err:
926 if (sock != NULL && sock->fd >= 0)
927 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
929 return 0;
930 } /* }}} static int check_file_access */
932 static int flush_file (const char *filename) /* {{{ */
933 {
934 cache_item_t *ci;
936 pthread_mutex_lock (&cache_lock);
938 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
939 if (ci == NULL)
940 {
941 pthread_mutex_unlock (&cache_lock);
942 return (ENOENT);
943 }
945 if (ci->values_num > 0)
946 {
947 /* Enqueue at head */
948 enqueue_cache_item (ci, HEAD);
949 pthread_cond_wait(&ci->flushed, &cache_lock);
950 }
952 pthread_mutex_unlock(&cache_lock);
954 return (0);
955 } /* }}} int flush_file */
957 static int handle_request_help (listen_socket_t *sock, /* {{{ */
958 char *buffer, size_t buffer_size)
959 {
960 int status;
961 char **help_text;
962 char *command;
964 char *help_help[2] =
965 {
966 "Command overview\n"
967 ,
968 "FLUSH <filename>\n"
969 "FLUSHALL\n"
970 "HELP [<command>]\n"
971 "UPDATE <filename> <values> [<values> ...]\n"
972 "STATS\n"
973 };
975 char *help_flush[2] =
976 {
977 "Help for FLUSH\n"
978 ,
979 "Usage: FLUSH <filename>\n"
980 "\n"
981 "Adds the given filename to the head of the update queue and returns\n"
982 "after is has been dequeued.\n"
983 };
985 char *help_flushall[2] =
986 {
987 "Help for FLUSHALL\n"
988 ,
989 "Usage: FLUSHALL\n"
990 "\n"
991 "Triggers writing of all pending updates. Returns immediately.\n"
992 };
994 char *help_update[2] =
995 {
996 "Help for UPDATE\n"
997 ,
998 "Usage: UPDATE <filename> <values> [<values> ...]\n"
999 "\n"
1000 "Adds the given file to the internal cache if it is not yet known and\n"
1001 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1002 "for details.\n"
1003 "\n"
1004 "Each <values> has the following form:\n"
1005 " <values> = <time>:<value>[:<value>[...]]\n"
1006 "See the rrdupdate(1) manpage for details.\n"
1007 };
1009 char *help_stats[2] =
1010 {
1011 "Help for STATS\n"
1012 ,
1013 "Usage: STATS\n"
1014 "\n"
1015 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1016 "a description of the values.\n"
1017 };
1019 status = buffer_get_field (&buffer, &buffer_size, &command);
1020 if (status != 0)
1021 help_text = help_help;
1022 else
1023 {
1024 if (strcasecmp (command, "update") == 0)
1025 help_text = help_update;
1026 else if (strcasecmp (command, "flush") == 0)
1027 help_text = help_flush;
1028 else if (strcasecmp (command, "flushall") == 0)
1029 help_text = help_flushall;
1030 else if (strcasecmp (command, "stats") == 0)
1031 help_text = help_stats;
1032 else
1033 help_text = help_help;
1034 }
1036 add_response_info(sock, help_text[1]);
1037 return send_response(sock, RESP_OK, help_text[0]);
1038 } /* }}} int handle_request_help */
1040 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1041 {
1042 uint64_t copy_queue_length;
1043 uint64_t copy_updates_received;
1044 uint64_t copy_flush_received;
1045 uint64_t copy_updates_written;
1046 uint64_t copy_data_sets_written;
1047 uint64_t copy_journal_bytes;
1048 uint64_t copy_journal_rotate;
1050 uint64_t tree_nodes_number;
1051 uint64_t tree_depth;
1053 pthread_mutex_lock (&stats_lock);
1054 copy_queue_length = stats_queue_length;
1055 copy_updates_received = stats_updates_received;
1056 copy_flush_received = stats_flush_received;
1057 copy_updates_written = stats_updates_written;
1058 copy_data_sets_written = stats_data_sets_written;
1059 copy_journal_bytes = stats_journal_bytes;
1060 copy_journal_rotate = stats_journal_rotate;
1061 pthread_mutex_unlock (&stats_lock);
1063 pthread_mutex_lock (&cache_lock);
1064 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1065 tree_depth = (uint64_t) g_tree_height (cache_tree);
1066 pthread_mutex_unlock (&cache_lock);
1068 add_response_info(sock,
1069 "QueueLength: %"PRIu64"\n", copy_queue_length);
1070 add_response_info(sock,
1071 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1072 add_response_info(sock,
1073 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1074 add_response_info(sock,
1075 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1076 add_response_info(sock,
1077 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1078 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1079 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1080 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1081 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1083 send_response(sock, RESP_OK, "Statistics follow\n");
1085 return (0);
1086 } /* }}} int handle_request_stats */
1088 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1089 char *buffer, size_t buffer_size)
1090 {
1091 char *file;
1092 int status;
1094 status = buffer_get_field (&buffer, &buffer_size, &file);
1095 if (status != 0)
1096 {
1097 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1098 }
1099 else
1100 {
1101 pthread_mutex_lock(&stats_lock);
1102 stats_flush_received++;
1103 pthread_mutex_unlock(&stats_lock);
1105 if (!check_file_access(file, sock)) return 0;
1107 status = flush_file (file);
1108 if (status == 0)
1109 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1110 else if (status == ENOENT)
1111 {
1112 /* no file in our tree; see whether it exists at all */
1113 struct stat statbuf;
1115 memset(&statbuf, 0, sizeof(statbuf));
1116 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1117 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1118 else
1119 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1120 }
1121 else if (status < 0)
1122 return send_response(sock, RESP_ERR, "Internal error.\n");
1123 else
1124 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1125 }
1127 /* NOTREACHED */
1128 assert(1==0);
1129 } /* }}} int handle_request_slurp */
1131 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1132 {
1134 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1136 pthread_mutex_lock(&cache_lock);
1137 flush_old_values(-1);
1138 pthread_mutex_unlock(&cache_lock);
1140 return send_response(sock, RESP_OK, "Started flush.\n");
1141 } /* }}} static int handle_request_flushall */
1143 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1144 char *buffer, size_t buffer_size)
1145 {
1146 char *file;
1147 int values_num = 0;
1148 int status;
1150 time_t now;
1151 cache_item_t *ci;
1153 now = time (NULL);
1155 status = buffer_get_field (&buffer, &buffer_size, &file);
1156 if (status != 0)
1157 return send_response(sock, RESP_ERR,
1158 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1160 pthread_mutex_lock(&stats_lock);
1161 stats_updates_received++;
1162 pthread_mutex_unlock(&stats_lock);
1164 if (!check_file_access(file, sock)) return 0;
1166 pthread_mutex_lock (&cache_lock);
1167 ci = g_tree_lookup (cache_tree, file);
1169 if (ci == NULL) /* {{{ */
1170 {
1171 struct stat statbuf;
1173 /* don't hold the lock while we setup; stat(2) might block */
1174 pthread_mutex_unlock(&cache_lock);
1176 memset (&statbuf, 0, sizeof (statbuf));
1177 status = stat (file, &statbuf);
1178 if (status != 0)
1179 {
1180 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1182 status = errno;
1183 if (status == ENOENT)
1184 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1185 else
1186 return send_response(sock, RESP_ERR,
1187 "stat failed with error %i.\n", status);
1188 }
1189 if (!S_ISREG (statbuf.st_mode))
1190 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1192 if (access(file, R_OK|W_OK) != 0)
1193 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1194 file, rrd_strerror(errno));
1196 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1197 if (ci == NULL)
1198 {
1199 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1201 return send_response(sock, RESP_ERR, "malloc failed.\n");
1202 }
1203 memset (ci, 0, sizeof (cache_item_t));
1205 ci->file = strdup (file);
1206 if (ci->file == NULL)
1207 {
1208 free (ci);
1209 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1211 return send_response(sock, RESP_ERR, "strdup failed.\n");
1212 }
1214 wipe_ci_values(ci, now);
1215 ci->flags = CI_FLAGS_IN_TREE;
1217 pthread_mutex_lock(&cache_lock);
1218 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1219 } /* }}} */
1220 assert (ci != NULL);
1222 while (buffer_size > 0)
1223 {
1224 char **temp;
1225 char *value;
1227 status = buffer_get_field (&buffer, &buffer_size, &value);
1228 if (status != 0)
1229 {
1230 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1231 break;
1232 }
1234 temp = (char **) realloc (ci->values,
1235 sizeof (char *) * (ci->values_num + 1));
1236 if (temp == NULL)
1237 {
1238 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1239 continue;
1240 }
1241 ci->values = temp;
1243 ci->values[ci->values_num] = strdup (value);
1244 if (ci->values[ci->values_num] == NULL)
1245 {
1246 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1247 continue;
1248 }
1249 ci->values_num++;
1251 values_num++;
1252 }
1254 if (((now - ci->last_flush_time) >= config_write_interval)
1255 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1256 && (ci->values_num > 0))
1257 {
1258 enqueue_cache_item (ci, TAIL);
1259 }
1261 pthread_mutex_unlock (&cache_lock);
1263 if (values_num < 1)
1264 return send_response(sock, RESP_ERR, "No values updated.\n");
1265 else
1266 return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1268 /* NOTREACHED */
1269 assert(1==0);
1271 } /* }}} int handle_request_update */
1273 /* we came across a "WROTE" entry during journal replay.
1274 * throw away any values that we have accumulated for this file
1275 */
1276 static int handle_request_wrote (const char *buffer) /* {{{ */
1277 {
1278 int i;
1279 cache_item_t *ci;
1280 const char *file = buffer;
1282 pthread_mutex_lock(&cache_lock);
1284 ci = g_tree_lookup(cache_tree, file);
1285 if (ci == NULL)
1286 {
1287 pthread_mutex_unlock(&cache_lock);
1288 return (0);
1289 }
1291 if (ci->values)
1292 {
1293 for (i=0; i < ci->values_num; i++)
1294 free(ci->values[i]);
1296 free(ci->values);
1297 }
1299 wipe_ci_values(ci, time(NULL));
1300 remove_from_queue(ci);
1302 pthread_mutex_unlock(&cache_lock);
1303 return (0);
1304 } /* }}} int handle_request_wrote */
1306 /* returns 1 if we have the required privilege level */
1307 static int has_privilege (listen_socket_t *sock, /* {{{ */
1308 socket_privilege priv)
1309 {
1310 if (sock == NULL) /* journal replay */
1311 return 1;
1313 if (sock->privilege >= priv)
1314 return 1;
1316 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1317 } /* }}} static int has_privilege */
1319 /* if sock==NULL, we are in journal replay mode */
1320 static int handle_request (listen_socket_t *sock, /* {{{ */
1321 char *buffer, size_t buffer_size)
1322 {
1323 char *buffer_ptr;
1324 char *command;
1325 int status;
1327 assert (buffer[buffer_size - 1] == '\0');
1329 buffer_ptr = buffer;
1330 command = NULL;
1331 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1332 if (status != 0)
1333 {
1334 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1335 return (-1);
1336 }
1338 if (strcasecmp (command, "update") == 0)
1339 {
1340 status = has_privilege(sock, PRIV_HIGH);
1341 if (status <= 0)
1342 return status;
1344 /* don't re-write updates in replay mode */
1345 if (sock != NULL)
1346 journal_write(command, buffer_ptr);
1348 return (handle_request_update (sock, buffer_ptr, buffer_size));
1349 }
1350 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1351 {
1352 /* this is only valid in replay mode */
1353 return (handle_request_wrote (buffer_ptr));
1354 }
1355 else if (strcasecmp (command, "flush") == 0)
1356 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1357 else if (strcasecmp (command, "flushall") == 0)
1358 {
1359 status = has_privilege(sock, PRIV_HIGH);
1360 if (status <= 0)
1361 return status;
1363 return (handle_request_flushall(sock));
1364 }
1365 else if (strcasecmp (command, "stats") == 0)
1366 return (handle_request_stats (sock));
1367 else if (strcasecmp (command, "help") == 0)
1368 return (handle_request_help (sock, buffer_ptr, buffer_size));
1369 else
1370 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1372 /* NOTREACHED */
1373 assert(1==0);
1374 } /* }}} int handle_request */
1376 /* MUST NOT hold journal_lock before calling this */
1377 static void journal_rotate(void) /* {{{ */
1378 {
1379 FILE *old_fh = NULL;
1381 if (journal_cur == NULL || journal_old == NULL)
1382 return;
1384 pthread_mutex_lock(&journal_lock);
1386 /* we rotate this way (rename before close) so that the we can release
1387 * the journal lock as fast as possible. Journal writes to the new
1388 * journal can proceed immediately after the new file is opened. The
1389 * fclose can then block without affecting new updates.
1390 */
1391 if (journal_fh != NULL)
1392 {
1393 old_fh = journal_fh;
1394 rename(journal_cur, journal_old);
1395 ++stats_journal_rotate;
1396 }
1398 journal_fh = fopen(journal_cur, "a");
1399 pthread_mutex_unlock(&journal_lock);
1401 if (old_fh != NULL)
1402 fclose(old_fh);
1404 if (journal_fh == NULL)
1405 {
1406 RRDD_LOG(LOG_CRIT,
1407 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1408 journal_cur, rrd_strerror(errno));
1410 RRDD_LOG(LOG_ERR,
1411 "JOURNALING DISABLED: All values will be flushed at shutdown");
1412 config_flush_at_shutdown = 1;
1413 }
1415 } /* }}} static void journal_rotate */
1417 static void journal_done(void) /* {{{ */
1418 {
1419 if (journal_cur == NULL)
1420 return;
1422 pthread_mutex_lock(&journal_lock);
1423 if (journal_fh != NULL)
1424 {
1425 fclose(journal_fh);
1426 journal_fh = NULL;
1427 }
1429 if (config_flush_at_shutdown)
1430 {
1431 RRDD_LOG(LOG_INFO, "removing journals");
1432 unlink(journal_old);
1433 unlink(journal_cur);
1434 }
1435 else
1436 {
1437 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1438 "journals will be used at next startup");
1439 }
1441 pthread_mutex_unlock(&journal_lock);
1443 } /* }}} static void journal_done */
1445 static int journal_write(char *cmd, char *args) /* {{{ */
1446 {
1447 int chars;
1449 if (journal_fh == NULL)
1450 return 0;
1452 pthread_mutex_lock(&journal_lock);
1453 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1454 pthread_mutex_unlock(&journal_lock);
1456 if (chars > 0)
1457 {
1458 pthread_mutex_lock(&stats_lock);
1459 stats_journal_bytes += chars;
1460 pthread_mutex_unlock(&stats_lock);
1461 }
1463 return chars;
1464 } /* }}} static int journal_write */
1466 static int journal_replay (const char *file) /* {{{ */
1467 {
1468 FILE *fh;
1469 int entry_cnt = 0;
1470 int fail_cnt = 0;
1471 uint64_t line = 0;
1472 char entry[CMD_MAX];
1474 if (file == NULL) return 0;
1476 fh = fopen(file, "r");
1477 if (fh == NULL)
1478 {
1479 if (errno != ENOENT)
1480 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1481 file, rrd_strerror(errno));
1482 return 0;
1483 }
1484 else
1485 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1487 while(!feof(fh))
1488 {
1489 size_t entry_len;
1491 ++line;
1492 if (fgets(entry, sizeof(entry), fh) == NULL)
1493 break;
1494 entry_len = strlen(entry);
1496 /* check \n termination in case journal writing crashed mid-line */
1497 if (entry_len == 0)
1498 continue;
1499 else if (entry[entry_len - 1] != '\n')
1500 {
1501 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1502 ++fail_cnt;
1503 continue;
1504 }
1506 entry[entry_len - 1] = '\0';
1508 if (handle_request(NULL, entry, entry_len) == 0)
1509 ++entry_cnt;
1510 else
1511 ++fail_cnt;
1512 }
1514 fclose(fh);
1516 if (entry_cnt > 0)
1517 {
1518 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1519 entry_cnt, fail_cnt);
1520 return 1;
1521 }
1522 else
1523 return 0;
1525 } /* }}} static int journal_replay */
1527 static void close_connection(listen_socket_t *sock)
1528 {
1529 close(sock->fd) ; sock->fd = -1;
1530 free(sock->rbuf); sock->rbuf = NULL;
1531 free(sock->wbuf); sock->wbuf = NULL;
1533 free(sock);
1534 }
1536 static void *connection_thread_main (void *args) /* {{{ */
1537 {
1538 pthread_t self;
1539 listen_socket_t *sock;
1540 int i;
1541 int fd;
1543 sock = (listen_socket_t *) args;
1544 fd = sock->fd;
1546 /* init read buffers */
1547 sock->next_read = sock->next_cmd = 0;
1548 sock->rbuf = malloc(RBUF_SIZE);
1549 if (sock->rbuf == NULL)
1550 {
1551 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1552 close_connection(sock);
1553 return NULL;
1554 }
1556 pthread_mutex_lock (&connection_threads_lock);
1557 {
1558 pthread_t *temp;
1560 temp = (pthread_t *) realloc (connection_threads,
1561 sizeof (pthread_t) * (connection_threads_num + 1));
1562 if (temp == NULL)
1563 {
1564 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1565 }
1566 else
1567 {
1568 connection_threads = temp;
1569 connection_threads[connection_threads_num] = pthread_self ();
1570 connection_threads_num++;
1571 }
1572 }
1573 pthread_mutex_unlock (&connection_threads_lock);
1575 while (do_shutdown == 0)
1576 {
1577 char *cmd;
1578 ssize_t cmd_len;
1579 ssize_t rbytes;
1581 struct pollfd pollfd;
1582 int status;
1584 pollfd.fd = fd;
1585 pollfd.events = POLLIN | POLLPRI;
1586 pollfd.revents = 0;
1588 status = poll (&pollfd, 1, /* timeout = */ 500);
1589 if (do_shutdown)
1590 break;
1591 else if (status == 0) /* timeout */
1592 continue;
1593 else if (status < 0) /* error */
1594 {
1595 status = errno;
1596 if (status == EINTR)
1597 continue;
1598 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1599 continue;
1600 }
1602 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1603 {
1604 close_connection(sock);
1605 break;
1606 }
1607 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1608 {
1609 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1610 "poll(2) returned something unexpected: %#04hx",
1611 pollfd.revents);
1612 close_connection(sock);
1613 break;
1614 }
1616 rbytes = read(fd, sock->rbuf + sock->next_read,
1617 RBUF_SIZE - sock->next_read);
1618 if (rbytes < 0)
1619 {
1620 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1621 break;
1622 }
1623 else if (rbytes == 0)
1624 break; /* eof */
1626 sock->next_read += rbytes;
1628 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1629 {
1630 status = handle_request (sock, cmd, cmd_len+1);
1631 if (status != 0)
1632 goto out_close;
1633 }
1634 }
1636 out_close:
1637 close_connection(sock);
1639 self = pthread_self ();
1640 /* Remove this thread from the connection threads list */
1641 pthread_mutex_lock (&connection_threads_lock);
1642 /* Find out own index in the array */
1643 for (i = 0; i < connection_threads_num; i++)
1644 if (pthread_equal (connection_threads[i], self) != 0)
1645 break;
1646 assert (i < connection_threads_num);
1648 /* Move the trailing threads forward. */
1649 if (i < (connection_threads_num - 1))
1650 {
1651 memmove (connection_threads + i,
1652 connection_threads + i + 1,
1653 sizeof (pthread_t) * (connection_threads_num - i - 1));
1654 }
1656 connection_threads_num--;
1657 pthread_mutex_unlock (&connection_threads_lock);
1659 return (NULL);
1660 } /* }}} void *connection_thread_main */
1662 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1663 {
1664 int fd;
1665 struct sockaddr_un sa;
1666 listen_socket_t *temp;
1667 int status;
1668 const char *path;
1670 path = sock->addr;
1671 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1672 path += strlen("unix:");
1674 temp = (listen_socket_t *) realloc (listen_fds,
1675 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1676 if (temp == NULL)
1677 {
1678 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1679 return (-1);
1680 }
1681 listen_fds = temp;
1682 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1684 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1685 if (fd < 0)
1686 {
1687 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1688 return (-1);
1689 }
1691 memset (&sa, 0, sizeof (sa));
1692 sa.sun_family = AF_UNIX;
1693 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1695 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1696 if (status != 0)
1697 {
1698 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1699 close (fd);
1700 unlink (path);
1701 return (-1);
1702 }
1704 status = listen (fd, /* backlog = */ 10);
1705 if (status != 0)
1706 {
1707 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1708 close (fd);
1709 unlink (path);
1710 return (-1);
1711 }
1713 listen_fds[listen_fds_num].fd = fd;
1714 listen_fds[listen_fds_num].family = PF_UNIX;
1715 strncpy(listen_fds[listen_fds_num].addr, path,
1716 sizeof (listen_fds[listen_fds_num].addr) - 1);
1717 listen_fds_num++;
1719 return (0);
1720 } /* }}} int open_listen_socket_unix */
1722 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1723 {
1724 struct addrinfo ai_hints;
1725 struct addrinfo *ai_res;
1726 struct addrinfo *ai_ptr;
1727 char addr_copy[NI_MAXHOST];
1728 char *addr;
1729 char *port;
1730 int status;
1732 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1733 addr_copy[sizeof (addr_copy) - 1] = 0;
1734 addr = addr_copy;
1736 memset (&ai_hints, 0, sizeof (ai_hints));
1737 ai_hints.ai_flags = 0;
1738 #ifdef AI_ADDRCONFIG
1739 ai_hints.ai_flags |= AI_ADDRCONFIG;
1740 #endif
1741 ai_hints.ai_family = AF_UNSPEC;
1742 ai_hints.ai_socktype = SOCK_STREAM;
1744 port = NULL;
1745 if (*addr == '[') /* IPv6+port format */
1746 {
1747 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1748 addr++;
1750 port = strchr (addr, ']');
1751 if (port == NULL)
1752 {
1753 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1754 sock->addr);
1755 return (-1);
1756 }
1757 *port = 0;
1758 port++;
1760 if (*port == ':')
1761 port++;
1762 else if (*port == 0)
1763 port = NULL;
1764 else
1765 {
1766 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1767 port);
1768 return (-1);
1769 }
1770 } /* if (*addr = ']') */
1771 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1772 {
1773 port = rindex(addr, ':');
1774 if (port != NULL)
1775 {
1776 *port = 0;
1777 port++;
1778 }
1779 }
1780 ai_res = NULL;
1781 status = getaddrinfo (addr,
1782 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1783 &ai_hints, &ai_res);
1784 if (status != 0)
1785 {
1786 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1787 "%s", addr, gai_strerror (status));
1788 return (-1);
1789 }
1791 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1792 {
1793 int fd;
1794 listen_socket_t *temp;
1795 int one = 1;
1797 temp = (listen_socket_t *) realloc (listen_fds,
1798 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1799 if (temp == NULL)
1800 {
1801 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1802 continue;
1803 }
1804 listen_fds = temp;
1805 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1807 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1808 if (fd < 0)
1809 {
1810 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1811 continue;
1812 }
1814 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1816 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1817 if (status != 0)
1818 {
1819 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1820 close (fd);
1821 continue;
1822 }
1824 status = listen (fd, /* backlog = */ 10);
1825 if (status != 0)
1826 {
1827 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1828 close (fd);
1829 return (-1);
1830 }
1832 listen_fds[listen_fds_num].fd = fd;
1833 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1834 listen_fds_num++;
1835 } /* for (ai_ptr) */
1837 return (0);
1838 } /* }}} static int open_listen_socket_network */
1840 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1841 {
1842 assert(sock != NULL);
1843 assert(sock->addr != NULL);
1845 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1846 || sock->addr[0] == '/')
1847 return (open_listen_socket_unix(sock));
1848 else
1849 return (open_listen_socket_network(sock));
1850 } /* }}} int open_listen_socket */
1852 static int close_listen_sockets (void) /* {{{ */
1853 {
1854 size_t i;
1856 for (i = 0; i < listen_fds_num; i++)
1857 {
1858 close (listen_fds[i].fd);
1860 if (listen_fds[i].family == PF_UNIX)
1861 unlink(listen_fds[i].addr);
1862 }
1864 free (listen_fds);
1865 listen_fds = NULL;
1866 listen_fds_num = 0;
1868 return (0);
1869 } /* }}} int close_listen_sockets */
1871 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1872 {
1873 struct pollfd *pollfds;
1874 int pollfds_num;
1875 int status;
1876 int i;
1878 for (i = 0; i < config_listen_address_list_len; i++)
1879 open_listen_socket (config_listen_address_list[i]);
1881 if (config_listen_address_list_len < 1)
1882 {
1883 listen_socket_t sock;
1884 memset(&sock, 0, sizeof(sock));
1885 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1886 open_listen_socket (&sock);
1887 }
1889 if (listen_fds_num < 1)
1890 {
1891 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1892 "could be opened. Sorry.");
1893 return (NULL);
1894 }
1896 pollfds_num = listen_fds_num;
1897 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1898 if (pollfds == NULL)
1899 {
1900 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1901 return (NULL);
1902 }
1903 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1905 RRDD_LOG(LOG_INFO, "listening for connections");
1907 while (do_shutdown == 0)
1908 {
1909 assert (pollfds_num == ((int) listen_fds_num));
1910 for (i = 0; i < pollfds_num; i++)
1911 {
1912 pollfds[i].fd = listen_fds[i].fd;
1913 pollfds[i].events = POLLIN | POLLPRI;
1914 pollfds[i].revents = 0;
1915 }
1917 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1918 if (do_shutdown)
1919 break;
1920 else if (status == 0) /* timeout */
1921 continue;
1922 else if (status < 0) /* error */
1923 {
1924 status = errno;
1925 if (status != EINTR)
1926 {
1927 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1928 }
1929 continue;
1930 }
1932 for (i = 0; i < pollfds_num; i++)
1933 {
1934 listen_socket_t *client_sock;
1935 struct sockaddr_storage client_sa;
1936 socklen_t client_sa_size;
1937 pthread_t tid;
1938 pthread_attr_t attr;
1940 if (pollfds[i].revents == 0)
1941 continue;
1943 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1944 {
1945 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1946 "poll(2) returned something unexpected for listen FD #%i.",
1947 pollfds[i].fd);
1948 continue;
1949 }
1951 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1952 if (client_sock == NULL)
1953 {
1954 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1955 continue;
1956 }
1957 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1959 client_sa_size = sizeof (client_sa);
1960 client_sock->fd = accept (pollfds[i].fd,
1961 (struct sockaddr *) &client_sa, &client_sa_size);
1962 if (client_sock->fd < 0)
1963 {
1964 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1965 free(client_sock);
1966 continue;
1967 }
1969 pthread_attr_init (&attr);
1970 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1972 status = pthread_create (&tid, &attr, connection_thread_main,
1973 client_sock);
1974 if (status != 0)
1975 {
1976 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1977 close_connection(client_sock);
1978 continue;
1979 }
1980 } /* for (pollfds_num) */
1981 } /* while (do_shutdown == 0) */
1983 RRDD_LOG(LOG_INFO, "starting shutdown");
1985 close_listen_sockets ();
1987 pthread_mutex_lock (&connection_threads_lock);
1988 while (connection_threads_num > 0)
1989 {
1990 pthread_t wait_for;
1992 wait_for = connection_threads[0];
1994 pthread_mutex_unlock (&connection_threads_lock);
1995 pthread_join (wait_for, /* retval = */ NULL);
1996 pthread_mutex_lock (&connection_threads_lock);
1997 }
1998 pthread_mutex_unlock (&connection_threads_lock);
2000 return (NULL);
2001 } /* }}} void *listen_thread_main */
2003 static int daemonize (void) /* {{{ */
2004 {
2005 int status;
2006 int fd;
2007 char *base_dir;
2009 fd = open_pidfile();
2010 if (fd < 0) return fd;
2012 if (!stay_foreground)
2013 {
2014 pid_t child;
2016 child = fork ();
2017 if (child < 0)
2018 {
2019 fprintf (stderr, "daemonize: fork(2) failed.\n");
2020 return (-1);
2021 }
2022 else if (child > 0)
2023 {
2024 return (1);
2025 }
2027 /* Become session leader */
2028 setsid ();
2030 /* Open the first three file descriptors to /dev/null */
2031 close (2);
2032 close (1);
2033 close (0);
2035 open ("/dev/null", O_RDWR);
2036 dup (0);
2037 dup (0);
2038 } /* if (!stay_foreground) */
2040 /* Change into the /tmp directory. */
2041 base_dir = (config_base_dir != NULL)
2042 ? config_base_dir
2043 : "/tmp";
2044 status = chdir (base_dir);
2045 if (status != 0)
2046 {
2047 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2048 return (-1);
2049 }
2051 install_signal_handlers();
2053 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2054 RRDD_LOG(LOG_INFO, "starting up");
2056 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2057 if (cache_tree == NULL)
2058 {
2059 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2060 return (-1);
2061 }
2063 status = write_pidfile (fd);
2064 return status;
2065 } /* }}} int daemonize */
2067 static int cleanup (void) /* {{{ */
2068 {
2069 do_shutdown++;
2071 pthread_cond_signal (&cache_cond);
2072 pthread_join (queue_thread, /* return = */ NULL);
2074 remove_pidfile ();
2076 RRDD_LOG(LOG_INFO, "goodbye");
2077 closelog ();
2079 return (0);
2080 } /* }}} int cleanup */
2082 static int read_options (int argc, char **argv) /* {{{ */
2083 {
2084 int option;
2085 int status = 0;
2087 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2088 {
2089 switch (option)
2090 {
2091 case 'g':
2092 stay_foreground=1;
2093 break;
2095 case 'L':
2096 case 'l':
2097 {
2098 listen_socket_t **temp;
2099 listen_socket_t *new;
2101 new = malloc(sizeof(listen_socket_t));
2102 if (new == NULL)
2103 {
2104 fprintf(stderr, "read_options: malloc failed.\n");
2105 return(2);
2106 }
2107 memset(new, 0, sizeof(listen_socket_t));
2109 temp = (listen_socket_t **) realloc (config_listen_address_list,
2110 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2111 if (temp == NULL)
2112 {
2113 fprintf (stderr, "read_options: realloc failed.\n");
2114 return (2);
2115 }
2116 config_listen_address_list = temp;
2118 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2119 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2121 temp[config_listen_address_list_len] = new;
2122 config_listen_address_list_len++;
2123 }
2124 break;
2126 case 'f':
2127 {
2128 int temp;
2130 temp = atoi (optarg);
2131 if (temp > 0)
2132 config_flush_interval = temp;
2133 else
2134 {
2135 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2136 status = 3;
2137 }
2138 }
2139 break;
2141 case 'w':
2142 {
2143 int temp;
2145 temp = atoi (optarg);
2146 if (temp > 0)
2147 config_write_interval = temp;
2148 else
2149 {
2150 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2151 status = 2;
2152 }
2153 }
2154 break;
2156 case 'z':
2157 {
2158 int temp;
2160 temp = atoi(optarg);
2161 if (temp > 0)
2162 config_write_jitter = temp;
2163 else
2164 {
2165 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2166 status = 2;
2167 }
2169 break;
2170 }
2172 case 'B':
2173 config_write_base_only = 1;
2174 break;
2176 case 'b':
2177 {
2178 size_t len;
2180 if (config_base_dir != NULL)
2181 free (config_base_dir);
2182 config_base_dir = strdup (optarg);
2183 if (config_base_dir == NULL)
2184 {
2185 fprintf (stderr, "read_options: strdup failed.\n");
2186 return (3);
2187 }
2189 len = strlen (config_base_dir);
2190 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2191 {
2192 config_base_dir[len - 1] = 0;
2193 len--;
2194 }
2196 if (len < 1)
2197 {
2198 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2199 return (4);
2200 }
2202 _config_base_dir_len = len;
2203 }
2204 break;
2206 case 'p':
2207 {
2208 if (config_pid_file != NULL)
2209 free (config_pid_file);
2210 config_pid_file = strdup (optarg);
2211 if (config_pid_file == NULL)
2212 {
2213 fprintf (stderr, "read_options: strdup failed.\n");
2214 return (3);
2215 }
2216 }
2217 break;
2219 case 'F':
2220 config_flush_at_shutdown = 1;
2221 break;
2223 case 'j':
2224 {
2225 struct stat statbuf;
2226 const char *dir = optarg;
2228 status = stat(dir, &statbuf);
2229 if (status != 0)
2230 {
2231 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2232 return 6;
2233 }
2235 if (!S_ISDIR(statbuf.st_mode)
2236 || access(dir, R_OK|W_OK|X_OK) != 0)
2237 {
2238 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2239 errno ? rrd_strerror(errno) : "");
2240 return 6;
2241 }
2243 journal_cur = malloc(PATH_MAX + 1);
2244 journal_old = malloc(PATH_MAX + 1);
2245 if (journal_cur == NULL || journal_old == NULL)
2246 {
2247 fprintf(stderr, "malloc failure for journal files\n");
2248 return 6;
2249 }
2250 else
2251 {
2252 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2253 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2254 }
2255 }
2256 break;
2258 case 'h':
2259 case '?':
2260 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2261 "\n"
2262 "Usage: rrdcached [options]\n"
2263 "\n"
2264 "Valid options are:\n"
2265 " -l <address> Socket address to listen to.\n"
2266 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2267 " -w <seconds> Interval in which to write data.\n"
2268 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2269 " -f <seconds> Interval in which to flush dead data.\n"
2270 " -p <file> Location of the PID-file.\n"
2271 " -b <dir> Base directory to change to.\n"
2272 " -B Restrict file access to paths within -b <dir>\n"
2273 " -g Do not fork and run in the foreground.\n"
2274 " -j <dir> Directory in which to create the journal files.\n"
2275 " -F Always flush all updates at shutdown\n"
2276 "\n"
2277 "For more information and a detailed description of all options "
2278 "please refer\n"
2279 "to the rrdcached(1) manual page.\n",
2280 VERSION);
2281 status = -1;
2282 break;
2283 } /* switch (option) */
2284 } /* while (getopt) */
2286 /* advise the user when values are not sane */
2287 if (config_flush_interval < 2 * config_write_interval)
2288 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2289 " 2x write interval (-w) !\n");
2290 if (config_write_jitter > config_write_interval)
2291 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2292 " write interval (-w) !\n");
2294 if (config_write_base_only && config_base_dir == NULL)
2295 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2296 " Consult the rrdcached documentation\n");
2298 if (journal_cur == NULL)
2299 config_flush_at_shutdown = 1;
2301 return (status);
2302 } /* }}} int read_options */
2304 int main (int argc, char **argv)
2305 {
2306 int status;
2308 status = read_options (argc, argv);
2309 if (status != 0)
2310 {
2311 if (status < 0)
2312 status = 0;
2313 return (status);
2314 }
2316 status = daemonize ();
2317 if (status == 1)
2318 {
2319 struct sigaction sigchld;
2321 memset (&sigchld, 0, sizeof (sigchld));
2322 sigchld.sa_handler = SIG_IGN;
2323 sigaction (SIGCHLD, &sigchld, NULL);
2325 return (0);
2326 }
2327 else if (status != 0)
2328 {
2329 fprintf (stderr, "daemonize failed, exiting.\n");
2330 return (1);
2331 }
2333 if (journal_cur != NULL)
2334 {
2335 int had_journal = 0;
2337 pthread_mutex_lock(&journal_lock);
2339 RRDD_LOG(LOG_INFO, "checking for journal files");
2341 had_journal += journal_replay(journal_old);
2342 had_journal += journal_replay(journal_cur);
2344 if (had_journal)
2345 flush_old_values(-1);
2347 pthread_mutex_unlock(&journal_lock);
2348 journal_rotate();
2350 RRDD_LOG(LOG_INFO, "journal processing complete");
2351 }
2353 /* start the queue thread */
2354 memset (&queue_thread, 0, sizeof (queue_thread));
2355 status = pthread_create (&queue_thread,
2356 NULL, /* attr */
2357 queue_thread_main,
2358 NULL); /* args */
2359 if (status != 0)
2360 {
2361 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2362 cleanup();
2363 return (1);
2364 }
2366 listen_thread_main (NULL);
2367 cleanup ();
2369 return (0);
2370 } /* int main */
2372 /*
2373 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2374 */