1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 # include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
110 #include <glib-2.0/glib.h>
111 /* }}} */
113 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
115 #ifndef __GNUC__
116 # define __attribute__(x) /**/
117 #endif
119 /*
120 * Types
121 */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124 struct listen_socket_s
125 {
126 int fd;
127 char addr[PATH_MAX + 1];
128 int family;
130 /* state for BATCH processing */
131 time_t batch_start;
132 int batch_cmd;
134 /* buffered IO */
135 char *rbuf;
136 off_t next_cmd;
137 off_t next_read;
139 char *wbuf;
140 ssize_t wbuf_len;
142 uint32_t permissions;
143 };
144 typedef struct listen_socket_s listen_socket_t;
146 struct command_s;
147 typedef struct command_s command_t;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command_s {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
161 char context; /* where we expect to see it */
162 #define CMD_CONTEXT_CLIENT (1<<0)
163 #define CMD_CONTEXT_BATCH (1<<1)
164 #define CMD_CONTEXT_JOURNAL (1<<2)
165 #define CMD_CONTEXT_ANY (0x7f)
167 char *syntax;
168 char *help;
169 };
171 struct cache_item_s;
172 typedef struct cache_item_s cache_item_t;
173 struct cache_item_s
174 {
175 char *file;
176 char **values;
177 size_t values_num;
178 time_t last_flush_time;
179 time_t last_update_stamp;
180 #define CI_FLAGS_IN_TREE (1<<0)
181 #define CI_FLAGS_IN_QUEUE (1<<1)
182 int flags;
183 pthread_cond_t flushed;
184 cache_item_t *prev;
185 cache_item_t *next;
186 };
188 struct callback_flush_data_s
189 {
190 time_t now;
191 time_t abs_timeout;
192 char **keys;
193 size_t keys_num;
194 };
195 typedef struct callback_flush_data_s callback_flush_data_t;
197 enum queue_side_e
198 {
199 HEAD,
200 TAIL
201 };
202 typedef enum queue_side_e queue_side_t;
204 /* describe a set of journal files */
205 typedef struct {
206 char **files;
207 size_t files_num;
208 } journal_set;
210 /* max length of socket command or response */
211 #define CMD_MAX 4096
212 #define RBUF_SIZE (CMD_MAX*2)
214 /*
215 * Variables
216 */
217 static int stay_foreground = 0;
218 static uid_t daemon_uid;
220 static listen_socket_t *listen_fds = NULL;
221 static size_t listen_fds_num = 0;
223 enum {
224 RUNNING, /* normal operation */
225 FLUSHING, /* flushing remaining values */
226 SHUTDOWN /* shutting down */
227 } state = RUNNING;
229 static pthread_t *queue_threads;
230 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
231 static int config_queue_threads = 4;
233 static pthread_t flush_thread;
234 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
236 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
237 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
238 static int connection_threads_num = 0;
240 /* Cache stuff */
241 static GTree *cache_tree = NULL;
242 static cache_item_t *cache_queue_head = NULL;
243 static cache_item_t *cache_queue_tail = NULL;
244 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
246 static int config_write_interval = 300;
247 static int config_write_jitter = 0;
248 static int config_flush_interval = 3600;
249 static int config_flush_at_shutdown = 0;
250 static char *config_pid_file = NULL;
251 static char *config_base_dir = NULL;
252 static size_t _config_base_dir_len = 0;
253 static int config_write_base_only = 0;
255 static listen_socket_t **config_listen_address_list = NULL;
256 static size_t config_listen_address_list_len = 0;
258 static uint64_t stats_queue_length = 0;
259 static uint64_t stats_updates_received = 0;
260 static uint64_t stats_flush_received = 0;
261 static uint64_t stats_updates_written = 0;
262 static uint64_t stats_data_sets_written = 0;
263 static uint64_t stats_journal_bytes = 0;
264 static uint64_t stats_journal_rotate = 0;
265 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
267 /* Journaled updates */
268 #define JOURNAL_BASE "rrd.journal"
269 static journal_set *journal_cur = NULL;
270 static journal_set *journal_old = NULL;
271 static char *journal_dir = NULL;
272 static FILE *journal_fh = NULL; /* current journal file handle */
273 static long journal_size = 0; /* current journal size */
274 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
275 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
276 static int journal_write(char *cmd, char *args);
277 static void journal_done(void);
278 static void journal_rotate(void);
280 /* prototypes for forward refernces */
281 static int handle_request_help (HANDLER_PROTO);
283 /*
284 * Functions
285 */
286 static void sig_common (const char *sig) /* {{{ */
287 {
288 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
289 state = FLUSHING;
290 pthread_cond_broadcast(&flush_cond);
291 pthread_cond_broadcast(&queue_cond);
292 } /* }}} void sig_common */
294 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
295 {
296 sig_common("INT");
297 } /* }}} void sig_int_handler */
299 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
300 {
301 sig_common("TERM");
302 } /* }}} void sig_term_handler */
304 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
305 {
306 config_flush_at_shutdown = 1;
307 sig_common("USR1");
308 } /* }}} void sig_usr1_handler */
310 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
311 {
312 config_flush_at_shutdown = 0;
313 sig_common("USR2");
314 } /* }}} void sig_usr2_handler */
316 static void install_signal_handlers(void) /* {{{ */
317 {
318 /* These structures are static, because `sigaction' behaves weird if the are
319 * overwritten.. */
320 static struct sigaction sa_int;
321 static struct sigaction sa_term;
322 static struct sigaction sa_pipe;
323 static struct sigaction sa_usr1;
324 static struct sigaction sa_usr2;
326 /* Install signal handlers */
327 memset (&sa_int, 0, sizeof (sa_int));
328 sa_int.sa_handler = sig_int_handler;
329 sigaction (SIGINT, &sa_int, NULL);
331 memset (&sa_term, 0, sizeof (sa_term));
332 sa_term.sa_handler = sig_term_handler;
333 sigaction (SIGTERM, &sa_term, NULL);
335 memset (&sa_pipe, 0, sizeof (sa_pipe));
336 sa_pipe.sa_handler = SIG_IGN;
337 sigaction (SIGPIPE, &sa_pipe, NULL);
339 memset (&sa_pipe, 0, sizeof (sa_usr1));
340 sa_usr1.sa_handler = sig_usr1_handler;
341 sigaction (SIGUSR1, &sa_usr1, NULL);
343 memset (&sa_usr2, 0, sizeof (sa_usr2));
344 sa_usr2.sa_handler = sig_usr2_handler;
345 sigaction (SIGUSR2, &sa_usr2, NULL);
347 } /* }}} void install_signal_handlers */
349 static int open_pidfile(char *action, int oflag) /* {{{ */
350 {
351 int fd;
352 const char *file;
353 char *file_copy, *dir;
355 file = (config_pid_file != NULL)
356 ? config_pid_file
357 : LOCALSTATEDIR "/run/rrdcached.pid";
359 /* dirname may modify its argument */
360 file_copy = strdup(file);
361 if (file_copy == NULL)
362 {
363 fprintf(stderr, "rrdcached: strdup(): %s\n",
364 rrd_strerror(errno));
365 return -1;
366 }
368 dir = dirname(file_copy);
369 if (rrd_mkdir_p(dir, 0777) != 0)
370 {
371 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
372 dir, rrd_strerror(errno));
373 return -1;
374 }
376 free(file_copy);
378 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
379 if (fd < 0)
380 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
381 action, file, rrd_strerror(errno));
383 return(fd);
384 } /* }}} static int open_pidfile */
386 /* check existing pid file to see whether a daemon is running */
387 static int check_pidfile(void)
388 {
389 int pid_fd;
390 pid_t pid;
391 char pid_str[16];
393 pid_fd = open_pidfile("open", O_RDWR);
394 if (pid_fd < 0)
395 return pid_fd;
397 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
398 return -1;
400 pid = atoi(pid_str);
401 if (pid <= 0)
402 return -1;
404 /* another running process that we can signal COULD be
405 * a competing rrdcached */
406 if (pid != getpid() && kill(pid, 0) == 0)
407 {
408 fprintf(stderr,
409 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
410 close(pid_fd);
411 return -1;
412 }
414 lseek(pid_fd, 0, SEEK_SET);
415 if (ftruncate(pid_fd, 0) == -1)
416 {
417 fprintf(stderr,
418 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
419 close(pid_fd);
420 return -1;
421 }
423 fprintf(stderr,
424 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
425 "rrdcached: starting normally.\n", pid);
427 return pid_fd;
428 } /* }}} static int check_pidfile */
430 static int write_pidfile (int fd) /* {{{ */
431 {
432 pid_t pid;
433 FILE *fh;
435 pid = getpid ();
437 fh = fdopen (fd, "w");
438 if (fh == NULL)
439 {
440 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
441 close(fd);
442 return (-1);
443 }
445 fprintf (fh, "%i\n", (int) pid);
446 fclose (fh);
448 return (0);
449 } /* }}} int write_pidfile */
451 static int remove_pidfile (void) /* {{{ */
452 {
453 char *file;
454 int status;
456 file = (config_pid_file != NULL)
457 ? config_pid_file
458 : LOCALSTATEDIR "/run/rrdcached.pid";
460 status = unlink (file);
461 if (status == 0)
462 return (0);
463 return (errno);
464 } /* }}} int remove_pidfile */
466 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
467 {
468 char *eol;
470 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
471 sock->next_read - sock->next_cmd);
473 if (eol == NULL)
474 {
475 /* no commands left, move remainder back to front of rbuf */
476 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
477 sock->next_read - sock->next_cmd);
478 sock->next_read -= sock->next_cmd;
479 sock->next_cmd = 0;
480 *len = 0;
481 return NULL;
482 }
483 else
484 {
485 char *cmd = sock->rbuf + sock->next_cmd;
486 *eol = '\0';
488 sock->next_cmd = eol - sock->rbuf + 1;
490 if (eol > sock->rbuf && *(eol-1) == '\r')
491 *(--eol) = '\0'; /* handle "\r\n" EOL */
493 *len = eol - cmd;
495 return cmd;
496 }
498 /* NOTREACHED */
499 assert(1==0);
500 } /* }}} char *next_cmd */
502 /* add the characters directly to the write buffer */
503 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
504 {
505 char *new_buf;
507 assert(sock != NULL);
509 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
510 if (new_buf == NULL)
511 {
512 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
513 return -1;
514 }
516 strncpy(new_buf + sock->wbuf_len, str, len + 1);
518 sock->wbuf = new_buf;
519 sock->wbuf_len += len;
521 return 0;
522 } /* }}} static int add_to_wbuf */
524 /* add the text to the "extra" info that's sent after the status line */
525 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
526 {
527 va_list argp;
528 char buffer[CMD_MAX];
529 int len;
531 if (sock == NULL) return 0; /* journal replay mode */
532 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
534 va_start(argp, fmt);
535 #ifdef HAVE_VSNPRINTF
536 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
537 #else
538 len = vsprintf(buffer, fmt, argp);
539 #endif
540 va_end(argp);
541 if (len < 0)
542 {
543 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
544 return -1;
545 }
547 return add_to_wbuf(sock, buffer, len);
548 } /* }}} static int add_response_info */
550 static int count_lines(char *str) /* {{{ */
551 {
552 int lines = 0;
554 if (str != NULL)
555 {
556 while ((str = strchr(str, '\n')) != NULL)
557 {
558 ++lines;
559 ++str;
560 }
561 }
563 return lines;
564 } /* }}} static int count_lines */
566 /* send the response back to the user.
567 * returns 0 on success, -1 on error
568 * write buffer is always zeroed after this call */
569 static int send_response (listen_socket_t *sock, response_code rc,
570 char *fmt, ...) /* {{{ */
571 {
572 va_list argp;
573 char buffer[CMD_MAX];
574 int lines;
575 ssize_t wrote;
576 int rclen, len;
578 if (sock == NULL) return rc; /* journal replay mode */
580 if (sock->batch_start)
581 {
582 if (rc == RESP_OK)
583 return rc; /* no response on success during BATCH */
584 lines = sock->batch_cmd;
585 }
586 else if (rc == RESP_OK)
587 lines = count_lines(sock->wbuf);
588 else
589 lines = -1;
591 rclen = sprintf(buffer, "%d ", lines);
592 va_start(argp, fmt);
593 #ifdef HAVE_VSNPRINTF
594 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
595 #else
596 len = vsprintf(buffer+rclen, fmt, argp);
597 #endif
598 va_end(argp);
599 if (len < 0)
600 return -1;
602 len += rclen;
604 /* append the result to the wbuf, don't write to the user */
605 if (sock->batch_start)
606 return add_to_wbuf(sock, buffer, len);
608 /* first write must be complete */
609 if (len != write(sock->fd, buffer, len))
610 {
611 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
612 return -1;
613 }
615 if (sock->wbuf != NULL && rc == RESP_OK)
616 {
617 wrote = 0;
618 while (wrote < sock->wbuf_len)
619 {
620 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
621 if (wb <= 0)
622 {
623 RRDD_LOG(LOG_INFO, "send_response: could not write results");
624 return -1;
625 }
626 wrote += wb;
627 }
628 }
630 free(sock->wbuf); sock->wbuf = NULL;
631 sock->wbuf_len = 0;
633 return 0;
634 } /* }}} */
636 static void wipe_ci_values(cache_item_t *ci, time_t when)
637 {
638 ci->values = NULL;
639 ci->values_num = 0;
641 ci->last_flush_time = when;
642 if (config_write_jitter > 0)
643 ci->last_flush_time += (rrd_random() % config_write_jitter);
644 }
646 /* remove_from_queue
647 * remove a "cache_item_t" item from the queue.
648 * must hold 'cache_lock' when calling this
649 */
650 static void remove_from_queue(cache_item_t *ci) /* {{{ */
651 {
652 if (ci == NULL) return;
653 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
655 if (ci->prev == NULL)
656 cache_queue_head = ci->next; /* reset head */
657 else
658 ci->prev->next = ci->next;
660 if (ci->next == NULL)
661 cache_queue_tail = ci->prev; /* reset the tail */
662 else
663 ci->next->prev = ci->prev;
665 ci->next = ci->prev = NULL;
666 ci->flags &= ~CI_FLAGS_IN_QUEUE;
668 pthread_mutex_lock (&stats_lock);
669 assert (stats_queue_length > 0);
670 stats_queue_length--;
671 pthread_mutex_unlock (&stats_lock);
673 } /* }}} static void remove_from_queue */
675 /* free the resources associated with the cache_item_t
676 * must hold cache_lock when calling this function
677 */
678 static void *free_cache_item(cache_item_t *ci) /* {{{ */
679 {
680 if (ci == NULL) return NULL;
682 remove_from_queue(ci);
684 for (size_t i=0; i < ci->values_num; i++)
685 free(ci->values[i]);
687 free (ci->values);
688 free (ci->file);
690 /* in case anyone is waiting */
691 pthread_cond_broadcast(&ci->flushed);
692 pthread_cond_destroy(&ci->flushed);
694 free (ci);
696 return NULL;
697 } /* }}} static void *free_cache_item */
699 /*
700 * enqueue_cache_item:
701 * `cache_lock' must be acquired before calling this function!
702 */
703 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
704 queue_side_t side)
705 {
706 if (ci == NULL)
707 return (-1);
709 if (ci->values_num == 0)
710 return (0);
712 if (side == HEAD)
713 {
714 if (cache_queue_head == ci)
715 return 0;
717 /* remove if further down in queue */
718 remove_from_queue(ci);
720 ci->prev = NULL;
721 ci->next = cache_queue_head;
722 if (ci->next != NULL)
723 ci->next->prev = ci;
724 cache_queue_head = ci;
726 if (cache_queue_tail == NULL)
727 cache_queue_tail = cache_queue_head;
728 }
729 else /* (side == TAIL) */
730 {
731 /* We don't move values back in the list.. */
732 if (ci->flags & CI_FLAGS_IN_QUEUE)
733 return (0);
735 assert (ci->next == NULL);
736 assert (ci->prev == NULL);
738 ci->prev = cache_queue_tail;
740 if (cache_queue_tail == NULL)
741 cache_queue_head = ci;
742 else
743 cache_queue_tail->next = ci;
745 cache_queue_tail = ci;
746 }
748 ci->flags |= CI_FLAGS_IN_QUEUE;
750 pthread_cond_signal(&queue_cond);
751 pthread_mutex_lock (&stats_lock);
752 stats_queue_length++;
753 pthread_mutex_unlock (&stats_lock);
755 return (0);
756 } /* }}} int enqueue_cache_item */
758 /*
759 * tree_callback_flush:
760 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
761 * while this is in progress.
762 */
763 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
764 gpointer data)
765 {
766 cache_item_t *ci;
767 callback_flush_data_t *cfd;
769 ci = (cache_item_t *) value;
770 cfd = (callback_flush_data_t *) data;
772 if (ci->flags & CI_FLAGS_IN_QUEUE)
773 return FALSE;
775 if (ci->values_num > 0
776 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
777 {
778 enqueue_cache_item (ci, TAIL);
779 }
780 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
781 && (ci->values_num <= 0))
782 {
783 assert ((char *) key == ci->file);
784 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
785 {
786 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
787 return (FALSE);
788 }
789 }
791 return (FALSE);
792 } /* }}} gboolean tree_callback_flush */
794 static int flush_old_values (int max_age)
795 {
796 callback_flush_data_t cfd;
797 size_t k;
799 memset (&cfd, 0, sizeof (cfd));
800 /* Pass the current time as user data so that we don't need to call
801 * `time' for each node. */
802 cfd.now = time (NULL);
803 cfd.keys = NULL;
804 cfd.keys_num = 0;
806 if (max_age > 0)
807 cfd.abs_timeout = cfd.now - max_age;
808 else
809 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
811 /* `tree_callback_flush' will return the keys of all values that haven't
812 * been touched in the last `config_flush_interval' seconds in `cfd'.
813 * The char*'s in this array point to the same memory as ci->file, so we
814 * don't need to free them separately. */
815 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
817 for (k = 0; k < cfd.keys_num; k++)
818 {
819 /* should never fail, since we have held the cache_lock
820 * the entire time */
821 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
822 }
824 if (cfd.keys != NULL)
825 {
826 free (cfd.keys);
827 cfd.keys = NULL;
828 }
830 return (0);
831 } /* int flush_old_values */
833 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
834 {
835 struct timeval now;
836 struct timespec next_flush;
837 int status;
839 gettimeofday (&now, NULL);
840 next_flush.tv_sec = now.tv_sec + config_flush_interval;
841 next_flush.tv_nsec = 1000 * now.tv_usec;
843 pthread_mutex_lock(&cache_lock);
845 while (state == RUNNING)
846 {
847 gettimeofday (&now, NULL);
848 if ((now.tv_sec > next_flush.tv_sec)
849 || ((now.tv_sec == next_flush.tv_sec)
850 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
851 {
852 RRDD_LOG(LOG_DEBUG, "flushing old values");
854 /* Determine the time of the next cache flush. */
855 next_flush.tv_sec = now.tv_sec + config_flush_interval;
857 /* Flush all values that haven't been written in the last
858 * `config_write_interval' seconds. */
859 flush_old_values (config_write_interval);
861 /* unlock the cache while we rotate so we don't block incoming
862 * updates if the fsync() blocks on disk I/O */
863 pthread_mutex_unlock(&cache_lock);
864 journal_rotate();
865 pthread_mutex_lock(&cache_lock);
866 }
868 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
869 if (status != 0 && status != ETIMEDOUT)
870 {
871 RRDD_LOG (LOG_ERR, "flush_thread_main: "
872 "pthread_cond_timedwait returned %i.", status);
873 }
874 }
876 if (config_flush_at_shutdown)
877 flush_old_values (-1); /* flush everything */
879 state = SHUTDOWN;
881 pthread_mutex_unlock(&cache_lock);
883 return NULL;
884 } /* void *flush_thread_main */
886 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
887 {
888 pthread_mutex_lock (&cache_lock);
890 while (state != SHUTDOWN
891 || (cache_queue_head != NULL && config_flush_at_shutdown))
892 {
893 cache_item_t *ci;
894 char *file;
895 char **values;
896 size_t values_num;
897 int status;
899 /* Now, check if there's something to store away. If not, wait until
900 * something comes in. */
901 if (cache_queue_head == NULL)
902 {
903 status = pthread_cond_wait (&queue_cond, &cache_lock);
904 if ((status != 0) && (status != ETIMEDOUT))
905 {
906 RRDD_LOG (LOG_ERR, "queue_thread_main: "
907 "pthread_cond_wait returned %i.", status);
908 }
909 }
911 /* Check if a value has arrived. This may be NULL if we timed out or there
912 * was an interrupt such as a signal. */
913 if (cache_queue_head == NULL)
914 continue;
916 ci = cache_queue_head;
918 /* copy the relevant parts */
919 file = strdup (ci->file);
920 if (file == NULL)
921 {
922 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
923 continue;
924 }
926 assert(ci->values != NULL);
927 assert(ci->values_num > 0);
929 values = ci->values;
930 values_num = ci->values_num;
932 wipe_ci_values(ci, time(NULL));
933 remove_from_queue(ci);
935 pthread_mutex_unlock (&cache_lock);
937 rrd_clear_error ();
938 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
939 if (status != 0)
940 {
941 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
942 "rrd_update_r (%s) failed with status %i. (%s)",
943 file, status, rrd_get_error());
944 }
946 journal_write("wrote", file);
948 /* Search again in the tree. It's possible someone issued a "FORGET"
949 * while we were writing the update values. */
950 pthread_mutex_lock(&cache_lock);
951 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
952 if (ci)
953 pthread_cond_broadcast(&ci->flushed);
954 pthread_mutex_unlock(&cache_lock);
956 if (status == 0)
957 {
958 pthread_mutex_lock (&stats_lock);
959 stats_updates_written++;
960 stats_data_sets_written += values_num;
961 pthread_mutex_unlock (&stats_lock);
962 }
964 rrd_free_ptrs((void ***) &values, &values_num);
965 free(file);
967 pthread_mutex_lock (&cache_lock);
968 }
969 pthread_mutex_unlock (&cache_lock);
971 return (NULL);
972 } /* }}} void *queue_thread_main */
974 static int buffer_get_field (char **buffer_ret, /* {{{ */
975 size_t *buffer_size_ret, char **field_ret)
976 {
977 char *buffer;
978 size_t buffer_pos;
979 size_t buffer_size;
980 char *field;
981 size_t field_size;
982 int status;
984 buffer = *buffer_ret;
985 buffer_pos = 0;
986 buffer_size = *buffer_size_ret;
987 field = *buffer_ret;
988 field_size = 0;
990 if (buffer_size <= 0)
991 return (-1);
993 /* This is ensured by `handle_request'. */
994 assert (buffer[buffer_size - 1] == '\0');
996 status = -1;
997 while (buffer_pos < buffer_size)
998 {
999 /* Check for end-of-field or end-of-buffer */
1000 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1001 {
1002 field[field_size] = 0;
1003 field_size++;
1004 buffer_pos++;
1005 status = 0;
1006 break;
1007 }
1008 /* Handle escaped characters. */
1009 else if (buffer[buffer_pos] == '\\')
1010 {
1011 if (buffer_pos >= (buffer_size - 1))
1012 break;
1013 buffer_pos++;
1014 field[field_size] = buffer[buffer_pos];
1015 field_size++;
1016 buffer_pos++;
1017 }
1018 /* Normal operation */
1019 else
1020 {
1021 field[field_size] = buffer[buffer_pos];
1022 field_size++;
1023 buffer_pos++;
1024 }
1025 } /* while (buffer_pos < buffer_size) */
1027 if (status != 0)
1028 return (status);
1030 *buffer_ret = buffer + buffer_pos;
1031 *buffer_size_ret = buffer_size - buffer_pos;
1032 *field_ret = field;
1034 return (0);
1035 } /* }}} int buffer_get_field */
1037 /* if we're restricting writes to the base directory,
1038 * check whether the file falls within the dir
1039 * returns 1 if OK, otherwise 0
1040 */
1041 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1042 {
1043 assert(file != NULL);
1045 if (!config_write_base_only
1046 || sock == NULL /* journal replay */
1047 || config_base_dir == NULL)
1048 return 1;
1050 if (strstr(file, "../") != NULL) goto err;
1052 /* relative paths without "../" are ok */
1053 if (*file != '/') return 1;
1055 /* file must be of the format base + "/" + <1+ char filename> */
1056 if (strlen(file) < _config_base_dir_len + 2) goto err;
1057 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1058 if (*(file + _config_base_dir_len) != '/') goto err;
1060 return 1;
1062 err:
1063 if (sock != NULL && sock->fd >= 0)
1064 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1066 return 0;
1067 } /* }}} static int check_file_access */
1069 /* when using a base dir, convert relative paths to absolute paths.
1070 * if necessary, modifies the "filename" pointer to point
1071 * to the new path created in "tmp". "tmp" is provided
1072 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1073 *
1074 * this allows us to optimize for the expected case (absolute path)
1075 * with a no-op.
1076 */
1077 static void get_abs_path(char **filename, char *tmp)
1078 {
1079 assert(tmp != NULL);
1080 assert(filename != NULL && *filename != NULL);
1082 if (config_base_dir == NULL || **filename == '/')
1083 return;
1085 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1086 *filename = tmp;
1087 } /* }}} static int get_abs_path */
1089 static int flush_file (const char *filename) /* {{{ */
1090 {
1091 cache_item_t *ci;
1093 pthread_mutex_lock (&cache_lock);
1095 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1096 if (ci == NULL)
1097 {
1098 pthread_mutex_unlock (&cache_lock);
1099 return (ENOENT);
1100 }
1102 if (ci->values_num > 0)
1103 {
1104 /* Enqueue at head */
1105 enqueue_cache_item (ci, HEAD);
1106 pthread_cond_wait(&ci->flushed, &cache_lock);
1107 }
1109 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1110 * may have been purged during our cond_wait() */
1112 pthread_mutex_unlock(&cache_lock);
1114 return (0);
1115 } /* }}} int flush_file */
1117 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1118 {
1119 char *err = "Syntax error.\n";
1121 if (cmd && cmd->syntax)
1122 err = cmd->syntax;
1124 return send_response(sock, RESP_ERR, "Usage: %s", err);
1125 } /* }}} static int syntax_error() */
1127 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1128 {
1129 uint64_t copy_queue_length;
1130 uint64_t copy_updates_received;
1131 uint64_t copy_flush_received;
1132 uint64_t copy_updates_written;
1133 uint64_t copy_data_sets_written;
1134 uint64_t copy_journal_bytes;
1135 uint64_t copy_journal_rotate;
1137 uint64_t tree_nodes_number;
1138 uint64_t tree_depth;
1140 pthread_mutex_lock (&stats_lock);
1141 copy_queue_length = stats_queue_length;
1142 copy_updates_received = stats_updates_received;
1143 copy_flush_received = stats_flush_received;
1144 copy_updates_written = stats_updates_written;
1145 copy_data_sets_written = stats_data_sets_written;
1146 copy_journal_bytes = stats_journal_bytes;
1147 copy_journal_rotate = stats_journal_rotate;
1148 pthread_mutex_unlock (&stats_lock);
1150 pthread_mutex_lock (&cache_lock);
1151 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1152 tree_depth = (uint64_t) g_tree_height (cache_tree);
1153 pthread_mutex_unlock (&cache_lock);
1155 add_response_info(sock,
1156 "QueueLength: %"PRIu64"\n", copy_queue_length);
1157 add_response_info(sock,
1158 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1159 add_response_info(sock,
1160 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1161 add_response_info(sock,
1162 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1163 add_response_info(sock,
1164 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1165 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1166 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1167 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1168 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1170 send_response(sock, RESP_OK, "Statistics follow\n");
1172 return (0);
1173 } /* }}} int handle_request_stats */
1175 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1176 {
1177 char *file, file_tmp[PATH_MAX];
1178 int status;
1180 status = buffer_get_field (&buffer, &buffer_size, &file);
1181 if (status != 0)
1182 {
1183 return syntax_error(sock,cmd);
1184 }
1185 else
1186 {
1187 pthread_mutex_lock(&stats_lock);
1188 stats_flush_received++;
1189 pthread_mutex_unlock(&stats_lock);
1191 get_abs_path(&file, file_tmp);
1192 if (!check_file_access(file, sock)) return 0;
1194 status = flush_file (file);
1195 if (status == 0)
1196 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1197 else if (status == ENOENT)
1198 {
1199 /* no file in our tree; see whether it exists at all */
1200 struct stat statbuf;
1202 memset(&statbuf, 0, sizeof(statbuf));
1203 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1204 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1205 else
1206 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1207 }
1208 else if (status < 0)
1209 return send_response(sock, RESP_ERR, "Internal error.\n");
1210 else
1211 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1212 }
1214 /* NOTREACHED */
1215 assert(1==0);
1216 } /* }}} int handle_request_flush */
1218 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1219 {
1220 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1222 pthread_mutex_lock(&cache_lock);
1223 flush_old_values(-1);
1224 pthread_mutex_unlock(&cache_lock);
1226 return send_response(sock, RESP_OK, "Started flush.\n");
1227 } /* }}} static int handle_request_flushall */
1229 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1230 {
1231 int status;
1232 char *file, file_tmp[PATH_MAX];
1233 cache_item_t *ci;
1235 status = buffer_get_field(&buffer, &buffer_size, &file);
1236 if (status != 0)
1237 return syntax_error(sock,cmd);
1239 get_abs_path(&file, file_tmp);
1241 pthread_mutex_lock(&cache_lock);
1242 ci = g_tree_lookup(cache_tree, file);
1243 if (ci == NULL)
1244 {
1245 pthread_mutex_unlock(&cache_lock);
1246 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1247 }
1249 for (size_t i=0; i < ci->values_num; i++)
1250 add_response_info(sock, "%s\n", ci->values[i]);
1252 pthread_mutex_unlock(&cache_lock);
1253 return send_response(sock, RESP_OK, "updates pending\n");
1254 } /* }}} static int handle_request_pending */
1256 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1257 {
1258 int status;
1259 gboolean found;
1260 char *file, file_tmp[PATH_MAX];
1262 status = buffer_get_field(&buffer, &buffer_size, &file);
1263 if (status != 0)
1264 return syntax_error(sock,cmd);
1266 get_abs_path(&file, file_tmp);
1267 if (!check_file_access(file, sock)) return 0;
1269 pthread_mutex_lock(&cache_lock);
1270 found = g_tree_remove(cache_tree, file);
1271 pthread_mutex_unlock(&cache_lock);
1273 if (found == TRUE)
1274 {
1275 if (sock != NULL)
1276 journal_write("forget", file);
1278 return send_response(sock, RESP_OK, "Gone!\n");
1279 }
1280 else
1281 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1283 /* NOTREACHED */
1284 assert(1==0);
1285 } /* }}} static int handle_request_forget */
1287 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1288 {
1289 cache_item_t *ci;
1291 pthread_mutex_lock(&cache_lock);
1293 ci = cache_queue_head;
1294 while (ci != NULL)
1295 {
1296 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1297 ci = ci->next;
1298 }
1300 pthread_mutex_unlock(&cache_lock);
1302 return send_response(sock, RESP_OK, "in queue.\n");
1303 } /* }}} int handle_request_queue */
1305 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1306 {
1307 char *file, file_tmp[PATH_MAX];
1308 int values_num = 0;
1309 int status;
1310 char orig_buf[CMD_MAX];
1312 cache_item_t *ci;
1314 /* save it for the journal later */
1315 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1317 status = buffer_get_field (&buffer, &buffer_size, &file);
1318 if (status != 0)
1319 return syntax_error(sock,cmd);
1321 pthread_mutex_lock(&stats_lock);
1322 stats_updates_received++;
1323 pthread_mutex_unlock(&stats_lock);
1325 get_abs_path(&file, file_tmp);
1326 if (!check_file_access(file, sock)) return 0;
1328 pthread_mutex_lock (&cache_lock);
1329 ci = g_tree_lookup (cache_tree, file);
1331 if (ci == NULL) /* {{{ */
1332 {
1333 struct stat statbuf;
1334 cache_item_t *tmp;
1336 /* don't hold the lock while we setup; stat(2) might block */
1337 pthread_mutex_unlock(&cache_lock);
1339 memset (&statbuf, 0, sizeof (statbuf));
1340 status = stat (file, &statbuf);
1341 if (status != 0)
1342 {
1343 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1345 status = errno;
1346 if (status == ENOENT)
1347 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1348 else
1349 return send_response(sock, RESP_ERR,
1350 "stat failed with error %i.\n", status);
1351 }
1352 if (!S_ISREG (statbuf.st_mode))
1353 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1355 if (access(file, R_OK|W_OK) != 0)
1356 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1357 file, rrd_strerror(errno));
1359 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1360 if (ci == NULL)
1361 {
1362 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1364 return send_response(sock, RESP_ERR, "malloc failed.\n");
1365 }
1366 memset (ci, 0, sizeof (cache_item_t));
1368 ci->file = strdup (file);
1369 if (ci->file == NULL)
1370 {
1371 free (ci);
1372 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1374 return send_response(sock, RESP_ERR, "strdup failed.\n");
1375 }
1377 wipe_ci_values(ci, now);
1378 ci->flags = CI_FLAGS_IN_TREE;
1379 pthread_cond_init(&ci->flushed, NULL);
1381 pthread_mutex_lock(&cache_lock);
1383 /* another UPDATE might have added this entry in the meantime */
1384 tmp = g_tree_lookup (cache_tree, file);
1385 if (tmp == NULL)
1386 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1387 else
1388 {
1389 free_cache_item (ci);
1390 ci = tmp;
1391 }
1393 /* state may have changed while we were unlocked */
1394 if (state == SHUTDOWN)
1395 return -1;
1396 } /* }}} */
1397 assert (ci != NULL);
1399 /* don't re-write updates in replay mode */
1400 if (sock != NULL)
1401 journal_write("update", orig_buf);
1403 while (buffer_size > 0)
1404 {
1405 char *value;
1406 time_t stamp;
1407 char *eostamp;
1409 status = buffer_get_field (&buffer, &buffer_size, &value);
1410 if (status != 0)
1411 {
1412 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1413 break;
1414 }
1416 /* make sure update time is always moving forward */
1417 stamp = strtol(value, &eostamp, 10);
1418 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1419 {
1420 pthread_mutex_unlock(&cache_lock);
1421 return send_response(sock, RESP_ERR,
1422 "Cannot find timestamp in '%s'!\n", value);
1423 }
1424 else if (stamp <= ci->last_update_stamp)
1425 {
1426 pthread_mutex_unlock(&cache_lock);
1427 return send_response(sock, RESP_ERR,
1428 "illegal attempt to update using time %ld when last"
1429 " update time is %ld (minimum one second step)\n",
1430 stamp, ci->last_update_stamp);
1431 }
1432 else
1433 ci->last_update_stamp = stamp;
1435 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1436 {
1437 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1438 continue;
1439 }
1441 values_num++;
1442 }
1444 if (((now - ci->last_flush_time) >= config_write_interval)
1445 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1446 && (ci->values_num > 0))
1447 {
1448 enqueue_cache_item (ci, TAIL);
1449 }
1451 pthread_mutex_unlock (&cache_lock);
1453 if (values_num < 1)
1454 return send_response(sock, RESP_ERR, "No values updated.\n");
1455 else
1456 return send_response(sock, RESP_OK,
1457 "errors, enqueued %i value(s).\n", values_num);
1459 /* NOTREACHED */
1460 assert(1==0);
1462 } /* }}} int handle_request_update */
1464 /* we came across a "WROTE" entry during journal replay.
1465 * throw away any values that we have accumulated for this file
1466 */
1467 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1468 {
1469 cache_item_t *ci;
1470 const char *file = buffer;
1472 pthread_mutex_lock(&cache_lock);
1474 ci = g_tree_lookup(cache_tree, file);
1475 if (ci == NULL)
1476 {
1477 pthread_mutex_unlock(&cache_lock);
1478 return (0);
1479 }
1481 if (ci->values)
1482 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1484 wipe_ci_values(ci, now);
1485 remove_from_queue(ci);
1487 pthread_mutex_unlock(&cache_lock);
1488 return (0);
1489 } /* }}} int handle_request_wrote */
1491 /* start "BATCH" processing */
1492 static int batch_start (HANDLER_PROTO) /* {{{ */
1493 {
1494 int status;
1495 if (sock->batch_start)
1496 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1498 status = send_response(sock, RESP_OK,
1499 "Go ahead. End with dot '.' on its own line.\n");
1500 sock->batch_start = time(NULL);
1501 sock->batch_cmd = 0;
1503 return status;
1504 } /* }}} static int batch_start */
1506 /* finish "BATCH" processing and return results to the client */
1507 static int batch_done (HANDLER_PROTO) /* {{{ */
1508 {
1509 assert(sock->batch_start);
1510 sock->batch_start = 0;
1511 sock->batch_cmd = 0;
1512 return send_response(sock, RESP_OK, "errors\n");
1513 } /* }}} static int batch_done */
1515 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1516 {
1517 return -1;
1518 } /* }}} static int handle_request_quit */
1520 static command_t list_of_commands[] = { /* {{{ */
1521 {
1522 "UPDATE",
1523 handle_request_update,
1524 CMD_CONTEXT_ANY,
1525 "UPDATE <filename> <values> [<values> ...]\n"
1526 ,
1527 "Adds the given file to the internal cache if it is not yet known and\n"
1528 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1529 "for details.\n"
1530 "\n"
1531 "Each <values> has the following form:\n"
1532 " <values> = <time>:<value>[:<value>[...]]\n"
1533 "See the rrdupdate(1) manpage for details.\n"
1534 },
1535 {
1536 "WROTE",
1537 handle_request_wrote,
1538 CMD_CONTEXT_JOURNAL,
1539 NULL,
1540 NULL
1541 },
1542 {
1543 "FLUSH",
1544 handle_request_flush,
1545 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1546 "FLUSH <filename>\n"
1547 ,
1548 "Adds the given filename to the head of the update queue and returns\n"
1549 "after it has been dequeued.\n"
1550 },
1551 {
1552 "FLUSHALL",
1553 handle_request_flushall,
1554 CMD_CONTEXT_CLIENT,
1555 "FLUSHALL\n"
1556 ,
1557 "Triggers writing of all pending updates. Returns immediately.\n"
1558 },
1559 {
1560 "PENDING",
1561 handle_request_pending,
1562 CMD_CONTEXT_CLIENT,
1563 "PENDING <filename>\n"
1564 ,
1565 "Shows any 'pending' updates for a file, in order.\n"
1566 "The updates shown have not yet been written to the underlying RRD file.\n"
1567 },
1568 {
1569 "FORGET",
1570 handle_request_forget,
1571 CMD_CONTEXT_ANY,
1572 "FORGET <filename>\n"
1573 ,
1574 "Removes the file completely from the cache.\n"
1575 "Any pending updates for the file will be lost.\n"
1576 },
1577 {
1578 "QUEUE",
1579 handle_request_queue,
1580 CMD_CONTEXT_CLIENT,
1581 "QUEUE\n"
1582 ,
1583 "Shows all files in the output queue.\n"
1584 "The output is zero or more lines in the following format:\n"
1585 "(where <num_vals> is the number of values to be written)\n"
1586 "\n"
1587 "<num_vals> <filename>\n"
1588 },
1589 {
1590 "STATS",
1591 handle_request_stats,
1592 CMD_CONTEXT_CLIENT,
1593 "STATS\n"
1594 ,
1595 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1596 "a description of the values.\n"
1597 },
1598 {
1599 "HELP",
1600 handle_request_help,
1601 CMD_CONTEXT_CLIENT,
1602 "HELP [<command>]\n",
1603 NULL, /* special! */
1604 },
1605 {
1606 "BATCH",
1607 batch_start,
1608 CMD_CONTEXT_CLIENT,
1609 "BATCH\n"
1610 ,
1611 "The 'BATCH' command permits the client to initiate a bulk load\n"
1612 " of commands to rrdcached.\n"
1613 "\n"
1614 "Usage:\n"
1615 "\n"
1616 " client: BATCH\n"
1617 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1618 " client: command #1\n"
1619 " client: command #2\n"
1620 " client: ... and so on\n"
1621 " client: .\n"
1622 " server: 2 errors\n"
1623 " server: 7 message for command #7\n"
1624 " server: 9 message for command #9\n"
1625 "\n"
1626 "For more information, consult the rrdcached(1) documentation.\n"
1627 },
1628 {
1629 ".", /* BATCH terminator */
1630 batch_done,
1631 CMD_CONTEXT_BATCH,
1632 NULL,
1633 NULL
1634 },
1635 {
1636 "QUIT",
1637 handle_request_quit,
1638 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1639 "QUIT\n"
1640 ,
1641 "Disconnect from rrdcached.\n"
1642 }
1643 }; /* }}} command_t list_of_commands[] */
1644 static size_t list_of_commands_len = sizeof (list_of_commands)
1645 / sizeof (list_of_commands[0]);
1647 static command_t *find_command(char *cmd)
1648 {
1649 size_t i;
1651 for (i = 0; i < list_of_commands_len; i++)
1652 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1653 return (&list_of_commands[i]);
1654 return NULL;
1655 }
1657 /* We currently use the index in the `list_of_commands' array as a bit position
1658 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1659 * outside these functions so that switching to a more elegant storage method
1660 * is easily possible. */
1661 static ssize_t find_command_index (const char *cmd) /* {{{ */
1662 {
1663 size_t i;
1665 for (i = 0; i < list_of_commands_len; i++)
1666 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1667 return ((ssize_t) i);
1668 return (-1);
1669 } /* }}} ssize_t find_command_index */
1671 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1672 const char *cmd)
1673 {
1674 ssize_t i;
1676 if (sock == NULL) /* journal replay */
1677 return (1);
1679 if (cmd == NULL)
1680 return (-1);
1682 if ((strcasecmp ("QUIT", cmd) == 0)
1683 || (strcasecmp ("HELP", cmd) == 0))
1684 return (1);
1685 else if (strcmp (".", cmd) == 0)
1686 cmd = "BATCH";
1688 i = find_command_index (cmd);
1689 if (i < 0)
1690 return (-1);
1691 assert (i < 32);
1693 if ((sock->permissions & (1 << i)) != 0)
1694 return (1);
1695 return (0);
1696 } /* }}} int socket_permission_check */
1698 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1699 const char *cmd)
1700 {
1701 ssize_t i;
1703 i = find_command_index (cmd);
1704 if (i < 0)
1705 return (-1);
1706 assert (i < 32);
1708 sock->permissions |= (1 << i);
1709 return (0);
1710 } /* }}} int socket_permission_add */
1712 /* check whether commands are received in the expected context */
1713 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1714 {
1715 if (sock == NULL)
1716 return (cmd->context & CMD_CONTEXT_JOURNAL);
1717 else if (sock->batch_start)
1718 return (cmd->context & CMD_CONTEXT_BATCH);
1719 else
1720 return (cmd->context & CMD_CONTEXT_CLIENT);
1722 /* NOTREACHED */
1723 assert(1==0);
1724 }
1726 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1727 {
1728 int status;
1729 char *cmd_str;
1730 char *resp_txt;
1731 command_t *help = NULL;
1733 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1734 if (status == 0)
1735 help = find_command(cmd_str);
1737 if (help && (help->syntax || help->help))
1738 {
1739 char tmp[CMD_MAX];
1741 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1742 resp_txt = tmp;
1744 if (help->syntax)
1745 add_response_info(sock, "Usage: %s\n", help->syntax);
1747 if (help->help)
1748 add_response_info(sock, "%s\n", help->help);
1749 }
1750 else
1751 {
1752 size_t i;
1754 resp_txt = "Command overview\n";
1756 for (i = 0; i < list_of_commands_len; i++)
1757 {
1758 if (list_of_commands[i].syntax == NULL)
1759 continue;
1760 add_response_info (sock, "%s", list_of_commands[i].syntax);
1761 }
1762 }
1764 return send_response(sock, RESP_OK, resp_txt);
1765 } /* }}} int handle_request_help */
1767 /* if sock==NULL, we are in journal replay mode */
1768 static int handle_request (DISPATCH_PROTO) /* {{{ */
1769 {
1770 char *buffer_ptr = buffer;
1771 char *cmd_str = NULL;
1772 command_t *cmd = NULL;
1773 int status;
1775 assert (buffer[buffer_size - 1] == '\0');
1777 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1778 if (status != 0)
1779 {
1780 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1781 return (-1);
1782 }
1784 if (sock != NULL && sock->batch_start)
1785 sock->batch_cmd++;
1787 cmd = find_command(cmd_str);
1788 if (!cmd)
1789 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1791 if (!socket_permission_check (sock, cmd->cmd))
1792 return send_response(sock, RESP_ERR, "Permission denied.\n");
1794 if (!command_check_context(sock, cmd))
1795 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1797 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1798 } /* }}} int handle_request */
1800 static void journal_set_free (journal_set *js) /* {{{ */
1801 {
1802 if (js == NULL)
1803 return;
1805 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1807 free(js);
1808 } /* }}} journal_set_free */
1810 static void journal_set_remove (journal_set *js) /* {{{ */
1811 {
1812 if (js == NULL)
1813 return;
1815 for (uint i=0; i < js->files_num; i++)
1816 {
1817 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1818 unlink(js->files[i]);
1819 }
1820 } /* }}} journal_set_remove */
1822 /* close current journal file handle.
1823 * MUST hold journal_lock before calling */
1824 static void journal_close(void) /* {{{ */
1825 {
1826 if (journal_fh != NULL)
1827 {
1828 if (fclose(journal_fh) != 0)
1829 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1830 }
1832 journal_fh = NULL;
1833 journal_size = 0;
1834 } /* }}} journal_close */
1836 /* MUST hold journal_lock before calling */
1837 static void journal_new_file(void) /* {{{ */
1838 {
1839 struct timeval now;
1840 int new_fd;
1841 char new_file[PATH_MAX + 1];
1843 assert(journal_dir != NULL);
1844 assert(journal_cur != NULL);
1846 journal_close();
1848 gettimeofday(&now, NULL);
1849 /* this format assures that the files sort in strcmp() order */
1850 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1851 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1853 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1854 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1855 if (new_fd < 0)
1856 goto error;
1858 journal_fh = fdopen(new_fd, "a");
1859 if (journal_fh == NULL)
1860 goto error;
1862 journal_size = ftell(journal_fh);
1863 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1865 /* record the file in the journal set */
1866 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1868 return;
1870 error:
1871 RRDD_LOG(LOG_CRIT,
1872 "JOURNALING DISABLED: Error while trying to create %s : %s",
1873 new_file, rrd_strerror(errno));
1874 RRDD_LOG(LOG_CRIT,
1875 "JOURNALING DISABLED: All values will be flushed at shutdown");
1877 close(new_fd);
1878 config_flush_at_shutdown = 1;
1880 } /* }}} journal_new_file */
1882 /* MUST NOT hold journal_lock before calling this */
1883 static void journal_rotate(void) /* {{{ */
1884 {
1885 journal_set *old_js = NULL;
1887 if (journal_dir == NULL)
1888 return;
1890 RRDD_LOG(LOG_DEBUG, "rotating journals");
1892 pthread_mutex_lock(&stats_lock);
1893 ++stats_journal_rotate;
1894 pthread_mutex_unlock(&stats_lock);
1896 pthread_mutex_lock(&journal_lock);
1898 journal_close();
1900 /* rotate the journal sets */
1901 old_js = journal_old;
1902 journal_old = journal_cur;
1903 journal_cur = calloc(1, sizeof(journal_set));
1905 if (journal_cur != NULL)
1906 journal_new_file();
1907 else
1908 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1910 pthread_mutex_unlock(&journal_lock);
1912 journal_set_remove(old_js);
1913 journal_set_free (old_js);
1915 } /* }}} static void journal_rotate */
1917 /* MUST hold journal_lock when calling */
1918 static void journal_done(void) /* {{{ */
1919 {
1920 if (journal_cur == NULL)
1921 return;
1923 journal_close();
1925 if (config_flush_at_shutdown)
1926 {
1927 RRDD_LOG(LOG_INFO, "removing journals");
1928 journal_set_remove(journal_old);
1929 journal_set_remove(journal_cur);
1930 }
1931 else
1932 {
1933 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1934 "journals will be used at next startup");
1935 }
1937 journal_set_free(journal_cur);
1938 journal_set_free(journal_old);
1939 free(journal_dir);
1941 } /* }}} static void journal_done */
1943 static int journal_write(char *cmd, char *args) /* {{{ */
1944 {
1945 int chars;
1947 if (journal_fh == NULL)
1948 return 0;
1950 pthread_mutex_lock(&journal_lock);
1951 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1952 journal_size += chars;
1954 if (journal_size > JOURNAL_MAX)
1955 journal_new_file();
1957 pthread_mutex_unlock(&journal_lock);
1959 if (chars > 0)
1960 {
1961 pthread_mutex_lock(&stats_lock);
1962 stats_journal_bytes += chars;
1963 pthread_mutex_unlock(&stats_lock);
1964 }
1966 return chars;
1967 } /* }}} static int journal_write */
1969 static int journal_replay (const char *file) /* {{{ */
1970 {
1971 FILE *fh;
1972 int entry_cnt = 0;
1973 int fail_cnt = 0;
1974 uint64_t line = 0;
1975 char entry[CMD_MAX];
1976 time_t now;
1978 if (file == NULL) return 0;
1980 {
1981 char *reason = "unknown error";
1982 int status = 0;
1983 struct stat statbuf;
1985 memset(&statbuf, 0, sizeof(statbuf));
1986 if (stat(file, &statbuf) != 0)
1987 {
1988 reason = "stat error";
1989 status = errno;
1990 }
1991 else if (!S_ISREG(statbuf.st_mode))
1992 {
1993 reason = "not a regular file";
1994 status = EPERM;
1995 }
1996 if (statbuf.st_uid != daemon_uid)
1997 {
1998 reason = "not owned by daemon user";
1999 status = EACCES;
2000 }
2001 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2002 {
2003 reason = "must not be user/group writable";
2004 status = EACCES;
2005 }
2007 if (status != 0)
2008 {
2009 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2010 file, rrd_strerror(status), reason);
2011 return 0;
2012 }
2013 }
2015 fh = fopen(file, "r");
2016 if (fh == NULL)
2017 {
2018 if (errno != ENOENT)
2019 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2020 file, rrd_strerror(errno));
2021 return 0;
2022 }
2023 else
2024 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2026 now = time(NULL);
2028 while(!feof(fh))
2029 {
2030 size_t entry_len;
2032 ++line;
2033 if (fgets(entry, sizeof(entry), fh) == NULL)
2034 break;
2035 entry_len = strlen(entry);
2037 /* check \n termination in case journal writing crashed mid-line */
2038 if (entry_len == 0)
2039 continue;
2040 else if (entry[entry_len - 1] != '\n')
2041 {
2042 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2043 ++fail_cnt;
2044 continue;
2045 }
2047 entry[entry_len - 1] = '\0';
2049 if (handle_request(NULL, now, entry, entry_len) == 0)
2050 ++entry_cnt;
2051 else
2052 ++fail_cnt;
2053 }
2055 fclose(fh);
2057 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2058 entry_cnt, fail_cnt);
2060 return entry_cnt > 0 ? 1 : 0;
2061 } /* }}} static int journal_replay */
2063 static int journal_sort(const void *v1, const void *v2)
2064 {
2065 char **jn1 = (char **) v1;
2066 char **jn2 = (char **) v2;
2068 return strcmp(*jn1,*jn2);
2069 }
2071 static void journal_init(void) /* {{{ */
2072 {
2073 int had_journal = 0;
2074 DIR *dir;
2075 struct dirent *dent;
2076 char path[PATH_MAX+1];
2078 if (journal_dir == NULL) return;
2080 pthread_mutex_lock(&journal_lock);
2082 journal_cur = calloc(1, sizeof(journal_set));
2083 if (journal_cur == NULL)
2084 {
2085 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2086 return;
2087 }
2089 RRDD_LOG(LOG_INFO, "checking for journal files");
2091 /* Handle old journal files during transition. This gives them the
2092 * correct sort order. TODO: remove after first release
2093 */
2094 {
2095 char old_path[PATH_MAX+1];
2096 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2097 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2098 rename(old_path, path);
2100 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2101 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2102 rename(old_path, path);
2103 }
2105 dir = opendir(journal_dir);
2106 while ((dent = readdir(dir)) != NULL)
2107 {
2108 /* looks like a journal file? */
2109 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2110 continue;
2112 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2114 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2115 {
2116 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2117 dent->d_name);
2118 break;
2119 }
2120 }
2121 closedir(dir);
2123 qsort(journal_cur->files, journal_cur->files_num,
2124 sizeof(journal_cur->files[0]), journal_sort);
2126 for (uint i=0; i < journal_cur->files_num; i++)
2127 had_journal += journal_replay(journal_cur->files[i]);
2129 journal_new_file();
2131 /* it must have been a crash. start a flush */
2132 if (had_journal && config_flush_at_shutdown)
2133 flush_old_values(-1);
2135 pthread_mutex_unlock(&journal_lock);
2137 RRDD_LOG(LOG_INFO, "journal processing complete");
2139 } /* }}} static void journal_init */
2141 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2142 {
2143 assert(sock != NULL);
2145 free(sock->rbuf); sock->rbuf = NULL;
2146 free(sock->wbuf); sock->wbuf = NULL;
2147 free(sock);
2148 } /* }}} void free_listen_socket */
2150 static void close_connection(listen_socket_t *sock) /* {{{ */
2151 {
2152 if (sock->fd >= 0)
2153 {
2154 close(sock->fd);
2155 sock->fd = -1;
2156 }
2158 free_listen_socket(sock);
2160 } /* }}} void close_connection */
2162 static void *connection_thread_main (void *args) /* {{{ */
2163 {
2164 listen_socket_t *sock;
2165 int fd;
2167 sock = (listen_socket_t *) args;
2168 fd = sock->fd;
2170 /* init read buffers */
2171 sock->next_read = sock->next_cmd = 0;
2172 sock->rbuf = malloc(RBUF_SIZE);
2173 if (sock->rbuf == NULL)
2174 {
2175 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2176 close_connection(sock);
2177 return NULL;
2178 }
2180 pthread_mutex_lock (&connection_threads_lock);
2181 connection_threads_num++;
2182 pthread_mutex_unlock (&connection_threads_lock);
2184 while (state == RUNNING)
2185 {
2186 char *cmd;
2187 ssize_t cmd_len;
2188 ssize_t rbytes;
2189 time_t now;
2191 struct pollfd pollfd;
2192 int status;
2194 pollfd.fd = fd;
2195 pollfd.events = POLLIN | POLLPRI;
2196 pollfd.revents = 0;
2198 status = poll (&pollfd, 1, /* timeout = */ 500);
2199 if (state != RUNNING)
2200 break;
2201 else if (status == 0) /* timeout */
2202 continue;
2203 else if (status < 0) /* error */
2204 {
2205 status = errno;
2206 if (status != EINTR)
2207 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2208 continue;
2209 }
2211 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2212 break;
2213 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2214 {
2215 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2216 "poll(2) returned something unexpected: %#04hx",
2217 pollfd.revents);
2218 break;
2219 }
2221 rbytes = read(fd, sock->rbuf + sock->next_read,
2222 RBUF_SIZE - sock->next_read);
2223 if (rbytes < 0)
2224 {
2225 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2226 break;
2227 }
2228 else if (rbytes == 0)
2229 break; /* eof */
2231 sock->next_read += rbytes;
2233 if (sock->batch_start)
2234 now = sock->batch_start;
2235 else
2236 now = time(NULL);
2238 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2239 {
2240 status = handle_request (sock, now, cmd, cmd_len+1);
2241 if (status != 0)
2242 goto out_close;
2243 }
2244 }
2246 out_close:
2247 close_connection(sock);
2249 /* Remove this thread from the connection threads list */
2250 pthread_mutex_lock (&connection_threads_lock);
2251 connection_threads_num--;
2252 if (connection_threads_num <= 0)
2253 pthread_cond_broadcast(&connection_threads_done);
2254 pthread_mutex_unlock (&connection_threads_lock);
2256 return (NULL);
2257 } /* }}} void *connection_thread_main */
2259 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2260 {
2261 int fd;
2262 struct sockaddr_un sa;
2263 listen_socket_t *temp;
2264 int status;
2265 const char *path;
2266 char *path_copy, *dir;
2268 path = sock->addr;
2269 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2270 path += strlen("unix:");
2272 /* dirname may modify its argument */
2273 path_copy = strdup(path);
2274 if (path_copy == NULL)
2275 {
2276 fprintf(stderr, "rrdcached: strdup(): %s\n",
2277 rrd_strerror(errno));
2278 return (-1);
2279 }
2281 dir = dirname(path_copy);
2282 if (rrd_mkdir_p(dir, 0777) != 0)
2283 {
2284 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2285 dir, rrd_strerror(errno));
2286 return (-1);
2287 }
2289 free(path_copy);
2291 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2292 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2293 if (temp == NULL)
2294 {
2295 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2296 return (-1);
2297 }
2298 listen_fds = temp;
2299 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2301 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2302 if (fd < 0)
2303 {
2304 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2305 rrd_strerror(errno));
2306 return (-1);
2307 }
2309 memset (&sa, 0, sizeof (sa));
2310 sa.sun_family = AF_UNIX;
2311 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2313 /* if we've gotten this far, we own the pid file. any daemon started
2314 * with the same args must not be alive. therefore, ensure that we can
2315 * create the socket...
2316 */
2317 unlink(path);
2319 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2320 if (status != 0)
2321 {
2322 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2323 path, rrd_strerror(errno));
2324 close (fd);
2325 return (-1);
2326 }
2328 status = listen (fd, /* backlog = */ 10);
2329 if (status != 0)
2330 {
2331 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2332 path, rrd_strerror(errno));
2333 close (fd);
2334 unlink (path);
2335 return (-1);
2336 }
2338 listen_fds[listen_fds_num].fd = fd;
2339 listen_fds[listen_fds_num].family = PF_UNIX;
2340 strncpy(listen_fds[listen_fds_num].addr, path,
2341 sizeof (listen_fds[listen_fds_num].addr) - 1);
2342 listen_fds_num++;
2344 return (0);
2345 } /* }}} int open_listen_socket_unix */
2347 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2348 {
2349 struct addrinfo ai_hints;
2350 struct addrinfo *ai_res;
2351 struct addrinfo *ai_ptr;
2352 char addr_copy[NI_MAXHOST];
2353 char *addr;
2354 char *port;
2355 int status;
2357 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2358 addr_copy[sizeof (addr_copy) - 1] = 0;
2359 addr = addr_copy;
2361 memset (&ai_hints, 0, sizeof (ai_hints));
2362 ai_hints.ai_flags = 0;
2363 #ifdef AI_ADDRCONFIG
2364 ai_hints.ai_flags |= AI_ADDRCONFIG;
2365 #endif
2366 ai_hints.ai_family = AF_UNSPEC;
2367 ai_hints.ai_socktype = SOCK_STREAM;
2369 port = NULL;
2370 if (*addr == '[') /* IPv6+port format */
2371 {
2372 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2373 addr++;
2375 port = strchr (addr, ']');
2376 if (port == NULL)
2377 {
2378 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2379 return (-1);
2380 }
2381 *port = 0;
2382 port++;
2384 if (*port == ':')
2385 port++;
2386 else if (*port == 0)
2387 port = NULL;
2388 else
2389 {
2390 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2391 return (-1);
2392 }
2393 } /* if (*addr == '[') */
2394 else
2395 {
2396 port = rindex(addr, ':');
2397 if (port != NULL)
2398 {
2399 *port = 0;
2400 port++;
2401 }
2402 }
2403 ai_res = NULL;
2404 status = getaddrinfo (addr,
2405 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2406 &ai_hints, &ai_res);
2407 if (status != 0)
2408 {
2409 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2410 addr, gai_strerror (status));
2411 return (-1);
2412 }
2414 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2415 {
2416 int fd;
2417 listen_socket_t *temp;
2418 int one = 1;
2420 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2421 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2422 if (temp == NULL)
2423 {
2424 fprintf (stderr,
2425 "rrdcached: open_listen_socket_network: realloc failed.\n");
2426 continue;
2427 }
2428 listen_fds = temp;
2429 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2431 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2432 if (fd < 0)
2433 {
2434 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2435 rrd_strerror(errno));
2436 continue;
2437 }
2439 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2441 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2442 if (status != 0)
2443 {
2444 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2445 sock->addr, rrd_strerror(errno));
2446 close (fd);
2447 continue;
2448 }
2450 status = listen (fd, /* backlog = */ 10);
2451 if (status != 0)
2452 {
2453 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2454 sock->addr, rrd_strerror(errno));
2455 close (fd);
2456 freeaddrinfo(ai_res);
2457 return (-1);
2458 }
2460 listen_fds[listen_fds_num].fd = fd;
2461 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2462 listen_fds_num++;
2463 } /* for (ai_ptr) */
2465 freeaddrinfo(ai_res);
2466 return (0);
2467 } /* }}} static int open_listen_socket_network */
2469 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2470 {
2471 assert(sock != NULL);
2472 assert(sock->addr != NULL);
2474 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2475 || sock->addr[0] == '/')
2476 return (open_listen_socket_unix(sock));
2477 else
2478 return (open_listen_socket_network(sock));
2479 } /* }}} int open_listen_socket */
2481 static int close_listen_sockets (void) /* {{{ */
2482 {
2483 size_t i;
2485 for (i = 0; i < listen_fds_num; i++)
2486 {
2487 close (listen_fds[i].fd);
2489 if (listen_fds[i].family == PF_UNIX)
2490 unlink(listen_fds[i].addr);
2491 }
2493 free (listen_fds);
2494 listen_fds = NULL;
2495 listen_fds_num = 0;
2497 return (0);
2498 } /* }}} int close_listen_sockets */
2500 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2501 {
2502 struct pollfd *pollfds;
2503 int pollfds_num;
2504 int status;
2505 int i;
2507 if (listen_fds_num < 1)
2508 {
2509 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2510 return (NULL);
2511 }
2513 pollfds_num = listen_fds_num;
2514 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2515 if (pollfds == NULL)
2516 {
2517 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2518 return (NULL);
2519 }
2520 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2522 RRDD_LOG(LOG_INFO, "listening for connections");
2524 while (state == RUNNING)
2525 {
2526 for (i = 0; i < pollfds_num; i++)
2527 {
2528 pollfds[i].fd = listen_fds[i].fd;
2529 pollfds[i].events = POLLIN | POLLPRI;
2530 pollfds[i].revents = 0;
2531 }
2533 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2534 if (state != RUNNING)
2535 break;
2536 else if (status == 0) /* timeout */
2537 continue;
2538 else if (status < 0) /* error */
2539 {
2540 status = errno;
2541 if (status != EINTR)
2542 {
2543 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2544 }
2545 continue;
2546 }
2548 for (i = 0; i < pollfds_num; i++)
2549 {
2550 listen_socket_t *client_sock;
2551 struct sockaddr_storage client_sa;
2552 socklen_t client_sa_size;
2553 pthread_t tid;
2554 pthread_attr_t attr;
2556 if (pollfds[i].revents == 0)
2557 continue;
2559 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2560 {
2561 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2562 "poll(2) returned something unexpected for listen FD #%i.",
2563 pollfds[i].fd);
2564 continue;
2565 }
2567 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2568 if (client_sock == NULL)
2569 {
2570 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2571 continue;
2572 }
2573 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2575 client_sa_size = sizeof (client_sa);
2576 client_sock->fd = accept (pollfds[i].fd,
2577 (struct sockaddr *) &client_sa, &client_sa_size);
2578 if (client_sock->fd < 0)
2579 {
2580 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2581 free(client_sock);
2582 continue;
2583 }
2585 pthread_attr_init (&attr);
2586 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2588 status = pthread_create (&tid, &attr, connection_thread_main,
2589 client_sock);
2590 if (status != 0)
2591 {
2592 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2593 close_connection(client_sock);
2594 continue;
2595 }
2596 } /* for (pollfds_num) */
2597 } /* while (state == RUNNING) */
2599 RRDD_LOG(LOG_INFO, "starting shutdown");
2601 close_listen_sockets ();
2603 pthread_mutex_lock (&connection_threads_lock);
2604 while (connection_threads_num > 0)
2605 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2606 pthread_mutex_unlock (&connection_threads_lock);
2608 free(pollfds);
2610 return (NULL);
2611 } /* }}} void *listen_thread_main */
2613 static int daemonize (void) /* {{{ */
2614 {
2615 int pid_fd;
2616 char *base_dir;
2618 daemon_uid = geteuid();
2620 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2621 if (pid_fd < 0)
2622 pid_fd = check_pidfile();
2623 if (pid_fd < 0)
2624 return pid_fd;
2626 /* open all the listen sockets */
2627 if (config_listen_address_list_len > 0)
2628 {
2629 for (size_t i = 0; i < config_listen_address_list_len; i++)
2630 open_listen_socket (config_listen_address_list[i]);
2632 rrd_free_ptrs((void ***) &config_listen_address_list,
2633 &config_listen_address_list_len);
2634 }
2635 else
2636 {
2637 listen_socket_t sock;
2638 memset(&sock, 0, sizeof(sock));
2639 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2640 open_listen_socket (&sock);
2641 }
2643 if (listen_fds_num < 1)
2644 {
2645 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2646 goto error;
2647 }
2649 if (!stay_foreground)
2650 {
2651 pid_t child;
2653 child = fork ();
2654 if (child < 0)
2655 {
2656 fprintf (stderr, "daemonize: fork(2) failed.\n");
2657 goto error;
2658 }
2659 else if (child > 0)
2660 exit(0);
2662 /* Become session leader */
2663 setsid ();
2665 /* Open the first three file descriptors to /dev/null */
2666 close (2);
2667 close (1);
2668 close (0);
2670 open ("/dev/null", O_RDWR);
2671 if (dup(0) == -1 || dup(0) == -1){
2672 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2673 }
2674 } /* if (!stay_foreground) */
2676 /* Change into the /tmp directory. */
2677 base_dir = (config_base_dir != NULL)
2678 ? config_base_dir
2679 : "/tmp";
2681 if (chdir (base_dir) != 0)
2682 {
2683 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2684 goto error;
2685 }
2687 install_signal_handlers();
2689 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2690 RRDD_LOG(LOG_INFO, "starting up");
2692 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2693 (GDestroyNotify) free_cache_item);
2694 if (cache_tree == NULL)
2695 {
2696 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2697 goto error;
2698 }
2700 return write_pidfile (pid_fd);
2702 error:
2703 remove_pidfile();
2704 return -1;
2705 } /* }}} int daemonize */
2707 static int cleanup (void) /* {{{ */
2708 {
2709 pthread_cond_broadcast (&flush_cond);
2710 pthread_join (flush_thread, NULL);
2712 pthread_cond_broadcast (&queue_cond);
2713 for (int i = 0; i < config_queue_threads; i++)
2714 pthread_join (queue_threads[i], NULL);
2716 if (config_flush_at_shutdown)
2717 {
2718 assert(cache_queue_head == NULL);
2719 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2720 }
2722 free(queue_threads);
2723 free(config_base_dir);
2725 pthread_mutex_lock(&cache_lock);
2726 g_tree_destroy(cache_tree);
2728 pthread_mutex_lock(&journal_lock);
2729 journal_done();
2731 RRDD_LOG(LOG_INFO, "goodbye");
2732 closelog ();
2734 remove_pidfile ();
2735 free(config_pid_file);
2737 return (0);
2738 } /* }}} int cleanup */
2740 static int read_options (int argc, char **argv) /* {{{ */
2741 {
2742 int option;
2743 int status = 0;
2745 char **permissions = NULL;
2746 size_t permissions_len = 0;
2748 while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2749 {
2750 switch (option)
2751 {
2752 case 'g':
2753 stay_foreground=1;
2754 break;
2756 case 'l':
2757 {
2758 listen_socket_t *new;
2760 new = malloc(sizeof(listen_socket_t));
2761 if (new == NULL)
2762 {
2763 fprintf(stderr, "read_options: malloc failed.\n");
2764 return(2);
2765 }
2766 memset(new, 0, sizeof(listen_socket_t));
2768 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2770 /* Add permissions to the socket {{{ */
2771 if (permissions_len != 0)
2772 {
2773 size_t i;
2774 for (i = 0; i < permissions_len; i++)
2775 {
2776 status = socket_permission_add (new, permissions[i]);
2777 if (status != 0)
2778 {
2779 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2780 "socket failed. Most likely, this permission doesn't "
2781 "exist. Check your command line.\n", permissions[i]);
2782 status = 4;
2783 }
2784 }
2785 }
2786 else /* if (permissions_len == 0) */
2787 {
2788 /* Add permission for ALL commands to the socket. */
2789 size_t i;
2790 for (i = 0; i < list_of_commands_len; i++)
2791 {
2792 status = socket_permission_add (new, list_of_commands[i].cmd);
2793 if (status != 0)
2794 {
2795 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2796 "socket failed. This should never happen, ever! Sorry.\n",
2797 permissions[i]);
2798 status = 4;
2799 }
2800 }
2801 }
2802 /* }}} Done adding permissions. */
2804 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2805 &config_listen_address_list_len, new))
2806 {
2807 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2808 return (2);
2809 }
2810 }
2811 break;
2813 case 'P':
2814 {
2815 char *optcopy;
2816 char *saveptr;
2817 char *dummy;
2818 char *ptr;
2820 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2822 optcopy = strdup (optarg);
2823 dummy = optcopy;
2824 saveptr = NULL;
2825 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2826 {
2827 dummy = NULL;
2828 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2829 }
2831 free (optcopy);
2832 }
2833 break;
2835 case 'f':
2836 {
2837 int temp;
2839 temp = atoi (optarg);
2840 if (temp > 0)
2841 config_flush_interval = temp;
2842 else
2843 {
2844 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2845 status = 3;
2846 }
2847 }
2848 break;
2850 case 'w':
2851 {
2852 int temp;
2854 temp = atoi (optarg);
2855 if (temp > 0)
2856 config_write_interval = temp;
2857 else
2858 {
2859 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2860 status = 2;
2861 }
2862 }
2863 break;
2865 case 'z':
2866 {
2867 int temp;
2869 temp = atoi(optarg);
2870 if (temp > 0)
2871 config_write_jitter = temp;
2872 else
2873 {
2874 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2875 status = 2;
2876 }
2878 break;
2879 }
2881 case 't':
2882 {
2883 int threads;
2884 threads = atoi(optarg);
2885 if (threads >= 1)
2886 config_queue_threads = threads;
2887 else
2888 {
2889 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2890 return 1;
2891 }
2892 }
2893 break;
2895 case 'B':
2896 config_write_base_only = 1;
2897 break;
2899 case 'b':
2900 {
2901 size_t len;
2902 char base_realpath[PATH_MAX];
2904 if (config_base_dir != NULL)
2905 free (config_base_dir);
2906 config_base_dir = strdup (optarg);
2907 if (config_base_dir == NULL)
2908 {
2909 fprintf (stderr, "read_options: strdup failed.\n");
2910 return (3);
2911 }
2913 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2914 {
2915 fprintf (stderr, "Failed to create base directory '%s': %s\n",
2916 config_base_dir, rrd_strerror (errno));
2917 return (3);
2918 }
2920 /* make sure that the base directory is not resolved via
2921 * symbolic links. this makes some performance-enhancing
2922 * assumptions possible (we don't have to resolve paths
2923 * that start with a "/")
2924 */
2925 if (realpath(config_base_dir, base_realpath) == NULL)
2926 {
2927 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2928 "%s\n", config_base_dir, rrd_strerror(errno));
2929 return 5;
2930 }
2932 len = strlen (config_base_dir);
2933 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2934 {
2935 config_base_dir[len - 1] = 0;
2936 len--;
2937 }
2939 if (len < 1)
2940 {
2941 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2942 return (4);
2943 }
2945 _config_base_dir_len = len;
2947 len = strlen (base_realpath);
2948 while ((len > 0) && (base_realpath[len - 1] == '/'))
2949 {
2950 base_realpath[len - 1] = '\0';
2951 len--;
2952 }
2954 if (strncmp(config_base_dir,
2955 base_realpath, sizeof(base_realpath)) != 0)
2956 {
2957 fprintf(stderr,
2958 "Base directory (-b) resolved via file system links!\n"
2959 "Please consult rrdcached '-b' documentation!\n"
2960 "Consider specifying the real directory (%s)\n",
2961 base_realpath);
2962 return 5;
2963 }
2964 }
2965 break;
2967 case 'p':
2968 {
2969 if (config_pid_file != NULL)
2970 free (config_pid_file);
2971 config_pid_file = strdup (optarg);
2972 if (config_pid_file == NULL)
2973 {
2974 fprintf (stderr, "read_options: strdup failed.\n");
2975 return (3);
2976 }
2977 }
2978 break;
2980 case 'F':
2981 config_flush_at_shutdown = 1;
2982 break;
2984 case 'j':
2985 {
2986 const char *dir = journal_dir = strdup(optarg);
2988 status = rrd_mkdir_p(dir, 0777);
2989 if (status != 0)
2990 {
2991 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
2992 dir, rrd_strerror(errno));
2993 return 6;
2994 }
2996 if (access(dir, R_OK|W_OK|X_OK) != 0)
2997 {
2998 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2999 errno ? rrd_strerror(errno) : "");
3000 return 6;
3001 }
3002 }
3003 break;
3005 case 'h':
3006 case '?':
3007 printf ("RRDCacheD %s\n"
3008 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3009 "\n"
3010 "Usage: rrdcached [options]\n"
3011 "\n"
3012 "Valid options are:\n"
3013 " -l <address> Socket address to listen to.\n"
3014 " -P <perms> Sets the permissions to assign to all following "
3015 "sockets\n"
3016 " -w <seconds> Interval in which to write data.\n"
3017 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3018 " -t <threads> Number of write threads.\n"
3019 " -f <seconds> Interval in which to flush dead data.\n"
3020 " -p <file> Location of the PID-file.\n"
3021 " -b <dir> Base directory to change to.\n"
3022 " -B Restrict file access to paths within -b <dir>\n"
3023 " -g Do not fork and run in the foreground.\n"
3024 " -j <dir> Directory in which to create the journal files.\n"
3025 " -F Always flush all updates at shutdown\n"
3026 "\n"
3027 "For more information and a detailed description of all options "
3028 "please refer\n"
3029 "to the rrdcached(1) manual page.\n",
3030 VERSION);
3031 status = -1;
3032 break;
3033 } /* switch (option) */
3034 } /* while (getopt) */
3036 /* advise the user when values are not sane */
3037 if (config_flush_interval < 2 * config_write_interval)
3038 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3039 " 2x write interval (-w) !\n");
3040 if (config_write_jitter > config_write_interval)
3041 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3042 " write interval (-w) !\n");
3044 if (config_write_base_only && config_base_dir == NULL)
3045 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3046 " Consult the rrdcached documentation\n");
3048 if (journal_dir == NULL)
3049 config_flush_at_shutdown = 1;
3051 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3053 return (status);
3054 } /* }}} int read_options */
3056 int main (int argc, char **argv)
3057 {
3058 int status;
3060 status = read_options (argc, argv);
3061 if (status != 0)
3062 {
3063 if (status < 0)
3064 status = 0;
3065 return (status);
3066 }
3068 status = daemonize ();
3069 if (status != 0)
3070 {
3071 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3072 return (1);
3073 }
3075 journal_init();
3077 /* start the queue threads */
3078 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3079 if (queue_threads == NULL)
3080 {
3081 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3082 cleanup();
3083 return (1);
3084 }
3085 for (int i = 0; i < config_queue_threads; i++)
3086 {
3087 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3088 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3089 if (status != 0)
3090 {
3091 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3092 cleanup();
3093 return (1);
3094 }
3095 }
3097 /* start the flush thread */
3098 memset(&flush_thread, 0, sizeof(flush_thread));
3099 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3100 if (status != 0)
3101 {
3102 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3103 cleanup();
3104 return (1);
3105 }
3107 listen_thread_main (NULL);
3108 cleanup ();
3110 return (0);
3111 } /* int main */
3113 /*
3114 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3115 */