b290bcc0861619f3eed716208882689fefdd3862
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 # include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
111 #include <glib-2.0/glib.h>
112 /* }}} */
114 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
116 #ifndef __GNUC__
117 # define __attribute__(x) /**/
118 #endif
120 /*
121 * Types
122 */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125 struct listen_socket_s
126 {
127 int fd;
128 char addr[PATH_MAX + 1];
129 int family;
131 /* state for BATCH processing */
132 time_t batch_start;
133 int batch_cmd;
135 /* buffered IO */
136 char *rbuf;
137 off_t next_cmd;
138 off_t next_read;
140 char *wbuf;
141 ssize_t wbuf_len;
143 uint32_t permissions;
145 gid_t socket_group;
146 mode_t socket_permissions;
147 };
148 typedef struct listen_socket_s listen_socket_t;
150 struct command_s;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
154 time_t now __attribute__((unused)),\
155 char *buffer __attribute__((unused)),\
156 size_t buffer_size __attribute__((unused))
158 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
159 DISPATCH_PROTO
161 struct command_s {
162 char *cmd;
163 int (*handler)(HANDLER_PROTO);
165 char context; /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT (1<<0)
167 #define CMD_CONTEXT_BATCH (1<<1)
168 #define CMD_CONTEXT_JOURNAL (1<<2)
169 #define CMD_CONTEXT_ANY (0x7f)
171 char *syntax;
172 char *help;
173 };
175 struct cache_item_s;
176 typedef struct cache_item_s cache_item_t;
177 struct cache_item_s
178 {
179 char *file;
180 char **values;
181 size_t values_num;
182 time_t last_flush_time;
183 time_t last_update_stamp;
184 #define CI_FLAGS_IN_TREE (1<<0)
185 #define CI_FLAGS_IN_QUEUE (1<<1)
186 int flags;
187 pthread_cond_t flushed;
188 cache_item_t *prev;
189 cache_item_t *next;
190 };
192 struct callback_flush_data_s
193 {
194 time_t now;
195 time_t abs_timeout;
196 char **keys;
197 size_t keys_num;
198 };
199 typedef struct callback_flush_data_s callback_flush_data_t;
201 enum queue_side_e
202 {
203 HEAD,
204 TAIL
205 };
206 typedef enum queue_side_e queue_side_t;
208 /* describe a set of journal files */
209 typedef struct {
210 char **files;
211 size_t files_num;
212 } journal_set;
214 /* max length of socket command or response */
215 #define CMD_MAX 4096
216 #define RBUF_SIZE (CMD_MAX*2)
218 /*
219 * Variables
220 */
221 static int stay_foreground = 0;
222 static uid_t daemon_uid;
224 static listen_socket_t *listen_fds = NULL;
225 static size_t listen_fds_num = 0;
227 enum {
228 RUNNING, /* normal operation */
229 FLUSHING, /* flushing remaining values */
230 SHUTDOWN /* shutting down */
231 } state = RUNNING;
233 static pthread_t *queue_threads;
234 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
235 static int config_queue_threads = 4;
237 static pthread_t flush_thread;
238 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
240 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
241 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
242 static int connection_threads_num = 0;
244 /* Cache stuff */
245 static GTree *cache_tree = NULL;
246 static cache_item_t *cache_queue_head = NULL;
247 static cache_item_t *cache_queue_tail = NULL;
248 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
250 static int config_write_interval = 300;
251 static int config_write_jitter = 0;
252 static int config_flush_interval = 3600;
253 static int config_flush_at_shutdown = 0;
254 static char *config_pid_file = NULL;
255 static char *config_base_dir = NULL;
256 static size_t _config_base_dir_len = 0;
257 static int config_write_base_only = 0;
259 static listen_socket_t **config_listen_address_list = NULL;
260 static size_t config_listen_address_list_len = 0;
262 static uint64_t stats_queue_length = 0;
263 static uint64_t stats_updates_received = 0;
264 static uint64_t stats_flush_received = 0;
265 static uint64_t stats_updates_written = 0;
266 static uint64_t stats_data_sets_written = 0;
267 static uint64_t stats_journal_bytes = 0;
268 static uint64_t stats_journal_rotate = 0;
269 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
271 /* Journaled updates */
272 #define JOURNAL_REPLAY(s) ((s) == NULL)
273 #define JOURNAL_BASE "rrd.journal"
274 static journal_set *journal_cur = NULL;
275 static journal_set *journal_old = NULL;
276 static char *journal_dir = NULL;
277 static FILE *journal_fh = NULL; /* current journal file handle */
278 static long journal_size = 0; /* current journal size */
279 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
280 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
281 static int journal_write(char *cmd, char *args);
282 static void journal_done(void);
283 static void journal_rotate(void);
285 /* prototypes for forward refernces */
286 static int handle_request_help (HANDLER_PROTO);
288 /*
289 * Functions
290 */
291 static void sig_common (const char *sig) /* {{{ */
292 {
293 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
294 state = FLUSHING;
295 pthread_cond_broadcast(&flush_cond);
296 pthread_cond_broadcast(&queue_cond);
297 } /* }}} void sig_common */
299 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
300 {
301 sig_common("INT");
302 } /* }}} void sig_int_handler */
304 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
305 {
306 sig_common("TERM");
307 } /* }}} void sig_term_handler */
309 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
310 {
311 config_flush_at_shutdown = 1;
312 sig_common("USR1");
313 } /* }}} void sig_usr1_handler */
315 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
316 {
317 config_flush_at_shutdown = 0;
318 sig_common("USR2");
319 } /* }}} void sig_usr2_handler */
321 static void install_signal_handlers(void) /* {{{ */
322 {
323 /* These structures are static, because `sigaction' behaves weird if the are
324 * overwritten.. */
325 static struct sigaction sa_int;
326 static struct sigaction sa_term;
327 static struct sigaction sa_pipe;
328 static struct sigaction sa_usr1;
329 static struct sigaction sa_usr2;
331 /* Install signal handlers */
332 memset (&sa_int, 0, sizeof (sa_int));
333 sa_int.sa_handler = sig_int_handler;
334 sigaction (SIGINT, &sa_int, NULL);
336 memset (&sa_term, 0, sizeof (sa_term));
337 sa_term.sa_handler = sig_term_handler;
338 sigaction (SIGTERM, &sa_term, NULL);
340 memset (&sa_pipe, 0, sizeof (sa_pipe));
341 sa_pipe.sa_handler = SIG_IGN;
342 sigaction (SIGPIPE, &sa_pipe, NULL);
344 memset (&sa_pipe, 0, sizeof (sa_usr1));
345 sa_usr1.sa_handler = sig_usr1_handler;
346 sigaction (SIGUSR1, &sa_usr1, NULL);
348 memset (&sa_usr2, 0, sizeof (sa_usr2));
349 sa_usr2.sa_handler = sig_usr2_handler;
350 sigaction (SIGUSR2, &sa_usr2, NULL);
352 } /* }}} void install_signal_handlers */
354 static int open_pidfile(char *action, int oflag) /* {{{ */
355 {
356 int fd;
357 const char *file;
358 char *file_copy, *dir;
360 file = (config_pid_file != NULL)
361 ? config_pid_file
362 : LOCALSTATEDIR "/run/rrdcached.pid";
364 /* dirname may modify its argument */
365 file_copy = strdup(file);
366 if (file_copy == NULL)
367 {
368 fprintf(stderr, "rrdcached: strdup(): %s\n",
369 rrd_strerror(errno));
370 return -1;
371 }
373 dir = dirname(file_copy);
374 if (rrd_mkdir_p(dir, 0777) != 0)
375 {
376 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
377 dir, rrd_strerror(errno));
378 return -1;
379 }
381 free(file_copy);
383 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
384 if (fd < 0)
385 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
386 action, file, rrd_strerror(errno));
388 return(fd);
389 } /* }}} static int open_pidfile */
391 /* check existing pid file to see whether a daemon is running */
392 static int check_pidfile(void)
393 {
394 int pid_fd;
395 pid_t pid;
396 char pid_str[16];
398 pid_fd = open_pidfile("open", O_RDWR);
399 if (pid_fd < 0)
400 return pid_fd;
402 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
403 return -1;
405 pid = atoi(pid_str);
406 if (pid <= 0)
407 return -1;
409 /* another running process that we can signal COULD be
410 * a competing rrdcached */
411 if (pid != getpid() && kill(pid, 0) == 0)
412 {
413 fprintf(stderr,
414 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
415 close(pid_fd);
416 return -1;
417 }
419 lseek(pid_fd, 0, SEEK_SET);
420 if (ftruncate(pid_fd, 0) == -1)
421 {
422 fprintf(stderr,
423 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
424 close(pid_fd);
425 return -1;
426 }
428 fprintf(stderr,
429 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
430 "rrdcached: starting normally.\n", pid);
432 return pid_fd;
433 } /* }}} static int check_pidfile */
435 static int write_pidfile (int fd) /* {{{ */
436 {
437 pid_t pid;
438 FILE *fh;
440 pid = getpid ();
442 fh = fdopen (fd, "w");
443 if (fh == NULL)
444 {
445 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
446 close(fd);
447 return (-1);
448 }
450 fprintf (fh, "%i\n", (int) pid);
451 fclose (fh);
453 return (0);
454 } /* }}} int write_pidfile */
456 static int remove_pidfile (void) /* {{{ */
457 {
458 char *file;
459 int status;
461 file = (config_pid_file != NULL)
462 ? config_pid_file
463 : LOCALSTATEDIR "/run/rrdcached.pid";
465 status = unlink (file);
466 if (status == 0)
467 return (0);
468 return (errno);
469 } /* }}} int remove_pidfile */
471 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
472 {
473 char *eol;
475 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
476 sock->next_read - sock->next_cmd);
478 if (eol == NULL)
479 {
480 /* no commands left, move remainder back to front of rbuf */
481 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
482 sock->next_read - sock->next_cmd);
483 sock->next_read -= sock->next_cmd;
484 sock->next_cmd = 0;
485 *len = 0;
486 return NULL;
487 }
488 else
489 {
490 char *cmd = sock->rbuf + sock->next_cmd;
491 *eol = '\0';
493 sock->next_cmd = eol - sock->rbuf + 1;
495 if (eol > sock->rbuf && *(eol-1) == '\r')
496 *(--eol) = '\0'; /* handle "\r\n" EOL */
498 *len = eol - cmd;
500 return cmd;
501 }
503 /* NOTREACHED */
504 assert(1==0);
505 } /* }}} char *next_cmd */
507 /* add the characters directly to the write buffer */
508 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
509 {
510 char *new_buf;
512 assert(sock != NULL);
514 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
515 if (new_buf == NULL)
516 {
517 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
518 return -1;
519 }
521 strncpy(new_buf + sock->wbuf_len, str, len + 1);
523 sock->wbuf = new_buf;
524 sock->wbuf_len += len;
526 return 0;
527 } /* }}} static int add_to_wbuf */
529 /* add the text to the "extra" info that's sent after the status line */
530 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
531 {
532 va_list argp;
533 char buffer[CMD_MAX];
534 int len;
536 if (JOURNAL_REPLAY(sock)) return 0;
537 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
539 va_start(argp, fmt);
540 #ifdef HAVE_VSNPRINTF
541 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
542 #else
543 len = vsprintf(buffer, fmt, argp);
544 #endif
545 va_end(argp);
546 if (len < 0)
547 {
548 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
549 return -1;
550 }
552 return add_to_wbuf(sock, buffer, len);
553 } /* }}} static int add_response_info */
555 static int count_lines(char *str) /* {{{ */
556 {
557 int lines = 0;
559 if (str != NULL)
560 {
561 while ((str = strchr(str, '\n')) != NULL)
562 {
563 ++lines;
564 ++str;
565 }
566 }
568 return lines;
569 } /* }}} static int count_lines */
571 /* send the response back to the user.
572 * returns 0 on success, -1 on error
573 * write buffer is always zeroed after this call */
574 static int send_response (listen_socket_t *sock, response_code rc,
575 char *fmt, ...) /* {{{ */
576 {
577 va_list argp;
578 char buffer[CMD_MAX];
579 int lines;
580 ssize_t wrote;
581 int rclen, len;
583 if (JOURNAL_REPLAY(sock)) return rc;
585 if (sock->batch_start)
586 {
587 if (rc == RESP_OK)
588 return rc; /* no response on success during BATCH */
589 lines = sock->batch_cmd;
590 }
591 else if (rc == RESP_OK)
592 lines = count_lines(sock->wbuf);
593 else
594 lines = -1;
596 rclen = sprintf(buffer, "%d ", lines);
597 va_start(argp, fmt);
598 #ifdef HAVE_VSNPRINTF
599 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
600 #else
601 len = vsprintf(buffer+rclen, fmt, argp);
602 #endif
603 va_end(argp);
604 if (len < 0)
605 return -1;
607 len += rclen;
609 /* append the result to the wbuf, don't write to the user */
610 if (sock->batch_start)
611 return add_to_wbuf(sock, buffer, len);
613 /* first write must be complete */
614 if (len != write(sock->fd, buffer, len))
615 {
616 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
617 return -1;
618 }
620 if (sock->wbuf != NULL && rc == RESP_OK)
621 {
622 wrote = 0;
623 while (wrote < sock->wbuf_len)
624 {
625 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
626 if (wb <= 0)
627 {
628 RRDD_LOG(LOG_INFO, "send_response: could not write results");
629 return -1;
630 }
631 wrote += wb;
632 }
633 }
635 free(sock->wbuf); sock->wbuf = NULL;
636 sock->wbuf_len = 0;
638 return 0;
639 } /* }}} */
641 static void wipe_ci_values(cache_item_t *ci, time_t when)
642 {
643 ci->values = NULL;
644 ci->values_num = 0;
646 ci->last_flush_time = when;
647 if (config_write_jitter > 0)
648 ci->last_flush_time += (rrd_random() % config_write_jitter);
649 }
651 /* remove_from_queue
652 * remove a "cache_item_t" item from the queue.
653 * must hold 'cache_lock' when calling this
654 */
655 static void remove_from_queue(cache_item_t *ci) /* {{{ */
656 {
657 if (ci == NULL) return;
658 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
660 if (ci->prev == NULL)
661 cache_queue_head = ci->next; /* reset head */
662 else
663 ci->prev->next = ci->next;
665 if (ci->next == NULL)
666 cache_queue_tail = ci->prev; /* reset the tail */
667 else
668 ci->next->prev = ci->prev;
670 ci->next = ci->prev = NULL;
671 ci->flags &= ~CI_FLAGS_IN_QUEUE;
673 pthread_mutex_lock (&stats_lock);
674 assert (stats_queue_length > 0);
675 stats_queue_length--;
676 pthread_mutex_unlock (&stats_lock);
678 } /* }}} static void remove_from_queue */
680 /* free the resources associated with the cache_item_t
681 * must hold cache_lock when calling this function
682 */
683 static void *free_cache_item(cache_item_t *ci) /* {{{ */
684 {
685 if (ci == NULL) return NULL;
687 remove_from_queue(ci);
689 for (size_t i=0; i < ci->values_num; i++)
690 free(ci->values[i]);
692 free (ci->values);
693 free (ci->file);
695 /* in case anyone is waiting */
696 pthread_cond_broadcast(&ci->flushed);
697 pthread_cond_destroy(&ci->flushed);
699 free (ci);
701 return NULL;
702 } /* }}} static void *free_cache_item */
704 /*
705 * enqueue_cache_item:
706 * `cache_lock' must be acquired before calling this function!
707 */
708 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
709 queue_side_t side)
710 {
711 if (ci == NULL)
712 return (-1);
714 if (ci->values_num == 0)
715 return (0);
717 if (side == HEAD)
718 {
719 if (cache_queue_head == ci)
720 return 0;
722 /* remove if further down in queue */
723 remove_from_queue(ci);
725 ci->prev = NULL;
726 ci->next = cache_queue_head;
727 if (ci->next != NULL)
728 ci->next->prev = ci;
729 cache_queue_head = ci;
731 if (cache_queue_tail == NULL)
732 cache_queue_tail = cache_queue_head;
733 }
734 else /* (side == TAIL) */
735 {
736 /* We don't move values back in the list.. */
737 if (ci->flags & CI_FLAGS_IN_QUEUE)
738 return (0);
740 assert (ci->next == NULL);
741 assert (ci->prev == NULL);
743 ci->prev = cache_queue_tail;
745 if (cache_queue_tail == NULL)
746 cache_queue_head = ci;
747 else
748 cache_queue_tail->next = ci;
750 cache_queue_tail = ci;
751 }
753 ci->flags |= CI_FLAGS_IN_QUEUE;
755 pthread_cond_signal(&queue_cond);
756 pthread_mutex_lock (&stats_lock);
757 stats_queue_length++;
758 pthread_mutex_unlock (&stats_lock);
760 return (0);
761 } /* }}} int enqueue_cache_item */
763 /*
764 * tree_callback_flush:
765 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
766 * while this is in progress.
767 */
768 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
769 gpointer data)
770 {
771 cache_item_t *ci;
772 callback_flush_data_t *cfd;
774 ci = (cache_item_t *) value;
775 cfd = (callback_flush_data_t *) data;
777 if (ci->flags & CI_FLAGS_IN_QUEUE)
778 return FALSE;
780 if (ci->values_num > 0
781 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
782 {
783 enqueue_cache_item (ci, TAIL);
784 }
785 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
786 && (ci->values_num <= 0))
787 {
788 assert ((char *) key == ci->file);
789 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
790 {
791 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
792 return (FALSE);
793 }
794 }
796 return (FALSE);
797 } /* }}} gboolean tree_callback_flush */
799 static int flush_old_values (int max_age)
800 {
801 callback_flush_data_t cfd;
802 size_t k;
804 memset (&cfd, 0, sizeof (cfd));
805 /* Pass the current time as user data so that we don't need to call
806 * `time' for each node. */
807 cfd.now = time (NULL);
808 cfd.keys = NULL;
809 cfd.keys_num = 0;
811 if (max_age > 0)
812 cfd.abs_timeout = cfd.now - max_age;
813 else
814 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
816 /* `tree_callback_flush' will return the keys of all values that haven't
817 * been touched in the last `config_flush_interval' seconds in `cfd'.
818 * The char*'s in this array point to the same memory as ci->file, so we
819 * don't need to free them separately. */
820 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
822 for (k = 0; k < cfd.keys_num; k++)
823 {
824 /* should never fail, since we have held the cache_lock
825 * the entire time */
826 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
827 }
829 if (cfd.keys != NULL)
830 {
831 free (cfd.keys);
832 cfd.keys = NULL;
833 }
835 return (0);
836 } /* int flush_old_values */
838 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
839 {
840 struct timeval now;
841 struct timespec next_flush;
842 int status;
844 gettimeofday (&now, NULL);
845 next_flush.tv_sec = now.tv_sec + config_flush_interval;
846 next_flush.tv_nsec = 1000 * now.tv_usec;
848 pthread_mutex_lock(&cache_lock);
850 while (state == RUNNING)
851 {
852 gettimeofday (&now, NULL);
853 if ((now.tv_sec > next_flush.tv_sec)
854 || ((now.tv_sec == next_flush.tv_sec)
855 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
856 {
857 RRDD_LOG(LOG_DEBUG, "flushing old values");
859 /* Determine the time of the next cache flush. */
860 next_flush.tv_sec = now.tv_sec + config_flush_interval;
862 /* Flush all values that haven't been written in the last
863 * `config_write_interval' seconds. */
864 flush_old_values (config_write_interval);
866 /* unlock the cache while we rotate so we don't block incoming
867 * updates if the fsync() blocks on disk I/O */
868 pthread_mutex_unlock(&cache_lock);
869 journal_rotate();
870 pthread_mutex_lock(&cache_lock);
871 }
873 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
874 if (status != 0 && status != ETIMEDOUT)
875 {
876 RRDD_LOG (LOG_ERR, "flush_thread_main: "
877 "pthread_cond_timedwait returned %i.", status);
878 }
879 }
881 if (config_flush_at_shutdown)
882 flush_old_values (-1); /* flush everything */
884 state = SHUTDOWN;
886 pthread_mutex_unlock(&cache_lock);
888 return NULL;
889 } /* void *flush_thread_main */
891 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
892 {
893 pthread_mutex_lock (&cache_lock);
895 while (state != SHUTDOWN
896 || (cache_queue_head != NULL && config_flush_at_shutdown))
897 {
898 cache_item_t *ci;
899 char *file;
900 char **values;
901 size_t values_num;
902 int status;
904 /* Now, check if there's something to store away. If not, wait until
905 * something comes in. */
906 if (cache_queue_head == NULL)
907 {
908 status = pthread_cond_wait (&queue_cond, &cache_lock);
909 if ((status != 0) && (status != ETIMEDOUT))
910 {
911 RRDD_LOG (LOG_ERR, "queue_thread_main: "
912 "pthread_cond_wait returned %i.", status);
913 }
914 }
916 /* Check if a value has arrived. This may be NULL if we timed out or there
917 * was an interrupt such as a signal. */
918 if (cache_queue_head == NULL)
919 continue;
921 ci = cache_queue_head;
923 /* copy the relevant parts */
924 file = strdup (ci->file);
925 if (file == NULL)
926 {
927 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
928 continue;
929 }
931 assert(ci->values != NULL);
932 assert(ci->values_num > 0);
934 values = ci->values;
935 values_num = ci->values_num;
937 wipe_ci_values(ci, time(NULL));
938 remove_from_queue(ci);
940 pthread_mutex_unlock (&cache_lock);
942 rrd_clear_error ();
943 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
944 if (status != 0)
945 {
946 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
947 "rrd_update_r (%s) failed with status %i. (%s)",
948 file, status, rrd_get_error());
949 }
951 journal_write("wrote", file);
953 /* Search again in the tree. It's possible someone issued a "FORGET"
954 * while we were writing the update values. */
955 pthread_mutex_lock(&cache_lock);
956 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
957 if (ci)
958 pthread_cond_broadcast(&ci->flushed);
959 pthread_mutex_unlock(&cache_lock);
961 if (status == 0)
962 {
963 pthread_mutex_lock (&stats_lock);
964 stats_updates_written++;
965 stats_data_sets_written += values_num;
966 pthread_mutex_unlock (&stats_lock);
967 }
969 rrd_free_ptrs((void ***) &values, &values_num);
970 free(file);
972 pthread_mutex_lock (&cache_lock);
973 }
974 pthread_mutex_unlock (&cache_lock);
976 return (NULL);
977 } /* }}} void *queue_thread_main */
979 static int buffer_get_field (char **buffer_ret, /* {{{ */
980 size_t *buffer_size_ret, char **field_ret)
981 {
982 char *buffer;
983 size_t buffer_pos;
984 size_t buffer_size;
985 char *field;
986 size_t field_size;
987 int status;
989 buffer = *buffer_ret;
990 buffer_pos = 0;
991 buffer_size = *buffer_size_ret;
992 field = *buffer_ret;
993 field_size = 0;
995 if (buffer_size <= 0)
996 return (-1);
998 /* This is ensured by `handle_request'. */
999 assert (buffer[buffer_size - 1] == '\0');
1001 status = -1;
1002 while (buffer_pos < buffer_size)
1003 {
1004 /* Check for end-of-field or end-of-buffer */
1005 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1006 {
1007 field[field_size] = 0;
1008 field_size++;
1009 buffer_pos++;
1010 status = 0;
1011 break;
1012 }
1013 /* Handle escaped characters. */
1014 else if (buffer[buffer_pos] == '\\')
1015 {
1016 if (buffer_pos >= (buffer_size - 1))
1017 break;
1018 buffer_pos++;
1019 field[field_size] = buffer[buffer_pos];
1020 field_size++;
1021 buffer_pos++;
1022 }
1023 /* Normal operation */
1024 else
1025 {
1026 field[field_size] = buffer[buffer_pos];
1027 field_size++;
1028 buffer_pos++;
1029 }
1030 } /* while (buffer_pos < buffer_size) */
1032 if (status != 0)
1033 return (status);
1035 *buffer_ret = buffer + buffer_pos;
1036 *buffer_size_ret = buffer_size - buffer_pos;
1037 *field_ret = field;
1039 return (0);
1040 } /* }}} int buffer_get_field */
1042 /* if we're restricting writes to the base directory,
1043 * check whether the file falls within the dir
1044 * returns 1 if OK, otherwise 0
1045 */
1046 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1047 {
1048 assert(file != NULL);
1050 if (!config_write_base_only
1051 || JOURNAL_REPLAY(sock)
1052 || config_base_dir == NULL)
1053 return 1;
1055 if (strstr(file, "../") != NULL) goto err;
1057 /* relative paths without "../" are ok */
1058 if (*file != '/') return 1;
1060 /* file must be of the format base + "/" + <1+ char filename> */
1061 if (strlen(file) < _config_base_dir_len + 2) goto err;
1062 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1063 if (*(file + _config_base_dir_len) != '/') goto err;
1065 return 1;
1067 err:
1068 if (sock != NULL && sock->fd >= 0)
1069 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1071 return 0;
1072 } /* }}} static int check_file_access */
1074 /* when using a base dir, convert relative paths to absolute paths.
1075 * if necessary, modifies the "filename" pointer to point
1076 * to the new path created in "tmp". "tmp" is provided
1077 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1078 *
1079 * this allows us to optimize for the expected case (absolute path)
1080 * with a no-op.
1081 */
1082 static void get_abs_path(char **filename, char *tmp)
1083 {
1084 assert(tmp != NULL);
1085 assert(filename != NULL && *filename != NULL);
1087 if (config_base_dir == NULL || **filename == '/')
1088 return;
1090 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1091 *filename = tmp;
1092 } /* }}} static int get_abs_path */
1094 static int flush_file (const char *filename) /* {{{ */
1095 {
1096 cache_item_t *ci;
1098 pthread_mutex_lock (&cache_lock);
1100 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1101 if (ci == NULL)
1102 {
1103 pthread_mutex_unlock (&cache_lock);
1104 return (ENOENT);
1105 }
1107 if (ci->values_num > 0)
1108 {
1109 /* Enqueue at head */
1110 enqueue_cache_item (ci, HEAD);
1111 pthread_cond_wait(&ci->flushed, &cache_lock);
1112 }
1114 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1115 * may have been purged during our cond_wait() */
1117 pthread_mutex_unlock(&cache_lock);
1119 return (0);
1120 } /* }}} int flush_file */
1122 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1123 {
1124 char *err = "Syntax error.\n";
1126 if (cmd && cmd->syntax)
1127 err = cmd->syntax;
1129 return send_response(sock, RESP_ERR, "Usage: %s", err);
1130 } /* }}} static int syntax_error() */
1132 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1133 {
1134 uint64_t copy_queue_length;
1135 uint64_t copy_updates_received;
1136 uint64_t copy_flush_received;
1137 uint64_t copy_updates_written;
1138 uint64_t copy_data_sets_written;
1139 uint64_t copy_journal_bytes;
1140 uint64_t copy_journal_rotate;
1142 uint64_t tree_nodes_number;
1143 uint64_t tree_depth;
1145 pthread_mutex_lock (&stats_lock);
1146 copy_queue_length = stats_queue_length;
1147 copy_updates_received = stats_updates_received;
1148 copy_flush_received = stats_flush_received;
1149 copy_updates_written = stats_updates_written;
1150 copy_data_sets_written = stats_data_sets_written;
1151 copy_journal_bytes = stats_journal_bytes;
1152 copy_journal_rotate = stats_journal_rotate;
1153 pthread_mutex_unlock (&stats_lock);
1155 pthread_mutex_lock (&cache_lock);
1156 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1157 tree_depth = (uint64_t) g_tree_height (cache_tree);
1158 pthread_mutex_unlock (&cache_lock);
1160 add_response_info(sock,
1161 "QueueLength: %"PRIu64"\n", copy_queue_length);
1162 add_response_info(sock,
1163 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1164 add_response_info(sock,
1165 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1166 add_response_info(sock,
1167 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1168 add_response_info(sock,
1169 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1170 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1171 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1172 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1173 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1175 send_response(sock, RESP_OK, "Statistics follow\n");
1177 return (0);
1178 } /* }}} int handle_request_stats */
1180 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1181 {
1182 char *file, file_tmp[PATH_MAX];
1183 int status;
1185 status = buffer_get_field (&buffer, &buffer_size, &file);
1186 if (status != 0)
1187 {
1188 return syntax_error(sock,cmd);
1189 }
1190 else
1191 {
1192 pthread_mutex_lock(&stats_lock);
1193 stats_flush_received++;
1194 pthread_mutex_unlock(&stats_lock);
1196 get_abs_path(&file, file_tmp);
1197 if (!check_file_access(file, sock)) return 0;
1199 status = flush_file (file);
1200 if (status == 0)
1201 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1202 else if (status == ENOENT)
1203 {
1204 /* no file in our tree; see whether it exists at all */
1205 struct stat statbuf;
1207 memset(&statbuf, 0, sizeof(statbuf));
1208 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1209 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1210 else
1211 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1212 }
1213 else if (status < 0)
1214 return send_response(sock, RESP_ERR, "Internal error.\n");
1215 else
1216 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1217 }
1219 /* NOTREACHED */
1220 assert(1==0);
1221 } /* }}} int handle_request_flush */
1223 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1224 {
1225 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1227 pthread_mutex_lock(&cache_lock);
1228 flush_old_values(-1);
1229 pthread_mutex_unlock(&cache_lock);
1231 return send_response(sock, RESP_OK, "Started flush.\n");
1232 } /* }}} static int handle_request_flushall */
1234 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1235 {
1236 int status;
1237 char *file, file_tmp[PATH_MAX];
1238 cache_item_t *ci;
1240 status = buffer_get_field(&buffer, &buffer_size, &file);
1241 if (status != 0)
1242 return syntax_error(sock,cmd);
1244 get_abs_path(&file, file_tmp);
1246 pthread_mutex_lock(&cache_lock);
1247 ci = g_tree_lookup(cache_tree, file);
1248 if (ci == NULL)
1249 {
1250 pthread_mutex_unlock(&cache_lock);
1251 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1252 }
1254 for (size_t i=0; i < ci->values_num; i++)
1255 add_response_info(sock, "%s\n", ci->values[i]);
1257 pthread_mutex_unlock(&cache_lock);
1258 return send_response(sock, RESP_OK, "updates pending\n");
1259 } /* }}} static int handle_request_pending */
1261 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1262 {
1263 int status;
1264 gboolean found;
1265 char *file, file_tmp[PATH_MAX];
1267 status = buffer_get_field(&buffer, &buffer_size, &file);
1268 if (status != 0)
1269 return syntax_error(sock,cmd);
1271 get_abs_path(&file, file_tmp);
1272 if (!check_file_access(file, sock)) return 0;
1274 pthread_mutex_lock(&cache_lock);
1275 found = g_tree_remove(cache_tree, file);
1276 pthread_mutex_unlock(&cache_lock);
1278 if (found == TRUE)
1279 {
1280 if (!JOURNAL_REPLAY(sock))
1281 journal_write("forget", file);
1283 return send_response(sock, RESP_OK, "Gone!\n");
1284 }
1285 else
1286 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1288 /* NOTREACHED */
1289 assert(1==0);
1290 } /* }}} static int handle_request_forget */
1292 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1293 {
1294 cache_item_t *ci;
1296 pthread_mutex_lock(&cache_lock);
1298 ci = cache_queue_head;
1299 while (ci != NULL)
1300 {
1301 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1302 ci = ci->next;
1303 }
1305 pthread_mutex_unlock(&cache_lock);
1307 return send_response(sock, RESP_OK, "in queue.\n");
1308 } /* }}} int handle_request_queue */
1310 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1311 {
1312 char *file, file_tmp[PATH_MAX];
1313 int values_num = 0;
1314 int status;
1315 char orig_buf[CMD_MAX];
1317 cache_item_t *ci;
1319 /* save it for the journal later */
1320 if (!JOURNAL_REPLAY(sock))
1321 strncpy(orig_buf, buffer, buffer_size);
1323 status = buffer_get_field (&buffer, &buffer_size, &file);
1324 if (status != 0)
1325 return syntax_error(sock,cmd);
1327 pthread_mutex_lock(&stats_lock);
1328 stats_updates_received++;
1329 pthread_mutex_unlock(&stats_lock);
1331 get_abs_path(&file, file_tmp);
1332 if (!check_file_access(file, sock)) return 0;
1334 pthread_mutex_lock (&cache_lock);
1335 ci = g_tree_lookup (cache_tree, file);
1337 if (ci == NULL) /* {{{ */
1338 {
1339 struct stat statbuf;
1340 cache_item_t *tmp;
1342 /* don't hold the lock while we setup; stat(2) might block */
1343 pthread_mutex_unlock(&cache_lock);
1345 memset (&statbuf, 0, sizeof (statbuf));
1346 status = stat (file, &statbuf);
1347 if (status != 0)
1348 {
1349 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1351 status = errno;
1352 if (status == ENOENT)
1353 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1354 else
1355 return send_response(sock, RESP_ERR,
1356 "stat failed with error %i.\n", status);
1357 }
1358 if (!S_ISREG (statbuf.st_mode))
1359 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1361 if (access(file, R_OK|W_OK) != 0)
1362 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1363 file, rrd_strerror(errno));
1365 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1366 if (ci == NULL)
1367 {
1368 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1370 return send_response(sock, RESP_ERR, "malloc failed.\n");
1371 }
1372 memset (ci, 0, sizeof (cache_item_t));
1374 ci->file = strdup (file);
1375 if (ci->file == NULL)
1376 {
1377 free (ci);
1378 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1380 return send_response(sock, RESP_ERR, "strdup failed.\n");
1381 }
1383 wipe_ci_values(ci, now);
1384 ci->flags = CI_FLAGS_IN_TREE;
1385 pthread_cond_init(&ci->flushed, NULL);
1387 pthread_mutex_lock(&cache_lock);
1389 /* another UPDATE might have added this entry in the meantime */
1390 tmp = g_tree_lookup (cache_tree, file);
1391 if (tmp == NULL)
1392 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1393 else
1394 {
1395 free_cache_item (ci);
1396 ci = tmp;
1397 }
1399 /* state may have changed while we were unlocked */
1400 if (state == SHUTDOWN)
1401 return -1;
1402 } /* }}} */
1403 assert (ci != NULL);
1405 /* don't re-write updates in replay mode */
1406 if (!JOURNAL_REPLAY(sock))
1407 journal_write("update", orig_buf);
1409 while (buffer_size > 0)
1410 {
1411 char *value;
1412 time_t stamp;
1413 char *eostamp;
1415 status = buffer_get_field (&buffer, &buffer_size, &value);
1416 if (status != 0)
1417 {
1418 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1419 break;
1420 }
1422 /* make sure update time is always moving forward */
1423 stamp = strtol(value, &eostamp, 10);
1424 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1425 {
1426 pthread_mutex_unlock(&cache_lock);
1427 return send_response(sock, RESP_ERR,
1428 "Cannot find timestamp in '%s'!\n", value);
1429 }
1430 else if (stamp <= ci->last_update_stamp)
1431 {
1432 pthread_mutex_unlock(&cache_lock);
1433 return send_response(sock, RESP_ERR,
1434 "illegal attempt to update using time %ld when last"
1435 " update time is %ld (minimum one second step)\n",
1436 stamp, ci->last_update_stamp);
1437 }
1438 else
1439 ci->last_update_stamp = stamp;
1441 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1442 {
1443 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1444 continue;
1445 }
1447 values_num++;
1448 }
1450 if (((now - ci->last_flush_time) >= config_write_interval)
1451 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1452 && (ci->values_num > 0))
1453 {
1454 enqueue_cache_item (ci, TAIL);
1455 }
1457 pthread_mutex_unlock (&cache_lock);
1459 if (values_num < 1)
1460 return send_response(sock, RESP_ERR, "No values updated.\n");
1461 else
1462 return send_response(sock, RESP_OK,
1463 "errors, enqueued %i value(s).\n", values_num);
1465 /* NOTREACHED */
1466 assert(1==0);
1468 } /* }}} int handle_request_update */
1470 /* we came across a "WROTE" entry during journal replay.
1471 * throw away any values that we have accumulated for this file
1472 */
1473 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1474 {
1475 cache_item_t *ci;
1476 const char *file = buffer;
1478 pthread_mutex_lock(&cache_lock);
1480 ci = g_tree_lookup(cache_tree, file);
1481 if (ci == NULL)
1482 {
1483 pthread_mutex_unlock(&cache_lock);
1484 return (0);
1485 }
1487 if (ci->values)
1488 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1490 wipe_ci_values(ci, now);
1491 remove_from_queue(ci);
1493 pthread_mutex_unlock(&cache_lock);
1494 return (0);
1495 } /* }}} int handle_request_wrote */
1497 /* start "BATCH" processing */
1498 static int batch_start (HANDLER_PROTO) /* {{{ */
1499 {
1500 int status;
1501 if (sock->batch_start)
1502 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1504 status = send_response(sock, RESP_OK,
1505 "Go ahead. End with dot '.' on its own line.\n");
1506 sock->batch_start = time(NULL);
1507 sock->batch_cmd = 0;
1509 return status;
1510 } /* }}} static int batch_start */
1512 /* finish "BATCH" processing and return results to the client */
1513 static int batch_done (HANDLER_PROTO) /* {{{ */
1514 {
1515 assert(sock->batch_start);
1516 sock->batch_start = 0;
1517 sock->batch_cmd = 0;
1518 return send_response(sock, RESP_OK, "errors\n");
1519 } /* }}} static int batch_done */
1521 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1522 {
1523 return -1;
1524 } /* }}} static int handle_request_quit */
1526 static command_t list_of_commands[] = { /* {{{ */
1527 {
1528 "UPDATE",
1529 handle_request_update,
1530 CMD_CONTEXT_ANY,
1531 "UPDATE <filename> <values> [<values> ...]\n"
1532 ,
1533 "Adds the given file to the internal cache if it is not yet known and\n"
1534 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1535 "for details.\n"
1536 "\n"
1537 "Each <values> has the following form:\n"
1538 " <values> = <time>:<value>[:<value>[...]]\n"
1539 "See the rrdupdate(1) manpage for details.\n"
1540 },
1541 {
1542 "WROTE",
1543 handle_request_wrote,
1544 CMD_CONTEXT_JOURNAL,
1545 NULL,
1546 NULL
1547 },
1548 {
1549 "FLUSH",
1550 handle_request_flush,
1551 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1552 "FLUSH <filename>\n"
1553 ,
1554 "Adds the given filename to the head of the update queue and returns\n"
1555 "after it has been dequeued.\n"
1556 },
1557 {
1558 "FLUSHALL",
1559 handle_request_flushall,
1560 CMD_CONTEXT_CLIENT,
1561 "FLUSHALL\n"
1562 ,
1563 "Triggers writing of all pending updates. Returns immediately.\n"
1564 },
1565 {
1566 "PENDING",
1567 handle_request_pending,
1568 CMD_CONTEXT_CLIENT,
1569 "PENDING <filename>\n"
1570 ,
1571 "Shows any 'pending' updates for a file, in order.\n"
1572 "The updates shown have not yet been written to the underlying RRD file.\n"
1573 },
1574 {
1575 "FORGET",
1576 handle_request_forget,
1577 CMD_CONTEXT_ANY,
1578 "FORGET <filename>\n"
1579 ,
1580 "Removes the file completely from the cache.\n"
1581 "Any pending updates for the file will be lost.\n"
1582 },
1583 {
1584 "QUEUE",
1585 handle_request_queue,
1586 CMD_CONTEXT_CLIENT,
1587 "QUEUE\n"
1588 ,
1589 "Shows all files in the output queue.\n"
1590 "The output is zero or more lines in the following format:\n"
1591 "(where <num_vals> is the number of values to be written)\n"
1592 "\n"
1593 "<num_vals> <filename>\n"
1594 },
1595 {
1596 "STATS",
1597 handle_request_stats,
1598 CMD_CONTEXT_CLIENT,
1599 "STATS\n"
1600 ,
1601 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1602 "a description of the values.\n"
1603 },
1604 {
1605 "HELP",
1606 handle_request_help,
1607 CMD_CONTEXT_CLIENT,
1608 "HELP [<command>]\n",
1609 NULL, /* special! */
1610 },
1611 {
1612 "BATCH",
1613 batch_start,
1614 CMD_CONTEXT_CLIENT,
1615 "BATCH\n"
1616 ,
1617 "The 'BATCH' command permits the client to initiate a bulk load\n"
1618 " of commands to rrdcached.\n"
1619 "\n"
1620 "Usage:\n"
1621 "\n"
1622 " client: BATCH\n"
1623 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1624 " client: command #1\n"
1625 " client: command #2\n"
1626 " client: ... and so on\n"
1627 " client: .\n"
1628 " server: 2 errors\n"
1629 " server: 7 message for command #7\n"
1630 " server: 9 message for command #9\n"
1631 "\n"
1632 "For more information, consult the rrdcached(1) documentation.\n"
1633 },
1634 {
1635 ".", /* BATCH terminator */
1636 batch_done,
1637 CMD_CONTEXT_BATCH,
1638 NULL,
1639 NULL
1640 },
1641 {
1642 "QUIT",
1643 handle_request_quit,
1644 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1645 "QUIT\n"
1646 ,
1647 "Disconnect from rrdcached.\n"
1648 }
1649 }; /* }}} command_t list_of_commands[] */
1650 static size_t list_of_commands_len = sizeof (list_of_commands)
1651 / sizeof (list_of_commands[0]);
1653 static command_t *find_command(char *cmd)
1654 {
1655 size_t i;
1657 for (i = 0; i < list_of_commands_len; i++)
1658 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1659 return (&list_of_commands[i]);
1660 return NULL;
1661 }
1663 /* We currently use the index in the `list_of_commands' array as a bit position
1664 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1665 * outside these functions so that switching to a more elegant storage method
1666 * is easily possible. */
1667 static ssize_t find_command_index (const char *cmd) /* {{{ */
1668 {
1669 size_t i;
1671 for (i = 0; i < list_of_commands_len; i++)
1672 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1673 return ((ssize_t) i);
1674 return (-1);
1675 } /* }}} ssize_t find_command_index */
1677 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1678 const char *cmd)
1679 {
1680 ssize_t i;
1682 if (JOURNAL_REPLAY(sock))
1683 return (1);
1685 if (cmd == NULL)
1686 return (-1);
1688 if ((strcasecmp ("QUIT", cmd) == 0)
1689 || (strcasecmp ("HELP", cmd) == 0))
1690 return (1);
1691 else if (strcmp (".", cmd) == 0)
1692 cmd = "BATCH";
1694 i = find_command_index (cmd);
1695 if (i < 0)
1696 return (-1);
1697 assert (i < 32);
1699 if ((sock->permissions & (1 << i)) != 0)
1700 return (1);
1701 return (0);
1702 } /* }}} int socket_permission_check */
1704 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1705 const char *cmd)
1706 {
1707 ssize_t i;
1709 i = find_command_index (cmd);
1710 if (i < 0)
1711 return (-1);
1712 assert (i < 32);
1714 sock->permissions |= (1 << i);
1715 return (0);
1716 } /* }}} int socket_permission_add */
1718 /* check whether commands are received in the expected context */
1719 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1720 {
1721 if (JOURNAL_REPLAY(sock))
1722 return (cmd->context & CMD_CONTEXT_JOURNAL);
1723 else if (sock->batch_start)
1724 return (cmd->context & CMD_CONTEXT_BATCH);
1725 else
1726 return (cmd->context & CMD_CONTEXT_CLIENT);
1728 /* NOTREACHED */
1729 assert(1==0);
1730 }
1732 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1733 {
1734 int status;
1735 char *cmd_str;
1736 char *resp_txt;
1737 command_t *help = NULL;
1739 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1740 if (status == 0)
1741 help = find_command(cmd_str);
1743 if (help && (help->syntax || help->help))
1744 {
1745 char tmp[CMD_MAX];
1747 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1748 resp_txt = tmp;
1750 if (help->syntax)
1751 add_response_info(sock, "Usage: %s\n", help->syntax);
1753 if (help->help)
1754 add_response_info(sock, "%s\n", help->help);
1755 }
1756 else
1757 {
1758 size_t i;
1760 resp_txt = "Command overview\n";
1762 for (i = 0; i < list_of_commands_len; i++)
1763 {
1764 if (list_of_commands[i].syntax == NULL)
1765 continue;
1766 add_response_info (sock, "%s", list_of_commands[i].syntax);
1767 }
1768 }
1770 return send_response(sock, RESP_OK, resp_txt);
1771 } /* }}} int handle_request_help */
1773 static int handle_request (DISPATCH_PROTO) /* {{{ */
1774 {
1775 char *buffer_ptr = buffer;
1776 char *cmd_str = NULL;
1777 command_t *cmd = NULL;
1778 int status;
1780 assert (buffer[buffer_size - 1] == '\0');
1782 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1783 if (status != 0)
1784 {
1785 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1786 return (-1);
1787 }
1789 if (sock != NULL && sock->batch_start)
1790 sock->batch_cmd++;
1792 cmd = find_command(cmd_str);
1793 if (!cmd)
1794 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1796 if (!socket_permission_check (sock, cmd->cmd))
1797 return send_response(sock, RESP_ERR, "Permission denied.\n");
1799 if (!command_check_context(sock, cmd))
1800 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1802 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1803 } /* }}} int handle_request */
1805 static void journal_set_free (journal_set *js) /* {{{ */
1806 {
1807 if (js == NULL)
1808 return;
1810 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1812 free(js);
1813 } /* }}} journal_set_free */
1815 static void journal_set_remove (journal_set *js) /* {{{ */
1816 {
1817 if (js == NULL)
1818 return;
1820 for (uint i=0; i < js->files_num; i++)
1821 {
1822 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1823 unlink(js->files[i]);
1824 }
1825 } /* }}} journal_set_remove */
1827 /* close current journal file handle.
1828 * MUST hold journal_lock before calling */
1829 static void journal_close(void) /* {{{ */
1830 {
1831 if (journal_fh != NULL)
1832 {
1833 if (fclose(journal_fh) != 0)
1834 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1835 }
1837 journal_fh = NULL;
1838 journal_size = 0;
1839 } /* }}} journal_close */
1841 /* MUST hold journal_lock before calling */
1842 static void journal_new_file(void) /* {{{ */
1843 {
1844 struct timeval now;
1845 int new_fd;
1846 char new_file[PATH_MAX + 1];
1848 assert(journal_dir != NULL);
1849 assert(journal_cur != NULL);
1851 journal_close();
1853 gettimeofday(&now, NULL);
1854 /* this format assures that the files sort in strcmp() order */
1855 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1856 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1858 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1859 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1860 if (new_fd < 0)
1861 goto error;
1863 journal_fh = fdopen(new_fd, "a");
1864 if (journal_fh == NULL)
1865 goto error;
1867 journal_size = ftell(journal_fh);
1868 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1870 /* record the file in the journal set */
1871 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1873 return;
1875 error:
1876 RRDD_LOG(LOG_CRIT,
1877 "JOURNALING DISABLED: Error while trying to create %s : %s",
1878 new_file, rrd_strerror(errno));
1879 RRDD_LOG(LOG_CRIT,
1880 "JOURNALING DISABLED: All values will be flushed at shutdown");
1882 close(new_fd);
1883 config_flush_at_shutdown = 1;
1885 } /* }}} journal_new_file */
1887 /* MUST NOT hold journal_lock before calling this */
1888 static void journal_rotate(void) /* {{{ */
1889 {
1890 journal_set *old_js = NULL;
1892 if (journal_dir == NULL)
1893 return;
1895 RRDD_LOG(LOG_DEBUG, "rotating journals");
1897 pthread_mutex_lock(&stats_lock);
1898 ++stats_journal_rotate;
1899 pthread_mutex_unlock(&stats_lock);
1901 pthread_mutex_lock(&journal_lock);
1903 journal_close();
1905 /* rotate the journal sets */
1906 old_js = journal_old;
1907 journal_old = journal_cur;
1908 journal_cur = calloc(1, sizeof(journal_set));
1910 if (journal_cur != NULL)
1911 journal_new_file();
1912 else
1913 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1915 pthread_mutex_unlock(&journal_lock);
1917 journal_set_remove(old_js);
1918 journal_set_free (old_js);
1920 } /* }}} static void journal_rotate */
1922 /* MUST hold journal_lock when calling */
1923 static void journal_done(void) /* {{{ */
1924 {
1925 if (journal_cur == NULL)
1926 return;
1928 journal_close();
1930 if (config_flush_at_shutdown)
1931 {
1932 RRDD_LOG(LOG_INFO, "removing journals");
1933 journal_set_remove(journal_old);
1934 journal_set_remove(journal_cur);
1935 }
1936 else
1937 {
1938 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1939 "journals will be used at next startup");
1940 }
1942 journal_set_free(journal_cur);
1943 journal_set_free(journal_old);
1944 free(journal_dir);
1946 } /* }}} static void journal_done */
1948 static int journal_write(char *cmd, char *args) /* {{{ */
1949 {
1950 int chars;
1952 if (journal_fh == NULL)
1953 return 0;
1955 pthread_mutex_lock(&journal_lock);
1956 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1957 journal_size += chars;
1959 if (journal_size > JOURNAL_MAX)
1960 journal_new_file();
1962 pthread_mutex_unlock(&journal_lock);
1964 if (chars > 0)
1965 {
1966 pthread_mutex_lock(&stats_lock);
1967 stats_journal_bytes += chars;
1968 pthread_mutex_unlock(&stats_lock);
1969 }
1971 return chars;
1972 } /* }}} static int journal_write */
1974 static int journal_replay (const char *file) /* {{{ */
1975 {
1976 FILE *fh;
1977 int entry_cnt = 0;
1978 int fail_cnt = 0;
1979 uint64_t line = 0;
1980 char entry[CMD_MAX];
1981 time_t now;
1983 if (file == NULL) return 0;
1985 {
1986 char *reason = "unknown error";
1987 int status = 0;
1988 struct stat statbuf;
1990 memset(&statbuf, 0, sizeof(statbuf));
1991 if (stat(file, &statbuf) != 0)
1992 {
1993 reason = "stat error";
1994 status = errno;
1995 }
1996 else if (!S_ISREG(statbuf.st_mode))
1997 {
1998 reason = "not a regular file";
1999 status = EPERM;
2000 }
2001 if (statbuf.st_uid != daemon_uid)
2002 {
2003 reason = "not owned by daemon user";
2004 status = EACCES;
2005 }
2006 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2007 {
2008 reason = "must not be user/group writable";
2009 status = EACCES;
2010 }
2012 if (status != 0)
2013 {
2014 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2015 file, rrd_strerror(status), reason);
2016 return 0;
2017 }
2018 }
2020 fh = fopen(file, "r");
2021 if (fh == NULL)
2022 {
2023 if (errno != ENOENT)
2024 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2025 file, rrd_strerror(errno));
2026 return 0;
2027 }
2028 else
2029 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2031 now = time(NULL);
2033 while(!feof(fh))
2034 {
2035 size_t entry_len;
2037 ++line;
2038 if (fgets(entry, sizeof(entry), fh) == NULL)
2039 break;
2040 entry_len = strlen(entry);
2042 /* check \n termination in case journal writing crashed mid-line */
2043 if (entry_len == 0)
2044 continue;
2045 else if (entry[entry_len - 1] != '\n')
2046 {
2047 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2048 ++fail_cnt;
2049 continue;
2050 }
2052 entry[entry_len - 1] = '\0';
2054 if (handle_request(NULL, now, entry, entry_len) == 0)
2055 ++entry_cnt;
2056 else
2057 ++fail_cnt;
2058 }
2060 fclose(fh);
2062 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2063 entry_cnt, fail_cnt);
2065 return entry_cnt > 0 ? 1 : 0;
2066 } /* }}} static int journal_replay */
2068 static int journal_sort(const void *v1, const void *v2)
2069 {
2070 char **jn1 = (char **) v1;
2071 char **jn2 = (char **) v2;
2073 return strcmp(*jn1,*jn2);
2074 }
2076 static void journal_init(void) /* {{{ */
2077 {
2078 int had_journal = 0;
2079 DIR *dir;
2080 struct dirent *dent;
2081 char path[PATH_MAX+1];
2083 if (journal_dir == NULL) return;
2085 pthread_mutex_lock(&journal_lock);
2087 journal_cur = calloc(1, sizeof(journal_set));
2088 if (journal_cur == NULL)
2089 {
2090 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2091 return;
2092 }
2094 RRDD_LOG(LOG_INFO, "checking for journal files");
2096 /* Handle old journal files during transition. This gives them the
2097 * correct sort order. TODO: remove after first release
2098 */
2099 {
2100 char old_path[PATH_MAX+1];
2101 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2102 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2103 rename(old_path, path);
2105 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2106 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2107 rename(old_path, path);
2108 }
2110 dir = opendir(journal_dir);
2111 while ((dent = readdir(dir)) != NULL)
2112 {
2113 /* looks like a journal file? */
2114 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2115 continue;
2117 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2119 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2120 {
2121 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2122 dent->d_name);
2123 break;
2124 }
2125 }
2126 closedir(dir);
2128 qsort(journal_cur->files, journal_cur->files_num,
2129 sizeof(journal_cur->files[0]), journal_sort);
2131 for (uint i=0; i < journal_cur->files_num; i++)
2132 had_journal += journal_replay(journal_cur->files[i]);
2134 journal_new_file();
2136 /* it must have been a crash. start a flush */
2137 if (had_journal && config_flush_at_shutdown)
2138 flush_old_values(-1);
2140 pthread_mutex_unlock(&journal_lock);
2142 RRDD_LOG(LOG_INFO, "journal processing complete");
2144 } /* }}} static void journal_init */
2146 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2147 {
2148 assert(sock != NULL);
2150 free(sock->rbuf); sock->rbuf = NULL;
2151 free(sock->wbuf); sock->wbuf = NULL;
2152 free(sock);
2153 } /* }}} void free_listen_socket */
2155 static void close_connection(listen_socket_t *sock) /* {{{ */
2156 {
2157 if (sock->fd >= 0)
2158 {
2159 close(sock->fd);
2160 sock->fd = -1;
2161 }
2163 free_listen_socket(sock);
2165 } /* }}} void close_connection */
2167 static void *connection_thread_main (void *args) /* {{{ */
2168 {
2169 listen_socket_t *sock;
2170 int fd;
2172 sock = (listen_socket_t *) args;
2173 fd = sock->fd;
2175 /* init read buffers */
2176 sock->next_read = sock->next_cmd = 0;
2177 sock->rbuf = malloc(RBUF_SIZE);
2178 if (sock->rbuf == NULL)
2179 {
2180 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2181 close_connection(sock);
2182 return NULL;
2183 }
2185 pthread_mutex_lock (&connection_threads_lock);
2186 connection_threads_num++;
2187 pthread_mutex_unlock (&connection_threads_lock);
2189 while (state == RUNNING)
2190 {
2191 char *cmd;
2192 ssize_t cmd_len;
2193 ssize_t rbytes;
2194 time_t now;
2196 struct pollfd pollfd;
2197 int status;
2199 pollfd.fd = fd;
2200 pollfd.events = POLLIN | POLLPRI;
2201 pollfd.revents = 0;
2203 status = poll (&pollfd, 1, /* timeout = */ 500);
2204 if (state != RUNNING)
2205 break;
2206 else if (status == 0) /* timeout */
2207 continue;
2208 else if (status < 0) /* error */
2209 {
2210 status = errno;
2211 if (status != EINTR)
2212 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2213 continue;
2214 }
2216 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2217 break;
2218 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2219 {
2220 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2221 "poll(2) returned something unexpected: %#04hx",
2222 pollfd.revents);
2223 break;
2224 }
2226 rbytes = read(fd, sock->rbuf + sock->next_read,
2227 RBUF_SIZE - sock->next_read);
2228 if (rbytes < 0)
2229 {
2230 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2231 break;
2232 }
2233 else if (rbytes == 0)
2234 break; /* eof */
2236 sock->next_read += rbytes;
2238 if (sock->batch_start)
2239 now = sock->batch_start;
2240 else
2241 now = time(NULL);
2243 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2244 {
2245 status = handle_request (sock, now, cmd, cmd_len+1);
2246 if (status != 0)
2247 goto out_close;
2248 }
2249 }
2251 out_close:
2252 close_connection(sock);
2254 /* Remove this thread from the connection threads list */
2255 pthread_mutex_lock (&connection_threads_lock);
2256 connection_threads_num--;
2257 if (connection_threads_num <= 0)
2258 pthread_cond_broadcast(&connection_threads_done);
2259 pthread_mutex_unlock (&connection_threads_lock);
2261 return (NULL);
2262 } /* }}} void *connection_thread_main */
2264 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2265 {
2266 int fd;
2267 struct sockaddr_un sa;
2268 listen_socket_t *temp;
2269 int status;
2270 const char *path;
2271 char *path_copy, *dir;
2273 path = sock->addr;
2274 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2275 path += strlen("unix:");
2277 /* dirname may modify its argument */
2278 path_copy = strdup(path);
2279 if (path_copy == NULL)
2280 {
2281 fprintf(stderr, "rrdcached: strdup(): %s\n",
2282 rrd_strerror(errno));
2283 return (-1);
2284 }
2286 dir = dirname(path_copy);
2287 if (rrd_mkdir_p(dir, 0777) != 0)
2288 {
2289 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2290 dir, rrd_strerror(errno));
2291 return (-1);
2292 }
2294 free(path_copy);
2296 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2297 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2298 if (temp == NULL)
2299 {
2300 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2301 return (-1);
2302 }
2303 listen_fds = temp;
2304 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2306 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2307 if (fd < 0)
2308 {
2309 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2310 rrd_strerror(errno));
2311 return (-1);
2312 }
2314 memset (&sa, 0, sizeof (sa));
2315 sa.sun_family = AF_UNIX;
2316 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2318 /* if we've gotten this far, we own the pid file. any daemon started
2319 * with the same args must not be alive. therefore, ensure that we can
2320 * create the socket...
2321 */
2322 unlink(path);
2324 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2325 if (status != 0)
2326 {
2327 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2328 path, rrd_strerror(errno));
2329 close (fd);
2330 return (-1);
2331 }
2333 /* tweak the sockets group ownership */
2334 if (sock->socket_group != (gid_t)-1)
2335 {
2336 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2337 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2338 {
2339 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2340 }
2341 }
2343 if (sock->socket_permissions != (mode_t)-1)
2344 {
2345 if (chmod(path, sock->socket_permissions) != 0)
2346 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2347 (unsigned int)sock->socket_permissions, strerror(errno));
2348 }
2350 status = listen (fd, /* backlog = */ 10);
2351 if (status != 0)
2352 {
2353 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2354 path, rrd_strerror(errno));
2355 close (fd);
2356 unlink (path);
2357 return (-1);
2358 }
2360 listen_fds[listen_fds_num].fd = fd;
2361 listen_fds[listen_fds_num].family = PF_UNIX;
2362 strncpy(listen_fds[listen_fds_num].addr, path,
2363 sizeof (listen_fds[listen_fds_num].addr) - 1);
2364 listen_fds_num++;
2366 return (0);
2367 } /* }}} int open_listen_socket_unix */
2369 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2370 {
2371 struct addrinfo ai_hints;
2372 struct addrinfo *ai_res;
2373 struct addrinfo *ai_ptr;
2374 char addr_copy[NI_MAXHOST];
2375 char *addr;
2376 char *port;
2377 int status;
2379 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2380 addr_copy[sizeof (addr_copy) - 1] = 0;
2381 addr = addr_copy;
2383 memset (&ai_hints, 0, sizeof (ai_hints));
2384 ai_hints.ai_flags = 0;
2385 #ifdef AI_ADDRCONFIG
2386 ai_hints.ai_flags |= AI_ADDRCONFIG;
2387 #endif
2388 ai_hints.ai_family = AF_UNSPEC;
2389 ai_hints.ai_socktype = SOCK_STREAM;
2391 port = NULL;
2392 if (*addr == '[') /* IPv6+port format */
2393 {
2394 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2395 addr++;
2397 port = strchr (addr, ']');
2398 if (port == NULL)
2399 {
2400 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2401 return (-1);
2402 }
2403 *port = 0;
2404 port++;
2406 if (*port == ':')
2407 port++;
2408 else if (*port == 0)
2409 port = NULL;
2410 else
2411 {
2412 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2413 return (-1);
2414 }
2415 } /* if (*addr == '[') */
2416 else
2417 {
2418 port = rindex(addr, ':');
2419 if (port != NULL)
2420 {
2421 *port = 0;
2422 port++;
2423 }
2424 }
2425 ai_res = NULL;
2426 status = getaddrinfo (addr,
2427 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2428 &ai_hints, &ai_res);
2429 if (status != 0)
2430 {
2431 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2432 addr, gai_strerror (status));
2433 return (-1);
2434 }
2436 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2437 {
2438 int fd;
2439 listen_socket_t *temp;
2440 int one = 1;
2442 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2443 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2444 if (temp == NULL)
2445 {
2446 fprintf (stderr,
2447 "rrdcached: open_listen_socket_network: realloc failed.\n");
2448 continue;
2449 }
2450 listen_fds = temp;
2451 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2453 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2454 if (fd < 0)
2455 {
2456 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2457 rrd_strerror(errno));
2458 continue;
2459 }
2461 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2463 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2464 if (status != 0)
2465 {
2466 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2467 sock->addr, rrd_strerror(errno));
2468 close (fd);
2469 continue;
2470 }
2472 status = listen (fd, /* backlog = */ 10);
2473 if (status != 0)
2474 {
2475 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2476 sock->addr, rrd_strerror(errno));
2477 close (fd);
2478 freeaddrinfo(ai_res);
2479 return (-1);
2480 }
2482 listen_fds[listen_fds_num].fd = fd;
2483 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2484 listen_fds_num++;
2485 } /* for (ai_ptr) */
2487 freeaddrinfo(ai_res);
2488 return (0);
2489 } /* }}} static int open_listen_socket_network */
2491 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2492 {
2493 assert(sock != NULL);
2494 assert(sock->addr != NULL);
2496 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2497 || sock->addr[0] == '/')
2498 return (open_listen_socket_unix(sock));
2499 else
2500 return (open_listen_socket_network(sock));
2501 } /* }}} int open_listen_socket */
2503 static int close_listen_sockets (void) /* {{{ */
2504 {
2505 size_t i;
2507 for (i = 0; i < listen_fds_num; i++)
2508 {
2509 close (listen_fds[i].fd);
2511 if (listen_fds[i].family == PF_UNIX)
2512 unlink(listen_fds[i].addr);
2513 }
2515 free (listen_fds);
2516 listen_fds = NULL;
2517 listen_fds_num = 0;
2519 return (0);
2520 } /* }}} int close_listen_sockets */
2522 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2523 {
2524 struct pollfd *pollfds;
2525 int pollfds_num;
2526 int status;
2527 int i;
2529 if (listen_fds_num < 1)
2530 {
2531 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2532 return (NULL);
2533 }
2535 pollfds_num = listen_fds_num;
2536 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2537 if (pollfds == NULL)
2538 {
2539 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2540 return (NULL);
2541 }
2542 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2544 RRDD_LOG(LOG_INFO, "listening for connections");
2546 while (state == RUNNING)
2547 {
2548 for (i = 0; i < pollfds_num; i++)
2549 {
2550 pollfds[i].fd = listen_fds[i].fd;
2551 pollfds[i].events = POLLIN | POLLPRI;
2552 pollfds[i].revents = 0;
2553 }
2555 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2556 if (state != RUNNING)
2557 break;
2558 else if (status == 0) /* timeout */
2559 continue;
2560 else if (status < 0) /* error */
2561 {
2562 status = errno;
2563 if (status != EINTR)
2564 {
2565 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2566 }
2567 continue;
2568 }
2570 for (i = 0; i < pollfds_num; i++)
2571 {
2572 listen_socket_t *client_sock;
2573 struct sockaddr_storage client_sa;
2574 socklen_t client_sa_size;
2575 pthread_t tid;
2576 pthread_attr_t attr;
2578 if (pollfds[i].revents == 0)
2579 continue;
2581 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2582 {
2583 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2584 "poll(2) returned something unexpected for listen FD #%i.",
2585 pollfds[i].fd);
2586 continue;
2587 }
2589 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2590 if (client_sock == NULL)
2591 {
2592 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2593 continue;
2594 }
2595 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2597 client_sa_size = sizeof (client_sa);
2598 client_sock->fd = accept (pollfds[i].fd,
2599 (struct sockaddr *) &client_sa, &client_sa_size);
2600 if (client_sock->fd < 0)
2601 {
2602 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2603 free(client_sock);
2604 continue;
2605 }
2607 pthread_attr_init (&attr);
2608 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2610 status = pthread_create (&tid, &attr, connection_thread_main,
2611 client_sock);
2612 if (status != 0)
2613 {
2614 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2615 close_connection(client_sock);
2616 continue;
2617 }
2618 } /* for (pollfds_num) */
2619 } /* while (state == RUNNING) */
2621 RRDD_LOG(LOG_INFO, "starting shutdown");
2623 close_listen_sockets ();
2625 pthread_mutex_lock (&connection_threads_lock);
2626 while (connection_threads_num > 0)
2627 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2628 pthread_mutex_unlock (&connection_threads_lock);
2630 free(pollfds);
2632 return (NULL);
2633 } /* }}} void *listen_thread_main */
2635 static int daemonize (void) /* {{{ */
2636 {
2637 int pid_fd;
2638 char *base_dir;
2640 daemon_uid = geteuid();
2642 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2643 if (pid_fd < 0)
2644 pid_fd = check_pidfile();
2645 if (pid_fd < 0)
2646 return pid_fd;
2648 /* open all the listen sockets */
2649 if (config_listen_address_list_len > 0)
2650 {
2651 for (size_t i = 0; i < config_listen_address_list_len; i++)
2652 open_listen_socket (config_listen_address_list[i]);
2654 rrd_free_ptrs((void ***) &config_listen_address_list,
2655 &config_listen_address_list_len);
2656 }
2657 else
2658 {
2659 listen_socket_t sock;
2660 memset(&sock, 0, sizeof(sock));
2661 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2662 open_listen_socket (&sock);
2663 }
2665 if (listen_fds_num < 1)
2666 {
2667 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2668 goto error;
2669 }
2671 if (!stay_foreground)
2672 {
2673 pid_t child;
2675 child = fork ();
2676 if (child < 0)
2677 {
2678 fprintf (stderr, "daemonize: fork(2) failed.\n");
2679 goto error;
2680 }
2681 else if (child > 0)
2682 exit(0);
2684 /* Become session leader */
2685 setsid ();
2687 /* Open the first three file descriptors to /dev/null */
2688 close (2);
2689 close (1);
2690 close (0);
2692 open ("/dev/null", O_RDWR);
2693 if (dup(0) == -1 || dup(0) == -1){
2694 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2695 }
2696 } /* if (!stay_foreground) */
2698 /* Change into the /tmp directory. */
2699 base_dir = (config_base_dir != NULL)
2700 ? config_base_dir
2701 : "/tmp";
2703 if (chdir (base_dir) != 0)
2704 {
2705 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2706 goto error;
2707 }
2709 install_signal_handlers();
2711 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2712 RRDD_LOG(LOG_INFO, "starting up");
2714 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2715 (GDestroyNotify) free_cache_item);
2716 if (cache_tree == NULL)
2717 {
2718 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2719 goto error;
2720 }
2722 return write_pidfile (pid_fd);
2724 error:
2725 remove_pidfile();
2726 return -1;
2727 } /* }}} int daemonize */
2729 static int cleanup (void) /* {{{ */
2730 {
2731 pthread_cond_broadcast (&flush_cond);
2732 pthread_join (flush_thread, NULL);
2734 pthread_cond_broadcast (&queue_cond);
2735 for (int i = 0; i < config_queue_threads; i++)
2736 pthread_join (queue_threads[i], NULL);
2738 if (config_flush_at_shutdown)
2739 {
2740 assert(cache_queue_head == NULL);
2741 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2742 }
2744 free(queue_threads);
2745 free(config_base_dir);
2747 pthread_mutex_lock(&cache_lock);
2748 g_tree_destroy(cache_tree);
2750 pthread_mutex_lock(&journal_lock);
2751 journal_done();
2753 RRDD_LOG(LOG_INFO, "goodbye");
2754 closelog ();
2756 remove_pidfile ();
2757 free(config_pid_file);
2759 return (0);
2760 } /* }}} int cleanup */
2762 static int read_options (int argc, char **argv) /* {{{ */
2763 {
2764 int option;
2765 int status = 0;
2767 char **permissions = NULL;
2768 size_t permissions_len = 0;
2770 gid_t socket_group = (gid_t)-1;
2771 mode_t socket_permissions = (mode_t)-1;
2773 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2774 {
2775 switch (option)
2776 {
2777 case 'g':
2778 stay_foreground=1;
2779 break;
2781 case 'l':
2782 {
2783 listen_socket_t *new;
2785 new = malloc(sizeof(listen_socket_t));
2786 if (new == NULL)
2787 {
2788 fprintf(stderr, "read_options: malloc failed.\n");
2789 return(2);
2790 }
2791 memset(new, 0, sizeof(listen_socket_t));
2793 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2795 /* Add permissions to the socket {{{ */
2796 if (permissions_len != 0)
2797 {
2798 size_t i;
2799 for (i = 0; i < permissions_len; i++)
2800 {
2801 status = socket_permission_add (new, permissions[i]);
2802 if (status != 0)
2803 {
2804 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2805 "socket failed. Most likely, this permission doesn't "
2806 "exist. Check your command line.\n", permissions[i]);
2807 status = 4;
2808 }
2809 }
2810 }
2811 else /* if (permissions_len == 0) */
2812 {
2813 /* Add permission for ALL commands to the socket. */
2814 size_t i;
2815 for (i = 0; i < list_of_commands_len; i++)
2816 {
2817 status = socket_permission_add (new, list_of_commands[i].cmd);
2818 if (status != 0)
2819 {
2820 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2821 "socket failed. This should never happen, ever! Sorry.\n",
2822 permissions[i]);
2823 status = 4;
2824 }
2825 }
2826 }
2827 /* }}} Done adding permissions. */
2829 new->socket_group = socket_group;
2830 new->socket_permissions = socket_permissions;
2832 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2833 &config_listen_address_list_len, new))
2834 {
2835 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2836 return (2);
2837 }
2838 }
2839 break;
2841 /* set socket group permissions */
2842 case 's':
2843 {
2844 gid_t group_gid;
2845 struct group *grp;
2847 group_gid = strtoul(optarg, NULL, 10);
2848 if (errno != EINVAL && group_gid>0)
2849 {
2850 /* we were passed a number */
2851 grp = getgrgid(group_gid);
2852 }
2853 else
2854 {
2855 grp = getgrnam(optarg);
2856 }
2858 if (grp)
2859 {
2860 socket_group = grp->gr_gid;
2861 }
2862 else
2863 {
2864 /* no idea what the user wanted... */
2865 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2866 return (5);
2867 }
2868 }
2869 break;
2871 /* set socket file permissions */
2872 case 'm':
2873 {
2874 long tmp;
2875 char *endptr = NULL;
2877 tmp = strtol (optarg, &endptr, 8);
2878 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2879 || (tmp > 07777) || (tmp < 0)) {
2880 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2881 optarg);
2882 return (5);
2883 }
2885 socket_permissions = (mode_t)tmp;
2886 }
2887 break;
2889 case 'P':
2890 {
2891 char *optcopy;
2892 char *saveptr;
2893 char *dummy;
2894 char *ptr;
2896 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2898 optcopy = strdup (optarg);
2899 dummy = optcopy;
2900 saveptr = NULL;
2901 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2902 {
2903 dummy = NULL;
2904 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2905 }
2907 free (optcopy);
2908 }
2909 break;
2911 case 'f':
2912 {
2913 int temp;
2915 temp = atoi (optarg);
2916 if (temp > 0)
2917 config_flush_interval = temp;
2918 else
2919 {
2920 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2921 status = 3;
2922 }
2923 }
2924 break;
2926 case 'w':
2927 {
2928 int temp;
2930 temp = atoi (optarg);
2931 if (temp > 0)
2932 config_write_interval = temp;
2933 else
2934 {
2935 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2936 status = 2;
2937 }
2938 }
2939 break;
2941 case 'z':
2942 {
2943 int temp;
2945 temp = atoi(optarg);
2946 if (temp > 0)
2947 config_write_jitter = temp;
2948 else
2949 {
2950 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2951 status = 2;
2952 }
2954 break;
2955 }
2957 case 't':
2958 {
2959 int threads;
2960 threads = atoi(optarg);
2961 if (threads >= 1)
2962 config_queue_threads = threads;
2963 else
2964 {
2965 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2966 return 1;
2967 }
2968 }
2969 break;
2971 case 'B':
2972 config_write_base_only = 1;
2973 break;
2975 case 'b':
2976 {
2977 size_t len;
2978 char base_realpath[PATH_MAX];
2980 if (config_base_dir != NULL)
2981 free (config_base_dir);
2982 config_base_dir = strdup (optarg);
2983 if (config_base_dir == NULL)
2984 {
2985 fprintf (stderr, "read_options: strdup failed.\n");
2986 return (3);
2987 }
2989 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2990 {
2991 fprintf (stderr, "Failed to create base directory '%s': %s\n",
2992 config_base_dir, rrd_strerror (errno));
2993 return (3);
2994 }
2996 /* make sure that the base directory is not resolved via
2997 * symbolic links. this makes some performance-enhancing
2998 * assumptions possible (we don't have to resolve paths
2999 * that start with a "/")
3000 */
3001 if (realpath(config_base_dir, base_realpath) == NULL)
3002 {
3003 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3004 "%s\n", config_base_dir, rrd_strerror(errno));
3005 return 5;
3006 }
3008 len = strlen (config_base_dir);
3009 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3010 {
3011 config_base_dir[len - 1] = 0;
3012 len--;
3013 }
3015 if (len < 1)
3016 {
3017 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3018 return (4);
3019 }
3021 _config_base_dir_len = len;
3023 len = strlen (base_realpath);
3024 while ((len > 0) && (base_realpath[len - 1] == '/'))
3025 {
3026 base_realpath[len - 1] = '\0';
3027 len--;
3028 }
3030 if (strncmp(config_base_dir,
3031 base_realpath, sizeof(base_realpath)) != 0)
3032 {
3033 fprintf(stderr,
3034 "Base directory (-b) resolved via file system links!\n"
3035 "Please consult rrdcached '-b' documentation!\n"
3036 "Consider specifying the real directory (%s)\n",
3037 base_realpath);
3038 return 5;
3039 }
3040 }
3041 break;
3043 case 'p':
3044 {
3045 if (config_pid_file != NULL)
3046 free (config_pid_file);
3047 config_pid_file = strdup (optarg);
3048 if (config_pid_file == NULL)
3049 {
3050 fprintf (stderr, "read_options: strdup failed.\n");
3051 return (3);
3052 }
3053 }
3054 break;
3056 case 'F':
3057 config_flush_at_shutdown = 1;
3058 break;
3060 case 'j':
3061 {
3062 const char *dir = journal_dir = strdup(optarg);
3064 status = rrd_mkdir_p(dir, 0777);
3065 if (status != 0)
3066 {
3067 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3068 dir, rrd_strerror(errno));
3069 return 6;
3070 }
3072 if (access(dir, R_OK|W_OK|X_OK) != 0)
3073 {
3074 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3075 errno ? rrd_strerror(errno) : "");
3076 return 6;
3077 }
3078 }
3079 break;
3081 case 'h':
3082 case '?':
3083 printf ("RRDCacheD %s\n"
3084 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3085 "\n"
3086 "Usage: rrdcached [options]\n"
3087 "\n"
3088 "Valid options are:\n"
3089 " -l <address> Socket address to listen to.\n"
3090 " -P <perms> Sets the permissions to assign to all following "
3091 "sockets\n"
3092 " -w <seconds> Interval in which to write data.\n"
3093 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3094 " -t <threads> Number of write threads.\n"
3095 " -f <seconds> Interval in which to flush dead data.\n"
3096 " -p <file> Location of the PID-file.\n"
3097 " -b <dir> Base directory to change to.\n"
3098 " -B Restrict file access to paths within -b <dir>\n"
3099 " -g Do not fork and run in the foreground.\n"
3100 " -j <dir> Directory in which to create the journal files.\n"
3101 " -F Always flush all updates at shutdown\n"
3102 " -s <id|name> Make socket g+rw to named group\n"
3103 "\n"
3104 "For more information and a detailed description of all options "
3105 "please refer\n"
3106 "to the rrdcached(1) manual page.\n",
3107 VERSION);
3108 status = -1;
3109 break;
3110 } /* switch (option) */
3111 } /* while (getopt) */
3113 /* advise the user when values are not sane */
3114 if (config_flush_interval < 2 * config_write_interval)
3115 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3116 " 2x write interval (-w) !\n");
3117 if (config_write_jitter > config_write_interval)
3118 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3119 " write interval (-w) !\n");
3121 if (config_write_base_only && config_base_dir == NULL)
3122 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3123 " Consult the rrdcached documentation\n");
3125 if (journal_dir == NULL)
3126 config_flush_at_shutdown = 1;
3128 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3130 return (status);
3131 } /* }}} int read_options */
3133 int main (int argc, char **argv)
3134 {
3135 int status;
3137 status = read_options (argc, argv);
3138 if (status != 0)
3139 {
3140 if (status < 0)
3141 status = 0;
3142 return (status);
3143 }
3145 status = daemonize ();
3146 if (status != 0)
3147 {
3148 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3149 return (1);
3150 }
3152 journal_init();
3154 /* start the queue threads */
3155 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3156 if (queue_threads == NULL)
3157 {
3158 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3159 cleanup();
3160 return (1);
3161 }
3162 for (int i = 0; i < config_queue_threads; i++)
3163 {
3164 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3165 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3166 if (status != 0)
3167 {
3168 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3169 cleanup();
3170 return (1);
3171 }
3172 }
3174 /* start the flush thread */
3175 memset(&flush_thread, 0, sizeof(flush_thread));
3176 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3177 if (status != 0)
3178 {
3179 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3180 cleanup();
3181 return (1);
3182 }
3184 listen_thread_main (NULL);
3185 cleanup ();
3187 return (0);
3188 } /* int main */
3190 /*
3191 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3192 */