4c84f19a0304c1206a5b121decc2770ca39b2843
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 # include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
110 #include <glib-2.0/glib.h>
111 /* }}} */
113 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
115 #ifndef __GNUC__
116 # define __attribute__(x) /**/
117 #endif
119 /*
120 * Types
121 */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124 struct listen_socket_s
125 {
126 int fd;
127 char addr[PATH_MAX + 1];
128 int family;
130 /* state for BATCH processing */
131 time_t batch_start;
132 int batch_cmd;
134 /* buffered IO */
135 char *rbuf;
136 off_t next_cmd;
137 off_t next_read;
139 char *wbuf;
140 ssize_t wbuf_len;
142 uint32_t permissions;
143 };
144 typedef struct listen_socket_s listen_socket_t;
146 struct command_s;
147 typedef struct command_s command_t;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command_s {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
161 char context; /* where we expect to see it */
162 #define CMD_CONTEXT_CLIENT (1<<0)
163 #define CMD_CONTEXT_BATCH (1<<1)
164 #define CMD_CONTEXT_JOURNAL (1<<2)
165 #define CMD_CONTEXT_ANY (0x7f)
167 char *syntax;
168 char *help;
169 };
171 struct cache_item_s;
172 typedef struct cache_item_s cache_item_t;
173 struct cache_item_s
174 {
175 char *file;
176 char **values;
177 size_t values_num;
178 time_t last_flush_time;
179 time_t last_update_stamp;
180 #define CI_FLAGS_IN_TREE (1<<0)
181 #define CI_FLAGS_IN_QUEUE (1<<1)
182 int flags;
183 pthread_cond_t flushed;
184 cache_item_t *prev;
185 cache_item_t *next;
186 };
188 struct callback_flush_data_s
189 {
190 time_t now;
191 time_t abs_timeout;
192 char **keys;
193 size_t keys_num;
194 };
195 typedef struct callback_flush_data_s callback_flush_data_t;
197 enum queue_side_e
198 {
199 HEAD,
200 TAIL
201 };
202 typedef enum queue_side_e queue_side_t;
204 /* describe a set of journal files */
205 typedef struct {
206 char **files;
207 size_t files_num;
208 } journal_set;
210 /* max length of socket command or response */
211 #define CMD_MAX 4096
212 #define RBUF_SIZE (CMD_MAX*2)
214 /*
215 * Variables
216 */
217 static int stay_foreground = 0;
218 static uid_t daemon_uid;
220 static listen_socket_t *listen_fds = NULL;
221 static size_t listen_fds_num = 0;
223 enum {
224 RUNNING, /* normal operation */
225 FLUSHING, /* flushing remaining values */
226 SHUTDOWN /* shutting down */
227 } state = RUNNING;
229 static pthread_t *queue_threads;
230 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
231 static int config_queue_threads = 4;
233 static pthread_t flush_thread;
234 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
236 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
237 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
238 static int connection_threads_num = 0;
240 /* Cache stuff */
241 static GTree *cache_tree = NULL;
242 static cache_item_t *cache_queue_head = NULL;
243 static cache_item_t *cache_queue_tail = NULL;
244 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
246 static int config_write_interval = 300;
247 static int config_write_jitter = 0;
248 static int config_flush_interval = 3600;
249 static int config_flush_at_shutdown = 0;
250 static char *config_pid_file = NULL;
251 static char *config_base_dir = NULL;
252 static size_t _config_base_dir_len = 0;
253 static int config_write_base_only = 0;
255 static listen_socket_t **config_listen_address_list = NULL;
256 static size_t config_listen_address_list_len = 0;
258 static uint64_t stats_queue_length = 0;
259 static uint64_t stats_updates_received = 0;
260 static uint64_t stats_flush_received = 0;
261 static uint64_t stats_updates_written = 0;
262 static uint64_t stats_data_sets_written = 0;
263 static uint64_t stats_journal_bytes = 0;
264 static uint64_t stats_journal_rotate = 0;
265 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
267 /* Journaled updates */
268 #define JOURNAL_REPLAY(s) ((s) == NULL)
269 #define JOURNAL_BASE "rrd.journal"
270 static journal_set *journal_cur = NULL;
271 static journal_set *journal_old = NULL;
272 static char *journal_dir = NULL;
273 static FILE *journal_fh = NULL; /* current journal file handle */
274 static long journal_size = 0; /* current journal size */
275 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
276 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
277 static int journal_write(char *cmd, char *args);
278 static void journal_done(void);
279 static void journal_rotate(void);
281 /* prototypes for forward refernces */
282 static int handle_request_help (HANDLER_PROTO);
284 /*
285 * Functions
286 */
287 static void sig_common (const char *sig) /* {{{ */
288 {
289 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
290 state = FLUSHING;
291 pthread_cond_broadcast(&flush_cond);
292 pthread_cond_broadcast(&queue_cond);
293 } /* }}} void sig_common */
295 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
296 {
297 sig_common("INT");
298 } /* }}} void sig_int_handler */
300 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
301 {
302 sig_common("TERM");
303 } /* }}} void sig_term_handler */
305 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
306 {
307 config_flush_at_shutdown = 1;
308 sig_common("USR1");
309 } /* }}} void sig_usr1_handler */
311 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
312 {
313 config_flush_at_shutdown = 0;
314 sig_common("USR2");
315 } /* }}} void sig_usr2_handler */
317 static void install_signal_handlers(void) /* {{{ */
318 {
319 /* These structures are static, because `sigaction' behaves weird if the are
320 * overwritten.. */
321 static struct sigaction sa_int;
322 static struct sigaction sa_term;
323 static struct sigaction sa_pipe;
324 static struct sigaction sa_usr1;
325 static struct sigaction sa_usr2;
327 /* Install signal handlers */
328 memset (&sa_int, 0, sizeof (sa_int));
329 sa_int.sa_handler = sig_int_handler;
330 sigaction (SIGINT, &sa_int, NULL);
332 memset (&sa_term, 0, sizeof (sa_term));
333 sa_term.sa_handler = sig_term_handler;
334 sigaction (SIGTERM, &sa_term, NULL);
336 memset (&sa_pipe, 0, sizeof (sa_pipe));
337 sa_pipe.sa_handler = SIG_IGN;
338 sigaction (SIGPIPE, &sa_pipe, NULL);
340 memset (&sa_pipe, 0, sizeof (sa_usr1));
341 sa_usr1.sa_handler = sig_usr1_handler;
342 sigaction (SIGUSR1, &sa_usr1, NULL);
344 memset (&sa_usr2, 0, sizeof (sa_usr2));
345 sa_usr2.sa_handler = sig_usr2_handler;
346 sigaction (SIGUSR2, &sa_usr2, NULL);
348 } /* }}} void install_signal_handlers */
350 static int open_pidfile(char *action, int oflag) /* {{{ */
351 {
352 int fd;
353 const char *file;
354 char *file_copy, *dir;
356 file = (config_pid_file != NULL)
357 ? config_pid_file
358 : LOCALSTATEDIR "/run/rrdcached.pid";
360 /* dirname may modify its argument */
361 file_copy = strdup(file);
362 if (file_copy == NULL)
363 {
364 fprintf(stderr, "rrdcached: strdup(): %s\n",
365 rrd_strerror(errno));
366 return -1;
367 }
369 dir = dirname(file_copy);
370 if (rrd_mkdir_p(dir, 0777) != 0)
371 {
372 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
373 dir, rrd_strerror(errno));
374 return -1;
375 }
377 free(file_copy);
379 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
380 if (fd < 0)
381 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
382 action, file, rrd_strerror(errno));
384 return(fd);
385 } /* }}} static int open_pidfile */
387 /* check existing pid file to see whether a daemon is running */
388 static int check_pidfile(void)
389 {
390 int pid_fd;
391 pid_t pid;
392 char pid_str[16];
394 pid_fd = open_pidfile("open", O_RDWR);
395 if (pid_fd < 0)
396 return pid_fd;
398 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
399 return -1;
401 pid = atoi(pid_str);
402 if (pid <= 0)
403 return -1;
405 /* another running process that we can signal COULD be
406 * a competing rrdcached */
407 if (pid != getpid() && kill(pid, 0) == 0)
408 {
409 fprintf(stderr,
410 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
411 close(pid_fd);
412 return -1;
413 }
415 lseek(pid_fd, 0, SEEK_SET);
416 if (ftruncate(pid_fd, 0) == -1)
417 {
418 fprintf(stderr,
419 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
420 close(pid_fd);
421 return -1;
422 }
424 fprintf(stderr,
425 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
426 "rrdcached: starting normally.\n", pid);
428 return pid_fd;
429 } /* }}} static int check_pidfile */
431 static int write_pidfile (int fd) /* {{{ */
432 {
433 pid_t pid;
434 FILE *fh;
436 pid = getpid ();
438 fh = fdopen (fd, "w");
439 if (fh == NULL)
440 {
441 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
442 close(fd);
443 return (-1);
444 }
446 fprintf (fh, "%i\n", (int) pid);
447 fclose (fh);
449 return (0);
450 } /* }}} int write_pidfile */
452 static int remove_pidfile (void) /* {{{ */
453 {
454 char *file;
455 int status;
457 file = (config_pid_file != NULL)
458 ? config_pid_file
459 : LOCALSTATEDIR "/run/rrdcached.pid";
461 status = unlink (file);
462 if (status == 0)
463 return (0);
464 return (errno);
465 } /* }}} int remove_pidfile */
467 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
468 {
469 char *eol;
471 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
472 sock->next_read - sock->next_cmd);
474 if (eol == NULL)
475 {
476 /* no commands left, move remainder back to front of rbuf */
477 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
478 sock->next_read - sock->next_cmd);
479 sock->next_read -= sock->next_cmd;
480 sock->next_cmd = 0;
481 *len = 0;
482 return NULL;
483 }
484 else
485 {
486 char *cmd = sock->rbuf + sock->next_cmd;
487 *eol = '\0';
489 sock->next_cmd = eol - sock->rbuf + 1;
491 if (eol > sock->rbuf && *(eol-1) == '\r')
492 *(--eol) = '\0'; /* handle "\r\n" EOL */
494 *len = eol - cmd;
496 return cmd;
497 }
499 /* NOTREACHED */
500 assert(1==0);
501 } /* }}} char *next_cmd */
503 /* add the characters directly to the write buffer */
504 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
505 {
506 char *new_buf;
508 assert(sock != NULL);
510 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
511 if (new_buf == NULL)
512 {
513 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
514 return -1;
515 }
517 strncpy(new_buf + sock->wbuf_len, str, len + 1);
519 sock->wbuf = new_buf;
520 sock->wbuf_len += len;
522 return 0;
523 } /* }}} static int add_to_wbuf */
525 /* add the text to the "extra" info that's sent after the status line */
526 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
527 {
528 va_list argp;
529 char buffer[CMD_MAX];
530 int len;
532 if (JOURNAL_REPLAY(sock)) return 0;
533 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
535 va_start(argp, fmt);
536 #ifdef HAVE_VSNPRINTF
537 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
538 #else
539 len = vsprintf(buffer, fmt, argp);
540 #endif
541 va_end(argp);
542 if (len < 0)
543 {
544 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
545 return -1;
546 }
548 return add_to_wbuf(sock, buffer, len);
549 } /* }}} static int add_response_info */
551 static int count_lines(char *str) /* {{{ */
552 {
553 int lines = 0;
555 if (str != NULL)
556 {
557 while ((str = strchr(str, '\n')) != NULL)
558 {
559 ++lines;
560 ++str;
561 }
562 }
564 return lines;
565 } /* }}} static int count_lines */
567 /* send the response back to the user.
568 * returns 0 on success, -1 on error
569 * write buffer is always zeroed after this call */
570 static int send_response (listen_socket_t *sock, response_code rc,
571 char *fmt, ...) /* {{{ */
572 {
573 va_list argp;
574 char buffer[CMD_MAX];
575 int lines;
576 ssize_t wrote;
577 int rclen, len;
579 if (JOURNAL_REPLAY(sock)) return rc;
581 if (sock->batch_start)
582 {
583 if (rc == RESP_OK)
584 return rc; /* no response on success during BATCH */
585 lines = sock->batch_cmd;
586 }
587 else if (rc == RESP_OK)
588 lines = count_lines(sock->wbuf);
589 else
590 lines = -1;
592 rclen = sprintf(buffer, "%d ", lines);
593 va_start(argp, fmt);
594 #ifdef HAVE_VSNPRINTF
595 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
596 #else
597 len = vsprintf(buffer+rclen, fmt, argp);
598 #endif
599 va_end(argp);
600 if (len < 0)
601 return -1;
603 len += rclen;
605 /* append the result to the wbuf, don't write to the user */
606 if (sock->batch_start)
607 return add_to_wbuf(sock, buffer, len);
609 /* first write must be complete */
610 if (len != write(sock->fd, buffer, len))
611 {
612 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
613 return -1;
614 }
616 if (sock->wbuf != NULL && rc == RESP_OK)
617 {
618 wrote = 0;
619 while (wrote < sock->wbuf_len)
620 {
621 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
622 if (wb <= 0)
623 {
624 RRDD_LOG(LOG_INFO, "send_response: could not write results");
625 return -1;
626 }
627 wrote += wb;
628 }
629 }
631 free(sock->wbuf); sock->wbuf = NULL;
632 sock->wbuf_len = 0;
634 return 0;
635 } /* }}} */
637 static void wipe_ci_values(cache_item_t *ci, time_t when)
638 {
639 ci->values = NULL;
640 ci->values_num = 0;
642 ci->last_flush_time = when;
643 if (config_write_jitter > 0)
644 ci->last_flush_time += (rrd_random() % config_write_jitter);
645 }
647 /* remove_from_queue
648 * remove a "cache_item_t" item from the queue.
649 * must hold 'cache_lock' when calling this
650 */
651 static void remove_from_queue(cache_item_t *ci) /* {{{ */
652 {
653 if (ci == NULL) return;
654 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
656 if (ci->prev == NULL)
657 cache_queue_head = ci->next; /* reset head */
658 else
659 ci->prev->next = ci->next;
661 if (ci->next == NULL)
662 cache_queue_tail = ci->prev; /* reset the tail */
663 else
664 ci->next->prev = ci->prev;
666 ci->next = ci->prev = NULL;
667 ci->flags &= ~CI_FLAGS_IN_QUEUE;
669 pthread_mutex_lock (&stats_lock);
670 assert (stats_queue_length > 0);
671 stats_queue_length--;
672 pthread_mutex_unlock (&stats_lock);
674 } /* }}} static void remove_from_queue */
676 /* free the resources associated with the cache_item_t
677 * must hold cache_lock when calling this function
678 */
679 static void *free_cache_item(cache_item_t *ci) /* {{{ */
680 {
681 if (ci == NULL) return NULL;
683 remove_from_queue(ci);
685 for (size_t i=0; i < ci->values_num; i++)
686 free(ci->values[i]);
688 free (ci->values);
689 free (ci->file);
691 /* in case anyone is waiting */
692 pthread_cond_broadcast(&ci->flushed);
693 pthread_cond_destroy(&ci->flushed);
695 free (ci);
697 return NULL;
698 } /* }}} static void *free_cache_item */
700 /*
701 * enqueue_cache_item:
702 * `cache_lock' must be acquired before calling this function!
703 */
704 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
705 queue_side_t side)
706 {
707 if (ci == NULL)
708 return (-1);
710 if (ci->values_num == 0)
711 return (0);
713 if (side == HEAD)
714 {
715 if (cache_queue_head == ci)
716 return 0;
718 /* remove if further down in queue */
719 remove_from_queue(ci);
721 ci->prev = NULL;
722 ci->next = cache_queue_head;
723 if (ci->next != NULL)
724 ci->next->prev = ci;
725 cache_queue_head = ci;
727 if (cache_queue_tail == NULL)
728 cache_queue_tail = cache_queue_head;
729 }
730 else /* (side == TAIL) */
731 {
732 /* We don't move values back in the list.. */
733 if (ci->flags & CI_FLAGS_IN_QUEUE)
734 return (0);
736 assert (ci->next == NULL);
737 assert (ci->prev == NULL);
739 ci->prev = cache_queue_tail;
741 if (cache_queue_tail == NULL)
742 cache_queue_head = ci;
743 else
744 cache_queue_tail->next = ci;
746 cache_queue_tail = ci;
747 }
749 ci->flags |= CI_FLAGS_IN_QUEUE;
751 pthread_cond_signal(&queue_cond);
752 pthread_mutex_lock (&stats_lock);
753 stats_queue_length++;
754 pthread_mutex_unlock (&stats_lock);
756 return (0);
757 } /* }}} int enqueue_cache_item */
759 /*
760 * tree_callback_flush:
761 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
762 * while this is in progress.
763 */
764 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
765 gpointer data)
766 {
767 cache_item_t *ci;
768 callback_flush_data_t *cfd;
770 ci = (cache_item_t *) value;
771 cfd = (callback_flush_data_t *) data;
773 if (ci->flags & CI_FLAGS_IN_QUEUE)
774 return FALSE;
776 if (ci->values_num > 0
777 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
778 {
779 enqueue_cache_item (ci, TAIL);
780 }
781 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
782 && (ci->values_num <= 0))
783 {
784 assert ((char *) key == ci->file);
785 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
786 {
787 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
788 return (FALSE);
789 }
790 }
792 return (FALSE);
793 } /* }}} gboolean tree_callback_flush */
795 static int flush_old_values (int max_age)
796 {
797 callback_flush_data_t cfd;
798 size_t k;
800 memset (&cfd, 0, sizeof (cfd));
801 /* Pass the current time as user data so that we don't need to call
802 * `time' for each node. */
803 cfd.now = time (NULL);
804 cfd.keys = NULL;
805 cfd.keys_num = 0;
807 if (max_age > 0)
808 cfd.abs_timeout = cfd.now - max_age;
809 else
810 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
812 /* `tree_callback_flush' will return the keys of all values that haven't
813 * been touched in the last `config_flush_interval' seconds in `cfd'.
814 * The char*'s in this array point to the same memory as ci->file, so we
815 * don't need to free them separately. */
816 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
818 for (k = 0; k < cfd.keys_num; k++)
819 {
820 /* should never fail, since we have held the cache_lock
821 * the entire time */
822 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
823 }
825 if (cfd.keys != NULL)
826 {
827 free (cfd.keys);
828 cfd.keys = NULL;
829 }
831 return (0);
832 } /* int flush_old_values */
834 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
835 {
836 struct timeval now;
837 struct timespec next_flush;
838 int status;
840 gettimeofday (&now, NULL);
841 next_flush.tv_sec = now.tv_sec + config_flush_interval;
842 next_flush.tv_nsec = 1000 * now.tv_usec;
844 pthread_mutex_lock(&cache_lock);
846 while (state == RUNNING)
847 {
848 gettimeofday (&now, NULL);
849 if ((now.tv_sec > next_flush.tv_sec)
850 || ((now.tv_sec == next_flush.tv_sec)
851 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
852 {
853 RRDD_LOG(LOG_DEBUG, "flushing old values");
855 /* Determine the time of the next cache flush. */
856 next_flush.tv_sec = now.tv_sec + config_flush_interval;
858 /* Flush all values that haven't been written in the last
859 * `config_write_interval' seconds. */
860 flush_old_values (config_write_interval);
862 /* unlock the cache while we rotate so we don't block incoming
863 * updates if the fsync() blocks on disk I/O */
864 pthread_mutex_unlock(&cache_lock);
865 journal_rotate();
866 pthread_mutex_lock(&cache_lock);
867 }
869 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
870 if (status != 0 && status != ETIMEDOUT)
871 {
872 RRDD_LOG (LOG_ERR, "flush_thread_main: "
873 "pthread_cond_timedwait returned %i.", status);
874 }
875 }
877 if (config_flush_at_shutdown)
878 flush_old_values (-1); /* flush everything */
880 state = SHUTDOWN;
882 pthread_mutex_unlock(&cache_lock);
884 return NULL;
885 } /* void *flush_thread_main */
887 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
888 {
889 pthread_mutex_lock (&cache_lock);
891 while (state != SHUTDOWN
892 || (cache_queue_head != NULL && config_flush_at_shutdown))
893 {
894 cache_item_t *ci;
895 char *file;
896 char **values;
897 size_t values_num;
898 int status;
900 /* Now, check if there's something to store away. If not, wait until
901 * something comes in. */
902 if (cache_queue_head == NULL)
903 {
904 status = pthread_cond_wait (&queue_cond, &cache_lock);
905 if ((status != 0) && (status != ETIMEDOUT))
906 {
907 RRDD_LOG (LOG_ERR, "queue_thread_main: "
908 "pthread_cond_wait returned %i.", status);
909 }
910 }
912 /* Check if a value has arrived. This may be NULL if we timed out or there
913 * was an interrupt such as a signal. */
914 if (cache_queue_head == NULL)
915 continue;
917 ci = cache_queue_head;
919 /* copy the relevant parts */
920 file = strdup (ci->file);
921 if (file == NULL)
922 {
923 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
924 continue;
925 }
927 assert(ci->values != NULL);
928 assert(ci->values_num > 0);
930 values = ci->values;
931 values_num = ci->values_num;
933 wipe_ci_values(ci, time(NULL));
934 remove_from_queue(ci);
936 pthread_mutex_unlock (&cache_lock);
938 rrd_clear_error ();
939 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
940 if (status != 0)
941 {
942 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
943 "rrd_update_r (%s) failed with status %i. (%s)",
944 file, status, rrd_get_error());
945 }
947 journal_write("wrote", file);
949 /* Search again in the tree. It's possible someone issued a "FORGET"
950 * while we were writing the update values. */
951 pthread_mutex_lock(&cache_lock);
952 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
953 if (ci)
954 pthread_cond_broadcast(&ci->flushed);
955 pthread_mutex_unlock(&cache_lock);
957 if (status == 0)
958 {
959 pthread_mutex_lock (&stats_lock);
960 stats_updates_written++;
961 stats_data_sets_written += values_num;
962 pthread_mutex_unlock (&stats_lock);
963 }
965 rrd_free_ptrs((void ***) &values, &values_num);
966 free(file);
968 pthread_mutex_lock (&cache_lock);
969 }
970 pthread_mutex_unlock (&cache_lock);
972 return (NULL);
973 } /* }}} void *queue_thread_main */
975 static int buffer_get_field (char **buffer_ret, /* {{{ */
976 size_t *buffer_size_ret, char **field_ret)
977 {
978 char *buffer;
979 size_t buffer_pos;
980 size_t buffer_size;
981 char *field;
982 size_t field_size;
983 int status;
985 buffer = *buffer_ret;
986 buffer_pos = 0;
987 buffer_size = *buffer_size_ret;
988 field = *buffer_ret;
989 field_size = 0;
991 if (buffer_size <= 0)
992 return (-1);
994 /* This is ensured by `handle_request'. */
995 assert (buffer[buffer_size - 1] == '\0');
997 status = -1;
998 while (buffer_pos < buffer_size)
999 {
1000 /* Check for end-of-field or end-of-buffer */
1001 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1002 {
1003 field[field_size] = 0;
1004 field_size++;
1005 buffer_pos++;
1006 status = 0;
1007 break;
1008 }
1009 /* Handle escaped characters. */
1010 else if (buffer[buffer_pos] == '\\')
1011 {
1012 if (buffer_pos >= (buffer_size - 1))
1013 break;
1014 buffer_pos++;
1015 field[field_size] = buffer[buffer_pos];
1016 field_size++;
1017 buffer_pos++;
1018 }
1019 /* Normal operation */
1020 else
1021 {
1022 field[field_size] = buffer[buffer_pos];
1023 field_size++;
1024 buffer_pos++;
1025 }
1026 } /* while (buffer_pos < buffer_size) */
1028 if (status != 0)
1029 return (status);
1031 *buffer_ret = buffer + buffer_pos;
1032 *buffer_size_ret = buffer_size - buffer_pos;
1033 *field_ret = field;
1035 return (0);
1036 } /* }}} int buffer_get_field */
1038 /* if we're restricting writes to the base directory,
1039 * check whether the file falls within the dir
1040 * returns 1 if OK, otherwise 0
1041 */
1042 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1043 {
1044 assert(file != NULL);
1046 if (!config_write_base_only
1047 || JOURNAL_REPLAY(sock)
1048 || config_base_dir == NULL)
1049 return 1;
1051 if (strstr(file, "../") != NULL) goto err;
1053 /* relative paths without "../" are ok */
1054 if (*file != '/') return 1;
1056 /* file must be of the format base + "/" + <1+ char filename> */
1057 if (strlen(file) < _config_base_dir_len + 2) goto err;
1058 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1059 if (*(file + _config_base_dir_len) != '/') goto err;
1061 return 1;
1063 err:
1064 if (sock != NULL && sock->fd >= 0)
1065 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1067 return 0;
1068 } /* }}} static int check_file_access */
1070 /* when using a base dir, convert relative paths to absolute paths.
1071 * if necessary, modifies the "filename" pointer to point
1072 * to the new path created in "tmp". "tmp" is provided
1073 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1074 *
1075 * this allows us to optimize for the expected case (absolute path)
1076 * with a no-op.
1077 */
1078 static void get_abs_path(char **filename, char *tmp)
1079 {
1080 assert(tmp != NULL);
1081 assert(filename != NULL && *filename != NULL);
1083 if (config_base_dir == NULL || **filename == '/')
1084 return;
1086 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1087 *filename = tmp;
1088 } /* }}} static int get_abs_path */
1090 static int flush_file (const char *filename) /* {{{ */
1091 {
1092 cache_item_t *ci;
1094 pthread_mutex_lock (&cache_lock);
1096 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1097 if (ci == NULL)
1098 {
1099 pthread_mutex_unlock (&cache_lock);
1100 return (ENOENT);
1101 }
1103 if (ci->values_num > 0)
1104 {
1105 /* Enqueue at head */
1106 enqueue_cache_item (ci, HEAD);
1107 pthread_cond_wait(&ci->flushed, &cache_lock);
1108 }
1110 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1111 * may have been purged during our cond_wait() */
1113 pthread_mutex_unlock(&cache_lock);
1115 return (0);
1116 } /* }}} int flush_file */
1118 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1119 {
1120 char *err = "Syntax error.\n";
1122 if (cmd && cmd->syntax)
1123 err = cmd->syntax;
1125 return send_response(sock, RESP_ERR, "Usage: %s", err);
1126 } /* }}} static int syntax_error() */
1128 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1129 {
1130 uint64_t copy_queue_length;
1131 uint64_t copy_updates_received;
1132 uint64_t copy_flush_received;
1133 uint64_t copy_updates_written;
1134 uint64_t copy_data_sets_written;
1135 uint64_t copy_journal_bytes;
1136 uint64_t copy_journal_rotate;
1138 uint64_t tree_nodes_number;
1139 uint64_t tree_depth;
1141 pthread_mutex_lock (&stats_lock);
1142 copy_queue_length = stats_queue_length;
1143 copy_updates_received = stats_updates_received;
1144 copy_flush_received = stats_flush_received;
1145 copy_updates_written = stats_updates_written;
1146 copy_data_sets_written = stats_data_sets_written;
1147 copy_journal_bytes = stats_journal_bytes;
1148 copy_journal_rotate = stats_journal_rotate;
1149 pthread_mutex_unlock (&stats_lock);
1151 pthread_mutex_lock (&cache_lock);
1152 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1153 tree_depth = (uint64_t) g_tree_height (cache_tree);
1154 pthread_mutex_unlock (&cache_lock);
1156 add_response_info(sock,
1157 "QueueLength: %"PRIu64"\n", copy_queue_length);
1158 add_response_info(sock,
1159 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1160 add_response_info(sock,
1161 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1162 add_response_info(sock,
1163 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1164 add_response_info(sock,
1165 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1166 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1167 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1168 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1169 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1171 send_response(sock, RESP_OK, "Statistics follow\n");
1173 return (0);
1174 } /* }}} int handle_request_stats */
1176 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1177 {
1178 char *file, file_tmp[PATH_MAX];
1179 int status;
1181 status = buffer_get_field (&buffer, &buffer_size, &file);
1182 if (status != 0)
1183 {
1184 return syntax_error(sock,cmd);
1185 }
1186 else
1187 {
1188 pthread_mutex_lock(&stats_lock);
1189 stats_flush_received++;
1190 pthread_mutex_unlock(&stats_lock);
1192 get_abs_path(&file, file_tmp);
1193 if (!check_file_access(file, sock)) return 0;
1195 status = flush_file (file);
1196 if (status == 0)
1197 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1198 else if (status == ENOENT)
1199 {
1200 /* no file in our tree; see whether it exists at all */
1201 struct stat statbuf;
1203 memset(&statbuf, 0, sizeof(statbuf));
1204 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1205 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1206 else
1207 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1208 }
1209 else if (status < 0)
1210 return send_response(sock, RESP_ERR, "Internal error.\n");
1211 else
1212 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1213 }
1215 /* NOTREACHED */
1216 assert(1==0);
1217 } /* }}} int handle_request_flush */
1219 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1220 {
1221 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1223 pthread_mutex_lock(&cache_lock);
1224 flush_old_values(-1);
1225 pthread_mutex_unlock(&cache_lock);
1227 return send_response(sock, RESP_OK, "Started flush.\n");
1228 } /* }}} static int handle_request_flushall */
1230 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1231 {
1232 int status;
1233 char *file, file_tmp[PATH_MAX];
1234 cache_item_t *ci;
1236 status = buffer_get_field(&buffer, &buffer_size, &file);
1237 if (status != 0)
1238 return syntax_error(sock,cmd);
1240 get_abs_path(&file, file_tmp);
1242 pthread_mutex_lock(&cache_lock);
1243 ci = g_tree_lookup(cache_tree, file);
1244 if (ci == NULL)
1245 {
1246 pthread_mutex_unlock(&cache_lock);
1247 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1248 }
1250 for (size_t i=0; i < ci->values_num; i++)
1251 add_response_info(sock, "%s\n", ci->values[i]);
1253 pthread_mutex_unlock(&cache_lock);
1254 return send_response(sock, RESP_OK, "updates pending\n");
1255 } /* }}} static int handle_request_pending */
1257 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1258 {
1259 int status;
1260 gboolean found;
1261 char *file, file_tmp[PATH_MAX];
1263 status = buffer_get_field(&buffer, &buffer_size, &file);
1264 if (status != 0)
1265 return syntax_error(sock,cmd);
1267 get_abs_path(&file, file_tmp);
1268 if (!check_file_access(file, sock)) return 0;
1270 pthread_mutex_lock(&cache_lock);
1271 found = g_tree_remove(cache_tree, file);
1272 pthread_mutex_unlock(&cache_lock);
1274 if (found == TRUE)
1275 {
1276 if (!JOURNAL_REPLAY(sock))
1277 journal_write("forget", file);
1279 return send_response(sock, RESP_OK, "Gone!\n");
1280 }
1281 else
1282 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1284 /* NOTREACHED */
1285 assert(1==0);
1286 } /* }}} static int handle_request_forget */
1288 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1289 {
1290 cache_item_t *ci;
1292 pthread_mutex_lock(&cache_lock);
1294 ci = cache_queue_head;
1295 while (ci != NULL)
1296 {
1297 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1298 ci = ci->next;
1299 }
1301 pthread_mutex_unlock(&cache_lock);
1303 return send_response(sock, RESP_OK, "in queue.\n");
1304 } /* }}} int handle_request_queue */
1306 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1307 {
1308 char *file, file_tmp[PATH_MAX];
1309 int values_num = 0;
1310 int status;
1311 char orig_buf[CMD_MAX];
1313 cache_item_t *ci;
1315 /* save it for the journal later */
1316 if (!JOURNAL_REPLAY(sock))
1317 strncpy(orig_buf, buffer, buffer_size);
1319 status = buffer_get_field (&buffer, &buffer_size, &file);
1320 if (status != 0)
1321 return syntax_error(sock,cmd);
1323 pthread_mutex_lock(&stats_lock);
1324 stats_updates_received++;
1325 pthread_mutex_unlock(&stats_lock);
1327 get_abs_path(&file, file_tmp);
1328 if (!check_file_access(file, sock)) return 0;
1330 pthread_mutex_lock (&cache_lock);
1331 ci = g_tree_lookup (cache_tree, file);
1333 if (ci == NULL) /* {{{ */
1334 {
1335 struct stat statbuf;
1336 cache_item_t *tmp;
1338 /* don't hold the lock while we setup; stat(2) might block */
1339 pthread_mutex_unlock(&cache_lock);
1341 memset (&statbuf, 0, sizeof (statbuf));
1342 status = stat (file, &statbuf);
1343 if (status != 0)
1344 {
1345 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1347 status = errno;
1348 if (status == ENOENT)
1349 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1350 else
1351 return send_response(sock, RESP_ERR,
1352 "stat failed with error %i.\n", status);
1353 }
1354 if (!S_ISREG (statbuf.st_mode))
1355 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1357 if (access(file, R_OK|W_OK) != 0)
1358 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1359 file, rrd_strerror(errno));
1361 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1362 if (ci == NULL)
1363 {
1364 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1366 return send_response(sock, RESP_ERR, "malloc failed.\n");
1367 }
1368 memset (ci, 0, sizeof (cache_item_t));
1370 ci->file = strdup (file);
1371 if (ci->file == NULL)
1372 {
1373 free (ci);
1374 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1376 return send_response(sock, RESP_ERR, "strdup failed.\n");
1377 }
1379 wipe_ci_values(ci, now);
1380 ci->flags = CI_FLAGS_IN_TREE;
1381 pthread_cond_init(&ci->flushed, NULL);
1383 pthread_mutex_lock(&cache_lock);
1385 /* another UPDATE might have added this entry in the meantime */
1386 tmp = g_tree_lookup (cache_tree, file);
1387 if (tmp == NULL)
1388 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1389 else
1390 {
1391 free_cache_item (ci);
1392 ci = tmp;
1393 }
1395 /* state may have changed while we were unlocked */
1396 if (state == SHUTDOWN)
1397 return -1;
1398 } /* }}} */
1399 assert (ci != NULL);
1401 /* don't re-write updates in replay mode */
1402 if (!JOURNAL_REPLAY(sock))
1403 journal_write("update", orig_buf);
1405 while (buffer_size > 0)
1406 {
1407 char *value;
1408 time_t stamp;
1409 char *eostamp;
1411 status = buffer_get_field (&buffer, &buffer_size, &value);
1412 if (status != 0)
1413 {
1414 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1415 break;
1416 }
1418 /* make sure update time is always moving forward */
1419 stamp = strtol(value, &eostamp, 10);
1420 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1421 {
1422 pthread_mutex_unlock(&cache_lock);
1423 return send_response(sock, RESP_ERR,
1424 "Cannot find timestamp in '%s'!\n", value);
1425 }
1426 else if (stamp <= ci->last_update_stamp)
1427 {
1428 pthread_mutex_unlock(&cache_lock);
1429 return send_response(sock, RESP_ERR,
1430 "illegal attempt to update using time %ld when last"
1431 " update time is %ld (minimum one second step)\n",
1432 stamp, ci->last_update_stamp);
1433 }
1434 else
1435 ci->last_update_stamp = stamp;
1437 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1438 {
1439 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1440 continue;
1441 }
1443 values_num++;
1444 }
1446 if (((now - ci->last_flush_time) >= config_write_interval)
1447 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1448 && (ci->values_num > 0))
1449 {
1450 enqueue_cache_item (ci, TAIL);
1451 }
1453 pthread_mutex_unlock (&cache_lock);
1455 if (values_num < 1)
1456 return send_response(sock, RESP_ERR, "No values updated.\n");
1457 else
1458 return send_response(sock, RESP_OK,
1459 "errors, enqueued %i value(s).\n", values_num);
1461 /* NOTREACHED */
1462 assert(1==0);
1464 } /* }}} int handle_request_update */
1466 /* we came across a "WROTE" entry during journal replay.
1467 * throw away any values that we have accumulated for this file
1468 */
1469 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1470 {
1471 cache_item_t *ci;
1472 const char *file = buffer;
1474 pthread_mutex_lock(&cache_lock);
1476 ci = g_tree_lookup(cache_tree, file);
1477 if (ci == NULL)
1478 {
1479 pthread_mutex_unlock(&cache_lock);
1480 return (0);
1481 }
1483 if (ci->values)
1484 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1486 wipe_ci_values(ci, now);
1487 remove_from_queue(ci);
1489 pthread_mutex_unlock(&cache_lock);
1490 return (0);
1491 } /* }}} int handle_request_wrote */
1493 /* start "BATCH" processing */
1494 static int batch_start (HANDLER_PROTO) /* {{{ */
1495 {
1496 int status;
1497 if (sock->batch_start)
1498 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1500 status = send_response(sock, RESP_OK,
1501 "Go ahead. End with dot '.' on its own line.\n");
1502 sock->batch_start = time(NULL);
1503 sock->batch_cmd = 0;
1505 return status;
1506 } /* }}} static int batch_start */
1508 /* finish "BATCH" processing and return results to the client */
1509 static int batch_done (HANDLER_PROTO) /* {{{ */
1510 {
1511 assert(sock->batch_start);
1512 sock->batch_start = 0;
1513 sock->batch_cmd = 0;
1514 return send_response(sock, RESP_OK, "errors\n");
1515 } /* }}} static int batch_done */
1517 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1518 {
1519 return -1;
1520 } /* }}} static int handle_request_quit */
1522 static command_t list_of_commands[] = { /* {{{ */
1523 {
1524 "UPDATE",
1525 handle_request_update,
1526 CMD_CONTEXT_ANY,
1527 "UPDATE <filename> <values> [<values> ...]\n"
1528 ,
1529 "Adds the given file to the internal cache if it is not yet known and\n"
1530 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1531 "for details.\n"
1532 "\n"
1533 "Each <values> has the following form:\n"
1534 " <values> = <time>:<value>[:<value>[...]]\n"
1535 "See the rrdupdate(1) manpage for details.\n"
1536 },
1537 {
1538 "WROTE",
1539 handle_request_wrote,
1540 CMD_CONTEXT_JOURNAL,
1541 NULL,
1542 NULL
1543 },
1544 {
1545 "FLUSH",
1546 handle_request_flush,
1547 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1548 "FLUSH <filename>\n"
1549 ,
1550 "Adds the given filename to the head of the update queue and returns\n"
1551 "after it has been dequeued.\n"
1552 },
1553 {
1554 "FLUSHALL",
1555 handle_request_flushall,
1556 CMD_CONTEXT_CLIENT,
1557 "FLUSHALL\n"
1558 ,
1559 "Triggers writing of all pending updates. Returns immediately.\n"
1560 },
1561 {
1562 "PENDING",
1563 handle_request_pending,
1564 CMD_CONTEXT_CLIENT,
1565 "PENDING <filename>\n"
1566 ,
1567 "Shows any 'pending' updates for a file, in order.\n"
1568 "The updates shown have not yet been written to the underlying RRD file.\n"
1569 },
1570 {
1571 "FORGET",
1572 handle_request_forget,
1573 CMD_CONTEXT_ANY,
1574 "FORGET <filename>\n"
1575 ,
1576 "Removes the file completely from the cache.\n"
1577 "Any pending updates for the file will be lost.\n"
1578 },
1579 {
1580 "QUEUE",
1581 handle_request_queue,
1582 CMD_CONTEXT_CLIENT,
1583 "QUEUE\n"
1584 ,
1585 "Shows all files in the output queue.\n"
1586 "The output is zero or more lines in the following format:\n"
1587 "(where <num_vals> is the number of values to be written)\n"
1588 "\n"
1589 "<num_vals> <filename>\n"
1590 },
1591 {
1592 "STATS",
1593 handle_request_stats,
1594 CMD_CONTEXT_CLIENT,
1595 "STATS\n"
1596 ,
1597 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1598 "a description of the values.\n"
1599 },
1600 {
1601 "HELP",
1602 handle_request_help,
1603 CMD_CONTEXT_CLIENT,
1604 "HELP [<command>]\n",
1605 NULL, /* special! */
1606 },
1607 {
1608 "BATCH",
1609 batch_start,
1610 CMD_CONTEXT_CLIENT,
1611 "BATCH\n"
1612 ,
1613 "The 'BATCH' command permits the client to initiate a bulk load\n"
1614 " of commands to rrdcached.\n"
1615 "\n"
1616 "Usage:\n"
1617 "\n"
1618 " client: BATCH\n"
1619 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1620 " client: command #1\n"
1621 " client: command #2\n"
1622 " client: ... and so on\n"
1623 " client: .\n"
1624 " server: 2 errors\n"
1625 " server: 7 message for command #7\n"
1626 " server: 9 message for command #9\n"
1627 "\n"
1628 "For more information, consult the rrdcached(1) documentation.\n"
1629 },
1630 {
1631 ".", /* BATCH terminator */
1632 batch_done,
1633 CMD_CONTEXT_BATCH,
1634 NULL,
1635 NULL
1636 },
1637 {
1638 "QUIT",
1639 handle_request_quit,
1640 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1641 "QUIT\n"
1642 ,
1643 "Disconnect from rrdcached.\n"
1644 }
1645 }; /* }}} command_t list_of_commands[] */
1646 static size_t list_of_commands_len = sizeof (list_of_commands)
1647 / sizeof (list_of_commands[0]);
1649 static command_t *find_command(char *cmd)
1650 {
1651 size_t i;
1653 for (i = 0; i < list_of_commands_len; i++)
1654 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1655 return (&list_of_commands[i]);
1656 return NULL;
1657 }
1659 /* We currently use the index in the `list_of_commands' array as a bit position
1660 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1661 * outside these functions so that switching to a more elegant storage method
1662 * is easily possible. */
1663 static ssize_t find_command_index (const char *cmd) /* {{{ */
1664 {
1665 size_t i;
1667 for (i = 0; i < list_of_commands_len; i++)
1668 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1669 return ((ssize_t) i);
1670 return (-1);
1671 } /* }}} ssize_t find_command_index */
1673 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1674 const char *cmd)
1675 {
1676 ssize_t i;
1678 if (JOURNAL_REPLAY(sock))
1679 return (1);
1681 if (cmd == NULL)
1682 return (-1);
1684 if ((strcasecmp ("QUIT", cmd) == 0)
1685 || (strcasecmp ("HELP", cmd) == 0))
1686 return (1);
1687 else if (strcmp (".", cmd) == 0)
1688 cmd = "BATCH";
1690 i = find_command_index (cmd);
1691 if (i < 0)
1692 return (-1);
1693 assert (i < 32);
1695 if ((sock->permissions & (1 << i)) != 0)
1696 return (1);
1697 return (0);
1698 } /* }}} int socket_permission_check */
1700 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1701 const char *cmd)
1702 {
1703 ssize_t i;
1705 i = find_command_index (cmd);
1706 if (i < 0)
1707 return (-1);
1708 assert (i < 32);
1710 sock->permissions |= (1 << i);
1711 return (0);
1712 } /* }}} int socket_permission_add */
1714 /* check whether commands are received in the expected context */
1715 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1716 {
1717 if (JOURNAL_REPLAY(sock))
1718 return (cmd->context & CMD_CONTEXT_JOURNAL);
1719 else if (sock->batch_start)
1720 return (cmd->context & CMD_CONTEXT_BATCH);
1721 else
1722 return (cmd->context & CMD_CONTEXT_CLIENT);
1724 /* NOTREACHED */
1725 assert(1==0);
1726 }
1728 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1729 {
1730 int status;
1731 char *cmd_str;
1732 char *resp_txt;
1733 command_t *help = NULL;
1735 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1736 if (status == 0)
1737 help = find_command(cmd_str);
1739 if (help && (help->syntax || help->help))
1740 {
1741 char tmp[CMD_MAX];
1743 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1744 resp_txt = tmp;
1746 if (help->syntax)
1747 add_response_info(sock, "Usage: %s\n", help->syntax);
1749 if (help->help)
1750 add_response_info(sock, "%s\n", help->help);
1751 }
1752 else
1753 {
1754 size_t i;
1756 resp_txt = "Command overview\n";
1758 for (i = 0; i < list_of_commands_len; i++)
1759 {
1760 if (list_of_commands[i].syntax == NULL)
1761 continue;
1762 add_response_info (sock, "%s", list_of_commands[i].syntax);
1763 }
1764 }
1766 return send_response(sock, RESP_OK, resp_txt);
1767 } /* }}} int handle_request_help */
1769 static int handle_request (DISPATCH_PROTO) /* {{{ */
1770 {
1771 char *buffer_ptr = buffer;
1772 char *cmd_str = NULL;
1773 command_t *cmd = NULL;
1774 int status;
1776 assert (buffer[buffer_size - 1] == '\0');
1778 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1779 if (status != 0)
1780 {
1781 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1782 return (-1);
1783 }
1785 if (sock != NULL && sock->batch_start)
1786 sock->batch_cmd++;
1788 cmd = find_command(cmd_str);
1789 if (!cmd)
1790 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1792 if (!socket_permission_check (sock, cmd->cmd))
1793 return send_response(sock, RESP_ERR, "Permission denied.\n");
1795 if (!command_check_context(sock, cmd))
1796 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1798 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1799 } /* }}} int handle_request */
1801 static void journal_set_free (journal_set *js) /* {{{ */
1802 {
1803 if (js == NULL)
1804 return;
1806 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1808 free(js);
1809 } /* }}} journal_set_free */
1811 static void journal_set_remove (journal_set *js) /* {{{ */
1812 {
1813 if (js == NULL)
1814 return;
1816 for (uint i=0; i < js->files_num; i++)
1817 {
1818 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1819 unlink(js->files[i]);
1820 }
1821 } /* }}} journal_set_remove */
1823 /* close current journal file handle.
1824 * MUST hold journal_lock before calling */
1825 static void journal_close(void) /* {{{ */
1826 {
1827 if (journal_fh != NULL)
1828 {
1829 if (fclose(journal_fh) != 0)
1830 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1831 }
1833 journal_fh = NULL;
1834 journal_size = 0;
1835 } /* }}} journal_close */
1837 /* MUST hold journal_lock before calling */
1838 static void journal_new_file(void) /* {{{ */
1839 {
1840 struct timeval now;
1841 int new_fd;
1842 char new_file[PATH_MAX + 1];
1844 assert(journal_dir != NULL);
1845 assert(journal_cur != NULL);
1847 journal_close();
1849 gettimeofday(&now, NULL);
1850 /* this format assures that the files sort in strcmp() order */
1851 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1852 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1854 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1855 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1856 if (new_fd < 0)
1857 goto error;
1859 journal_fh = fdopen(new_fd, "a");
1860 if (journal_fh == NULL)
1861 goto error;
1863 journal_size = ftell(journal_fh);
1864 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1866 /* record the file in the journal set */
1867 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1869 return;
1871 error:
1872 RRDD_LOG(LOG_CRIT,
1873 "JOURNALING DISABLED: Error while trying to create %s : %s",
1874 new_file, rrd_strerror(errno));
1875 RRDD_LOG(LOG_CRIT,
1876 "JOURNALING DISABLED: All values will be flushed at shutdown");
1878 close(new_fd);
1879 config_flush_at_shutdown = 1;
1881 } /* }}} journal_new_file */
1883 /* MUST NOT hold journal_lock before calling this */
1884 static void journal_rotate(void) /* {{{ */
1885 {
1886 journal_set *old_js = NULL;
1888 if (journal_dir == NULL)
1889 return;
1891 RRDD_LOG(LOG_DEBUG, "rotating journals");
1893 pthread_mutex_lock(&stats_lock);
1894 ++stats_journal_rotate;
1895 pthread_mutex_unlock(&stats_lock);
1897 pthread_mutex_lock(&journal_lock);
1899 journal_close();
1901 /* rotate the journal sets */
1902 old_js = journal_old;
1903 journal_old = journal_cur;
1904 journal_cur = calloc(1, sizeof(journal_set));
1906 if (journal_cur != NULL)
1907 journal_new_file();
1908 else
1909 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1911 pthread_mutex_unlock(&journal_lock);
1913 journal_set_remove(old_js);
1914 journal_set_free (old_js);
1916 } /* }}} static void journal_rotate */
1918 /* MUST hold journal_lock when calling */
1919 static void journal_done(void) /* {{{ */
1920 {
1921 if (journal_cur == NULL)
1922 return;
1924 journal_close();
1926 if (config_flush_at_shutdown)
1927 {
1928 RRDD_LOG(LOG_INFO, "removing journals");
1929 journal_set_remove(journal_old);
1930 journal_set_remove(journal_cur);
1931 }
1932 else
1933 {
1934 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1935 "journals will be used at next startup");
1936 }
1938 journal_set_free(journal_cur);
1939 journal_set_free(journal_old);
1940 free(journal_dir);
1942 } /* }}} static void journal_done */
1944 static int journal_write(char *cmd, char *args) /* {{{ */
1945 {
1946 int chars;
1948 if (journal_fh == NULL)
1949 return 0;
1951 pthread_mutex_lock(&journal_lock);
1952 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1953 journal_size += chars;
1955 if (journal_size > JOURNAL_MAX)
1956 journal_new_file();
1958 pthread_mutex_unlock(&journal_lock);
1960 if (chars > 0)
1961 {
1962 pthread_mutex_lock(&stats_lock);
1963 stats_journal_bytes += chars;
1964 pthread_mutex_unlock(&stats_lock);
1965 }
1967 return chars;
1968 } /* }}} static int journal_write */
1970 static int journal_replay (const char *file) /* {{{ */
1971 {
1972 FILE *fh;
1973 int entry_cnt = 0;
1974 int fail_cnt = 0;
1975 uint64_t line = 0;
1976 char entry[CMD_MAX];
1977 time_t now;
1979 if (file == NULL) return 0;
1981 {
1982 char *reason = "unknown error";
1983 int status = 0;
1984 struct stat statbuf;
1986 memset(&statbuf, 0, sizeof(statbuf));
1987 if (stat(file, &statbuf) != 0)
1988 {
1989 reason = "stat error";
1990 status = errno;
1991 }
1992 else if (!S_ISREG(statbuf.st_mode))
1993 {
1994 reason = "not a regular file";
1995 status = EPERM;
1996 }
1997 if (statbuf.st_uid != daemon_uid)
1998 {
1999 reason = "not owned by daemon user";
2000 status = EACCES;
2001 }
2002 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2003 {
2004 reason = "must not be user/group writable";
2005 status = EACCES;
2006 }
2008 if (status != 0)
2009 {
2010 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2011 file, rrd_strerror(status), reason);
2012 return 0;
2013 }
2014 }
2016 fh = fopen(file, "r");
2017 if (fh == NULL)
2018 {
2019 if (errno != ENOENT)
2020 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2021 file, rrd_strerror(errno));
2022 return 0;
2023 }
2024 else
2025 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2027 now = time(NULL);
2029 while(!feof(fh))
2030 {
2031 size_t entry_len;
2033 ++line;
2034 if (fgets(entry, sizeof(entry), fh) == NULL)
2035 break;
2036 entry_len = strlen(entry);
2038 /* check \n termination in case journal writing crashed mid-line */
2039 if (entry_len == 0)
2040 continue;
2041 else if (entry[entry_len - 1] != '\n')
2042 {
2043 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2044 ++fail_cnt;
2045 continue;
2046 }
2048 entry[entry_len - 1] = '\0';
2050 if (handle_request(NULL, now, entry, entry_len) == 0)
2051 ++entry_cnt;
2052 else
2053 ++fail_cnt;
2054 }
2056 fclose(fh);
2058 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2059 entry_cnt, fail_cnt);
2061 return entry_cnt > 0 ? 1 : 0;
2062 } /* }}} static int journal_replay */
2064 static int journal_sort(const void *v1, const void *v2)
2065 {
2066 char **jn1 = (char **) v1;
2067 char **jn2 = (char **) v2;
2069 return strcmp(*jn1,*jn2);
2070 }
2072 static void journal_init(void) /* {{{ */
2073 {
2074 int had_journal = 0;
2075 DIR *dir;
2076 struct dirent *dent;
2077 char path[PATH_MAX+1];
2079 if (journal_dir == NULL) return;
2081 pthread_mutex_lock(&journal_lock);
2083 journal_cur = calloc(1, sizeof(journal_set));
2084 if (journal_cur == NULL)
2085 {
2086 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2087 return;
2088 }
2090 RRDD_LOG(LOG_INFO, "checking for journal files");
2092 /* Handle old journal files during transition. This gives them the
2093 * correct sort order. TODO: remove after first release
2094 */
2095 {
2096 char old_path[PATH_MAX+1];
2097 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2098 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2099 rename(old_path, path);
2101 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2102 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2103 rename(old_path, path);
2104 }
2106 dir = opendir(journal_dir);
2107 while ((dent = readdir(dir)) != NULL)
2108 {
2109 /* looks like a journal file? */
2110 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2111 continue;
2113 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2115 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2116 {
2117 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2118 dent->d_name);
2119 break;
2120 }
2121 }
2122 closedir(dir);
2124 qsort(journal_cur->files, journal_cur->files_num,
2125 sizeof(journal_cur->files[0]), journal_sort);
2127 for (uint i=0; i < journal_cur->files_num; i++)
2128 had_journal += journal_replay(journal_cur->files[i]);
2130 journal_new_file();
2132 /* it must have been a crash. start a flush */
2133 if (had_journal && config_flush_at_shutdown)
2134 flush_old_values(-1);
2136 pthread_mutex_unlock(&journal_lock);
2138 RRDD_LOG(LOG_INFO, "journal processing complete");
2140 } /* }}} static void journal_init */
2142 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2143 {
2144 assert(sock != NULL);
2146 free(sock->rbuf); sock->rbuf = NULL;
2147 free(sock->wbuf); sock->wbuf = NULL;
2148 free(sock);
2149 } /* }}} void free_listen_socket */
2151 static void close_connection(listen_socket_t *sock) /* {{{ */
2152 {
2153 if (sock->fd >= 0)
2154 {
2155 close(sock->fd);
2156 sock->fd = -1;
2157 }
2159 free_listen_socket(sock);
2161 } /* }}} void close_connection */
2163 static void *connection_thread_main (void *args) /* {{{ */
2164 {
2165 listen_socket_t *sock;
2166 int fd;
2168 sock = (listen_socket_t *) args;
2169 fd = sock->fd;
2171 /* init read buffers */
2172 sock->next_read = sock->next_cmd = 0;
2173 sock->rbuf = malloc(RBUF_SIZE);
2174 if (sock->rbuf == NULL)
2175 {
2176 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2177 close_connection(sock);
2178 return NULL;
2179 }
2181 pthread_mutex_lock (&connection_threads_lock);
2182 connection_threads_num++;
2183 pthread_mutex_unlock (&connection_threads_lock);
2185 while (state == RUNNING)
2186 {
2187 char *cmd;
2188 ssize_t cmd_len;
2189 ssize_t rbytes;
2190 time_t now;
2192 struct pollfd pollfd;
2193 int status;
2195 pollfd.fd = fd;
2196 pollfd.events = POLLIN | POLLPRI;
2197 pollfd.revents = 0;
2199 status = poll (&pollfd, 1, /* timeout = */ 500);
2200 if (state != RUNNING)
2201 break;
2202 else if (status == 0) /* timeout */
2203 continue;
2204 else if (status < 0) /* error */
2205 {
2206 status = errno;
2207 if (status != EINTR)
2208 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2209 continue;
2210 }
2212 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2213 break;
2214 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2215 {
2216 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2217 "poll(2) returned something unexpected: %#04hx",
2218 pollfd.revents);
2219 break;
2220 }
2222 rbytes = read(fd, sock->rbuf + sock->next_read,
2223 RBUF_SIZE - sock->next_read);
2224 if (rbytes < 0)
2225 {
2226 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2227 break;
2228 }
2229 else if (rbytes == 0)
2230 break; /* eof */
2232 sock->next_read += rbytes;
2234 if (sock->batch_start)
2235 now = sock->batch_start;
2236 else
2237 now = time(NULL);
2239 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2240 {
2241 status = handle_request (sock, now, cmd, cmd_len+1);
2242 if (status != 0)
2243 goto out_close;
2244 }
2245 }
2247 out_close:
2248 close_connection(sock);
2250 /* Remove this thread from the connection threads list */
2251 pthread_mutex_lock (&connection_threads_lock);
2252 connection_threads_num--;
2253 if (connection_threads_num <= 0)
2254 pthread_cond_broadcast(&connection_threads_done);
2255 pthread_mutex_unlock (&connection_threads_lock);
2257 return (NULL);
2258 } /* }}} void *connection_thread_main */
2260 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2261 {
2262 int fd;
2263 struct sockaddr_un sa;
2264 listen_socket_t *temp;
2265 int status;
2266 const char *path;
2267 char *path_copy, *dir;
2269 path = sock->addr;
2270 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2271 path += strlen("unix:");
2273 /* dirname may modify its argument */
2274 path_copy = strdup(path);
2275 if (path_copy == NULL)
2276 {
2277 fprintf(stderr, "rrdcached: strdup(): %s\n",
2278 rrd_strerror(errno));
2279 return (-1);
2280 }
2282 dir = dirname(path_copy);
2283 if (rrd_mkdir_p(dir, 0777) != 0)
2284 {
2285 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2286 dir, rrd_strerror(errno));
2287 return (-1);
2288 }
2290 free(path_copy);
2292 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2293 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2294 if (temp == NULL)
2295 {
2296 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2297 return (-1);
2298 }
2299 listen_fds = temp;
2300 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2302 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2303 if (fd < 0)
2304 {
2305 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2306 rrd_strerror(errno));
2307 return (-1);
2308 }
2310 memset (&sa, 0, sizeof (sa));
2311 sa.sun_family = AF_UNIX;
2312 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2314 /* if we've gotten this far, we own the pid file. any daemon started
2315 * with the same args must not be alive. therefore, ensure that we can
2316 * create the socket...
2317 */
2318 unlink(path);
2320 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2321 if (status != 0)
2322 {
2323 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2324 path, rrd_strerror(errno));
2325 close (fd);
2326 return (-1);
2327 }
2329 status = listen (fd, /* backlog = */ 10);
2330 if (status != 0)
2331 {
2332 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2333 path, rrd_strerror(errno));
2334 close (fd);
2335 unlink (path);
2336 return (-1);
2337 }
2339 listen_fds[listen_fds_num].fd = fd;
2340 listen_fds[listen_fds_num].family = PF_UNIX;
2341 strncpy(listen_fds[listen_fds_num].addr, path,
2342 sizeof (listen_fds[listen_fds_num].addr) - 1);
2343 listen_fds_num++;
2345 return (0);
2346 } /* }}} int open_listen_socket_unix */
2348 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2349 {
2350 struct addrinfo ai_hints;
2351 struct addrinfo *ai_res;
2352 struct addrinfo *ai_ptr;
2353 char addr_copy[NI_MAXHOST];
2354 char *addr;
2355 char *port;
2356 int status;
2358 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2359 addr_copy[sizeof (addr_copy) - 1] = 0;
2360 addr = addr_copy;
2362 memset (&ai_hints, 0, sizeof (ai_hints));
2363 ai_hints.ai_flags = 0;
2364 #ifdef AI_ADDRCONFIG
2365 ai_hints.ai_flags |= AI_ADDRCONFIG;
2366 #endif
2367 ai_hints.ai_family = AF_UNSPEC;
2368 ai_hints.ai_socktype = SOCK_STREAM;
2370 port = NULL;
2371 if (*addr == '[') /* IPv6+port format */
2372 {
2373 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2374 addr++;
2376 port = strchr (addr, ']');
2377 if (port == NULL)
2378 {
2379 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2380 return (-1);
2381 }
2382 *port = 0;
2383 port++;
2385 if (*port == ':')
2386 port++;
2387 else if (*port == 0)
2388 port = NULL;
2389 else
2390 {
2391 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2392 return (-1);
2393 }
2394 } /* if (*addr == '[') */
2395 else
2396 {
2397 port = rindex(addr, ':');
2398 if (port != NULL)
2399 {
2400 *port = 0;
2401 port++;
2402 }
2403 }
2404 ai_res = NULL;
2405 status = getaddrinfo (addr,
2406 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2407 &ai_hints, &ai_res);
2408 if (status != 0)
2409 {
2410 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2411 addr, gai_strerror (status));
2412 return (-1);
2413 }
2415 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2416 {
2417 int fd;
2418 listen_socket_t *temp;
2419 int one = 1;
2421 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2422 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2423 if (temp == NULL)
2424 {
2425 fprintf (stderr,
2426 "rrdcached: open_listen_socket_network: realloc failed.\n");
2427 continue;
2428 }
2429 listen_fds = temp;
2430 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2432 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2433 if (fd < 0)
2434 {
2435 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2436 rrd_strerror(errno));
2437 continue;
2438 }
2440 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2442 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2443 if (status != 0)
2444 {
2445 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2446 sock->addr, rrd_strerror(errno));
2447 close (fd);
2448 continue;
2449 }
2451 status = listen (fd, /* backlog = */ 10);
2452 if (status != 0)
2453 {
2454 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2455 sock->addr, rrd_strerror(errno));
2456 close (fd);
2457 freeaddrinfo(ai_res);
2458 return (-1);
2459 }
2461 listen_fds[listen_fds_num].fd = fd;
2462 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2463 listen_fds_num++;
2464 } /* for (ai_ptr) */
2466 freeaddrinfo(ai_res);
2467 return (0);
2468 } /* }}} static int open_listen_socket_network */
2470 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2471 {
2472 assert(sock != NULL);
2473 assert(sock->addr != NULL);
2475 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2476 || sock->addr[0] == '/')
2477 return (open_listen_socket_unix(sock));
2478 else
2479 return (open_listen_socket_network(sock));
2480 } /* }}} int open_listen_socket */
2482 static int close_listen_sockets (void) /* {{{ */
2483 {
2484 size_t i;
2486 for (i = 0; i < listen_fds_num; i++)
2487 {
2488 close (listen_fds[i].fd);
2490 if (listen_fds[i].family == PF_UNIX)
2491 unlink(listen_fds[i].addr);
2492 }
2494 free (listen_fds);
2495 listen_fds = NULL;
2496 listen_fds_num = 0;
2498 return (0);
2499 } /* }}} int close_listen_sockets */
2501 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2502 {
2503 struct pollfd *pollfds;
2504 int pollfds_num;
2505 int status;
2506 int i;
2508 if (listen_fds_num < 1)
2509 {
2510 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2511 return (NULL);
2512 }
2514 pollfds_num = listen_fds_num;
2515 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2516 if (pollfds == NULL)
2517 {
2518 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2519 return (NULL);
2520 }
2521 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2523 RRDD_LOG(LOG_INFO, "listening for connections");
2525 while (state == RUNNING)
2526 {
2527 for (i = 0; i < pollfds_num; i++)
2528 {
2529 pollfds[i].fd = listen_fds[i].fd;
2530 pollfds[i].events = POLLIN | POLLPRI;
2531 pollfds[i].revents = 0;
2532 }
2534 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2535 if (state != RUNNING)
2536 break;
2537 else if (status == 0) /* timeout */
2538 continue;
2539 else if (status < 0) /* error */
2540 {
2541 status = errno;
2542 if (status != EINTR)
2543 {
2544 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2545 }
2546 continue;
2547 }
2549 for (i = 0; i < pollfds_num; i++)
2550 {
2551 listen_socket_t *client_sock;
2552 struct sockaddr_storage client_sa;
2553 socklen_t client_sa_size;
2554 pthread_t tid;
2555 pthread_attr_t attr;
2557 if (pollfds[i].revents == 0)
2558 continue;
2560 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2561 {
2562 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2563 "poll(2) returned something unexpected for listen FD #%i.",
2564 pollfds[i].fd);
2565 continue;
2566 }
2568 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2569 if (client_sock == NULL)
2570 {
2571 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2572 continue;
2573 }
2574 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2576 client_sa_size = sizeof (client_sa);
2577 client_sock->fd = accept (pollfds[i].fd,
2578 (struct sockaddr *) &client_sa, &client_sa_size);
2579 if (client_sock->fd < 0)
2580 {
2581 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2582 free(client_sock);
2583 continue;
2584 }
2586 pthread_attr_init (&attr);
2587 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2589 status = pthread_create (&tid, &attr, connection_thread_main,
2590 client_sock);
2591 if (status != 0)
2592 {
2593 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2594 close_connection(client_sock);
2595 continue;
2596 }
2597 } /* for (pollfds_num) */
2598 } /* while (state == RUNNING) */
2600 RRDD_LOG(LOG_INFO, "starting shutdown");
2602 close_listen_sockets ();
2604 pthread_mutex_lock (&connection_threads_lock);
2605 while (connection_threads_num > 0)
2606 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2607 pthread_mutex_unlock (&connection_threads_lock);
2609 free(pollfds);
2611 return (NULL);
2612 } /* }}} void *listen_thread_main */
2614 static int daemonize (void) /* {{{ */
2615 {
2616 int pid_fd;
2617 char *base_dir;
2619 daemon_uid = geteuid();
2621 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2622 if (pid_fd < 0)
2623 pid_fd = check_pidfile();
2624 if (pid_fd < 0)
2625 return pid_fd;
2627 /* open all the listen sockets */
2628 if (config_listen_address_list_len > 0)
2629 {
2630 for (size_t i = 0; i < config_listen_address_list_len; i++)
2631 open_listen_socket (config_listen_address_list[i]);
2633 rrd_free_ptrs((void ***) &config_listen_address_list,
2634 &config_listen_address_list_len);
2635 }
2636 else
2637 {
2638 listen_socket_t sock;
2639 memset(&sock, 0, sizeof(sock));
2640 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2641 open_listen_socket (&sock);
2642 }
2644 if (listen_fds_num < 1)
2645 {
2646 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2647 goto error;
2648 }
2650 if (!stay_foreground)
2651 {
2652 pid_t child;
2654 child = fork ();
2655 if (child < 0)
2656 {
2657 fprintf (stderr, "daemonize: fork(2) failed.\n");
2658 goto error;
2659 }
2660 else if (child > 0)
2661 exit(0);
2663 /* Become session leader */
2664 setsid ();
2666 /* Open the first three file descriptors to /dev/null */
2667 close (2);
2668 close (1);
2669 close (0);
2671 open ("/dev/null", O_RDWR);
2672 if (dup(0) == -1 || dup(0) == -1){
2673 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2674 }
2675 } /* if (!stay_foreground) */
2677 /* Change into the /tmp directory. */
2678 base_dir = (config_base_dir != NULL)
2679 ? config_base_dir
2680 : "/tmp";
2682 if (chdir (base_dir) != 0)
2683 {
2684 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2685 goto error;
2686 }
2688 install_signal_handlers();
2690 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2691 RRDD_LOG(LOG_INFO, "starting up");
2693 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2694 (GDestroyNotify) free_cache_item);
2695 if (cache_tree == NULL)
2696 {
2697 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2698 goto error;
2699 }
2701 return write_pidfile (pid_fd);
2703 error:
2704 remove_pidfile();
2705 return -1;
2706 } /* }}} int daemonize */
2708 static int cleanup (void) /* {{{ */
2709 {
2710 pthread_cond_broadcast (&flush_cond);
2711 pthread_join (flush_thread, NULL);
2713 pthread_cond_broadcast (&queue_cond);
2714 for (int i = 0; i < config_queue_threads; i++)
2715 pthread_join (queue_threads[i], NULL);
2717 if (config_flush_at_shutdown)
2718 {
2719 assert(cache_queue_head == NULL);
2720 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2721 }
2723 free(queue_threads);
2724 free(config_base_dir);
2726 pthread_mutex_lock(&cache_lock);
2727 g_tree_destroy(cache_tree);
2729 pthread_mutex_lock(&journal_lock);
2730 journal_done();
2732 RRDD_LOG(LOG_INFO, "goodbye");
2733 closelog ();
2735 remove_pidfile ();
2736 free(config_pid_file);
2738 return (0);
2739 } /* }}} int cleanup */
2741 static int read_options (int argc, char **argv) /* {{{ */
2742 {
2743 int option;
2744 int status = 0;
2746 char **permissions = NULL;
2747 size_t permissions_len = 0;
2749 while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2750 {
2751 switch (option)
2752 {
2753 case 'g':
2754 stay_foreground=1;
2755 break;
2757 case 'l':
2758 {
2759 listen_socket_t *new;
2761 new = malloc(sizeof(listen_socket_t));
2762 if (new == NULL)
2763 {
2764 fprintf(stderr, "read_options: malloc failed.\n");
2765 return(2);
2766 }
2767 memset(new, 0, sizeof(listen_socket_t));
2769 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2771 /* Add permissions to the socket {{{ */
2772 if (permissions_len != 0)
2773 {
2774 size_t i;
2775 for (i = 0; i < permissions_len; i++)
2776 {
2777 status = socket_permission_add (new, permissions[i]);
2778 if (status != 0)
2779 {
2780 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2781 "socket failed. Most likely, this permission doesn't "
2782 "exist. Check your command line.\n", permissions[i]);
2783 status = 4;
2784 }
2785 }
2786 }
2787 else /* if (permissions_len == 0) */
2788 {
2789 /* Add permission for ALL commands to the socket. */
2790 size_t i;
2791 for (i = 0; i < list_of_commands_len; i++)
2792 {
2793 status = socket_permission_add (new, list_of_commands[i].cmd);
2794 if (status != 0)
2795 {
2796 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2797 "socket failed. This should never happen, ever! Sorry.\n",
2798 permissions[i]);
2799 status = 4;
2800 }
2801 }
2802 }
2803 /* }}} Done adding permissions. */
2805 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2806 &config_listen_address_list_len, new))
2807 {
2808 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2809 return (2);
2810 }
2811 }
2812 break;
2814 case 'P':
2815 {
2816 char *optcopy;
2817 char *saveptr;
2818 char *dummy;
2819 char *ptr;
2821 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2823 optcopy = strdup (optarg);
2824 dummy = optcopy;
2825 saveptr = NULL;
2826 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2827 {
2828 dummy = NULL;
2829 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2830 }
2832 free (optcopy);
2833 }
2834 break;
2836 case 'f':
2837 {
2838 int temp;
2840 temp = atoi (optarg);
2841 if (temp > 0)
2842 config_flush_interval = temp;
2843 else
2844 {
2845 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2846 status = 3;
2847 }
2848 }
2849 break;
2851 case 'w':
2852 {
2853 int temp;
2855 temp = atoi (optarg);
2856 if (temp > 0)
2857 config_write_interval = temp;
2858 else
2859 {
2860 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2861 status = 2;
2862 }
2863 }
2864 break;
2866 case 'z':
2867 {
2868 int temp;
2870 temp = atoi(optarg);
2871 if (temp > 0)
2872 config_write_jitter = temp;
2873 else
2874 {
2875 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2876 status = 2;
2877 }
2879 break;
2880 }
2882 case 't':
2883 {
2884 int threads;
2885 threads = atoi(optarg);
2886 if (threads >= 1)
2887 config_queue_threads = threads;
2888 else
2889 {
2890 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2891 return 1;
2892 }
2893 }
2894 break;
2896 case 'B':
2897 config_write_base_only = 1;
2898 break;
2900 case 'b':
2901 {
2902 size_t len;
2903 char base_realpath[PATH_MAX];
2905 if (config_base_dir != NULL)
2906 free (config_base_dir);
2907 config_base_dir = strdup (optarg);
2908 if (config_base_dir == NULL)
2909 {
2910 fprintf (stderr, "read_options: strdup failed.\n");
2911 return (3);
2912 }
2914 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2915 {
2916 fprintf (stderr, "Failed to create base directory '%s': %s\n",
2917 config_base_dir, rrd_strerror (errno));
2918 return (3);
2919 }
2921 /* make sure that the base directory is not resolved via
2922 * symbolic links. this makes some performance-enhancing
2923 * assumptions possible (we don't have to resolve paths
2924 * that start with a "/")
2925 */
2926 if (realpath(config_base_dir, base_realpath) == NULL)
2927 {
2928 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2929 "%s\n", config_base_dir, rrd_strerror(errno));
2930 return 5;
2931 }
2933 len = strlen (config_base_dir);
2934 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2935 {
2936 config_base_dir[len - 1] = 0;
2937 len--;
2938 }
2940 if (len < 1)
2941 {
2942 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2943 return (4);
2944 }
2946 _config_base_dir_len = len;
2948 len = strlen (base_realpath);
2949 while ((len > 0) && (base_realpath[len - 1] == '/'))
2950 {
2951 base_realpath[len - 1] = '\0';
2952 len--;
2953 }
2955 if (strncmp(config_base_dir,
2956 base_realpath, sizeof(base_realpath)) != 0)
2957 {
2958 fprintf(stderr,
2959 "Base directory (-b) resolved via file system links!\n"
2960 "Please consult rrdcached '-b' documentation!\n"
2961 "Consider specifying the real directory (%s)\n",
2962 base_realpath);
2963 return 5;
2964 }
2965 }
2966 break;
2968 case 'p':
2969 {
2970 if (config_pid_file != NULL)
2971 free (config_pid_file);
2972 config_pid_file = strdup (optarg);
2973 if (config_pid_file == NULL)
2974 {
2975 fprintf (stderr, "read_options: strdup failed.\n");
2976 return (3);
2977 }
2978 }
2979 break;
2981 case 'F':
2982 config_flush_at_shutdown = 1;
2983 break;
2985 case 'j':
2986 {
2987 const char *dir = journal_dir = strdup(optarg);
2989 status = rrd_mkdir_p(dir, 0777);
2990 if (status != 0)
2991 {
2992 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
2993 dir, rrd_strerror(errno));
2994 return 6;
2995 }
2997 if (access(dir, R_OK|W_OK|X_OK) != 0)
2998 {
2999 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3000 errno ? rrd_strerror(errno) : "");
3001 return 6;
3002 }
3003 }
3004 break;
3006 case 'h':
3007 case '?':
3008 printf ("RRDCacheD %s\n"
3009 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3010 "\n"
3011 "Usage: rrdcached [options]\n"
3012 "\n"
3013 "Valid options are:\n"
3014 " -l <address> Socket address to listen to.\n"
3015 " -P <perms> Sets the permissions to assign to all following "
3016 "sockets\n"
3017 " -w <seconds> Interval in which to write data.\n"
3018 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3019 " -t <threads> Number of write threads.\n"
3020 " -f <seconds> Interval in which to flush dead data.\n"
3021 " -p <file> Location of the PID-file.\n"
3022 " -b <dir> Base directory to change to.\n"
3023 " -B Restrict file access to paths within -b <dir>\n"
3024 " -g Do not fork and run in the foreground.\n"
3025 " -j <dir> Directory in which to create the journal files.\n"
3026 " -F Always flush all updates at shutdown\n"
3027 "\n"
3028 "For more information and a detailed description of all options "
3029 "please refer\n"
3030 "to the rrdcached(1) manual page.\n",
3031 VERSION);
3032 status = -1;
3033 break;
3034 } /* switch (option) */
3035 } /* while (getopt) */
3037 /* advise the user when values are not sane */
3038 if (config_flush_interval < 2 * config_write_interval)
3039 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3040 " 2x write interval (-w) !\n");
3041 if (config_write_jitter > config_write_interval)
3042 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3043 " write interval (-w) !\n");
3045 if (config_write_base_only && config_base_dir == NULL)
3046 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3047 " Consult the rrdcached documentation\n");
3049 if (journal_dir == NULL)
3050 config_flush_at_shutdown = 1;
3052 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3054 return (status);
3055 } /* }}} int read_options */
3057 int main (int argc, char **argv)
3058 {
3059 int status;
3061 status = read_options (argc, argv);
3062 if (status != 0)
3063 {
3064 if (status < 0)
3065 status = 0;
3066 return (status);
3067 }
3069 status = daemonize ();
3070 if (status != 0)
3071 {
3072 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3073 return (1);
3074 }
3076 journal_init();
3078 /* start the queue threads */
3079 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3080 if (queue_threads == NULL)
3081 {
3082 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3083 cleanup();
3084 return (1);
3085 }
3086 for (int i = 0; i < config_queue_threads; i++)
3087 {
3088 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3089 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3090 if (status != 0)
3091 {
3092 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3093 cleanup();
3094 return (1);
3095 }
3096 }
3098 /* start the flush thread */
3099 memset(&flush_thread, 0, sizeof(flush_thread));
3100 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3101 if (status != 0)
3102 {
3103 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3104 cleanup();
3105 return (1);
3106 }
3108 listen_thread_main (NULL);
3109 cleanup ();
3111 return (0);
3112 } /* int main */
3114 /*
3115 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3116 */