1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 # include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
110 #include <glib-2.0/glib.h>
111 /* }}} */
113 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
115 #ifndef __GNUC__
116 # define __attribute__(x) /**/
117 #endif
119 /*
120 * Types
121 */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124 struct listen_socket_s
125 {
126 int fd;
127 char addr[PATH_MAX + 1];
128 int family;
130 /* state for BATCH processing */
131 time_t batch_start;
132 int batch_cmd;
134 /* buffered IO */
135 char *rbuf;
136 off_t next_cmd;
137 off_t next_read;
139 char *wbuf;
140 ssize_t wbuf_len;
142 uint32_t permissions;
143 };
144 typedef struct listen_socket_s listen_socket_t;
146 struct command_s;
147 typedef struct command_s command_t;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command_s {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
161 char context; /* where we expect to see it */
162 #define CMD_CONTEXT_CLIENT (1<<0)
163 #define CMD_CONTEXT_BATCH (1<<1)
164 #define CMD_CONTEXT_JOURNAL (1<<2)
165 #define CMD_CONTEXT_ANY (0x7f)
167 char *syntax;
168 char *help;
169 };
171 struct cache_item_s;
172 typedef struct cache_item_s cache_item_t;
173 struct cache_item_s
174 {
175 char *file;
176 char **values;
177 size_t values_num;
178 time_t last_flush_time;
179 time_t last_update_stamp;
180 #define CI_FLAGS_IN_TREE (1<<0)
181 #define CI_FLAGS_IN_QUEUE (1<<1)
182 int flags;
183 pthread_cond_t flushed;
184 cache_item_t *prev;
185 cache_item_t *next;
186 };
188 struct callback_flush_data_s
189 {
190 time_t now;
191 time_t abs_timeout;
192 char **keys;
193 size_t keys_num;
194 };
195 typedef struct callback_flush_data_s callback_flush_data_t;
197 enum queue_side_e
198 {
199 HEAD,
200 TAIL
201 };
202 typedef enum queue_side_e queue_side_t;
204 /* describe a set of journal files */
205 typedef struct {
206 char **files;
207 size_t files_num;
208 } journal_set;
210 /* max length of socket command or response */
211 #define CMD_MAX 4096
212 #define RBUF_SIZE (CMD_MAX*2)
214 /*
215 * Variables
216 */
217 static int stay_foreground = 0;
218 static uid_t daemon_uid;
220 static listen_socket_t *listen_fds = NULL;
221 static size_t listen_fds_num = 0;
223 enum {
224 RUNNING, /* normal operation */
225 FLUSHING, /* flushing remaining values */
226 SHUTDOWN /* shutting down */
227 } state = RUNNING;
229 static pthread_t *queue_threads;
230 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
231 static int config_queue_threads = 4;
233 static pthread_t flush_thread;
234 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
236 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
237 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
238 static int connection_threads_num = 0;
240 /* Cache stuff */
241 static GTree *cache_tree = NULL;
242 static cache_item_t *cache_queue_head = NULL;
243 static cache_item_t *cache_queue_tail = NULL;
244 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
246 static int config_write_interval = 300;
247 static int config_write_jitter = 0;
248 static int config_flush_interval = 3600;
249 static int config_flush_at_shutdown = 0;
250 static char *config_pid_file = NULL;
251 static char *config_base_dir = NULL;
252 static size_t _config_base_dir_len = 0;
253 static int config_write_base_only = 0;
255 static listen_socket_t **config_listen_address_list = NULL;
256 static size_t config_listen_address_list_len = 0;
258 static uint64_t stats_queue_length = 0;
259 static uint64_t stats_updates_received = 0;
260 static uint64_t stats_flush_received = 0;
261 static uint64_t stats_updates_written = 0;
262 static uint64_t stats_data_sets_written = 0;
263 static uint64_t stats_journal_bytes = 0;
264 static uint64_t stats_journal_rotate = 0;
265 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
267 /* Journaled updates */
268 #define JOURNAL_BASE "rrd.journal"
269 static journal_set *journal_cur = NULL;
270 static journal_set *journal_old = NULL;
271 static char *journal_dir = NULL;
272 static FILE *journal_fh = NULL; /* current journal file handle */
273 static long journal_size = 0; /* current journal size */
274 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
275 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
276 static int journal_write(char *cmd, char *args);
277 static void journal_done(void);
278 static void journal_rotate(void);
280 /* prototypes for forward refernces */
281 static int handle_request_help (HANDLER_PROTO);
283 /*
284 * Functions
285 */
286 static void sig_common (const char *sig) /* {{{ */
287 {
288 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
289 state = FLUSHING;
290 pthread_cond_broadcast(&flush_cond);
291 pthread_cond_broadcast(&queue_cond);
292 } /* }}} void sig_common */
294 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
295 {
296 sig_common("INT");
297 } /* }}} void sig_int_handler */
299 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
300 {
301 sig_common("TERM");
302 } /* }}} void sig_term_handler */
304 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
305 {
306 config_flush_at_shutdown = 1;
307 sig_common("USR1");
308 } /* }}} void sig_usr1_handler */
310 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
311 {
312 config_flush_at_shutdown = 0;
313 sig_common("USR2");
314 } /* }}} void sig_usr2_handler */
316 static void install_signal_handlers(void) /* {{{ */
317 {
318 /* These structures are static, because `sigaction' behaves weird if the are
319 * overwritten.. */
320 static struct sigaction sa_int;
321 static struct sigaction sa_term;
322 static struct sigaction sa_pipe;
323 static struct sigaction sa_usr1;
324 static struct sigaction sa_usr2;
326 /* Install signal handlers */
327 memset (&sa_int, 0, sizeof (sa_int));
328 sa_int.sa_handler = sig_int_handler;
329 sigaction (SIGINT, &sa_int, NULL);
331 memset (&sa_term, 0, sizeof (sa_term));
332 sa_term.sa_handler = sig_term_handler;
333 sigaction (SIGTERM, &sa_term, NULL);
335 memset (&sa_pipe, 0, sizeof (sa_pipe));
336 sa_pipe.sa_handler = SIG_IGN;
337 sigaction (SIGPIPE, &sa_pipe, NULL);
339 memset (&sa_pipe, 0, sizeof (sa_usr1));
340 sa_usr1.sa_handler = sig_usr1_handler;
341 sigaction (SIGUSR1, &sa_usr1, NULL);
343 memset (&sa_usr2, 0, sizeof (sa_usr2));
344 sa_usr2.sa_handler = sig_usr2_handler;
345 sigaction (SIGUSR2, &sa_usr2, NULL);
347 } /* }}} void install_signal_handlers */
349 static int open_pidfile(char *action, int oflag) /* {{{ */
350 {
351 int fd;
352 const char *file;
353 char *file_copy, *dir;
355 file = (config_pid_file != NULL)
356 ? config_pid_file
357 : LOCALSTATEDIR "/run/rrdcached.pid";
359 /* dirname may modify its argument */
360 file_copy = strdup(file);
361 if (file_copy == NULL)
362 {
363 fprintf(stderr, "rrdcached: strdup(): %s\n",
364 rrd_strerror(errno));
365 return -1;
366 }
368 dir = dirname(file_copy);
369 if (rrd_mkdir_p(dir, 0777) != 0)
370 {
371 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
372 dir, rrd_strerror(errno));
373 return -1;
374 }
376 free(file_copy);
378 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
379 if (fd < 0)
380 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
381 action, file, rrd_strerror(errno));
383 return(fd);
384 } /* }}} static int open_pidfile */
386 /* check existing pid file to see whether a daemon is running */
387 static int check_pidfile(void)
388 {
389 int pid_fd;
390 pid_t pid;
391 char pid_str[16];
393 pid_fd = open_pidfile("open", O_RDWR);
394 if (pid_fd < 0)
395 return pid_fd;
397 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
398 return -1;
400 pid = atoi(pid_str);
401 if (pid <= 0)
402 return -1;
404 /* another running process that we can signal COULD be
405 * a competing rrdcached */
406 if (pid != getpid() && kill(pid, 0) == 0)
407 {
408 fprintf(stderr,
409 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
410 close(pid_fd);
411 return -1;
412 }
414 lseek(pid_fd, 0, SEEK_SET);
415 if (ftruncate(pid_fd, 0) == -1)
416 {
417 fprintf(stderr,
418 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
419 close(pid_fd);
420 return -1;
421 }
423 fprintf(stderr,
424 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
425 "rrdcached: starting normally.\n", pid);
427 return pid_fd;
428 } /* }}} static int check_pidfile */
430 static int write_pidfile (int fd) /* {{{ */
431 {
432 pid_t pid;
433 FILE *fh;
435 pid = getpid ();
437 fh = fdopen (fd, "w");
438 if (fh == NULL)
439 {
440 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
441 close(fd);
442 return (-1);
443 }
445 fprintf (fh, "%i\n", (int) pid);
446 fclose (fh);
448 return (0);
449 } /* }}} int write_pidfile */
451 static int remove_pidfile (void) /* {{{ */
452 {
453 char *file;
454 int status;
456 file = (config_pid_file != NULL)
457 ? config_pid_file
458 : LOCALSTATEDIR "/run/rrdcached.pid";
460 status = unlink (file);
461 if (status == 0)
462 return (0);
463 return (errno);
464 } /* }}} int remove_pidfile */
466 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
467 {
468 char *eol;
470 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
471 sock->next_read - sock->next_cmd);
473 if (eol == NULL)
474 {
475 /* no commands left, move remainder back to front of rbuf */
476 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
477 sock->next_read - sock->next_cmd);
478 sock->next_read -= sock->next_cmd;
479 sock->next_cmd = 0;
480 *len = 0;
481 return NULL;
482 }
483 else
484 {
485 char *cmd = sock->rbuf + sock->next_cmd;
486 *eol = '\0';
488 sock->next_cmd = eol - sock->rbuf + 1;
490 if (eol > sock->rbuf && *(eol-1) == '\r')
491 *(--eol) = '\0'; /* handle "\r\n" EOL */
493 *len = eol - cmd;
495 return cmd;
496 }
498 /* NOTREACHED */
499 assert(1==0);
500 } /* }}} char *next_cmd */
502 /* add the characters directly to the write buffer */
503 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
504 {
505 char *new_buf;
507 assert(sock != NULL);
509 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
510 if (new_buf == NULL)
511 {
512 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
513 return -1;
514 }
516 strncpy(new_buf + sock->wbuf_len, str, len + 1);
518 sock->wbuf = new_buf;
519 sock->wbuf_len += len;
521 return 0;
522 } /* }}} static int add_to_wbuf */
524 /* add the text to the "extra" info that's sent after the status line */
525 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
526 {
527 va_list argp;
528 char buffer[CMD_MAX];
529 int len;
531 if (sock == NULL) return 0; /* journal replay mode */
532 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
534 va_start(argp, fmt);
535 #ifdef HAVE_VSNPRINTF
536 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
537 #else
538 len = vsprintf(buffer, fmt, argp);
539 #endif
540 va_end(argp);
541 if (len < 0)
542 {
543 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
544 return -1;
545 }
547 return add_to_wbuf(sock, buffer, len);
548 } /* }}} static int add_response_info */
550 static int count_lines(char *str) /* {{{ */
551 {
552 int lines = 0;
554 if (str != NULL)
555 {
556 while ((str = strchr(str, '\n')) != NULL)
557 {
558 ++lines;
559 ++str;
560 }
561 }
563 return lines;
564 } /* }}} static int count_lines */
566 /* send the response back to the user.
567 * returns 0 on success, -1 on error
568 * write buffer is always zeroed after this call */
569 static int send_response (listen_socket_t *sock, response_code rc,
570 char *fmt, ...) /* {{{ */
571 {
572 va_list argp;
573 char buffer[CMD_MAX];
574 int lines;
575 ssize_t wrote;
576 int rclen, len;
578 if (sock == NULL) return rc; /* journal replay mode */
580 if (sock->batch_start)
581 {
582 if (rc == RESP_OK)
583 return rc; /* no response on success during BATCH */
584 lines = sock->batch_cmd;
585 }
586 else if (rc == RESP_OK)
587 lines = count_lines(sock->wbuf);
588 else
589 lines = -1;
591 rclen = sprintf(buffer, "%d ", lines);
592 va_start(argp, fmt);
593 #ifdef HAVE_VSNPRINTF
594 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
595 #else
596 len = vsprintf(buffer+rclen, fmt, argp);
597 #endif
598 va_end(argp);
599 if (len < 0)
600 return -1;
602 len += rclen;
604 /* append the result to the wbuf, don't write to the user */
605 if (sock->batch_start)
606 return add_to_wbuf(sock, buffer, len);
608 /* first write must be complete */
609 if (len != write(sock->fd, buffer, len))
610 {
611 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
612 return -1;
613 }
615 if (sock->wbuf != NULL && rc == RESP_OK)
616 {
617 wrote = 0;
618 while (wrote < sock->wbuf_len)
619 {
620 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
621 if (wb <= 0)
622 {
623 RRDD_LOG(LOG_INFO, "send_response: could not write results");
624 return -1;
625 }
626 wrote += wb;
627 }
628 }
630 free(sock->wbuf); sock->wbuf = NULL;
631 sock->wbuf_len = 0;
633 return 0;
634 } /* }}} */
636 static void wipe_ci_values(cache_item_t *ci, time_t when)
637 {
638 ci->values = NULL;
639 ci->values_num = 0;
641 ci->last_flush_time = when;
642 if (config_write_jitter > 0)
643 ci->last_flush_time += (rrd_random() % config_write_jitter);
644 }
646 /* remove_from_queue
647 * remove a "cache_item_t" item from the queue.
648 * must hold 'cache_lock' when calling this
649 */
650 static void remove_from_queue(cache_item_t *ci) /* {{{ */
651 {
652 if (ci == NULL) return;
653 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
655 if (ci->prev == NULL)
656 cache_queue_head = ci->next; /* reset head */
657 else
658 ci->prev->next = ci->next;
660 if (ci->next == NULL)
661 cache_queue_tail = ci->prev; /* reset the tail */
662 else
663 ci->next->prev = ci->prev;
665 ci->next = ci->prev = NULL;
666 ci->flags &= ~CI_FLAGS_IN_QUEUE;
668 pthread_mutex_lock (&stats_lock);
669 assert (stats_queue_length > 0);
670 stats_queue_length--;
671 pthread_mutex_unlock (&stats_lock);
673 } /* }}} static void remove_from_queue */
675 /* free the resources associated with the cache_item_t
676 * must hold cache_lock when calling this function
677 */
678 static void *free_cache_item(cache_item_t *ci) /* {{{ */
679 {
680 if (ci == NULL) return NULL;
682 remove_from_queue(ci);
684 for (size_t i=0; i < ci->values_num; i++)
685 free(ci->values[i]);
687 free (ci->values);
688 free (ci->file);
690 /* in case anyone is waiting */
691 pthread_cond_broadcast(&ci->flushed);
692 pthread_cond_destroy(&ci->flushed);
694 free (ci);
696 return NULL;
697 } /* }}} static void *free_cache_item */
699 /*
700 * enqueue_cache_item:
701 * `cache_lock' must be acquired before calling this function!
702 */
703 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
704 queue_side_t side)
705 {
706 if (ci == NULL)
707 return (-1);
709 if (ci->values_num == 0)
710 return (0);
712 if (side == HEAD)
713 {
714 if (cache_queue_head == ci)
715 return 0;
717 /* remove if further down in queue */
718 remove_from_queue(ci);
720 ci->prev = NULL;
721 ci->next = cache_queue_head;
722 if (ci->next != NULL)
723 ci->next->prev = ci;
724 cache_queue_head = ci;
726 if (cache_queue_tail == NULL)
727 cache_queue_tail = cache_queue_head;
728 }
729 else /* (side == TAIL) */
730 {
731 /* We don't move values back in the list.. */
732 if (ci->flags & CI_FLAGS_IN_QUEUE)
733 return (0);
735 assert (ci->next == NULL);
736 assert (ci->prev == NULL);
738 ci->prev = cache_queue_tail;
740 if (cache_queue_tail == NULL)
741 cache_queue_head = ci;
742 else
743 cache_queue_tail->next = ci;
745 cache_queue_tail = ci;
746 }
748 ci->flags |= CI_FLAGS_IN_QUEUE;
750 pthread_cond_signal(&queue_cond);
751 pthread_mutex_lock (&stats_lock);
752 stats_queue_length++;
753 pthread_mutex_unlock (&stats_lock);
755 return (0);
756 } /* }}} int enqueue_cache_item */
758 /*
759 * tree_callback_flush:
760 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
761 * while this is in progress.
762 */
763 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
764 gpointer data)
765 {
766 cache_item_t *ci;
767 callback_flush_data_t *cfd;
769 ci = (cache_item_t *) value;
770 cfd = (callback_flush_data_t *) data;
772 if (ci->flags & CI_FLAGS_IN_QUEUE)
773 return FALSE;
775 if (ci->values_num > 0
776 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
777 {
778 enqueue_cache_item (ci, TAIL);
779 }
780 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
781 && (ci->values_num <= 0))
782 {
783 assert ((char *) key == ci->file);
784 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
785 {
786 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
787 return (FALSE);
788 }
789 }
791 return (FALSE);
792 } /* }}} gboolean tree_callback_flush */
794 static int flush_old_values (int max_age)
795 {
796 callback_flush_data_t cfd;
797 size_t k;
799 memset (&cfd, 0, sizeof (cfd));
800 /* Pass the current time as user data so that we don't need to call
801 * `time' for each node. */
802 cfd.now = time (NULL);
803 cfd.keys = NULL;
804 cfd.keys_num = 0;
806 if (max_age > 0)
807 cfd.abs_timeout = cfd.now - max_age;
808 else
809 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
811 /* `tree_callback_flush' will return the keys of all values that haven't
812 * been touched in the last `config_flush_interval' seconds in `cfd'.
813 * The char*'s in this array point to the same memory as ci->file, so we
814 * don't need to free them separately. */
815 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
817 for (k = 0; k < cfd.keys_num; k++)
818 {
819 /* should never fail, since we have held the cache_lock
820 * the entire time */
821 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
822 }
824 if (cfd.keys != NULL)
825 {
826 free (cfd.keys);
827 cfd.keys = NULL;
828 }
830 return (0);
831 } /* int flush_old_values */
833 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
834 {
835 struct timeval now;
836 struct timespec next_flush;
837 int status;
839 gettimeofday (&now, NULL);
840 next_flush.tv_sec = now.tv_sec + config_flush_interval;
841 next_flush.tv_nsec = 1000 * now.tv_usec;
843 pthread_mutex_lock(&cache_lock);
845 while (state == RUNNING)
846 {
847 gettimeofday (&now, NULL);
848 if ((now.tv_sec > next_flush.tv_sec)
849 || ((now.tv_sec == next_flush.tv_sec)
850 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
851 {
852 RRDD_LOG(LOG_DEBUG, "flushing old values");
854 /* Determine the time of the next cache flush. */
855 next_flush.tv_sec = now.tv_sec + config_flush_interval;
857 /* Flush all values that haven't been written in the last
858 * `config_write_interval' seconds. */
859 flush_old_values (config_write_interval);
861 /* unlock the cache while we rotate so we don't block incoming
862 * updates if the fsync() blocks on disk I/O */
863 pthread_mutex_unlock(&cache_lock);
864 journal_rotate();
865 pthread_mutex_lock(&cache_lock);
866 }
868 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
869 if (status != 0 && status != ETIMEDOUT)
870 {
871 RRDD_LOG (LOG_ERR, "flush_thread_main: "
872 "pthread_cond_timedwait returned %i.", status);
873 }
874 }
876 if (config_flush_at_shutdown)
877 flush_old_values (-1); /* flush everything */
879 state = SHUTDOWN;
881 pthread_mutex_unlock(&cache_lock);
883 return NULL;
884 } /* void *flush_thread_main */
886 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
887 {
888 pthread_mutex_lock (&cache_lock);
890 while (state != SHUTDOWN
891 || (cache_queue_head != NULL && config_flush_at_shutdown))
892 {
893 cache_item_t *ci;
894 char *file;
895 char **values;
896 size_t values_num;
897 int status;
899 /* Now, check if there's something to store away. If not, wait until
900 * something comes in. */
901 if (cache_queue_head == NULL)
902 {
903 status = pthread_cond_wait (&queue_cond, &cache_lock);
904 if ((status != 0) && (status != ETIMEDOUT))
905 {
906 RRDD_LOG (LOG_ERR, "queue_thread_main: "
907 "pthread_cond_wait returned %i.", status);
908 }
909 }
911 /* Check if a value has arrived. This may be NULL if we timed out or there
912 * was an interrupt such as a signal. */
913 if (cache_queue_head == NULL)
914 continue;
916 ci = cache_queue_head;
918 /* copy the relevant parts */
919 file = strdup (ci->file);
920 if (file == NULL)
921 {
922 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
923 continue;
924 }
926 assert(ci->values != NULL);
927 assert(ci->values_num > 0);
929 values = ci->values;
930 values_num = ci->values_num;
932 wipe_ci_values(ci, time(NULL));
933 remove_from_queue(ci);
935 pthread_mutex_unlock (&cache_lock);
937 rrd_clear_error ();
938 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
939 if (status != 0)
940 {
941 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
942 "rrd_update_r (%s) failed with status %i. (%s)",
943 file, status, rrd_get_error());
944 }
946 journal_write("wrote", file);
948 /* Search again in the tree. It's possible someone issued a "FORGET"
949 * while we were writing the update values. */
950 pthread_mutex_lock(&cache_lock);
951 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
952 if (ci)
953 pthread_cond_broadcast(&ci->flushed);
954 pthread_mutex_unlock(&cache_lock);
956 if (status == 0)
957 {
958 pthread_mutex_lock (&stats_lock);
959 stats_updates_written++;
960 stats_data_sets_written += values_num;
961 pthread_mutex_unlock (&stats_lock);
962 }
964 rrd_free_ptrs((void ***) &values, &values_num);
965 free(file);
967 pthread_mutex_lock (&cache_lock);
968 }
969 pthread_mutex_unlock (&cache_lock);
971 return (NULL);
972 } /* }}} void *queue_thread_main */
974 static int buffer_get_field (char **buffer_ret, /* {{{ */
975 size_t *buffer_size_ret, char **field_ret)
976 {
977 char *buffer;
978 size_t buffer_pos;
979 size_t buffer_size;
980 char *field;
981 size_t field_size;
982 int status;
984 buffer = *buffer_ret;
985 buffer_pos = 0;
986 buffer_size = *buffer_size_ret;
987 field = *buffer_ret;
988 field_size = 0;
990 if (buffer_size <= 0)
991 return (-1);
993 /* This is ensured by `handle_request'. */
994 assert (buffer[buffer_size - 1] == '\0');
996 status = -1;
997 while (buffer_pos < buffer_size)
998 {
999 /* Check for end-of-field or end-of-buffer */
1000 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1001 {
1002 field[field_size] = 0;
1003 field_size++;
1004 buffer_pos++;
1005 status = 0;
1006 break;
1007 }
1008 /* Handle escaped characters. */
1009 else if (buffer[buffer_pos] == '\\')
1010 {
1011 if (buffer_pos >= (buffer_size - 1))
1012 break;
1013 buffer_pos++;
1014 field[field_size] = buffer[buffer_pos];
1015 field_size++;
1016 buffer_pos++;
1017 }
1018 /* Normal operation */
1019 else
1020 {
1021 field[field_size] = buffer[buffer_pos];
1022 field_size++;
1023 buffer_pos++;
1024 }
1025 } /* while (buffer_pos < buffer_size) */
1027 if (status != 0)
1028 return (status);
1030 *buffer_ret = buffer + buffer_pos;
1031 *buffer_size_ret = buffer_size - buffer_pos;
1032 *field_ret = field;
1034 return (0);
1035 } /* }}} int buffer_get_field */
1037 /* if we're restricting writes to the base directory,
1038 * check whether the file falls within the dir
1039 * returns 1 if OK, otherwise 0
1040 */
1041 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1042 {
1043 assert(file != NULL);
1045 if (!config_write_base_only
1046 || sock == NULL /* journal replay */
1047 || config_base_dir == NULL)
1048 return 1;
1050 if (strstr(file, "../") != NULL) goto err;
1052 /* relative paths without "../" are ok */
1053 if (*file != '/') return 1;
1055 /* file must be of the format base + "/" + <1+ char filename> */
1056 if (strlen(file) < _config_base_dir_len + 2) goto err;
1057 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1058 if (*(file + _config_base_dir_len) != '/') goto err;
1060 return 1;
1062 err:
1063 if (sock != NULL && sock->fd >= 0)
1064 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1066 return 0;
1067 } /* }}} static int check_file_access */
1069 /* when using a base dir, convert relative paths to absolute paths.
1070 * if necessary, modifies the "filename" pointer to point
1071 * to the new path created in "tmp". "tmp" is provided
1072 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1073 *
1074 * this allows us to optimize for the expected case (absolute path)
1075 * with a no-op.
1076 */
1077 static void get_abs_path(char **filename, char *tmp)
1078 {
1079 assert(tmp != NULL);
1080 assert(filename != NULL && *filename != NULL);
1082 if (config_base_dir == NULL || **filename == '/')
1083 return;
1085 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1086 *filename = tmp;
1087 } /* }}} static int get_abs_path */
1089 static int flush_file (const char *filename) /* {{{ */
1090 {
1091 cache_item_t *ci;
1093 pthread_mutex_lock (&cache_lock);
1095 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1096 if (ci == NULL)
1097 {
1098 pthread_mutex_unlock (&cache_lock);
1099 return (ENOENT);
1100 }
1102 if (ci->values_num > 0)
1103 {
1104 /* Enqueue at head */
1105 enqueue_cache_item (ci, HEAD);
1106 pthread_cond_wait(&ci->flushed, &cache_lock);
1107 }
1109 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1110 * may have been purged during our cond_wait() */
1112 pthread_mutex_unlock(&cache_lock);
1114 return (0);
1115 } /* }}} int flush_file */
1117 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1118 {
1119 char *err = "Syntax error.\n";
1121 if (cmd && cmd->syntax)
1122 err = cmd->syntax;
1124 return send_response(sock, RESP_ERR, "Usage: %s", err);
1125 } /* }}} static int syntax_error() */
1127 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1128 {
1129 uint64_t copy_queue_length;
1130 uint64_t copy_updates_received;
1131 uint64_t copy_flush_received;
1132 uint64_t copy_updates_written;
1133 uint64_t copy_data_sets_written;
1134 uint64_t copy_journal_bytes;
1135 uint64_t copy_journal_rotate;
1137 uint64_t tree_nodes_number;
1138 uint64_t tree_depth;
1140 pthread_mutex_lock (&stats_lock);
1141 copy_queue_length = stats_queue_length;
1142 copy_updates_received = stats_updates_received;
1143 copy_flush_received = stats_flush_received;
1144 copy_updates_written = stats_updates_written;
1145 copy_data_sets_written = stats_data_sets_written;
1146 copy_journal_bytes = stats_journal_bytes;
1147 copy_journal_rotate = stats_journal_rotate;
1148 pthread_mutex_unlock (&stats_lock);
1150 pthread_mutex_lock (&cache_lock);
1151 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1152 tree_depth = (uint64_t) g_tree_height (cache_tree);
1153 pthread_mutex_unlock (&cache_lock);
1155 add_response_info(sock,
1156 "QueueLength: %"PRIu64"\n", copy_queue_length);
1157 add_response_info(sock,
1158 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1159 add_response_info(sock,
1160 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1161 add_response_info(sock,
1162 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1163 add_response_info(sock,
1164 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1165 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1166 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1167 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1168 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1170 send_response(sock, RESP_OK, "Statistics follow\n");
1172 return (0);
1173 } /* }}} int handle_request_stats */
1175 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1176 {
1177 char *file, file_tmp[PATH_MAX];
1178 int status;
1180 status = buffer_get_field (&buffer, &buffer_size, &file);
1181 if (status != 0)
1182 {
1183 return syntax_error(sock,cmd);
1184 }
1185 else
1186 {
1187 pthread_mutex_lock(&stats_lock);
1188 stats_flush_received++;
1189 pthread_mutex_unlock(&stats_lock);
1191 get_abs_path(&file, file_tmp);
1192 if (!check_file_access(file, sock)) return 0;
1194 status = flush_file (file);
1195 if (status == 0)
1196 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1197 else if (status == ENOENT)
1198 {
1199 /* no file in our tree; see whether it exists at all */
1200 struct stat statbuf;
1202 memset(&statbuf, 0, sizeof(statbuf));
1203 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1204 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1205 else
1206 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1207 }
1208 else if (status < 0)
1209 return send_response(sock, RESP_ERR, "Internal error.\n");
1210 else
1211 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1212 }
1214 /* NOTREACHED */
1215 assert(1==0);
1216 } /* }}} int handle_request_flush */
1218 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1219 {
1220 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1222 pthread_mutex_lock(&cache_lock);
1223 flush_old_values(-1);
1224 pthread_mutex_unlock(&cache_lock);
1226 return send_response(sock, RESP_OK, "Started flush.\n");
1227 } /* }}} static int handle_request_flushall */
1229 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1230 {
1231 int status;
1232 char *file, file_tmp[PATH_MAX];
1233 cache_item_t *ci;
1235 status = buffer_get_field(&buffer, &buffer_size, &file);
1236 if (status != 0)
1237 return syntax_error(sock,cmd);
1239 get_abs_path(&file, file_tmp);
1241 pthread_mutex_lock(&cache_lock);
1242 ci = g_tree_lookup(cache_tree, file);
1243 if (ci == NULL)
1244 {
1245 pthread_mutex_unlock(&cache_lock);
1246 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1247 }
1249 for (size_t i=0; i < ci->values_num; i++)
1250 add_response_info(sock, "%s\n", ci->values[i]);
1252 pthread_mutex_unlock(&cache_lock);
1253 return send_response(sock, RESP_OK, "updates pending\n");
1254 } /* }}} static int handle_request_pending */
1256 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1257 {
1258 int status;
1259 gboolean found;
1260 char *file, file_tmp[PATH_MAX];
1262 status = buffer_get_field(&buffer, &buffer_size, &file);
1263 if (status != 0)
1264 return syntax_error(sock,cmd);
1266 get_abs_path(&file, file_tmp);
1267 if (!check_file_access(file, sock)) return 0;
1269 pthread_mutex_lock(&cache_lock);
1270 found = g_tree_remove(cache_tree, file);
1271 pthread_mutex_unlock(&cache_lock);
1273 if (found == TRUE)
1274 {
1275 if (sock != NULL)
1276 journal_write("forget", file);
1278 return send_response(sock, RESP_OK, "Gone!\n");
1279 }
1280 else
1281 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1283 /* NOTREACHED */
1284 assert(1==0);
1285 } /* }}} static int handle_request_forget */
1287 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1288 {
1289 cache_item_t *ci;
1291 pthread_mutex_lock(&cache_lock);
1293 ci = cache_queue_head;
1294 while (ci != NULL)
1295 {
1296 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1297 ci = ci->next;
1298 }
1300 pthread_mutex_unlock(&cache_lock);
1302 return send_response(sock, RESP_OK, "in queue.\n");
1303 } /* }}} int handle_request_queue */
1305 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1306 {
1307 char *file, file_tmp[PATH_MAX];
1308 int values_num = 0;
1309 int status;
1310 char orig_buf[CMD_MAX];
1312 cache_item_t *ci;
1314 /* save it for the journal later */
1315 if (sock != NULL)
1316 strncpy(orig_buf, buffer, buffer_size);
1318 status = buffer_get_field (&buffer, &buffer_size, &file);
1319 if (status != 0)
1320 return syntax_error(sock,cmd);
1322 pthread_mutex_lock(&stats_lock);
1323 stats_updates_received++;
1324 pthread_mutex_unlock(&stats_lock);
1326 get_abs_path(&file, file_tmp);
1327 if (!check_file_access(file, sock)) return 0;
1329 pthread_mutex_lock (&cache_lock);
1330 ci = g_tree_lookup (cache_tree, file);
1332 if (ci == NULL) /* {{{ */
1333 {
1334 struct stat statbuf;
1335 cache_item_t *tmp;
1337 /* don't hold the lock while we setup; stat(2) might block */
1338 pthread_mutex_unlock(&cache_lock);
1340 memset (&statbuf, 0, sizeof (statbuf));
1341 status = stat (file, &statbuf);
1342 if (status != 0)
1343 {
1344 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1346 status = errno;
1347 if (status == ENOENT)
1348 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1349 else
1350 return send_response(sock, RESP_ERR,
1351 "stat failed with error %i.\n", status);
1352 }
1353 if (!S_ISREG (statbuf.st_mode))
1354 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1356 if (access(file, R_OK|W_OK) != 0)
1357 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1358 file, rrd_strerror(errno));
1360 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1361 if (ci == NULL)
1362 {
1363 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1365 return send_response(sock, RESP_ERR, "malloc failed.\n");
1366 }
1367 memset (ci, 0, sizeof (cache_item_t));
1369 ci->file = strdup (file);
1370 if (ci->file == NULL)
1371 {
1372 free (ci);
1373 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1375 return send_response(sock, RESP_ERR, "strdup failed.\n");
1376 }
1378 wipe_ci_values(ci, now);
1379 ci->flags = CI_FLAGS_IN_TREE;
1380 pthread_cond_init(&ci->flushed, NULL);
1382 pthread_mutex_lock(&cache_lock);
1384 /* another UPDATE might have added this entry in the meantime */
1385 tmp = g_tree_lookup (cache_tree, file);
1386 if (tmp == NULL)
1387 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1388 else
1389 {
1390 free_cache_item (ci);
1391 ci = tmp;
1392 }
1394 /* state may have changed while we were unlocked */
1395 if (state == SHUTDOWN)
1396 return -1;
1397 } /* }}} */
1398 assert (ci != NULL);
1400 /* don't re-write updates in replay mode */
1401 if (sock != NULL)
1402 journal_write("update", orig_buf);
1404 while (buffer_size > 0)
1405 {
1406 char *value;
1407 time_t stamp;
1408 char *eostamp;
1410 status = buffer_get_field (&buffer, &buffer_size, &value);
1411 if (status != 0)
1412 {
1413 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1414 break;
1415 }
1417 /* make sure update time is always moving forward */
1418 stamp = strtol(value, &eostamp, 10);
1419 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1420 {
1421 pthread_mutex_unlock(&cache_lock);
1422 return send_response(sock, RESP_ERR,
1423 "Cannot find timestamp in '%s'!\n", value);
1424 }
1425 else if (stamp <= ci->last_update_stamp)
1426 {
1427 pthread_mutex_unlock(&cache_lock);
1428 return send_response(sock, RESP_ERR,
1429 "illegal attempt to update using time %ld when last"
1430 " update time is %ld (minimum one second step)\n",
1431 stamp, ci->last_update_stamp);
1432 }
1433 else
1434 ci->last_update_stamp = stamp;
1436 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1437 {
1438 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1439 continue;
1440 }
1442 values_num++;
1443 }
1445 if (((now - ci->last_flush_time) >= config_write_interval)
1446 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1447 && (ci->values_num > 0))
1448 {
1449 enqueue_cache_item (ci, TAIL);
1450 }
1452 pthread_mutex_unlock (&cache_lock);
1454 if (values_num < 1)
1455 return send_response(sock, RESP_ERR, "No values updated.\n");
1456 else
1457 return send_response(sock, RESP_OK,
1458 "errors, enqueued %i value(s).\n", values_num);
1460 /* NOTREACHED */
1461 assert(1==0);
1463 } /* }}} int handle_request_update */
1465 /* we came across a "WROTE" entry during journal replay.
1466 * throw away any values that we have accumulated for this file
1467 */
1468 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1469 {
1470 cache_item_t *ci;
1471 const char *file = buffer;
1473 pthread_mutex_lock(&cache_lock);
1475 ci = g_tree_lookup(cache_tree, file);
1476 if (ci == NULL)
1477 {
1478 pthread_mutex_unlock(&cache_lock);
1479 return (0);
1480 }
1482 if (ci->values)
1483 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1485 wipe_ci_values(ci, now);
1486 remove_from_queue(ci);
1488 pthread_mutex_unlock(&cache_lock);
1489 return (0);
1490 } /* }}} int handle_request_wrote */
1492 /* start "BATCH" processing */
1493 static int batch_start (HANDLER_PROTO) /* {{{ */
1494 {
1495 int status;
1496 if (sock->batch_start)
1497 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1499 status = send_response(sock, RESP_OK,
1500 "Go ahead. End with dot '.' on its own line.\n");
1501 sock->batch_start = time(NULL);
1502 sock->batch_cmd = 0;
1504 return status;
1505 } /* }}} static int batch_start */
1507 /* finish "BATCH" processing and return results to the client */
1508 static int batch_done (HANDLER_PROTO) /* {{{ */
1509 {
1510 assert(sock->batch_start);
1511 sock->batch_start = 0;
1512 sock->batch_cmd = 0;
1513 return send_response(sock, RESP_OK, "errors\n");
1514 } /* }}} static int batch_done */
1516 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1517 {
1518 return -1;
1519 } /* }}} static int handle_request_quit */
1521 static command_t list_of_commands[] = { /* {{{ */
1522 {
1523 "UPDATE",
1524 handle_request_update,
1525 CMD_CONTEXT_ANY,
1526 "UPDATE <filename> <values> [<values> ...]\n"
1527 ,
1528 "Adds the given file to the internal cache if it is not yet known and\n"
1529 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1530 "for details.\n"
1531 "\n"
1532 "Each <values> has the following form:\n"
1533 " <values> = <time>:<value>[:<value>[...]]\n"
1534 "See the rrdupdate(1) manpage for details.\n"
1535 },
1536 {
1537 "WROTE",
1538 handle_request_wrote,
1539 CMD_CONTEXT_JOURNAL,
1540 NULL,
1541 NULL
1542 },
1543 {
1544 "FLUSH",
1545 handle_request_flush,
1546 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1547 "FLUSH <filename>\n"
1548 ,
1549 "Adds the given filename to the head of the update queue and returns\n"
1550 "after it has been dequeued.\n"
1551 },
1552 {
1553 "FLUSHALL",
1554 handle_request_flushall,
1555 CMD_CONTEXT_CLIENT,
1556 "FLUSHALL\n"
1557 ,
1558 "Triggers writing of all pending updates. Returns immediately.\n"
1559 },
1560 {
1561 "PENDING",
1562 handle_request_pending,
1563 CMD_CONTEXT_CLIENT,
1564 "PENDING <filename>\n"
1565 ,
1566 "Shows any 'pending' updates for a file, in order.\n"
1567 "The updates shown have not yet been written to the underlying RRD file.\n"
1568 },
1569 {
1570 "FORGET",
1571 handle_request_forget,
1572 CMD_CONTEXT_ANY,
1573 "FORGET <filename>\n"
1574 ,
1575 "Removes the file completely from the cache.\n"
1576 "Any pending updates for the file will be lost.\n"
1577 },
1578 {
1579 "QUEUE",
1580 handle_request_queue,
1581 CMD_CONTEXT_CLIENT,
1582 "QUEUE\n"
1583 ,
1584 "Shows all files in the output queue.\n"
1585 "The output is zero or more lines in the following format:\n"
1586 "(where <num_vals> is the number of values to be written)\n"
1587 "\n"
1588 "<num_vals> <filename>\n"
1589 },
1590 {
1591 "STATS",
1592 handle_request_stats,
1593 CMD_CONTEXT_CLIENT,
1594 "STATS\n"
1595 ,
1596 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1597 "a description of the values.\n"
1598 },
1599 {
1600 "HELP",
1601 handle_request_help,
1602 CMD_CONTEXT_CLIENT,
1603 "HELP [<command>]\n",
1604 NULL, /* special! */
1605 },
1606 {
1607 "BATCH",
1608 batch_start,
1609 CMD_CONTEXT_CLIENT,
1610 "BATCH\n"
1611 ,
1612 "The 'BATCH' command permits the client to initiate a bulk load\n"
1613 " of commands to rrdcached.\n"
1614 "\n"
1615 "Usage:\n"
1616 "\n"
1617 " client: BATCH\n"
1618 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1619 " client: command #1\n"
1620 " client: command #2\n"
1621 " client: ... and so on\n"
1622 " client: .\n"
1623 " server: 2 errors\n"
1624 " server: 7 message for command #7\n"
1625 " server: 9 message for command #9\n"
1626 "\n"
1627 "For more information, consult the rrdcached(1) documentation.\n"
1628 },
1629 {
1630 ".", /* BATCH terminator */
1631 batch_done,
1632 CMD_CONTEXT_BATCH,
1633 NULL,
1634 NULL
1635 },
1636 {
1637 "QUIT",
1638 handle_request_quit,
1639 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1640 "QUIT\n"
1641 ,
1642 "Disconnect from rrdcached.\n"
1643 }
1644 }; /* }}} command_t list_of_commands[] */
1645 static size_t list_of_commands_len = sizeof (list_of_commands)
1646 / sizeof (list_of_commands[0]);
1648 static command_t *find_command(char *cmd)
1649 {
1650 size_t i;
1652 for (i = 0; i < list_of_commands_len; i++)
1653 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1654 return (&list_of_commands[i]);
1655 return NULL;
1656 }
1658 /* We currently use the index in the `list_of_commands' array as a bit position
1659 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1660 * outside these functions so that switching to a more elegant storage method
1661 * is easily possible. */
1662 static ssize_t find_command_index (const char *cmd) /* {{{ */
1663 {
1664 size_t i;
1666 for (i = 0; i < list_of_commands_len; i++)
1667 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1668 return ((ssize_t) i);
1669 return (-1);
1670 } /* }}} ssize_t find_command_index */
1672 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1673 const char *cmd)
1674 {
1675 ssize_t i;
1677 if (sock == NULL) /* journal replay */
1678 return (1);
1680 if (cmd == NULL)
1681 return (-1);
1683 if ((strcasecmp ("QUIT", cmd) == 0)
1684 || (strcasecmp ("HELP", cmd) == 0))
1685 return (1);
1686 else if (strcmp (".", cmd) == 0)
1687 cmd = "BATCH";
1689 i = find_command_index (cmd);
1690 if (i < 0)
1691 return (-1);
1692 assert (i < 32);
1694 if ((sock->permissions & (1 << i)) != 0)
1695 return (1);
1696 return (0);
1697 } /* }}} int socket_permission_check */
1699 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1700 const char *cmd)
1701 {
1702 ssize_t i;
1704 i = find_command_index (cmd);
1705 if (i < 0)
1706 return (-1);
1707 assert (i < 32);
1709 sock->permissions |= (1 << i);
1710 return (0);
1711 } /* }}} int socket_permission_add */
1713 /* check whether commands are received in the expected context */
1714 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1715 {
1716 if (sock == NULL)
1717 return (cmd->context & CMD_CONTEXT_JOURNAL);
1718 else if (sock->batch_start)
1719 return (cmd->context & CMD_CONTEXT_BATCH);
1720 else
1721 return (cmd->context & CMD_CONTEXT_CLIENT);
1723 /* NOTREACHED */
1724 assert(1==0);
1725 }
1727 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1728 {
1729 int status;
1730 char *cmd_str;
1731 char *resp_txt;
1732 command_t *help = NULL;
1734 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1735 if (status == 0)
1736 help = find_command(cmd_str);
1738 if (help && (help->syntax || help->help))
1739 {
1740 char tmp[CMD_MAX];
1742 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1743 resp_txt = tmp;
1745 if (help->syntax)
1746 add_response_info(sock, "Usage: %s\n", help->syntax);
1748 if (help->help)
1749 add_response_info(sock, "%s\n", help->help);
1750 }
1751 else
1752 {
1753 size_t i;
1755 resp_txt = "Command overview\n";
1757 for (i = 0; i < list_of_commands_len; i++)
1758 {
1759 if (list_of_commands[i].syntax == NULL)
1760 continue;
1761 add_response_info (sock, "%s", list_of_commands[i].syntax);
1762 }
1763 }
1765 return send_response(sock, RESP_OK, resp_txt);
1766 } /* }}} int handle_request_help */
1768 /* if sock==NULL, we are in journal replay mode */
1769 static int handle_request (DISPATCH_PROTO) /* {{{ */
1770 {
1771 char *buffer_ptr = buffer;
1772 char *cmd_str = NULL;
1773 command_t *cmd = NULL;
1774 int status;
1776 assert (buffer[buffer_size - 1] == '\0');
1778 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1779 if (status != 0)
1780 {
1781 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1782 return (-1);
1783 }
1785 if (sock != NULL && sock->batch_start)
1786 sock->batch_cmd++;
1788 cmd = find_command(cmd_str);
1789 if (!cmd)
1790 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1792 if (!socket_permission_check (sock, cmd->cmd))
1793 return send_response(sock, RESP_ERR, "Permission denied.\n");
1795 if (!command_check_context(sock, cmd))
1796 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1798 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1799 } /* }}} int handle_request */
1801 static void journal_set_free (journal_set *js) /* {{{ */
1802 {
1803 if (js == NULL)
1804 return;
1806 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1808 free(js);
1809 } /* }}} journal_set_free */
1811 static void journal_set_remove (journal_set *js) /* {{{ */
1812 {
1813 if (js == NULL)
1814 return;
1816 for (uint i=0; i < js->files_num; i++)
1817 {
1818 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1819 unlink(js->files[i]);
1820 }
1821 } /* }}} journal_set_remove */
1823 /* close current journal file handle.
1824 * MUST hold journal_lock before calling */
1825 static void journal_close(void) /* {{{ */
1826 {
1827 if (journal_fh != NULL)
1828 {
1829 if (fclose(journal_fh) != 0)
1830 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1831 }
1833 journal_fh = NULL;
1834 journal_size = 0;
1835 } /* }}} journal_close */
1837 /* MUST hold journal_lock before calling */
1838 static void journal_new_file(void) /* {{{ */
1839 {
1840 struct timeval now;
1841 int new_fd;
1842 char new_file[PATH_MAX + 1];
1844 assert(journal_dir != NULL);
1845 assert(journal_cur != NULL);
1847 journal_close();
1849 gettimeofday(&now, NULL);
1850 /* this format assures that the files sort in strcmp() order */
1851 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1852 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1854 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1855 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1856 if (new_fd < 0)
1857 goto error;
1859 journal_fh = fdopen(new_fd, "a");
1860 if (journal_fh == NULL)
1861 goto error;
1863 journal_size = ftell(journal_fh);
1864 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1866 /* record the file in the journal set */
1867 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1869 return;
1871 error:
1872 RRDD_LOG(LOG_CRIT,
1873 "JOURNALING DISABLED: Error while trying to create %s : %s",
1874 new_file, rrd_strerror(errno));
1875 RRDD_LOG(LOG_CRIT,
1876 "JOURNALING DISABLED: All values will be flushed at shutdown");
1878 close(new_fd);
1879 config_flush_at_shutdown = 1;
1881 } /* }}} journal_new_file */
1883 /* MUST NOT hold journal_lock before calling this */
1884 static void journal_rotate(void) /* {{{ */
1885 {
1886 journal_set *old_js = NULL;
1888 if (journal_dir == NULL)
1889 return;
1891 RRDD_LOG(LOG_DEBUG, "rotating journals");
1893 pthread_mutex_lock(&stats_lock);
1894 ++stats_journal_rotate;
1895 pthread_mutex_unlock(&stats_lock);
1897 pthread_mutex_lock(&journal_lock);
1899 journal_close();
1901 /* rotate the journal sets */
1902 old_js = journal_old;
1903 journal_old = journal_cur;
1904 journal_cur = calloc(1, sizeof(journal_set));
1906 if (journal_cur != NULL)
1907 journal_new_file();
1908 else
1909 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1911 pthread_mutex_unlock(&journal_lock);
1913 journal_set_remove(old_js);
1914 journal_set_free (old_js);
1916 } /* }}} static void journal_rotate */
1918 /* MUST hold journal_lock when calling */
1919 static void journal_done(void) /* {{{ */
1920 {
1921 if (journal_cur == NULL)
1922 return;
1924 journal_close();
1926 if (config_flush_at_shutdown)
1927 {
1928 RRDD_LOG(LOG_INFO, "removing journals");
1929 journal_set_remove(journal_old);
1930 journal_set_remove(journal_cur);
1931 }
1932 else
1933 {
1934 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1935 "journals will be used at next startup");
1936 }
1938 journal_set_free(journal_cur);
1939 journal_set_free(journal_old);
1940 free(journal_dir);
1942 } /* }}} static void journal_done */
1944 static int journal_write(char *cmd, char *args) /* {{{ */
1945 {
1946 int chars;
1948 if (journal_fh == NULL)
1949 return 0;
1951 pthread_mutex_lock(&journal_lock);
1952 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1953 journal_size += chars;
1955 if (journal_size > JOURNAL_MAX)
1956 journal_new_file();
1958 pthread_mutex_unlock(&journal_lock);
1960 if (chars > 0)
1961 {
1962 pthread_mutex_lock(&stats_lock);
1963 stats_journal_bytes += chars;
1964 pthread_mutex_unlock(&stats_lock);
1965 }
1967 return chars;
1968 } /* }}} static int journal_write */
1970 static int journal_replay (const char *file) /* {{{ */
1971 {
1972 FILE *fh;
1973 int entry_cnt = 0;
1974 int fail_cnt = 0;
1975 uint64_t line = 0;
1976 char entry[CMD_MAX];
1977 time_t now;
1979 if (file == NULL) return 0;
1981 {
1982 char *reason = "unknown error";
1983 int status = 0;
1984 struct stat statbuf;
1986 memset(&statbuf, 0, sizeof(statbuf));
1987 if (stat(file, &statbuf) != 0)
1988 {
1989 reason = "stat error";
1990 status = errno;
1991 }
1992 else if (!S_ISREG(statbuf.st_mode))
1993 {
1994 reason = "not a regular file";
1995 status = EPERM;
1996 }
1997 if (statbuf.st_uid != daemon_uid)
1998 {
1999 reason = "not owned by daemon user";
2000 status = EACCES;
2001 }
2002 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2003 {
2004 reason = "must not be user/group writable";
2005 status = EACCES;
2006 }
2008 if (status != 0)
2009 {
2010 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2011 file, rrd_strerror(status), reason);
2012 return 0;
2013 }
2014 }
2016 fh = fopen(file, "r");
2017 if (fh == NULL)
2018 {
2019 if (errno != ENOENT)
2020 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2021 file, rrd_strerror(errno));
2022 return 0;
2023 }
2024 else
2025 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2027 now = time(NULL);
2029 while(!feof(fh))
2030 {
2031 size_t entry_len;
2033 ++line;
2034 if (fgets(entry, sizeof(entry), fh) == NULL)
2035 break;
2036 entry_len = strlen(entry);
2038 /* check \n termination in case journal writing crashed mid-line */
2039 if (entry_len == 0)
2040 continue;
2041 else if (entry[entry_len - 1] != '\n')
2042 {
2043 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2044 ++fail_cnt;
2045 continue;
2046 }
2048 entry[entry_len - 1] = '\0';
2050 if (handle_request(NULL, now, entry, entry_len) == 0)
2051 ++entry_cnt;
2052 else
2053 ++fail_cnt;
2054 }
2056 fclose(fh);
2058 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2059 entry_cnt, fail_cnt);
2061 return entry_cnt > 0 ? 1 : 0;
2062 } /* }}} static int journal_replay */
2064 static int journal_sort(const void *v1, const void *v2)
2065 {
2066 char **jn1 = (char **) v1;
2067 char **jn2 = (char **) v2;
2069 return strcmp(*jn1,*jn2);
2070 }
2072 static void journal_init(void) /* {{{ */
2073 {
2074 int had_journal = 0;
2075 DIR *dir;
2076 struct dirent *dent;
2077 char path[PATH_MAX+1];
2079 if (journal_dir == NULL) return;
2081 pthread_mutex_lock(&journal_lock);
2083 journal_cur = calloc(1, sizeof(journal_set));
2084 if (journal_cur == NULL)
2085 {
2086 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2087 return;
2088 }
2090 RRDD_LOG(LOG_INFO, "checking for journal files");
2092 /* Handle old journal files during transition. This gives them the
2093 * correct sort order. TODO: remove after first release
2094 */
2095 {
2096 char old_path[PATH_MAX+1];
2097 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2098 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2099 rename(old_path, path);
2101 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2102 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2103 rename(old_path, path);
2104 }
2106 dir = opendir(journal_dir);
2107 while ((dent = readdir(dir)) != NULL)
2108 {
2109 /* looks like a journal file? */
2110 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2111 continue;
2113 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2115 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2116 {
2117 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2118 dent->d_name);
2119 break;
2120 }
2121 }
2122 closedir(dir);
2124 qsort(journal_cur->files, journal_cur->files_num,
2125 sizeof(journal_cur->files[0]), journal_sort);
2127 for (uint i=0; i < journal_cur->files_num; i++)
2128 had_journal += journal_replay(journal_cur->files[i]);
2130 journal_new_file();
2132 /* it must have been a crash. start a flush */
2133 if (had_journal && config_flush_at_shutdown)
2134 flush_old_values(-1);
2136 pthread_mutex_unlock(&journal_lock);
2138 RRDD_LOG(LOG_INFO, "journal processing complete");
2140 } /* }}} static void journal_init */
2142 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2143 {
2144 assert(sock != NULL);
2146 free(sock->rbuf); sock->rbuf = NULL;
2147 free(sock->wbuf); sock->wbuf = NULL;
2148 free(sock);
2149 } /* }}} void free_listen_socket */
2151 static void close_connection(listen_socket_t *sock) /* {{{ */
2152 {
2153 if (sock->fd >= 0)
2154 {
2155 close(sock->fd);
2156 sock->fd = -1;
2157 }
2159 free_listen_socket(sock);
2161 } /* }}} void close_connection */
2163 static void *connection_thread_main (void *args) /* {{{ */
2164 {
2165 listen_socket_t *sock;
2166 int fd;
2168 sock = (listen_socket_t *) args;
2169 fd = sock->fd;
2171 /* init read buffers */
2172 sock->next_read = sock->next_cmd = 0;
2173 sock->rbuf = malloc(RBUF_SIZE);
2174 if (sock->rbuf == NULL)
2175 {
2176 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2177 close_connection(sock);
2178 return NULL;
2179 }
2181 pthread_mutex_lock (&connection_threads_lock);
2182 connection_threads_num++;
2183 pthread_mutex_unlock (&connection_threads_lock);
2185 while (state == RUNNING)
2186 {
2187 char *cmd;
2188 ssize_t cmd_len;
2189 ssize_t rbytes;
2190 time_t now;
2192 struct pollfd pollfd;
2193 int status;
2195 pollfd.fd = fd;
2196 pollfd.events = POLLIN | POLLPRI;
2197 pollfd.revents = 0;
2199 status = poll (&pollfd, 1, /* timeout = */ 500);
2200 if (state != RUNNING)
2201 break;
2202 else if (status == 0) /* timeout */
2203 continue;
2204 else if (status < 0) /* error */
2205 {
2206 status = errno;
2207 if (status != EINTR)
2208 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2209 continue;
2210 }
2212 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2213 break;
2214 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2215 {
2216 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2217 "poll(2) returned something unexpected: %#04hx",
2218 pollfd.revents);
2219 break;
2220 }
2222 rbytes = read(fd, sock->rbuf + sock->next_read,
2223 RBUF_SIZE - sock->next_read);
2224 if (rbytes < 0)
2225 {
2226 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2227 break;
2228 }
2229 else if (rbytes == 0)
2230 break; /* eof */
2232 sock->next_read += rbytes;
2234 if (sock->batch_start)
2235 now = sock->batch_start;
2236 else
2237 now = time(NULL);
2239 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2240 {
2241 status = handle_request (sock, now, cmd, cmd_len+1);
2242 if (status != 0)
2243 goto out_close;
2244 }
2245 }
2247 out_close:
2248 close_connection(sock);
2250 /* Remove this thread from the connection threads list */
2251 pthread_mutex_lock (&connection_threads_lock);
2252 connection_threads_num--;
2253 if (connection_threads_num <= 0)
2254 pthread_cond_broadcast(&connection_threads_done);
2255 pthread_mutex_unlock (&connection_threads_lock);
2257 return (NULL);
2258 } /* }}} void *connection_thread_main */
2260 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2261 {
2262 int fd;
2263 struct sockaddr_un sa;
2264 listen_socket_t *temp;
2265 int status;
2266 const char *path;
2267 char *path_copy, *dir;
2269 path = sock->addr;
2270 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2271 path += strlen("unix:");
2273 /* dirname may modify its argument */
2274 path_copy = strdup(path);
2275 if (path_copy == NULL)
2276 {
2277 fprintf(stderr, "rrdcached: strdup(): %s\n",
2278 rrd_strerror(errno));
2279 return (-1);
2280 }
2282 dir = dirname(path_copy);
2283 if (rrd_mkdir_p(dir, 0777) != 0)
2284 {
2285 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2286 dir, rrd_strerror(errno));
2287 return (-1);
2288 }
2290 free(path_copy);
2292 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2293 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2294 if (temp == NULL)
2295 {
2296 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2297 return (-1);
2298 }
2299 listen_fds = temp;
2300 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2302 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2303 if (fd < 0)
2304 {
2305 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2306 rrd_strerror(errno));
2307 return (-1);
2308 }
2310 memset (&sa, 0, sizeof (sa));
2311 sa.sun_family = AF_UNIX;
2312 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2314 /* if we've gotten this far, we own the pid file. any daemon started
2315 * with the same args must not be alive. therefore, ensure that we can
2316 * create the socket...
2317 */
2318 unlink(path);
2320 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2321 if (status != 0)
2322 {
2323 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2324 path, rrd_strerror(errno));
2325 close (fd);
2326 return (-1);
2327 }
2329 status = listen (fd, /* backlog = */ 10);
2330 if (status != 0)
2331 {
2332 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2333 path, rrd_strerror(errno));
2334 close (fd);
2335 unlink (path);
2336 return (-1);
2337 }
2339 listen_fds[listen_fds_num].fd = fd;
2340 listen_fds[listen_fds_num].family = PF_UNIX;
2341 strncpy(listen_fds[listen_fds_num].addr, path,
2342 sizeof (listen_fds[listen_fds_num].addr) - 1);
2343 listen_fds_num++;
2345 return (0);
2346 } /* }}} int open_listen_socket_unix */
2348 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2349 {
2350 struct addrinfo ai_hints;
2351 struct addrinfo *ai_res;
2352 struct addrinfo *ai_ptr;
2353 char addr_copy[NI_MAXHOST];
2354 char *addr;
2355 char *port;
2356 int status;
2358 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2359 addr_copy[sizeof (addr_copy) - 1] = 0;
2360 addr = addr_copy;
2362 memset (&ai_hints, 0, sizeof (ai_hints));
2363 ai_hints.ai_flags = 0;
2364 #ifdef AI_ADDRCONFIG
2365 ai_hints.ai_flags |= AI_ADDRCONFIG;
2366 #endif
2367 ai_hints.ai_family = AF_UNSPEC;
2368 ai_hints.ai_socktype = SOCK_STREAM;
2370 port = NULL;
2371 if (*addr == '[') /* IPv6+port format */
2372 {
2373 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2374 addr++;
2376 port = strchr (addr, ']');
2377 if (port == NULL)
2378 {
2379 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2380 return (-1);
2381 }
2382 *port = 0;
2383 port++;
2385 if (*port == ':')
2386 port++;
2387 else if (*port == 0)
2388 port = NULL;
2389 else
2390 {
2391 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2392 return (-1);
2393 }
2394 } /* if (*addr == '[') */
2395 else
2396 {
2397 port = rindex(addr, ':');
2398 if (port != NULL)
2399 {
2400 *port = 0;
2401 port++;
2402 }
2403 }
2404 ai_res = NULL;
2405 status = getaddrinfo (addr,
2406 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2407 &ai_hints, &ai_res);
2408 if (status != 0)
2409 {
2410 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2411 addr, gai_strerror (status));
2412 return (-1);
2413 }
2415 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2416 {
2417 int fd;
2418 listen_socket_t *temp;
2419 int one = 1;
2421 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2422 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2423 if (temp == NULL)
2424 {
2425 fprintf (stderr,
2426 "rrdcached: open_listen_socket_network: realloc failed.\n");
2427 continue;
2428 }
2429 listen_fds = temp;
2430 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2432 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2433 if (fd < 0)
2434 {
2435 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2436 rrd_strerror(errno));
2437 continue;
2438 }
2440 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2442 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2443 if (status != 0)
2444 {
2445 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2446 sock->addr, rrd_strerror(errno));
2447 close (fd);
2448 continue;
2449 }
2451 status = listen (fd, /* backlog = */ 10);
2452 if (status != 0)
2453 {
2454 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2455 sock->addr, rrd_strerror(errno));
2456 close (fd);
2457 freeaddrinfo(ai_res);
2458 return (-1);
2459 }
2461 listen_fds[listen_fds_num].fd = fd;
2462 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2463 listen_fds_num++;
2464 } /* for (ai_ptr) */
2466 freeaddrinfo(ai_res);
2467 return (0);
2468 } /* }}} static int open_listen_socket_network */
2470 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2471 {
2472 assert(sock != NULL);
2473 assert(sock->addr != NULL);
2475 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2476 || sock->addr[0] == '/')
2477 return (open_listen_socket_unix(sock));
2478 else
2479 return (open_listen_socket_network(sock));
2480 } /* }}} int open_listen_socket */
2482 static int close_listen_sockets (void) /* {{{ */
2483 {
2484 size_t i;
2486 for (i = 0; i < listen_fds_num; i++)
2487 {
2488 close (listen_fds[i].fd);
2490 if (listen_fds[i].family == PF_UNIX)
2491 unlink(listen_fds[i].addr);
2492 }
2494 free (listen_fds);
2495 listen_fds = NULL;
2496 listen_fds_num = 0;
2498 return (0);
2499 } /* }}} int close_listen_sockets */
2501 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2502 {
2503 struct pollfd *pollfds;
2504 int pollfds_num;
2505 int status;
2506 int i;
2508 if (listen_fds_num < 1)
2509 {
2510 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2511 return (NULL);
2512 }
2514 pollfds_num = listen_fds_num;
2515 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2516 if (pollfds == NULL)
2517 {
2518 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2519 return (NULL);
2520 }
2521 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2523 RRDD_LOG(LOG_INFO, "listening for connections");
2525 while (state == RUNNING)
2526 {
2527 for (i = 0; i < pollfds_num; i++)
2528 {
2529 pollfds[i].fd = listen_fds[i].fd;
2530 pollfds[i].events = POLLIN | POLLPRI;
2531 pollfds[i].revents = 0;
2532 }
2534 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2535 if (state != RUNNING)
2536 break;
2537 else if (status == 0) /* timeout */
2538 continue;
2539 else if (status < 0) /* error */
2540 {
2541 status = errno;
2542 if (status != EINTR)
2543 {
2544 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2545 }
2546 continue;
2547 }
2549 for (i = 0; i < pollfds_num; i++)
2550 {
2551 listen_socket_t *client_sock;
2552 struct sockaddr_storage client_sa;
2553 socklen_t client_sa_size;
2554 pthread_t tid;
2555 pthread_attr_t attr;
2557 if (pollfds[i].revents == 0)
2558 continue;
2560 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2561 {
2562 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2563 "poll(2) returned something unexpected for listen FD #%i.",
2564 pollfds[i].fd);
2565 continue;
2566 }
2568 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2569 if (client_sock == NULL)
2570 {
2571 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2572 continue;
2573 }
2574 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2576 client_sa_size = sizeof (client_sa);
2577 client_sock->fd = accept (pollfds[i].fd,
2578 (struct sockaddr *) &client_sa, &client_sa_size);
2579 if (client_sock->fd < 0)
2580 {
2581 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2582 free(client_sock);
2583 continue;
2584 }
2586 pthread_attr_init (&attr);
2587 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2589 status = pthread_create (&tid, &attr, connection_thread_main,
2590 client_sock);
2591 if (status != 0)
2592 {
2593 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2594 close_connection(client_sock);
2595 continue;
2596 }
2597 } /* for (pollfds_num) */
2598 } /* while (state == RUNNING) */
2600 RRDD_LOG(LOG_INFO, "starting shutdown");
2602 close_listen_sockets ();
2604 pthread_mutex_lock (&connection_threads_lock);
2605 while (connection_threads_num > 0)
2606 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2607 pthread_mutex_unlock (&connection_threads_lock);
2609 free(pollfds);
2611 return (NULL);
2612 } /* }}} void *listen_thread_main */
2614 static int daemonize (void) /* {{{ */
2615 {
2616 int pid_fd;
2617 char *base_dir;
2619 daemon_uid = geteuid();
2621 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2622 if (pid_fd < 0)
2623 pid_fd = check_pidfile();
2624 if (pid_fd < 0)
2625 return pid_fd;
2627 /* open all the listen sockets */
2628 if (config_listen_address_list_len > 0)
2629 {
2630 for (size_t i = 0; i < config_listen_address_list_len; i++)
2631 open_listen_socket (config_listen_address_list[i]);
2633 rrd_free_ptrs((void ***) &config_listen_address_list,
2634 &config_listen_address_list_len);
2635 }
2636 else
2637 {
2638 listen_socket_t sock;
2639 memset(&sock, 0, sizeof(sock));
2640 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2641 open_listen_socket (&sock);
2642 }
2644 if (listen_fds_num < 1)
2645 {
2646 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2647 goto error;
2648 }
2650 if (!stay_foreground)
2651 {
2652 pid_t child;
2654 child = fork ();
2655 if (child < 0)
2656 {
2657 fprintf (stderr, "daemonize: fork(2) failed.\n");
2658 goto error;
2659 }
2660 else if (child > 0)
2661 exit(0);
2663 /* Become session leader */
2664 setsid ();
2666 /* Open the first three file descriptors to /dev/null */
2667 close (2);
2668 close (1);
2669 close (0);
2671 open ("/dev/null", O_RDWR);
2672 if (dup(0) == -1 || dup(0) == -1){
2673 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2674 }
2675 } /* if (!stay_foreground) */
2677 /* Change into the /tmp directory. */
2678 base_dir = (config_base_dir != NULL)
2679 ? config_base_dir
2680 : "/tmp";
2682 if (chdir (base_dir) != 0)
2683 {
2684 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2685 goto error;
2686 }
2688 install_signal_handlers();
2690 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2691 RRDD_LOG(LOG_INFO, "starting up");
2693 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2694 (GDestroyNotify) free_cache_item);
2695 if (cache_tree == NULL)
2696 {
2697 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2698 goto error;
2699 }
2701 return write_pidfile (pid_fd);
2703 error:
2704 remove_pidfile();
2705 return -1;
2706 } /* }}} int daemonize */
2708 static int cleanup (void) /* {{{ */
2709 {
2710 pthread_cond_broadcast (&flush_cond);
2711 pthread_join (flush_thread, NULL);
2713 pthread_cond_broadcast (&queue_cond);
2714 for (int i = 0; i < config_queue_threads; i++)
2715 pthread_join (queue_threads[i], NULL);
2717 if (config_flush_at_shutdown)
2718 {
2719 assert(cache_queue_head == NULL);
2720 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2721 }
2723 free(queue_threads);
2724 free(config_base_dir);
2726 pthread_mutex_lock(&cache_lock);
2727 g_tree_destroy(cache_tree);
2729 pthread_mutex_lock(&journal_lock);
2730 journal_done();
2732 RRDD_LOG(LOG_INFO, "goodbye");
2733 closelog ();
2735 remove_pidfile ();
2736 free(config_pid_file);
2738 return (0);
2739 } /* }}} int cleanup */
2741 static int read_options (int argc, char **argv) /* {{{ */
2742 {
2743 int option;
2744 int status = 0;
2746 char **permissions = NULL;
2747 size_t permissions_len = 0;
2749 while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2750 {
2751 switch (option)
2752 {
2753 case 'g':
2754 stay_foreground=1;
2755 break;
2757 case 'l':
2758 {
2759 listen_socket_t *new;
2761 new = malloc(sizeof(listen_socket_t));
2762 if (new == NULL)
2763 {
2764 fprintf(stderr, "read_options: malloc failed.\n");
2765 return(2);
2766 }
2767 memset(new, 0, sizeof(listen_socket_t));
2769 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2771 /* Add permissions to the socket {{{ */
2772 if (permissions_len != 0)
2773 {
2774 size_t i;
2775 for (i = 0; i < permissions_len; i++)
2776 {
2777 status = socket_permission_add (new, permissions[i]);
2778 if (status != 0)
2779 {
2780 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2781 "socket failed. Most likely, this permission doesn't "
2782 "exist. Check your command line.\n", permissions[i]);
2783 status = 4;
2784 }
2785 }
2786 }
2787 else /* if (permissions_len == 0) */
2788 {
2789 /* Add permission for ALL commands to the socket. */
2790 size_t i;
2791 for (i = 0; i < list_of_commands_len; i++)
2792 {
2793 status = socket_permission_add (new, list_of_commands[i].cmd);
2794 if (status != 0)
2795 {
2796 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2797 "socket failed. This should never happen, ever! Sorry.\n",
2798 permissions[i]);
2799 status = 4;
2800 }
2801 }
2802 }
2803 /* }}} Done adding permissions. */
2805 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2806 &config_listen_address_list_len, new))
2807 {
2808 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2809 return (2);
2810 }
2811 }
2812 break;
2814 case 'P':
2815 {
2816 char *optcopy;
2817 char *saveptr;
2818 char *dummy;
2819 char *ptr;
2821 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2823 optcopy = strdup (optarg);
2824 dummy = optcopy;
2825 saveptr = NULL;
2826 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2827 {
2828 dummy = NULL;
2829 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2830 }
2832 free (optcopy);
2833 }
2834 break;
2836 case 'f':
2837 {
2838 int temp;
2840 temp = atoi (optarg);
2841 if (temp > 0)
2842 config_flush_interval = temp;
2843 else
2844 {
2845 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2846 status = 3;
2847 }
2848 }
2849 break;
2851 case 'w':
2852 {
2853 int temp;
2855 temp = atoi (optarg);
2856 if (temp > 0)
2857 config_write_interval = temp;
2858 else
2859 {
2860 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2861 status = 2;
2862 }
2863 }
2864 break;
2866 case 'z':
2867 {
2868 int temp;
2870 temp = atoi(optarg);
2871 if (temp > 0)
2872 config_write_jitter = temp;
2873 else
2874 {
2875 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2876 status = 2;
2877 }
2879 break;
2880 }
2882 case 't':
2883 {
2884 int threads;
2885 threads = atoi(optarg);
2886 if (threads >= 1)
2887 config_queue_threads = threads;
2888 else
2889 {
2890 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2891 return 1;
2892 }
2893 }
2894 break;
2896 case 'B':
2897 config_write_base_only = 1;
2898 break;
2900 case 'b':
2901 {
2902 size_t len;
2903 char base_realpath[PATH_MAX];
2905 if (config_base_dir != NULL)
2906 free (config_base_dir);
2907 config_base_dir = strdup (optarg);
2908 if (config_base_dir == NULL)
2909 {
2910 fprintf (stderr, "read_options: strdup failed.\n");
2911 return (3);
2912 }
2914 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2915 {
2916 fprintf (stderr, "Failed to create base directory '%s': %s\n",
2917 config_base_dir, rrd_strerror (errno));
2918 return (3);
2919 }
2921 /* make sure that the base directory is not resolved via
2922 * symbolic links. this makes some performance-enhancing
2923 * assumptions possible (we don't have to resolve paths
2924 * that start with a "/")
2925 */
2926 if (realpath(config_base_dir, base_realpath) == NULL)
2927 {
2928 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2929 "%s\n", config_base_dir, rrd_strerror(errno));
2930 return 5;
2931 }
2933 len = strlen (config_base_dir);
2934 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2935 {
2936 config_base_dir[len - 1] = 0;
2937 len--;
2938 }
2940 if (len < 1)
2941 {
2942 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2943 return (4);
2944 }
2946 _config_base_dir_len = len;
2948 len = strlen (base_realpath);
2949 while ((len > 0) && (base_realpath[len - 1] == '/'))
2950 {
2951 base_realpath[len - 1] = '\0';
2952 len--;
2953 }
2955 if (strncmp(config_base_dir,
2956 base_realpath, sizeof(base_realpath)) != 0)
2957 {
2958 fprintf(stderr,
2959 "Base directory (-b) resolved via file system links!\n"
2960 "Please consult rrdcached '-b' documentation!\n"
2961 "Consider specifying the real directory (%s)\n",
2962 base_realpath);
2963 return 5;
2964 }
2965 }
2966 break;
2968 case 'p':
2969 {
2970 if (config_pid_file != NULL)
2971 free (config_pid_file);
2972 config_pid_file = strdup (optarg);
2973 if (config_pid_file == NULL)
2974 {
2975 fprintf (stderr, "read_options: strdup failed.\n");
2976 return (3);
2977 }
2978 }
2979 break;
2981 case 'F':
2982 config_flush_at_shutdown = 1;
2983 break;
2985 case 'j':
2986 {
2987 const char *dir = journal_dir = strdup(optarg);
2989 status = rrd_mkdir_p(dir, 0777);
2990 if (status != 0)
2991 {
2992 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
2993 dir, rrd_strerror(errno));
2994 return 6;
2995 }
2997 if (access(dir, R_OK|W_OK|X_OK) != 0)
2998 {
2999 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3000 errno ? rrd_strerror(errno) : "");
3001 return 6;
3002 }
3003 }
3004 break;
3006 case 'h':
3007 case '?':
3008 printf ("RRDCacheD %s\n"
3009 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3010 "\n"
3011 "Usage: rrdcached [options]\n"
3012 "\n"
3013 "Valid options are:\n"
3014 " -l <address> Socket address to listen to.\n"
3015 " -P <perms> Sets the permissions to assign to all following "
3016 "sockets\n"
3017 " -w <seconds> Interval in which to write data.\n"
3018 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3019 " -t <threads> Number of write threads.\n"
3020 " -f <seconds> Interval in which to flush dead data.\n"
3021 " -p <file> Location of the PID-file.\n"
3022 " -b <dir> Base directory to change to.\n"
3023 " -B Restrict file access to paths within -b <dir>\n"
3024 " -g Do not fork and run in the foreground.\n"
3025 " -j <dir> Directory in which to create the journal files.\n"
3026 " -F Always flush all updates at shutdown\n"
3027 "\n"
3028 "For more information and a detailed description of all options "
3029 "please refer\n"
3030 "to the rrdcached(1) manual page.\n",
3031 VERSION);
3032 status = -1;
3033 break;
3034 } /* switch (option) */
3035 } /* while (getopt) */
3037 /* advise the user when values are not sane */
3038 if (config_flush_interval < 2 * config_write_interval)
3039 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3040 " 2x write interval (-w) !\n");
3041 if (config_write_jitter > config_write_interval)
3042 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3043 " write interval (-w) !\n");
3045 if (config_write_base_only && config_base_dir == NULL)
3046 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3047 " Consult the rrdcached documentation\n");
3049 if (journal_dir == NULL)
3050 config_flush_at_shutdown = 1;
3052 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3054 return (status);
3055 } /* }}} int read_options */
3057 int main (int argc, char **argv)
3058 {
3059 int status;
3061 status = read_options (argc, argv);
3062 if (status != 0)
3063 {
3064 if (status < 0)
3065 status = 0;
3066 return (status);
3067 }
3069 status = daemonize ();
3070 if (status != 0)
3071 {
3072 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3073 return (1);
3074 }
3076 journal_init();
3078 /* start the queue threads */
3079 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3080 if (queue_threads == NULL)
3081 {
3082 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3083 cleanup();
3084 return (1);
3085 }
3086 for (int i = 0; i < config_queue_threads; i++)
3087 {
3088 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3089 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3090 if (status != 0)
3091 {
3092 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3093 cleanup();
3094 return (1);
3095 }
3096 }
3098 /* start the flush thread */
3099 memset(&flush_thread, 0, sizeof(flush_thread));
3100 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3101 if (status != 0)
3102 {
3103 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3104 cleanup();
3105 return (1);
3106 }
3108 listen_thread_main (NULL);
3109 cleanup ();
3111 return (0);
3112 } /* int main */
3114 /*
3115 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3116 */