1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116 do { \
117 if (stay_foreground) { \
118 fprintf(stderr, __VA_ARGS__); \
119 fprintf(stderr, "\n"); } \
120 syslog ((severity), __VA_ARGS__); \
121 } while (0)
123 /*
124 * Types
125 */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
129 {
130 int fd;
131 char addr[PATH_MAX + 1];
132 int family;
134 /* state for BATCH processing */
135 time_t batch_start;
136 int batch_cmd;
138 /* buffered IO */
139 char *rbuf;
140 off_t next_cmd;
141 off_t next_read;
143 char *wbuf;
144 ssize_t wbuf_len;
146 uint32_t permissions;
148 gid_t socket_group;
149 mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
157 time_t UNUSED(now),\
158 char UNUSED(*buffer),\
159 size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO command_t UNUSED(*cmd),\
162 DISPATCH_PROTO
164 struct command_s {
165 char *cmd;
166 int (*handler)(HANDLER_PROTO);
168 char context; /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT (1<<0)
170 #define CMD_CONTEXT_BATCH (1<<1)
171 #define CMD_CONTEXT_JOURNAL (1<<2)
172 #define CMD_CONTEXT_ANY (0x7f)
174 char *syntax;
175 char *help;
176 };
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182 char *file;
183 char **values;
184 size_t values_num;
185 time_t last_flush_time;
186 time_t last_update_stamp;
187 #define CI_FLAGS_IN_TREE (1<<0)
188 #define CI_FLAGS_IN_QUEUE (1<<1)
189 int flags;
190 pthread_cond_t flushed;
191 cache_item_t *prev;
192 cache_item_t *next;
193 };
195 struct callback_flush_data_s
196 {
197 time_t now;
198 time_t abs_timeout;
199 char **keys;
200 size_t keys_num;
201 };
202 typedef struct callback_flush_data_s callback_flush_data_t;
204 enum queue_side_e
205 {
206 HEAD,
207 TAIL
208 };
209 typedef enum queue_side_e queue_side_t;
211 /* describe a set of journal files */
212 typedef struct {
213 char **files;
214 size_t files_num;
215 } journal_set;
217 /* max length of socket command or response */
218 #define CMD_MAX 4096
219 #define RBUF_SIZE (CMD_MAX*2)
221 /*
222 * Variables
223 */
224 static int stay_foreground = 0;
225 static uid_t daemon_uid;
227 static listen_socket_t *listen_fds = NULL;
228 static size_t listen_fds_num = 0;
230 enum {
231 RUNNING, /* normal operation */
232 FLUSHING, /* flushing remaining values */
233 SHUTDOWN /* shutting down */
234 } state = RUNNING;
236 static pthread_t *queue_threads;
237 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
238 static int config_queue_threads = 4;
240 static pthread_t flush_thread;
241 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
243 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
244 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
245 static int connection_threads_num = 0;
247 /* Cache stuff */
248 static GTree *cache_tree = NULL;
249 static cache_item_t *cache_queue_head = NULL;
250 static cache_item_t *cache_queue_tail = NULL;
251 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
253 static int config_write_interval = 300;
254 static int config_write_jitter = 0;
255 static int config_flush_interval = 3600;
256 static int config_flush_at_shutdown = 0;
257 static char *config_pid_file = NULL;
258 static char *config_base_dir = NULL;
259 static size_t _config_base_dir_len = 0;
260 static int config_write_base_only = 0;
262 static listen_socket_t **config_listen_address_list = NULL;
263 static size_t config_listen_address_list_len = 0;
265 static uint64_t stats_queue_length = 0;
266 static uint64_t stats_updates_received = 0;
267 static uint64_t stats_flush_received = 0;
268 static uint64_t stats_updates_written = 0;
269 static uint64_t stats_data_sets_written = 0;
270 static uint64_t stats_journal_bytes = 0;
271 static uint64_t stats_journal_rotate = 0;
272 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
274 /* Journaled updates */
275 #define JOURNAL_REPLAY(s) ((s) == NULL)
276 #define JOURNAL_BASE "rrd.journal"
277 static journal_set *journal_cur = NULL;
278 static journal_set *journal_old = NULL;
279 static char *journal_dir = NULL;
280 static FILE *journal_fh = NULL; /* current journal file handle */
281 static long journal_size = 0; /* current journal size */
282 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
283 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
284 static int journal_write(char *cmd, char *args);
285 static void journal_done(void);
286 static void journal_rotate(void);
288 /* prototypes for forward refernces */
289 static int handle_request_help (HANDLER_PROTO);
291 /*
292 * Functions
293 */
294 static void sig_common (const char *sig) /* {{{ */
295 {
296 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
297 state = FLUSHING;
298 pthread_cond_broadcast(&flush_cond);
299 pthread_cond_broadcast(&queue_cond);
300 } /* }}} void sig_common */
302 static void sig_int_handler (int UNUSED(s)) /* {{{ */
303 {
304 sig_common("INT");
305 } /* }}} void sig_int_handler */
307 static void sig_term_handler (int UNUSED(s)) /* {{{ */
308 {
309 sig_common("TERM");
310 } /* }}} void sig_term_handler */
312 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
313 {
314 config_flush_at_shutdown = 1;
315 sig_common("USR1");
316 } /* }}} void sig_usr1_handler */
318 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
319 {
320 config_flush_at_shutdown = 0;
321 sig_common("USR2");
322 } /* }}} void sig_usr2_handler */
324 static void install_signal_handlers(void) /* {{{ */
325 {
326 /* These structures are static, because `sigaction' behaves weird if the are
327 * overwritten.. */
328 static struct sigaction sa_int;
329 static struct sigaction sa_term;
330 static struct sigaction sa_pipe;
331 static struct sigaction sa_usr1;
332 static struct sigaction sa_usr2;
334 /* Install signal handlers */
335 memset (&sa_int, 0, sizeof (sa_int));
336 sa_int.sa_handler = sig_int_handler;
337 sigaction (SIGINT, &sa_int, NULL);
339 memset (&sa_term, 0, sizeof (sa_term));
340 sa_term.sa_handler = sig_term_handler;
341 sigaction (SIGTERM, &sa_term, NULL);
343 memset (&sa_pipe, 0, sizeof (sa_pipe));
344 sa_pipe.sa_handler = SIG_IGN;
345 sigaction (SIGPIPE, &sa_pipe, NULL);
347 memset (&sa_pipe, 0, sizeof (sa_usr1));
348 sa_usr1.sa_handler = sig_usr1_handler;
349 sigaction (SIGUSR1, &sa_usr1, NULL);
351 memset (&sa_usr2, 0, sizeof (sa_usr2));
352 sa_usr2.sa_handler = sig_usr2_handler;
353 sigaction (SIGUSR2, &sa_usr2, NULL);
355 } /* }}} void install_signal_handlers */
357 static int open_pidfile(char *action, int oflag) /* {{{ */
358 {
359 int fd;
360 const char *file;
361 char *file_copy, *dir;
363 file = (config_pid_file != NULL)
364 ? config_pid_file
365 : LOCALSTATEDIR "/run/rrdcached.pid";
367 /* dirname may modify its argument */
368 file_copy = strdup(file);
369 if (file_copy == NULL)
370 {
371 fprintf(stderr, "rrdcached: strdup(): %s\n",
372 rrd_strerror(errno));
373 return -1;
374 }
376 dir = dirname(file_copy);
377 if (rrd_mkdir_p(dir, 0777) != 0)
378 {
379 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
380 dir, rrd_strerror(errno));
381 return -1;
382 }
384 free(file_copy);
386 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
387 if (fd < 0)
388 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
389 action, file, rrd_strerror(errno));
391 return(fd);
392 } /* }}} static int open_pidfile */
394 /* check existing pid file to see whether a daemon is running */
395 static int check_pidfile(void)
396 {
397 int pid_fd;
398 pid_t pid;
399 char pid_str[16];
401 pid_fd = open_pidfile("open", O_RDWR);
402 if (pid_fd < 0)
403 return pid_fd;
405 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
406 return -1;
408 pid = atoi(pid_str);
409 if (pid <= 0)
410 return -1;
412 /* another running process that we can signal COULD be
413 * a competing rrdcached */
414 if (pid != getpid() && kill(pid, 0) == 0)
415 {
416 fprintf(stderr,
417 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
418 close(pid_fd);
419 return -1;
420 }
422 lseek(pid_fd, 0, SEEK_SET);
423 if (ftruncate(pid_fd, 0) == -1)
424 {
425 fprintf(stderr,
426 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
427 close(pid_fd);
428 return -1;
429 }
431 fprintf(stderr,
432 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
433 "rrdcached: starting normally.\n", pid);
435 return pid_fd;
436 } /* }}} static int check_pidfile */
438 static int write_pidfile (int fd) /* {{{ */
439 {
440 pid_t pid;
441 FILE *fh;
443 pid = getpid ();
445 fh = fdopen (fd, "w");
446 if (fh == NULL)
447 {
448 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
449 close(fd);
450 return (-1);
451 }
453 fprintf (fh, "%i\n", (int) pid);
454 fclose (fh);
456 return (0);
457 } /* }}} int write_pidfile */
459 static int remove_pidfile (void) /* {{{ */
460 {
461 char *file;
462 int status;
464 file = (config_pid_file != NULL)
465 ? config_pid_file
466 : LOCALSTATEDIR "/run/rrdcached.pid";
468 status = unlink (file);
469 if (status == 0)
470 return (0);
471 return (errno);
472 } /* }}} int remove_pidfile */
474 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
475 {
476 char *eol;
478 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
479 sock->next_read - sock->next_cmd);
481 if (eol == NULL)
482 {
483 /* no commands left, move remainder back to front of rbuf */
484 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
485 sock->next_read - sock->next_cmd);
486 sock->next_read -= sock->next_cmd;
487 sock->next_cmd = 0;
488 *len = 0;
489 return NULL;
490 }
491 else
492 {
493 char *cmd = sock->rbuf + sock->next_cmd;
494 *eol = '\0';
496 sock->next_cmd = eol - sock->rbuf + 1;
498 if (eol > sock->rbuf && *(eol-1) == '\r')
499 *(--eol) = '\0'; /* handle "\r\n" EOL */
501 *len = eol - cmd;
503 return cmd;
504 }
506 /* NOTREACHED */
507 assert(1==0);
508 } /* }}} char *next_cmd */
510 /* add the characters directly to the write buffer */
511 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
512 {
513 char *new_buf;
515 assert(sock != NULL);
517 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
518 if (new_buf == NULL)
519 {
520 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
521 return -1;
522 }
524 strncpy(new_buf + sock->wbuf_len, str, len + 1);
526 sock->wbuf = new_buf;
527 sock->wbuf_len += len;
529 return 0;
530 } /* }}} static int add_to_wbuf */
532 /* add the text to the "extra" info that's sent after the status line */
533 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
534 {
535 va_list argp;
536 char buffer[CMD_MAX];
537 int len;
539 if (JOURNAL_REPLAY(sock)) return 0;
540 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
542 va_start(argp, fmt);
543 #ifdef HAVE_VSNPRINTF
544 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
545 #else
546 len = vsprintf(buffer, fmt, argp);
547 #endif
548 va_end(argp);
549 if (len < 0)
550 {
551 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
552 return -1;
553 }
555 return add_to_wbuf(sock, buffer, len);
556 } /* }}} static int add_response_info */
558 static int count_lines(char *str) /* {{{ */
559 {
560 int lines = 0;
562 if (str != NULL)
563 {
564 while ((str = strchr(str, '\n')) != NULL)
565 {
566 ++lines;
567 ++str;
568 }
569 }
571 return lines;
572 } /* }}} static int count_lines */
574 /* send the response back to the user.
575 * returns 0 on success, -1 on error
576 * write buffer is always zeroed after this call */
577 static int send_response (listen_socket_t *sock, response_code rc,
578 char *fmt, ...) /* {{{ */
579 {
580 va_list argp;
581 char buffer[CMD_MAX];
582 int lines;
583 ssize_t wrote;
584 int rclen, len;
586 if (JOURNAL_REPLAY(sock)) return rc;
588 if (sock->batch_start)
589 {
590 if (rc == RESP_OK)
591 return rc; /* no response on success during BATCH */
592 lines = sock->batch_cmd;
593 }
594 else if (rc == RESP_OK)
595 lines = count_lines(sock->wbuf);
596 else
597 lines = -1;
599 rclen = sprintf(buffer, "%d ", lines);
600 va_start(argp, fmt);
601 #ifdef HAVE_VSNPRINTF
602 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
603 #else
604 len = vsprintf(buffer+rclen, fmt, argp);
605 #endif
606 va_end(argp);
607 if (len < 0)
608 return -1;
610 len += rclen;
612 /* append the result to the wbuf, don't write to the user */
613 if (sock->batch_start)
614 return add_to_wbuf(sock, buffer, len);
616 /* first write must be complete */
617 if (len != write(sock->fd, buffer, len))
618 {
619 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
620 return -1;
621 }
623 if (sock->wbuf != NULL && rc == RESP_OK)
624 {
625 wrote = 0;
626 while (wrote < sock->wbuf_len)
627 {
628 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
629 if (wb <= 0)
630 {
631 RRDD_LOG(LOG_INFO, "send_response: could not write results");
632 return -1;
633 }
634 wrote += wb;
635 }
636 }
638 free(sock->wbuf); sock->wbuf = NULL;
639 sock->wbuf_len = 0;
641 return 0;
642 } /* }}} */
644 static void wipe_ci_values(cache_item_t *ci, time_t when)
645 {
646 ci->values = NULL;
647 ci->values_num = 0;
649 ci->last_flush_time = when;
650 if (config_write_jitter > 0)
651 ci->last_flush_time += (rrd_random() % config_write_jitter);
652 }
654 /* remove_from_queue
655 * remove a "cache_item_t" item from the queue.
656 * must hold 'cache_lock' when calling this
657 */
658 static void remove_from_queue(cache_item_t *ci) /* {{{ */
659 {
660 if (ci == NULL) return;
661 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
663 if (ci->prev == NULL)
664 cache_queue_head = ci->next; /* reset head */
665 else
666 ci->prev->next = ci->next;
668 if (ci->next == NULL)
669 cache_queue_tail = ci->prev; /* reset the tail */
670 else
671 ci->next->prev = ci->prev;
673 ci->next = ci->prev = NULL;
674 ci->flags &= ~CI_FLAGS_IN_QUEUE;
676 pthread_mutex_lock (&stats_lock);
677 assert (stats_queue_length > 0);
678 stats_queue_length--;
679 pthread_mutex_unlock (&stats_lock);
681 } /* }}} static void remove_from_queue */
683 /* free the resources associated with the cache_item_t
684 * must hold cache_lock when calling this function
685 */
686 static void *free_cache_item(cache_item_t *ci) /* {{{ */
687 {
688 if (ci == NULL) return NULL;
690 remove_from_queue(ci);
692 for (size_t i=0; i < ci->values_num; i++)
693 free(ci->values[i]);
695 free (ci->values);
696 free (ci->file);
698 /* in case anyone is waiting */
699 pthread_cond_broadcast(&ci->flushed);
700 pthread_cond_destroy(&ci->flushed);
702 free (ci);
704 return NULL;
705 } /* }}} static void *free_cache_item */
707 /*
708 * enqueue_cache_item:
709 * `cache_lock' must be acquired before calling this function!
710 */
711 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
712 queue_side_t side)
713 {
714 if (ci == NULL)
715 return (-1);
717 if (ci->values_num == 0)
718 return (0);
720 if (side == HEAD)
721 {
722 if (cache_queue_head == ci)
723 return 0;
725 /* remove if further down in queue */
726 remove_from_queue(ci);
728 ci->prev = NULL;
729 ci->next = cache_queue_head;
730 if (ci->next != NULL)
731 ci->next->prev = ci;
732 cache_queue_head = ci;
734 if (cache_queue_tail == NULL)
735 cache_queue_tail = cache_queue_head;
736 }
737 else /* (side == TAIL) */
738 {
739 /* We don't move values back in the list.. */
740 if (ci->flags & CI_FLAGS_IN_QUEUE)
741 return (0);
743 assert (ci->next == NULL);
744 assert (ci->prev == NULL);
746 ci->prev = cache_queue_tail;
748 if (cache_queue_tail == NULL)
749 cache_queue_head = ci;
750 else
751 cache_queue_tail->next = ci;
753 cache_queue_tail = ci;
754 }
756 ci->flags |= CI_FLAGS_IN_QUEUE;
758 pthread_cond_signal(&queue_cond);
759 pthread_mutex_lock (&stats_lock);
760 stats_queue_length++;
761 pthread_mutex_unlock (&stats_lock);
763 return (0);
764 } /* }}} int enqueue_cache_item */
766 /*
767 * tree_callback_flush:
768 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
769 * while this is in progress.
770 */
771 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
772 gpointer data)
773 {
774 cache_item_t *ci;
775 callback_flush_data_t *cfd;
777 ci = (cache_item_t *) value;
778 cfd = (callback_flush_data_t *) data;
780 if (ci->flags & CI_FLAGS_IN_QUEUE)
781 return FALSE;
783 if (ci->values_num > 0
784 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
785 {
786 enqueue_cache_item (ci, TAIL);
787 }
788 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
789 && (ci->values_num <= 0))
790 {
791 assert ((char *) key == ci->file);
792 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
793 {
794 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
795 return (FALSE);
796 }
797 }
799 return (FALSE);
800 } /* }}} gboolean tree_callback_flush */
802 static int flush_old_values (int max_age)
803 {
804 callback_flush_data_t cfd;
805 size_t k;
807 memset (&cfd, 0, sizeof (cfd));
808 /* Pass the current time as user data so that we don't need to call
809 * `time' for each node. */
810 cfd.now = time (NULL);
811 cfd.keys = NULL;
812 cfd.keys_num = 0;
814 if (max_age > 0)
815 cfd.abs_timeout = cfd.now - max_age;
816 else
817 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
819 /* `tree_callback_flush' will return the keys of all values that haven't
820 * been touched in the last `config_flush_interval' seconds in `cfd'.
821 * The char*'s in this array point to the same memory as ci->file, so we
822 * don't need to free them separately. */
823 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
825 for (k = 0; k < cfd.keys_num; k++)
826 {
827 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
828 /* should never fail, since we have held the cache_lock
829 * the entire time */
830 assert(status == TRUE);
831 }
833 if (cfd.keys != NULL)
834 {
835 free (cfd.keys);
836 cfd.keys = NULL;
837 }
839 return (0);
840 } /* int flush_old_values */
842 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
843 {
844 struct timeval now;
845 struct timespec next_flush;
846 int status;
848 gettimeofday (&now, NULL);
849 next_flush.tv_sec = now.tv_sec + config_flush_interval;
850 next_flush.tv_nsec = 1000 * now.tv_usec;
852 pthread_mutex_lock(&cache_lock);
854 while (state == RUNNING)
855 {
856 gettimeofday (&now, NULL);
857 if ((now.tv_sec > next_flush.tv_sec)
858 || ((now.tv_sec == next_flush.tv_sec)
859 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
860 {
861 RRDD_LOG(LOG_DEBUG, "flushing old values");
863 /* Determine the time of the next cache flush. */
864 next_flush.tv_sec = now.tv_sec + config_flush_interval;
866 /* Flush all values that haven't been written in the last
867 * `config_write_interval' seconds. */
868 flush_old_values (config_write_interval);
870 /* unlock the cache while we rotate so we don't block incoming
871 * updates if the fsync() blocks on disk I/O */
872 pthread_mutex_unlock(&cache_lock);
873 journal_rotate();
874 pthread_mutex_lock(&cache_lock);
875 }
877 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
878 if (status != 0 && status != ETIMEDOUT)
879 {
880 RRDD_LOG (LOG_ERR, "flush_thread_main: "
881 "pthread_cond_timedwait returned %i.", status);
882 }
883 }
885 if (config_flush_at_shutdown)
886 flush_old_values (-1); /* flush everything */
888 state = SHUTDOWN;
890 pthread_mutex_unlock(&cache_lock);
892 return NULL;
893 } /* void *flush_thread_main */
895 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
896 {
897 pthread_mutex_lock (&cache_lock);
899 while (state != SHUTDOWN
900 || (cache_queue_head != NULL && config_flush_at_shutdown))
901 {
902 cache_item_t *ci;
903 char *file;
904 char **values;
905 size_t values_num;
906 int status;
908 /* Now, check if there's something to store away. If not, wait until
909 * something comes in. */
910 if (cache_queue_head == NULL)
911 {
912 status = pthread_cond_wait (&queue_cond, &cache_lock);
913 if ((status != 0) && (status != ETIMEDOUT))
914 {
915 RRDD_LOG (LOG_ERR, "queue_thread_main: "
916 "pthread_cond_wait returned %i.", status);
917 }
918 }
920 /* Check if a value has arrived. This may be NULL if we timed out or there
921 * was an interrupt such as a signal. */
922 if (cache_queue_head == NULL)
923 continue;
925 ci = cache_queue_head;
927 /* copy the relevant parts */
928 file = strdup (ci->file);
929 if (file == NULL)
930 {
931 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
932 continue;
933 }
935 assert(ci->values != NULL);
936 assert(ci->values_num > 0);
938 values = ci->values;
939 values_num = ci->values_num;
941 wipe_ci_values(ci, time(NULL));
942 remove_from_queue(ci);
944 pthread_mutex_unlock (&cache_lock);
946 rrd_clear_error ();
947 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
948 if (status != 0)
949 {
950 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
951 "rrd_update_r (%s) failed with status %i. (%s)",
952 file, status, rrd_get_error());
953 }
955 journal_write("wrote", file);
957 /* Search again in the tree. It's possible someone issued a "FORGET"
958 * while we were writing the update values. */
959 pthread_mutex_lock(&cache_lock);
960 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
961 if (ci)
962 pthread_cond_broadcast(&ci->flushed);
963 pthread_mutex_unlock(&cache_lock);
965 if (status == 0)
966 {
967 pthread_mutex_lock (&stats_lock);
968 stats_updates_written++;
969 stats_data_sets_written += values_num;
970 pthread_mutex_unlock (&stats_lock);
971 }
973 rrd_free_ptrs((void ***) &values, &values_num);
974 free(file);
976 pthread_mutex_lock (&cache_lock);
977 }
978 pthread_mutex_unlock (&cache_lock);
980 return (NULL);
981 } /* }}} void *queue_thread_main */
983 static int buffer_get_field (char **buffer_ret, /* {{{ */
984 size_t *buffer_size_ret, char **field_ret)
985 {
986 char *buffer;
987 size_t buffer_pos;
988 size_t buffer_size;
989 char *field;
990 size_t field_size;
991 int status;
993 buffer = *buffer_ret;
994 buffer_pos = 0;
995 buffer_size = *buffer_size_ret;
996 field = *buffer_ret;
997 field_size = 0;
999 if (buffer_size <= 0)
1000 return (-1);
1002 /* This is ensured by `handle_request'. */
1003 assert (buffer[buffer_size - 1] == '\0');
1005 status = -1;
1006 while (buffer_pos < buffer_size)
1007 {
1008 /* Check for end-of-field or end-of-buffer */
1009 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1010 {
1011 field[field_size] = 0;
1012 field_size++;
1013 buffer_pos++;
1014 status = 0;
1015 break;
1016 }
1017 /* Handle escaped characters. */
1018 else if (buffer[buffer_pos] == '\\')
1019 {
1020 if (buffer_pos >= (buffer_size - 1))
1021 break;
1022 buffer_pos++;
1023 field[field_size] = buffer[buffer_pos];
1024 field_size++;
1025 buffer_pos++;
1026 }
1027 /* Normal operation */
1028 else
1029 {
1030 field[field_size] = buffer[buffer_pos];
1031 field_size++;
1032 buffer_pos++;
1033 }
1034 } /* while (buffer_pos < buffer_size) */
1036 if (status != 0)
1037 return (status);
1039 *buffer_ret = buffer + buffer_pos;
1040 *buffer_size_ret = buffer_size - buffer_pos;
1041 *field_ret = field;
1043 return (0);
1044 } /* }}} int buffer_get_field */
1046 /* if we're restricting writes to the base directory,
1047 * check whether the file falls within the dir
1048 * returns 1 if OK, otherwise 0
1049 */
1050 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1051 {
1052 assert(file != NULL);
1054 if (!config_write_base_only
1055 || JOURNAL_REPLAY(sock)
1056 || config_base_dir == NULL)
1057 return 1;
1059 if (strstr(file, "../") != NULL) goto err;
1061 /* relative paths without "../" are ok */
1062 if (*file != '/') return 1;
1064 /* file must be of the format base + "/" + <1+ char filename> */
1065 if (strlen(file) < _config_base_dir_len + 2) goto err;
1066 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1067 if (*(file + _config_base_dir_len) != '/') goto err;
1069 return 1;
1071 err:
1072 if (sock != NULL && sock->fd >= 0)
1073 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1075 return 0;
1076 } /* }}} static int check_file_access */
1078 /* when using a base dir, convert relative paths to absolute paths.
1079 * if necessary, modifies the "filename" pointer to point
1080 * to the new path created in "tmp". "tmp" is provided
1081 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1082 *
1083 * this allows us to optimize for the expected case (absolute path)
1084 * with a no-op.
1085 */
1086 static void get_abs_path(char **filename, char *tmp)
1087 {
1088 assert(tmp != NULL);
1089 assert(filename != NULL && *filename != NULL);
1091 if (config_base_dir == NULL || **filename == '/')
1092 return;
1094 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1095 *filename = tmp;
1096 } /* }}} static int get_abs_path */
1098 static int flush_file (const char *filename) /* {{{ */
1099 {
1100 cache_item_t *ci;
1102 pthread_mutex_lock (&cache_lock);
1104 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1105 if (ci == NULL)
1106 {
1107 pthread_mutex_unlock (&cache_lock);
1108 return (ENOENT);
1109 }
1111 if (ci->values_num > 0)
1112 {
1113 /* Enqueue at head */
1114 enqueue_cache_item (ci, HEAD);
1115 pthread_cond_wait(&ci->flushed, &cache_lock);
1116 }
1118 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1119 * may have been purged during our cond_wait() */
1121 pthread_mutex_unlock(&cache_lock);
1123 return (0);
1124 } /* }}} int flush_file */
1126 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1127 {
1128 char *err = "Syntax error.\n";
1130 if (cmd && cmd->syntax)
1131 err = cmd->syntax;
1133 return send_response(sock, RESP_ERR, "Usage: %s", err);
1134 } /* }}} static int syntax_error() */
1136 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1137 {
1138 uint64_t copy_queue_length;
1139 uint64_t copy_updates_received;
1140 uint64_t copy_flush_received;
1141 uint64_t copy_updates_written;
1142 uint64_t copy_data_sets_written;
1143 uint64_t copy_journal_bytes;
1144 uint64_t copy_journal_rotate;
1146 uint64_t tree_nodes_number;
1147 uint64_t tree_depth;
1149 pthread_mutex_lock (&stats_lock);
1150 copy_queue_length = stats_queue_length;
1151 copy_updates_received = stats_updates_received;
1152 copy_flush_received = stats_flush_received;
1153 copy_updates_written = stats_updates_written;
1154 copy_data_sets_written = stats_data_sets_written;
1155 copy_journal_bytes = stats_journal_bytes;
1156 copy_journal_rotate = stats_journal_rotate;
1157 pthread_mutex_unlock (&stats_lock);
1159 pthread_mutex_lock (&cache_lock);
1160 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1161 tree_depth = (uint64_t) g_tree_height (cache_tree);
1162 pthread_mutex_unlock (&cache_lock);
1164 add_response_info(sock,
1165 "QueueLength: %"PRIu64"\n", copy_queue_length);
1166 add_response_info(sock,
1167 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1168 add_response_info(sock,
1169 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1170 add_response_info(sock,
1171 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1172 add_response_info(sock,
1173 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1174 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1175 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1176 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1177 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1179 send_response(sock, RESP_OK, "Statistics follow\n");
1181 return (0);
1182 } /* }}} int handle_request_stats */
1184 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1185 {
1186 char *file, file_tmp[PATH_MAX];
1187 int status;
1189 status = buffer_get_field (&buffer, &buffer_size, &file);
1190 if (status != 0)
1191 {
1192 return syntax_error(sock,cmd);
1193 }
1194 else
1195 {
1196 pthread_mutex_lock(&stats_lock);
1197 stats_flush_received++;
1198 pthread_mutex_unlock(&stats_lock);
1200 get_abs_path(&file, file_tmp);
1201 if (!check_file_access(file, sock)) return 0;
1203 status = flush_file (file);
1204 if (status == 0)
1205 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1206 else if (status == ENOENT)
1207 {
1208 /* no file in our tree; see whether it exists at all */
1209 struct stat statbuf;
1211 memset(&statbuf, 0, sizeof(statbuf));
1212 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1213 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1214 else
1215 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1216 }
1217 else if (status < 0)
1218 return send_response(sock, RESP_ERR, "Internal error.\n");
1219 else
1220 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1221 }
1223 /* NOTREACHED */
1224 assert(1==0);
1225 } /* }}} int handle_request_flush */
1227 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1228 {
1229 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1231 pthread_mutex_lock(&cache_lock);
1232 flush_old_values(-1);
1233 pthread_mutex_unlock(&cache_lock);
1235 return send_response(sock, RESP_OK, "Started flush.\n");
1236 } /* }}} static int handle_request_flushall */
1238 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1239 {
1240 int status;
1241 char *file, file_tmp[PATH_MAX];
1242 cache_item_t *ci;
1244 status = buffer_get_field(&buffer, &buffer_size, &file);
1245 if (status != 0)
1246 return syntax_error(sock,cmd);
1248 get_abs_path(&file, file_tmp);
1250 pthread_mutex_lock(&cache_lock);
1251 ci = g_tree_lookup(cache_tree, file);
1252 if (ci == NULL)
1253 {
1254 pthread_mutex_unlock(&cache_lock);
1255 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1256 }
1258 for (size_t i=0; i < ci->values_num; i++)
1259 add_response_info(sock, "%s\n", ci->values[i]);
1261 pthread_mutex_unlock(&cache_lock);
1262 return send_response(sock, RESP_OK, "updates pending\n");
1263 } /* }}} static int handle_request_pending */
1265 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1266 {
1267 int status;
1268 gboolean found;
1269 char *file, file_tmp[PATH_MAX];
1271 status = buffer_get_field(&buffer, &buffer_size, &file);
1272 if (status != 0)
1273 return syntax_error(sock,cmd);
1275 get_abs_path(&file, file_tmp);
1276 if (!check_file_access(file, sock)) return 0;
1278 pthread_mutex_lock(&cache_lock);
1279 found = g_tree_remove(cache_tree, file);
1280 pthread_mutex_unlock(&cache_lock);
1282 if (found == TRUE)
1283 {
1284 if (!JOURNAL_REPLAY(sock))
1285 journal_write("forget", file);
1287 return send_response(sock, RESP_OK, "Gone!\n");
1288 }
1289 else
1290 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1292 /* NOTREACHED */
1293 assert(1==0);
1294 } /* }}} static int handle_request_forget */
1296 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1297 {
1298 cache_item_t *ci;
1300 pthread_mutex_lock(&cache_lock);
1302 ci = cache_queue_head;
1303 while (ci != NULL)
1304 {
1305 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1306 ci = ci->next;
1307 }
1309 pthread_mutex_unlock(&cache_lock);
1311 return send_response(sock, RESP_OK, "in queue.\n");
1312 } /* }}} int handle_request_queue */
1314 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1315 {
1316 char *file, file_tmp[PATH_MAX];
1317 int values_num = 0;
1318 int status;
1319 char orig_buf[CMD_MAX];
1321 cache_item_t *ci;
1323 /* save it for the journal later */
1324 if (!JOURNAL_REPLAY(sock))
1325 strncpy(orig_buf, buffer, buffer_size);
1327 status = buffer_get_field (&buffer, &buffer_size, &file);
1328 if (status != 0)
1329 return syntax_error(sock,cmd);
1331 pthread_mutex_lock(&stats_lock);
1332 stats_updates_received++;
1333 pthread_mutex_unlock(&stats_lock);
1335 get_abs_path(&file, file_tmp);
1336 if (!check_file_access(file, sock)) return 0;
1338 pthread_mutex_lock (&cache_lock);
1339 ci = g_tree_lookup (cache_tree, file);
1341 if (ci == NULL) /* {{{ */
1342 {
1343 struct stat statbuf;
1344 cache_item_t *tmp;
1346 /* don't hold the lock while we setup; stat(2) might block */
1347 pthread_mutex_unlock(&cache_lock);
1349 memset (&statbuf, 0, sizeof (statbuf));
1350 status = stat (file, &statbuf);
1351 if (status != 0)
1352 {
1353 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1355 status = errno;
1356 if (status == ENOENT)
1357 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1358 else
1359 return send_response(sock, RESP_ERR,
1360 "stat failed with error %i.\n", status);
1361 }
1362 if (!S_ISREG (statbuf.st_mode))
1363 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1365 if (access(file, R_OK|W_OK) != 0)
1366 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1367 file, rrd_strerror(errno));
1369 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1370 if (ci == NULL)
1371 {
1372 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1374 return send_response(sock, RESP_ERR, "malloc failed.\n");
1375 }
1376 memset (ci, 0, sizeof (cache_item_t));
1378 ci->file = strdup (file);
1379 if (ci->file == NULL)
1380 {
1381 free (ci);
1382 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1384 return send_response(sock, RESP_ERR, "strdup failed.\n");
1385 }
1387 wipe_ci_values(ci, now);
1388 ci->flags = CI_FLAGS_IN_TREE;
1389 pthread_cond_init(&ci->flushed, NULL);
1391 pthread_mutex_lock(&cache_lock);
1393 /* another UPDATE might have added this entry in the meantime */
1394 tmp = g_tree_lookup (cache_tree, file);
1395 if (tmp == NULL)
1396 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1397 else
1398 {
1399 free_cache_item (ci);
1400 ci = tmp;
1401 }
1403 /* state may have changed while we were unlocked */
1404 if (state == SHUTDOWN)
1405 return -1;
1406 } /* }}} */
1407 assert (ci != NULL);
1409 /* don't re-write updates in replay mode */
1410 if (!JOURNAL_REPLAY(sock))
1411 journal_write("update", orig_buf);
1413 while (buffer_size > 0)
1414 {
1415 char *value;
1416 time_t stamp;
1417 char *eostamp;
1419 status = buffer_get_field (&buffer, &buffer_size, &value);
1420 if (status != 0)
1421 {
1422 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1423 break;
1424 }
1426 /* make sure update time is always moving forward */
1427 stamp = strtol(value, &eostamp, 10);
1428 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1429 {
1430 pthread_mutex_unlock(&cache_lock);
1431 return send_response(sock, RESP_ERR,
1432 "Cannot find timestamp in '%s'!\n", value);
1433 }
1434 else if (stamp <= ci->last_update_stamp)
1435 {
1436 pthread_mutex_unlock(&cache_lock);
1437 return send_response(sock, RESP_ERR,
1438 "illegal attempt to update using time %ld when last"
1439 " update time is %ld (minimum one second step)\n",
1440 stamp, ci->last_update_stamp);
1441 }
1442 else
1443 ci->last_update_stamp = stamp;
1445 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1446 {
1447 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1448 continue;
1449 }
1451 values_num++;
1452 }
1454 if (((now - ci->last_flush_time) >= config_write_interval)
1455 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1456 && (ci->values_num > 0))
1457 {
1458 enqueue_cache_item (ci, TAIL);
1459 }
1461 pthread_mutex_unlock (&cache_lock);
1463 if (values_num < 1)
1464 return send_response(sock, RESP_ERR, "No values updated.\n");
1465 else
1466 return send_response(sock, RESP_OK,
1467 "errors, enqueued %i value(s).\n", values_num);
1469 /* NOTREACHED */
1470 assert(1==0);
1472 } /* }}} int handle_request_update */
1474 /* we came across a "WROTE" entry during journal replay.
1475 * throw away any values that we have accumulated for this file
1476 */
1477 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1478 {
1479 cache_item_t *ci;
1480 const char *file = buffer;
1482 pthread_mutex_lock(&cache_lock);
1484 ci = g_tree_lookup(cache_tree, file);
1485 if (ci == NULL)
1486 {
1487 pthread_mutex_unlock(&cache_lock);
1488 return (0);
1489 }
1491 if (ci->values)
1492 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1494 wipe_ci_values(ci, now);
1495 remove_from_queue(ci);
1497 pthread_mutex_unlock(&cache_lock);
1498 return (0);
1499 } /* }}} int handle_request_wrote */
1501 /* start "BATCH" processing */
1502 static int batch_start (HANDLER_PROTO) /* {{{ */
1503 {
1504 int status;
1505 if (sock->batch_start)
1506 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1508 status = send_response(sock, RESP_OK,
1509 "Go ahead. End with dot '.' on its own line.\n");
1510 sock->batch_start = time(NULL);
1511 sock->batch_cmd = 0;
1513 return status;
1514 } /* }}} static int batch_start */
1516 /* finish "BATCH" processing and return results to the client */
1517 static int batch_done (HANDLER_PROTO) /* {{{ */
1518 {
1519 assert(sock->batch_start);
1520 sock->batch_start = 0;
1521 sock->batch_cmd = 0;
1522 return send_response(sock, RESP_OK, "errors\n");
1523 } /* }}} static int batch_done */
1525 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1526 {
1527 return -1;
1528 } /* }}} static int handle_request_quit */
1530 static command_t list_of_commands[] = { /* {{{ */
1531 {
1532 "UPDATE",
1533 handle_request_update,
1534 CMD_CONTEXT_ANY,
1535 "UPDATE <filename> <values> [<values> ...]\n"
1536 ,
1537 "Adds the given file to the internal cache if it is not yet known and\n"
1538 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1539 "for details.\n"
1540 "\n"
1541 "Each <values> has the following form:\n"
1542 " <values> = <time>:<value>[:<value>[...]]\n"
1543 "See the rrdupdate(1) manpage for details.\n"
1544 },
1545 {
1546 "WROTE",
1547 handle_request_wrote,
1548 CMD_CONTEXT_JOURNAL,
1549 NULL,
1550 NULL
1551 },
1552 {
1553 "FLUSH",
1554 handle_request_flush,
1555 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1556 "FLUSH <filename>\n"
1557 ,
1558 "Adds the given filename to the head of the update queue and returns\n"
1559 "after it has been dequeued.\n"
1560 },
1561 {
1562 "FLUSHALL",
1563 handle_request_flushall,
1564 CMD_CONTEXT_CLIENT,
1565 "FLUSHALL\n"
1566 ,
1567 "Triggers writing of all pending updates. Returns immediately.\n"
1568 },
1569 {
1570 "PENDING",
1571 handle_request_pending,
1572 CMD_CONTEXT_CLIENT,
1573 "PENDING <filename>\n"
1574 ,
1575 "Shows any 'pending' updates for a file, in order.\n"
1576 "The updates shown have not yet been written to the underlying RRD file.\n"
1577 },
1578 {
1579 "FORGET",
1580 handle_request_forget,
1581 CMD_CONTEXT_ANY,
1582 "FORGET <filename>\n"
1583 ,
1584 "Removes the file completely from the cache.\n"
1585 "Any pending updates for the file will be lost.\n"
1586 },
1587 {
1588 "QUEUE",
1589 handle_request_queue,
1590 CMD_CONTEXT_CLIENT,
1591 "QUEUE\n"
1592 ,
1593 "Shows all files in the output queue.\n"
1594 "The output is zero or more lines in the following format:\n"
1595 "(where <num_vals> is the number of values to be written)\n"
1596 "\n"
1597 "<num_vals> <filename>\n"
1598 },
1599 {
1600 "STATS",
1601 handle_request_stats,
1602 CMD_CONTEXT_CLIENT,
1603 "STATS\n"
1604 ,
1605 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1606 "a description of the values.\n"
1607 },
1608 {
1609 "HELP",
1610 handle_request_help,
1611 CMD_CONTEXT_CLIENT,
1612 "HELP [<command>]\n",
1613 NULL, /* special! */
1614 },
1615 {
1616 "BATCH",
1617 batch_start,
1618 CMD_CONTEXT_CLIENT,
1619 "BATCH\n"
1620 ,
1621 "The 'BATCH' command permits the client to initiate a bulk load\n"
1622 " of commands to rrdcached.\n"
1623 "\n"
1624 "Usage:\n"
1625 "\n"
1626 " client: BATCH\n"
1627 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1628 " client: command #1\n"
1629 " client: command #2\n"
1630 " client: ... and so on\n"
1631 " client: .\n"
1632 " server: 2 errors\n"
1633 " server: 7 message for command #7\n"
1634 " server: 9 message for command #9\n"
1635 "\n"
1636 "For more information, consult the rrdcached(1) documentation.\n"
1637 },
1638 {
1639 ".", /* BATCH terminator */
1640 batch_done,
1641 CMD_CONTEXT_BATCH,
1642 NULL,
1643 NULL
1644 },
1645 {
1646 "QUIT",
1647 handle_request_quit,
1648 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1649 "QUIT\n"
1650 ,
1651 "Disconnect from rrdcached.\n"
1652 }
1653 }; /* }}} command_t list_of_commands[] */
1654 static size_t list_of_commands_len = sizeof (list_of_commands)
1655 / sizeof (list_of_commands[0]);
1657 static command_t *find_command(char *cmd)
1658 {
1659 size_t i;
1661 for (i = 0; i < list_of_commands_len; i++)
1662 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1663 return (&list_of_commands[i]);
1664 return NULL;
1665 }
1667 /* We currently use the index in the `list_of_commands' array as a bit position
1668 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1669 * outside these functions so that switching to a more elegant storage method
1670 * is easily possible. */
1671 static ssize_t find_command_index (const char *cmd) /* {{{ */
1672 {
1673 size_t i;
1675 for (i = 0; i < list_of_commands_len; i++)
1676 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1677 return ((ssize_t) i);
1678 return (-1);
1679 } /* }}} ssize_t find_command_index */
1681 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1682 const char *cmd)
1683 {
1684 ssize_t i;
1686 if (JOURNAL_REPLAY(sock))
1687 return (1);
1689 if (cmd == NULL)
1690 return (-1);
1692 if ((strcasecmp ("QUIT", cmd) == 0)
1693 || (strcasecmp ("HELP", cmd) == 0))
1694 return (1);
1695 else if (strcmp (".", cmd) == 0)
1696 cmd = "BATCH";
1698 i = find_command_index (cmd);
1699 if (i < 0)
1700 return (-1);
1701 assert (i < 32);
1703 if ((sock->permissions & (1 << i)) != 0)
1704 return (1);
1705 return (0);
1706 } /* }}} int socket_permission_check */
1708 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1709 const char *cmd)
1710 {
1711 ssize_t i;
1713 i = find_command_index (cmd);
1714 if (i < 0)
1715 return (-1);
1716 assert (i < 32);
1718 sock->permissions |= (1 << i);
1719 return (0);
1720 } /* }}} int socket_permission_add */
1722 /* check whether commands are received in the expected context */
1723 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1724 {
1725 if (JOURNAL_REPLAY(sock))
1726 return (cmd->context & CMD_CONTEXT_JOURNAL);
1727 else if (sock->batch_start)
1728 return (cmd->context & CMD_CONTEXT_BATCH);
1729 else
1730 return (cmd->context & CMD_CONTEXT_CLIENT);
1732 /* NOTREACHED */
1733 assert(1==0);
1734 }
1736 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1737 {
1738 int status;
1739 char *cmd_str;
1740 char *resp_txt;
1741 command_t *help = NULL;
1743 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1744 if (status == 0)
1745 help = find_command(cmd_str);
1747 if (help && (help->syntax || help->help))
1748 {
1749 char tmp[CMD_MAX];
1751 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1752 resp_txt = tmp;
1754 if (help->syntax)
1755 add_response_info(sock, "Usage: %s\n", help->syntax);
1757 if (help->help)
1758 add_response_info(sock, "%s\n", help->help);
1759 }
1760 else
1761 {
1762 size_t i;
1764 resp_txt = "Command overview\n";
1766 for (i = 0; i < list_of_commands_len; i++)
1767 {
1768 if (list_of_commands[i].syntax == NULL)
1769 continue;
1770 add_response_info (sock, "%s", list_of_commands[i].syntax);
1771 }
1772 }
1774 return send_response(sock, RESP_OK, resp_txt);
1775 } /* }}} int handle_request_help */
1777 static int handle_request (DISPATCH_PROTO) /* {{{ */
1778 {
1779 char *buffer_ptr = buffer;
1780 char *cmd_str = NULL;
1781 command_t *cmd = NULL;
1782 int status;
1784 assert (buffer[buffer_size - 1] == '\0');
1786 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1787 if (status != 0)
1788 {
1789 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1790 return (-1);
1791 }
1793 if (sock != NULL && sock->batch_start)
1794 sock->batch_cmd++;
1796 cmd = find_command(cmd_str);
1797 if (!cmd)
1798 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1800 if (!socket_permission_check (sock, cmd->cmd))
1801 return send_response(sock, RESP_ERR, "Permission denied.\n");
1803 if (!command_check_context(sock, cmd))
1804 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1806 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1807 } /* }}} int handle_request */
1809 static void journal_set_free (journal_set *js) /* {{{ */
1810 {
1811 if (js == NULL)
1812 return;
1814 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1816 free(js);
1817 } /* }}} journal_set_free */
1819 static void journal_set_remove (journal_set *js) /* {{{ */
1820 {
1821 if (js == NULL)
1822 return;
1824 for (uint i=0; i < js->files_num; i++)
1825 {
1826 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1827 unlink(js->files[i]);
1828 }
1829 } /* }}} journal_set_remove */
1831 /* close current journal file handle.
1832 * MUST hold journal_lock before calling */
1833 static void journal_close(void) /* {{{ */
1834 {
1835 if (journal_fh != NULL)
1836 {
1837 if (fclose(journal_fh) != 0)
1838 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1839 }
1841 journal_fh = NULL;
1842 journal_size = 0;
1843 } /* }}} journal_close */
1845 /* MUST hold journal_lock before calling */
1846 static void journal_new_file(void) /* {{{ */
1847 {
1848 struct timeval now;
1849 int new_fd;
1850 char new_file[PATH_MAX + 1];
1852 assert(journal_dir != NULL);
1853 assert(journal_cur != NULL);
1855 journal_close();
1857 gettimeofday(&now, NULL);
1858 /* this format assures that the files sort in strcmp() order */
1859 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1860 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1862 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1863 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1864 if (new_fd < 0)
1865 goto error;
1867 journal_fh = fdopen(new_fd, "a");
1868 if (journal_fh == NULL)
1869 goto error;
1871 journal_size = ftell(journal_fh);
1872 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1874 /* record the file in the journal set */
1875 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1877 return;
1879 error:
1880 RRDD_LOG(LOG_CRIT,
1881 "JOURNALING DISABLED: Error while trying to create %s : %s",
1882 new_file, rrd_strerror(errno));
1883 RRDD_LOG(LOG_CRIT,
1884 "JOURNALING DISABLED: All values will be flushed at shutdown");
1886 close(new_fd);
1887 config_flush_at_shutdown = 1;
1889 } /* }}} journal_new_file */
1891 /* MUST NOT hold journal_lock before calling this */
1892 static void journal_rotate(void) /* {{{ */
1893 {
1894 journal_set *old_js = NULL;
1896 if (journal_dir == NULL)
1897 return;
1899 RRDD_LOG(LOG_DEBUG, "rotating journals");
1901 pthread_mutex_lock(&stats_lock);
1902 ++stats_journal_rotate;
1903 pthread_mutex_unlock(&stats_lock);
1905 pthread_mutex_lock(&journal_lock);
1907 journal_close();
1909 /* rotate the journal sets */
1910 old_js = journal_old;
1911 journal_old = journal_cur;
1912 journal_cur = calloc(1, sizeof(journal_set));
1914 if (journal_cur != NULL)
1915 journal_new_file();
1916 else
1917 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1919 pthread_mutex_unlock(&journal_lock);
1921 journal_set_remove(old_js);
1922 journal_set_free (old_js);
1924 } /* }}} static void journal_rotate */
1926 /* MUST hold journal_lock when calling */
1927 static void journal_done(void) /* {{{ */
1928 {
1929 if (journal_cur == NULL)
1930 return;
1932 journal_close();
1934 if (config_flush_at_shutdown)
1935 {
1936 RRDD_LOG(LOG_INFO, "removing journals");
1937 journal_set_remove(journal_old);
1938 journal_set_remove(journal_cur);
1939 }
1940 else
1941 {
1942 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1943 "journals will be used at next startup");
1944 }
1946 journal_set_free(journal_cur);
1947 journal_set_free(journal_old);
1948 free(journal_dir);
1950 } /* }}} static void journal_done */
1952 static int journal_write(char *cmd, char *args) /* {{{ */
1953 {
1954 int chars;
1956 if (journal_fh == NULL)
1957 return 0;
1959 pthread_mutex_lock(&journal_lock);
1960 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1961 journal_size += chars;
1963 if (journal_size > JOURNAL_MAX)
1964 journal_new_file();
1966 pthread_mutex_unlock(&journal_lock);
1968 if (chars > 0)
1969 {
1970 pthread_mutex_lock(&stats_lock);
1971 stats_journal_bytes += chars;
1972 pthread_mutex_unlock(&stats_lock);
1973 }
1975 return chars;
1976 } /* }}} static int journal_write */
1978 static int journal_replay (const char *file) /* {{{ */
1979 {
1980 FILE *fh;
1981 int entry_cnt = 0;
1982 int fail_cnt = 0;
1983 uint64_t line = 0;
1984 char entry[CMD_MAX];
1985 time_t now;
1987 if (file == NULL) return 0;
1989 {
1990 char *reason = "unknown error";
1991 int status = 0;
1992 struct stat statbuf;
1994 memset(&statbuf, 0, sizeof(statbuf));
1995 if (stat(file, &statbuf) != 0)
1996 {
1997 reason = "stat error";
1998 status = errno;
1999 }
2000 else if (!S_ISREG(statbuf.st_mode))
2001 {
2002 reason = "not a regular file";
2003 status = EPERM;
2004 }
2005 if (statbuf.st_uid != daemon_uid)
2006 {
2007 reason = "not owned by daemon user";
2008 status = EACCES;
2009 }
2010 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2011 {
2012 reason = "must not be user/group writable";
2013 status = EACCES;
2014 }
2016 if (status != 0)
2017 {
2018 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2019 file, rrd_strerror(status), reason);
2020 return 0;
2021 }
2022 }
2024 fh = fopen(file, "r");
2025 if (fh == NULL)
2026 {
2027 if (errno != ENOENT)
2028 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2029 file, rrd_strerror(errno));
2030 return 0;
2031 }
2032 else
2033 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2035 now = time(NULL);
2037 while(!feof(fh))
2038 {
2039 size_t entry_len;
2041 ++line;
2042 if (fgets(entry, sizeof(entry), fh) == NULL)
2043 break;
2044 entry_len = strlen(entry);
2046 /* check \n termination in case journal writing crashed mid-line */
2047 if (entry_len == 0)
2048 continue;
2049 else if (entry[entry_len - 1] != '\n')
2050 {
2051 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2052 ++fail_cnt;
2053 continue;
2054 }
2056 entry[entry_len - 1] = '\0';
2058 if (handle_request(NULL, now, entry, entry_len) == 0)
2059 ++entry_cnt;
2060 else
2061 ++fail_cnt;
2062 }
2064 fclose(fh);
2066 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2067 entry_cnt, fail_cnt);
2069 return entry_cnt > 0 ? 1 : 0;
2070 } /* }}} static int journal_replay */
2072 static int journal_sort(const void *v1, const void *v2)
2073 {
2074 char **jn1 = (char **) v1;
2075 char **jn2 = (char **) v2;
2077 return strcmp(*jn1,*jn2);
2078 }
2080 static void journal_init(void) /* {{{ */
2081 {
2082 int had_journal = 0;
2083 DIR *dir;
2084 struct dirent *dent;
2085 char path[PATH_MAX+1];
2087 if (journal_dir == NULL) return;
2089 pthread_mutex_lock(&journal_lock);
2091 journal_cur = calloc(1, sizeof(journal_set));
2092 if (journal_cur == NULL)
2093 {
2094 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2095 return;
2096 }
2098 RRDD_LOG(LOG_INFO, "checking for journal files");
2100 /* Handle old journal files during transition. This gives them the
2101 * correct sort order. TODO: remove after first release
2102 */
2103 {
2104 char old_path[PATH_MAX+1];
2105 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2106 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2107 rename(old_path, path);
2109 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2110 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2111 rename(old_path, path);
2112 }
2114 dir = opendir(journal_dir);
2115 if (!dir) {
2116 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2117 return;
2118 }
2119 while ((dent = readdir(dir)) != NULL)
2120 {
2121 /* looks like a journal file? */
2122 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2123 continue;
2125 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2127 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2128 {
2129 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2130 dent->d_name);
2131 break;
2132 }
2133 }
2134 closedir(dir);
2136 qsort(journal_cur->files, journal_cur->files_num,
2137 sizeof(journal_cur->files[0]), journal_sort);
2139 for (uint i=0; i < journal_cur->files_num; i++)
2140 had_journal += journal_replay(journal_cur->files[i]);
2142 journal_new_file();
2144 /* it must have been a crash. start a flush */
2145 if (had_journal && config_flush_at_shutdown)
2146 flush_old_values(-1);
2148 pthread_mutex_unlock(&journal_lock);
2150 RRDD_LOG(LOG_INFO, "journal processing complete");
2152 } /* }}} static void journal_init */
2154 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2155 {
2156 assert(sock != NULL);
2158 free(sock->rbuf); sock->rbuf = NULL;
2159 free(sock->wbuf); sock->wbuf = NULL;
2160 free(sock);
2161 } /* }}} void free_listen_socket */
2163 static void close_connection(listen_socket_t *sock) /* {{{ */
2164 {
2165 if (sock->fd >= 0)
2166 {
2167 close(sock->fd);
2168 sock->fd = -1;
2169 }
2171 free_listen_socket(sock);
2173 } /* }}} void close_connection */
2175 static void *connection_thread_main (void *args) /* {{{ */
2176 {
2177 listen_socket_t *sock;
2178 int fd;
2180 sock = (listen_socket_t *) args;
2181 fd = sock->fd;
2183 /* init read buffers */
2184 sock->next_read = sock->next_cmd = 0;
2185 sock->rbuf = malloc(RBUF_SIZE);
2186 if (sock->rbuf == NULL)
2187 {
2188 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2189 close_connection(sock);
2190 return NULL;
2191 }
2193 pthread_mutex_lock (&connection_threads_lock);
2194 connection_threads_num++;
2195 pthread_mutex_unlock (&connection_threads_lock);
2197 while (state == RUNNING)
2198 {
2199 char *cmd;
2200 ssize_t cmd_len;
2201 ssize_t rbytes;
2202 time_t now;
2204 struct pollfd pollfd;
2205 int status;
2207 pollfd.fd = fd;
2208 pollfd.events = POLLIN | POLLPRI;
2209 pollfd.revents = 0;
2211 status = poll (&pollfd, 1, /* timeout = */ 500);
2212 if (state != RUNNING)
2213 break;
2214 else if (status == 0) /* timeout */
2215 continue;
2216 else if (status < 0) /* error */
2217 {
2218 status = errno;
2219 if (status != EINTR)
2220 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2221 continue;
2222 }
2224 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2225 break;
2226 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2227 {
2228 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2229 "poll(2) returned something unexpected: %#04hx",
2230 pollfd.revents);
2231 break;
2232 }
2234 rbytes = read(fd, sock->rbuf + sock->next_read,
2235 RBUF_SIZE - sock->next_read);
2236 if (rbytes < 0)
2237 {
2238 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2239 break;
2240 }
2241 else if (rbytes == 0)
2242 break; /* eof */
2244 sock->next_read += rbytes;
2246 if (sock->batch_start)
2247 now = sock->batch_start;
2248 else
2249 now = time(NULL);
2251 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2252 {
2253 status = handle_request (sock, now, cmd, cmd_len+1);
2254 if (status != 0)
2255 goto out_close;
2256 }
2257 }
2259 out_close:
2260 close_connection(sock);
2262 /* Remove this thread from the connection threads list */
2263 pthread_mutex_lock (&connection_threads_lock);
2264 connection_threads_num--;
2265 if (connection_threads_num <= 0)
2266 pthread_cond_broadcast(&connection_threads_done);
2267 pthread_mutex_unlock (&connection_threads_lock);
2269 return (NULL);
2270 } /* }}} void *connection_thread_main */
2272 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2273 {
2274 int fd;
2275 struct sockaddr_un sa;
2276 listen_socket_t *temp;
2277 int status;
2278 const char *path;
2279 char *path_copy, *dir;
2281 path = sock->addr;
2282 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2283 path += strlen("unix:");
2285 /* dirname may modify its argument */
2286 path_copy = strdup(path);
2287 if (path_copy == NULL)
2288 {
2289 fprintf(stderr, "rrdcached: strdup(): %s\n",
2290 rrd_strerror(errno));
2291 return (-1);
2292 }
2294 dir = dirname(path_copy);
2295 if (rrd_mkdir_p(dir, 0777) != 0)
2296 {
2297 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2298 dir, rrd_strerror(errno));
2299 return (-1);
2300 }
2302 free(path_copy);
2304 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2305 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2306 if (temp == NULL)
2307 {
2308 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2309 return (-1);
2310 }
2311 listen_fds = temp;
2312 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2314 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2315 if (fd < 0)
2316 {
2317 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2318 rrd_strerror(errno));
2319 return (-1);
2320 }
2322 memset (&sa, 0, sizeof (sa));
2323 sa.sun_family = AF_UNIX;
2324 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2326 /* if we've gotten this far, we own the pid file. any daemon started
2327 * with the same args must not be alive. therefore, ensure that we can
2328 * create the socket...
2329 */
2330 unlink(path);
2332 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2333 if (status != 0)
2334 {
2335 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2336 path, rrd_strerror(errno));
2337 close (fd);
2338 return (-1);
2339 }
2341 /* tweak the sockets group ownership */
2342 if (sock->socket_group != (gid_t)-1)
2343 {
2344 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2345 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2346 {
2347 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2348 }
2349 }
2351 if (sock->socket_permissions != (mode_t)-1)
2352 {
2353 if (chmod(path, sock->socket_permissions) != 0)
2354 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2355 (unsigned int)sock->socket_permissions, strerror(errno));
2356 }
2358 status = listen (fd, /* backlog = */ 10);
2359 if (status != 0)
2360 {
2361 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2362 path, rrd_strerror(errno));
2363 close (fd);
2364 unlink (path);
2365 return (-1);
2366 }
2368 listen_fds[listen_fds_num].fd = fd;
2369 listen_fds[listen_fds_num].family = PF_UNIX;
2370 strncpy(listen_fds[listen_fds_num].addr, path,
2371 sizeof (listen_fds[listen_fds_num].addr) - 1);
2372 listen_fds_num++;
2374 return (0);
2375 } /* }}} int open_listen_socket_unix */
2377 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2378 {
2379 struct addrinfo ai_hints;
2380 struct addrinfo *ai_res;
2381 struct addrinfo *ai_ptr;
2382 char addr_copy[NI_MAXHOST];
2383 char *addr;
2384 char *port;
2385 int status;
2387 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2388 addr_copy[sizeof (addr_copy) - 1] = 0;
2389 addr = addr_copy;
2391 memset (&ai_hints, 0, sizeof (ai_hints));
2392 ai_hints.ai_flags = 0;
2393 #ifdef AI_ADDRCONFIG
2394 ai_hints.ai_flags |= AI_ADDRCONFIG;
2395 #endif
2396 ai_hints.ai_family = AF_UNSPEC;
2397 ai_hints.ai_socktype = SOCK_STREAM;
2399 port = NULL;
2400 if (*addr == '[') /* IPv6+port format */
2401 {
2402 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2403 addr++;
2405 port = strchr (addr, ']');
2406 if (port == NULL)
2407 {
2408 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2409 return (-1);
2410 }
2411 *port = 0;
2412 port++;
2414 if (*port == ':')
2415 port++;
2416 else if (*port == 0)
2417 port = NULL;
2418 else
2419 {
2420 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2421 return (-1);
2422 }
2423 } /* if (*addr == '[') */
2424 else
2425 {
2426 port = rindex(addr, ':');
2427 if (port != NULL)
2428 {
2429 *port = 0;
2430 port++;
2431 }
2432 }
2433 ai_res = NULL;
2434 status = getaddrinfo (addr,
2435 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2436 &ai_hints, &ai_res);
2437 if (status != 0)
2438 {
2439 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2440 addr, gai_strerror (status));
2441 return (-1);
2442 }
2444 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2445 {
2446 int fd;
2447 listen_socket_t *temp;
2448 int one = 1;
2450 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2451 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2452 if (temp == NULL)
2453 {
2454 fprintf (stderr,
2455 "rrdcached: open_listen_socket_network: realloc failed.\n");
2456 continue;
2457 }
2458 listen_fds = temp;
2459 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2461 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2462 if (fd < 0)
2463 {
2464 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2465 rrd_strerror(errno));
2466 continue;
2467 }
2469 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2471 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2472 if (status != 0)
2473 {
2474 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2475 sock->addr, rrd_strerror(errno));
2476 close (fd);
2477 continue;
2478 }
2480 status = listen (fd, /* backlog = */ 10);
2481 if (status != 0)
2482 {
2483 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2484 sock->addr, rrd_strerror(errno));
2485 close (fd);
2486 freeaddrinfo(ai_res);
2487 return (-1);
2488 }
2490 listen_fds[listen_fds_num].fd = fd;
2491 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2492 listen_fds_num++;
2493 } /* for (ai_ptr) */
2495 freeaddrinfo(ai_res);
2496 return (0);
2497 } /* }}} static int open_listen_socket_network */
2499 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2500 {
2501 assert(sock != NULL);
2502 assert(sock->addr != NULL);
2504 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2505 || sock->addr[0] == '/')
2506 return (open_listen_socket_unix(sock));
2507 else
2508 return (open_listen_socket_network(sock));
2509 } /* }}} int open_listen_socket */
2511 static int close_listen_sockets (void) /* {{{ */
2512 {
2513 size_t i;
2515 for (i = 0; i < listen_fds_num; i++)
2516 {
2517 close (listen_fds[i].fd);
2519 if (listen_fds[i].family == PF_UNIX)
2520 unlink(listen_fds[i].addr);
2521 }
2523 free (listen_fds);
2524 listen_fds = NULL;
2525 listen_fds_num = 0;
2527 return (0);
2528 } /* }}} int close_listen_sockets */
2530 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2531 {
2532 struct pollfd *pollfds;
2533 int pollfds_num;
2534 int status;
2535 int i;
2537 if (listen_fds_num < 1)
2538 {
2539 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2540 return (NULL);
2541 }
2543 pollfds_num = listen_fds_num;
2544 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2545 if (pollfds == NULL)
2546 {
2547 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2548 return (NULL);
2549 }
2550 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2552 RRDD_LOG(LOG_INFO, "listening for connections");
2554 while (state == RUNNING)
2555 {
2556 for (i = 0; i < pollfds_num; i++)
2557 {
2558 pollfds[i].fd = listen_fds[i].fd;
2559 pollfds[i].events = POLLIN | POLLPRI;
2560 pollfds[i].revents = 0;
2561 }
2563 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2564 if (state != RUNNING)
2565 break;
2566 else if (status == 0) /* timeout */
2567 continue;
2568 else if (status < 0) /* error */
2569 {
2570 status = errno;
2571 if (status != EINTR)
2572 {
2573 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2574 }
2575 continue;
2576 }
2578 for (i = 0; i < pollfds_num; i++)
2579 {
2580 listen_socket_t *client_sock;
2581 struct sockaddr_storage client_sa;
2582 socklen_t client_sa_size;
2583 pthread_t tid;
2584 pthread_attr_t attr;
2586 if (pollfds[i].revents == 0)
2587 continue;
2589 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2590 {
2591 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2592 "poll(2) returned something unexpected for listen FD #%i.",
2593 pollfds[i].fd);
2594 continue;
2595 }
2597 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2598 if (client_sock == NULL)
2599 {
2600 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2601 continue;
2602 }
2603 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2605 client_sa_size = sizeof (client_sa);
2606 client_sock->fd = accept (pollfds[i].fd,
2607 (struct sockaddr *) &client_sa, &client_sa_size);
2608 if (client_sock->fd < 0)
2609 {
2610 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2611 free(client_sock);
2612 continue;
2613 }
2615 pthread_attr_init (&attr);
2616 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2618 status = pthread_create (&tid, &attr, connection_thread_main,
2619 client_sock);
2620 if (status != 0)
2621 {
2622 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2623 close_connection(client_sock);
2624 continue;
2625 }
2626 } /* for (pollfds_num) */
2627 } /* while (state == RUNNING) */
2629 RRDD_LOG(LOG_INFO, "starting shutdown");
2631 close_listen_sockets ();
2633 pthread_mutex_lock (&connection_threads_lock);
2634 while (connection_threads_num > 0)
2635 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2636 pthread_mutex_unlock (&connection_threads_lock);
2638 free(pollfds);
2640 return (NULL);
2641 } /* }}} void *listen_thread_main */
2643 static int daemonize (void) /* {{{ */
2644 {
2645 int pid_fd;
2646 char *base_dir;
2648 daemon_uid = geteuid();
2650 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2651 if (pid_fd < 0)
2652 pid_fd = check_pidfile();
2653 if (pid_fd < 0)
2654 return pid_fd;
2656 /* open all the listen sockets */
2657 if (config_listen_address_list_len > 0)
2658 {
2659 for (size_t i = 0; i < config_listen_address_list_len; i++)
2660 open_listen_socket (config_listen_address_list[i]);
2662 rrd_free_ptrs((void ***) &config_listen_address_list,
2663 &config_listen_address_list_len);
2664 }
2665 else
2666 {
2667 listen_socket_t sock;
2668 memset(&sock, 0, sizeof(sock));
2669 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2670 open_listen_socket (&sock);
2671 }
2673 if (listen_fds_num < 1)
2674 {
2675 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2676 goto error;
2677 }
2679 if (!stay_foreground)
2680 {
2681 pid_t child;
2683 child = fork ();
2684 if (child < 0)
2685 {
2686 fprintf (stderr, "daemonize: fork(2) failed.\n");
2687 goto error;
2688 }
2689 else if (child > 0)
2690 exit(0);
2692 /* Become session leader */
2693 setsid ();
2695 /* Open the first three file descriptors to /dev/null */
2696 close (2);
2697 close (1);
2698 close (0);
2700 open ("/dev/null", O_RDWR);
2701 if (dup(0) == -1 || dup(0) == -1){
2702 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2703 }
2704 } /* if (!stay_foreground) */
2706 /* Change into the /tmp directory. */
2707 base_dir = (config_base_dir != NULL)
2708 ? config_base_dir
2709 : "/tmp";
2711 if (chdir (base_dir) != 0)
2712 {
2713 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2714 goto error;
2715 }
2717 install_signal_handlers();
2719 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2720 RRDD_LOG(LOG_INFO, "starting up");
2722 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2723 (GDestroyNotify) free_cache_item);
2724 if (cache_tree == NULL)
2725 {
2726 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2727 goto error;
2728 }
2730 return write_pidfile (pid_fd);
2732 error:
2733 remove_pidfile();
2734 return -1;
2735 } /* }}} int daemonize */
2737 static int cleanup (void) /* {{{ */
2738 {
2739 pthread_cond_broadcast (&flush_cond);
2740 pthread_join (flush_thread, NULL);
2742 pthread_cond_broadcast (&queue_cond);
2743 for (int i = 0; i < config_queue_threads; i++)
2744 pthread_join (queue_threads[i], NULL);
2746 if (config_flush_at_shutdown)
2747 {
2748 assert(cache_queue_head == NULL);
2749 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2750 }
2752 free(queue_threads);
2753 free(config_base_dir);
2755 pthread_mutex_lock(&cache_lock);
2756 g_tree_destroy(cache_tree);
2758 pthread_mutex_lock(&journal_lock);
2759 journal_done();
2761 RRDD_LOG(LOG_INFO, "goodbye");
2762 closelog ();
2764 remove_pidfile ();
2765 free(config_pid_file);
2767 return (0);
2768 } /* }}} int cleanup */
2770 static int read_options (int argc, char **argv) /* {{{ */
2771 {
2772 int option;
2773 int status = 0;
2775 char **permissions = NULL;
2776 size_t permissions_len = 0;
2778 gid_t socket_group = (gid_t)-1;
2779 mode_t socket_permissions = (mode_t)-1;
2781 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2782 {
2783 switch (option)
2784 {
2785 case 'g':
2786 stay_foreground=1;
2787 break;
2789 case 'l':
2790 {
2791 listen_socket_t *new;
2793 new = malloc(sizeof(listen_socket_t));
2794 if (new == NULL)
2795 {
2796 fprintf(stderr, "read_options: malloc failed.\n");
2797 return(2);
2798 }
2799 memset(new, 0, sizeof(listen_socket_t));
2801 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2803 /* Add permissions to the socket {{{ */
2804 if (permissions_len != 0)
2805 {
2806 size_t i;
2807 for (i = 0; i < permissions_len; i++)
2808 {
2809 status = socket_permission_add (new, permissions[i]);
2810 if (status != 0)
2811 {
2812 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2813 "socket failed. Most likely, this permission doesn't "
2814 "exist. Check your command line.\n", permissions[i]);
2815 status = 4;
2816 }
2817 }
2818 }
2819 else /* if (permissions_len == 0) */
2820 {
2821 /* Add permission for ALL commands to the socket. */
2822 size_t i;
2823 for (i = 0; i < list_of_commands_len; i++)
2824 {
2825 status = socket_permission_add (new, list_of_commands[i].cmd);
2826 if (status != 0)
2827 {
2828 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2829 "socket failed. This should never happen, ever! Sorry.\n",
2830 permissions[i]);
2831 status = 4;
2832 }
2833 }
2834 }
2835 /* }}} Done adding permissions. */
2837 new->socket_group = socket_group;
2838 new->socket_permissions = socket_permissions;
2840 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2841 &config_listen_address_list_len, new))
2842 {
2843 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2844 return (2);
2845 }
2846 }
2847 break;
2849 /* set socket group permissions */
2850 case 's':
2851 {
2852 gid_t group_gid;
2853 struct group *grp;
2855 group_gid = strtoul(optarg, NULL, 10);
2856 if (errno != EINVAL && group_gid>0)
2857 {
2858 /* we were passed a number */
2859 grp = getgrgid(group_gid);
2860 }
2861 else
2862 {
2863 grp = getgrnam(optarg);
2864 }
2866 if (grp)
2867 {
2868 socket_group = grp->gr_gid;
2869 }
2870 else
2871 {
2872 /* no idea what the user wanted... */
2873 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2874 return (5);
2875 }
2876 }
2877 break;
2879 /* set socket file permissions */
2880 case 'm':
2881 {
2882 long tmp;
2883 char *endptr = NULL;
2885 tmp = strtol (optarg, &endptr, 8);
2886 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2887 || (tmp > 07777) || (tmp < 0)) {
2888 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2889 optarg);
2890 return (5);
2891 }
2893 socket_permissions = (mode_t)tmp;
2894 }
2895 break;
2897 case 'P':
2898 {
2899 char *optcopy;
2900 char *saveptr;
2901 char *dummy;
2902 char *ptr;
2904 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2906 optcopy = strdup (optarg);
2907 dummy = optcopy;
2908 saveptr = NULL;
2909 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2910 {
2911 dummy = NULL;
2912 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2913 }
2915 free (optcopy);
2916 }
2917 break;
2919 case 'f':
2920 {
2921 int temp;
2923 temp = atoi (optarg);
2924 if (temp > 0)
2925 config_flush_interval = temp;
2926 else
2927 {
2928 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2929 status = 3;
2930 }
2931 }
2932 break;
2934 case 'w':
2935 {
2936 int temp;
2938 temp = atoi (optarg);
2939 if (temp > 0)
2940 config_write_interval = temp;
2941 else
2942 {
2943 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2944 status = 2;
2945 }
2946 }
2947 break;
2949 case 'z':
2950 {
2951 int temp;
2953 temp = atoi(optarg);
2954 if (temp > 0)
2955 config_write_jitter = temp;
2956 else
2957 {
2958 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2959 status = 2;
2960 }
2962 break;
2963 }
2965 case 't':
2966 {
2967 int threads;
2968 threads = atoi(optarg);
2969 if (threads >= 1)
2970 config_queue_threads = threads;
2971 else
2972 {
2973 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2974 return 1;
2975 }
2976 }
2977 break;
2979 case 'B':
2980 config_write_base_only = 1;
2981 break;
2983 case 'b':
2984 {
2985 size_t len;
2986 char base_realpath[PATH_MAX];
2988 if (config_base_dir != NULL)
2989 free (config_base_dir);
2990 config_base_dir = strdup (optarg);
2991 if (config_base_dir == NULL)
2992 {
2993 fprintf (stderr, "read_options: strdup failed.\n");
2994 return (3);
2995 }
2997 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2998 {
2999 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3000 config_base_dir, rrd_strerror (errno));
3001 return (3);
3002 }
3004 /* make sure that the base directory is not resolved via
3005 * symbolic links. this makes some performance-enhancing
3006 * assumptions possible (we don't have to resolve paths
3007 * that start with a "/")
3008 */
3009 if (realpath(config_base_dir, base_realpath) == NULL)
3010 {
3011 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3012 "%s\n", config_base_dir, rrd_strerror(errno));
3013 return 5;
3014 }
3016 len = strlen (config_base_dir);
3017 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3018 {
3019 config_base_dir[len - 1] = 0;
3020 len--;
3021 }
3023 if (len < 1)
3024 {
3025 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3026 return (4);
3027 }
3029 _config_base_dir_len = len;
3031 len = strlen (base_realpath);
3032 while ((len > 0) && (base_realpath[len - 1] == '/'))
3033 {
3034 base_realpath[len - 1] = '\0';
3035 len--;
3036 }
3038 if (strncmp(config_base_dir,
3039 base_realpath, sizeof(base_realpath)) != 0)
3040 {
3041 fprintf(stderr,
3042 "Base directory (-b) resolved via file system links!\n"
3043 "Please consult rrdcached '-b' documentation!\n"
3044 "Consider specifying the real directory (%s)\n",
3045 base_realpath);
3046 return 5;
3047 }
3048 }
3049 break;
3051 case 'p':
3052 {
3053 if (config_pid_file != NULL)
3054 free (config_pid_file);
3055 config_pid_file = strdup (optarg);
3056 if (config_pid_file == NULL)
3057 {
3058 fprintf (stderr, "read_options: strdup failed.\n");
3059 return (3);
3060 }
3061 }
3062 break;
3064 case 'F':
3065 config_flush_at_shutdown = 1;
3066 break;
3068 case 'j':
3069 {
3070 char journal_dir_actual[PATH_MAX];
3071 const char *dir;
3072 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3074 status = rrd_mkdir_p(dir, 0777);
3075 if (status != 0)
3076 {
3077 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3078 dir, rrd_strerror(errno));
3079 return 6;
3080 }
3082 if (access(dir, R_OK|W_OK|X_OK) != 0)
3083 {
3084 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3085 errno ? rrd_strerror(errno) : "");
3086 return 6;
3087 }
3088 }
3089 break;
3091 case 'h':
3092 case '?':
3093 printf ("RRDCacheD %s\n"
3094 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3095 "\n"
3096 "Usage: rrdcached [options]\n"
3097 "\n"
3098 "Valid options are:\n"
3099 " -l <address> Socket address to listen to.\n"
3100 " -P <perms> Sets the permissions to assign to all following "
3101 "sockets\n"
3102 " -w <seconds> Interval in which to write data.\n"
3103 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3104 " -t <threads> Number of write threads.\n"
3105 " -f <seconds> Interval in which to flush dead data.\n"
3106 " -p <file> Location of the PID-file.\n"
3107 " -b <dir> Base directory to change to.\n"
3108 " -B Restrict file access to paths within -b <dir>\n"
3109 " -g Do not fork and run in the foreground.\n"
3110 " -j <dir> Directory in which to create the journal files.\n"
3111 " -F Always flush all updates at shutdown\n"
3112 " -s <id|name> Group owner of all following UNIX sockets\n"
3113 " (the socket will also have read/write permissions "
3114 "for that group)\n"
3115 " -m <mode> File permissions (octal) of all following UNIX "
3116 "sockets\n"
3117 "\n"
3118 "For more information and a detailed description of all options "
3119 "please refer\n"
3120 "to the rrdcached(1) manual page.\n",
3121 VERSION);
3122 if (option == 'h')
3123 status = -1;
3124 else
3125 status = 1;
3126 break;
3127 } /* switch (option) */
3128 } /* while (getopt) */
3130 /* advise the user when values are not sane */
3131 if (config_flush_interval < 2 * config_write_interval)
3132 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3133 " 2x write interval (-w) !\n");
3134 if (config_write_jitter > config_write_interval)
3135 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3136 " write interval (-w) !\n");
3138 if (config_write_base_only && config_base_dir == NULL)
3139 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3140 " Consult the rrdcached documentation\n");
3142 if (journal_dir == NULL)
3143 config_flush_at_shutdown = 1;
3145 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3147 return (status);
3148 } /* }}} int read_options */
3150 int main (int argc, char **argv)
3151 {
3152 int status;
3154 status = read_options (argc, argv);
3155 if (status != 0)
3156 {
3157 if (status < 0)
3158 status = 0;
3159 return (status);
3160 }
3162 status = daemonize ();
3163 if (status != 0)
3164 {
3165 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3166 return (1);
3167 }
3169 journal_init();
3171 /* start the queue threads */
3172 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3173 if (queue_threads == NULL)
3174 {
3175 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3176 cleanup();
3177 return (1);
3178 }
3179 for (int i = 0; i < config_queue_threads; i++)
3180 {
3181 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3182 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3183 if (status != 0)
3184 {
3185 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3186 cleanup();
3187 return (1);
3188 }
3189 }
3191 /* start the flush thread */
3192 memset(&flush_thread, 0, sizeof(flush_thread));
3193 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3194 if (status != 0)
3195 {
3196 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3197 cleanup();
3198 return (1);
3199 }
3201 listen_thread_main (NULL);
3202 cleanup ();
3204 return (0);
3205 } /* int main */
3207 /*
3208 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3209 */