1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116 do { \
117 if (stay_foreground) \
118 fprintf(stderr, __VA_ARGS__); \
119 syslog ((severity), __VA_ARGS__); \
120 } while (0)
122 /*
123 * Types
124 */
125 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127 struct listen_socket_s
128 {
129 int fd;
130 char addr[PATH_MAX + 1];
131 int family;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
145 uint32_t permissions;
147 gid_t socket_group;
148 mode_t socket_permissions;
149 };
150 typedef struct listen_socket_s listen_socket_t;
152 struct command_s;
153 typedef struct command_s command_t;
154 /* note: guard against "unused" warnings in the handlers */
155 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
156 time_t UNUSED(now),\
157 char UNUSED(*buffer),\
158 size_t UNUSED(buffer_size)
160 #define HANDLER_PROTO command_t UNUSED(*cmd),\
161 DISPATCH_PROTO
163 struct command_s {
164 char *cmd;
165 int (*handler)(HANDLER_PROTO);
167 char context; /* where we expect to see it */
168 #define CMD_CONTEXT_CLIENT (1<<0)
169 #define CMD_CONTEXT_BATCH (1<<1)
170 #define CMD_CONTEXT_JOURNAL (1<<2)
171 #define CMD_CONTEXT_ANY (0x7f)
173 char *syntax;
174 char *help;
175 };
177 struct cache_item_s;
178 typedef struct cache_item_s cache_item_t;
179 struct cache_item_s
180 {
181 char *file;
182 char **values;
183 size_t values_num;
184 time_t last_flush_time;
185 time_t last_update_stamp;
186 #define CI_FLAGS_IN_TREE (1<<0)
187 #define CI_FLAGS_IN_QUEUE (1<<1)
188 int flags;
189 pthread_cond_t flushed;
190 cache_item_t *prev;
191 cache_item_t *next;
192 };
194 struct callback_flush_data_s
195 {
196 time_t now;
197 time_t abs_timeout;
198 char **keys;
199 size_t keys_num;
200 };
201 typedef struct callback_flush_data_s callback_flush_data_t;
203 enum queue_side_e
204 {
205 HEAD,
206 TAIL
207 };
208 typedef enum queue_side_e queue_side_t;
210 /* describe a set of journal files */
211 typedef struct {
212 char **files;
213 size_t files_num;
214 } journal_set;
216 /* max length of socket command or response */
217 #define CMD_MAX 4096
218 #define RBUF_SIZE (CMD_MAX*2)
220 /*
221 * Variables
222 */
223 static int stay_foreground = 0;
224 static uid_t daemon_uid;
226 static listen_socket_t *listen_fds = NULL;
227 static size_t listen_fds_num = 0;
229 enum {
230 RUNNING, /* normal operation */
231 FLUSHING, /* flushing remaining values */
232 SHUTDOWN /* shutting down */
233 } state = RUNNING;
235 static pthread_t *queue_threads;
236 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
237 static int config_queue_threads = 4;
239 static pthread_t flush_thread;
240 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
242 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
243 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
244 static int connection_threads_num = 0;
246 /* Cache stuff */
247 static GTree *cache_tree = NULL;
248 static cache_item_t *cache_queue_head = NULL;
249 static cache_item_t *cache_queue_tail = NULL;
250 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
252 static int config_write_interval = 300;
253 static int config_write_jitter = 0;
254 static int config_flush_interval = 3600;
255 static int config_flush_at_shutdown = 0;
256 static char *config_pid_file = NULL;
257 static char *config_base_dir = NULL;
258 static size_t _config_base_dir_len = 0;
259 static int config_write_base_only = 0;
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
273 /* Journaled updates */
274 #define JOURNAL_REPLAY(s) ((s) == NULL)
275 #define JOURNAL_BASE "rrd.journal"
276 static journal_set *journal_cur = NULL;
277 static journal_set *journal_old = NULL;
278 static char *journal_dir = NULL;
279 static FILE *journal_fh = NULL; /* current journal file handle */
280 static long journal_size = 0; /* current journal size */
281 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
282 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
283 static int journal_write(char *cmd, char *args);
284 static void journal_done(void);
285 static void journal_rotate(void);
287 /* prototypes for forward refernces */
288 static int handle_request_help (HANDLER_PROTO);
290 /*
291 * Functions
292 */
293 static void sig_common (const char *sig) /* {{{ */
294 {
295 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
296 state = FLUSHING;
297 pthread_cond_broadcast(&flush_cond);
298 pthread_cond_broadcast(&queue_cond);
299 } /* }}} void sig_common */
301 static void sig_int_handler (int UNUSED(s)) /* {{{ */
302 {
303 sig_common("INT");
304 } /* }}} void sig_int_handler */
306 static void sig_term_handler (int UNUSED(s)) /* {{{ */
307 {
308 sig_common("TERM");
309 } /* }}} void sig_term_handler */
311 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
312 {
313 config_flush_at_shutdown = 1;
314 sig_common("USR1");
315 } /* }}} void sig_usr1_handler */
317 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
318 {
319 config_flush_at_shutdown = 0;
320 sig_common("USR2");
321 } /* }}} void sig_usr2_handler */
323 static void install_signal_handlers(void) /* {{{ */
324 {
325 /* These structures are static, because `sigaction' behaves weird if the are
326 * overwritten.. */
327 static struct sigaction sa_int;
328 static struct sigaction sa_term;
329 static struct sigaction sa_pipe;
330 static struct sigaction sa_usr1;
331 static struct sigaction sa_usr2;
333 /* Install signal handlers */
334 memset (&sa_int, 0, sizeof (sa_int));
335 sa_int.sa_handler = sig_int_handler;
336 sigaction (SIGINT, &sa_int, NULL);
338 memset (&sa_term, 0, sizeof (sa_term));
339 sa_term.sa_handler = sig_term_handler;
340 sigaction (SIGTERM, &sa_term, NULL);
342 memset (&sa_pipe, 0, sizeof (sa_pipe));
343 sa_pipe.sa_handler = SIG_IGN;
344 sigaction (SIGPIPE, &sa_pipe, NULL);
346 memset (&sa_pipe, 0, sizeof (sa_usr1));
347 sa_usr1.sa_handler = sig_usr1_handler;
348 sigaction (SIGUSR1, &sa_usr1, NULL);
350 memset (&sa_usr2, 0, sizeof (sa_usr2));
351 sa_usr2.sa_handler = sig_usr2_handler;
352 sigaction (SIGUSR2, &sa_usr2, NULL);
354 } /* }}} void install_signal_handlers */
356 static int open_pidfile(char *action, int oflag) /* {{{ */
357 {
358 int fd;
359 const char *file;
360 char *file_copy, *dir;
362 file = (config_pid_file != NULL)
363 ? config_pid_file
364 : LOCALSTATEDIR "/run/rrdcached.pid";
366 /* dirname may modify its argument */
367 file_copy = strdup(file);
368 if (file_copy == NULL)
369 {
370 fprintf(stderr, "rrdcached: strdup(): %s\n",
371 rrd_strerror(errno));
372 return -1;
373 }
375 dir = dirname(file_copy);
376 if (rrd_mkdir_p(dir, 0777) != 0)
377 {
378 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
379 dir, rrd_strerror(errno));
380 return -1;
381 }
383 free(file_copy);
385 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
386 if (fd < 0)
387 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
388 action, file, rrd_strerror(errno));
390 return(fd);
391 } /* }}} static int open_pidfile */
393 /* check existing pid file to see whether a daemon is running */
394 static int check_pidfile(void)
395 {
396 int pid_fd;
397 pid_t pid;
398 char pid_str[16];
400 pid_fd = open_pidfile("open", O_RDWR);
401 if (pid_fd < 0)
402 return pid_fd;
404 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
405 return -1;
407 pid = atoi(pid_str);
408 if (pid <= 0)
409 return -1;
411 /* another running process that we can signal COULD be
412 * a competing rrdcached */
413 if (pid != getpid() && kill(pid, 0) == 0)
414 {
415 fprintf(stderr,
416 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
417 close(pid_fd);
418 return -1;
419 }
421 lseek(pid_fd, 0, SEEK_SET);
422 if (ftruncate(pid_fd, 0) == -1)
423 {
424 fprintf(stderr,
425 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
426 close(pid_fd);
427 return -1;
428 }
430 fprintf(stderr,
431 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
432 "rrdcached: starting normally.\n", pid);
434 return pid_fd;
435 } /* }}} static int check_pidfile */
437 static int write_pidfile (int fd) /* {{{ */
438 {
439 pid_t pid;
440 FILE *fh;
442 pid = getpid ();
444 fh = fdopen (fd, "w");
445 if (fh == NULL)
446 {
447 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
448 close(fd);
449 return (-1);
450 }
452 fprintf (fh, "%i\n", (int) pid);
453 fclose (fh);
455 return (0);
456 } /* }}} int write_pidfile */
458 static int remove_pidfile (void) /* {{{ */
459 {
460 char *file;
461 int status;
463 file = (config_pid_file != NULL)
464 ? config_pid_file
465 : LOCALSTATEDIR "/run/rrdcached.pid";
467 status = unlink (file);
468 if (status == 0)
469 return (0);
470 return (errno);
471 } /* }}} int remove_pidfile */
473 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
474 {
475 char *eol;
477 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
478 sock->next_read - sock->next_cmd);
480 if (eol == NULL)
481 {
482 /* no commands left, move remainder back to front of rbuf */
483 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
484 sock->next_read - sock->next_cmd);
485 sock->next_read -= sock->next_cmd;
486 sock->next_cmd = 0;
487 *len = 0;
488 return NULL;
489 }
490 else
491 {
492 char *cmd = sock->rbuf + sock->next_cmd;
493 *eol = '\0';
495 sock->next_cmd = eol - sock->rbuf + 1;
497 if (eol > sock->rbuf && *(eol-1) == '\r')
498 *(--eol) = '\0'; /* handle "\r\n" EOL */
500 *len = eol - cmd;
502 return cmd;
503 }
505 /* NOTREACHED */
506 assert(1==0);
507 } /* }}} char *next_cmd */
509 /* add the characters directly to the write buffer */
510 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
511 {
512 char *new_buf;
514 assert(sock != NULL);
516 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
517 if (new_buf == NULL)
518 {
519 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
520 return -1;
521 }
523 strncpy(new_buf + sock->wbuf_len, str, len + 1);
525 sock->wbuf = new_buf;
526 sock->wbuf_len += len;
528 return 0;
529 } /* }}} static int add_to_wbuf */
531 /* add the text to the "extra" info that's sent after the status line */
532 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
533 {
534 va_list argp;
535 char buffer[CMD_MAX];
536 int len;
538 if (JOURNAL_REPLAY(sock)) return 0;
539 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
541 va_start(argp, fmt);
542 #ifdef HAVE_VSNPRINTF
543 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
544 #else
545 len = vsprintf(buffer, fmt, argp);
546 #endif
547 va_end(argp);
548 if (len < 0)
549 {
550 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
551 return -1;
552 }
554 return add_to_wbuf(sock, buffer, len);
555 } /* }}} static int add_response_info */
557 static int count_lines(char *str) /* {{{ */
558 {
559 int lines = 0;
561 if (str != NULL)
562 {
563 while ((str = strchr(str, '\n')) != NULL)
564 {
565 ++lines;
566 ++str;
567 }
568 }
570 return lines;
571 } /* }}} static int count_lines */
573 /* send the response back to the user.
574 * returns 0 on success, -1 on error
575 * write buffer is always zeroed after this call */
576 static int send_response (listen_socket_t *sock, response_code rc,
577 char *fmt, ...) /* {{{ */
578 {
579 va_list argp;
580 char buffer[CMD_MAX];
581 int lines;
582 ssize_t wrote;
583 int rclen, len;
585 if (JOURNAL_REPLAY(sock)) return rc;
587 if (sock->batch_start)
588 {
589 if (rc == RESP_OK)
590 return rc; /* no response on success during BATCH */
591 lines = sock->batch_cmd;
592 }
593 else if (rc == RESP_OK)
594 lines = count_lines(sock->wbuf);
595 else
596 lines = -1;
598 rclen = sprintf(buffer, "%d ", lines);
599 va_start(argp, fmt);
600 #ifdef HAVE_VSNPRINTF
601 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
602 #else
603 len = vsprintf(buffer+rclen, fmt, argp);
604 #endif
605 va_end(argp);
606 if (len < 0)
607 return -1;
609 len += rclen;
611 /* append the result to the wbuf, don't write to the user */
612 if (sock->batch_start)
613 return add_to_wbuf(sock, buffer, len);
615 /* first write must be complete */
616 if (len != write(sock->fd, buffer, len))
617 {
618 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
619 return -1;
620 }
622 if (sock->wbuf != NULL && rc == RESP_OK)
623 {
624 wrote = 0;
625 while (wrote < sock->wbuf_len)
626 {
627 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
628 if (wb <= 0)
629 {
630 RRDD_LOG(LOG_INFO, "send_response: could not write results");
631 return -1;
632 }
633 wrote += wb;
634 }
635 }
637 free(sock->wbuf); sock->wbuf = NULL;
638 sock->wbuf_len = 0;
640 return 0;
641 } /* }}} */
643 static void wipe_ci_values(cache_item_t *ci, time_t when)
644 {
645 ci->values = NULL;
646 ci->values_num = 0;
648 ci->last_flush_time = when;
649 if (config_write_jitter > 0)
650 ci->last_flush_time += (rrd_random() % config_write_jitter);
651 }
653 /* remove_from_queue
654 * remove a "cache_item_t" item from the queue.
655 * must hold 'cache_lock' when calling this
656 */
657 static void remove_from_queue(cache_item_t *ci) /* {{{ */
658 {
659 if (ci == NULL) return;
660 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
662 if (ci->prev == NULL)
663 cache_queue_head = ci->next; /* reset head */
664 else
665 ci->prev->next = ci->next;
667 if (ci->next == NULL)
668 cache_queue_tail = ci->prev; /* reset the tail */
669 else
670 ci->next->prev = ci->prev;
672 ci->next = ci->prev = NULL;
673 ci->flags &= ~CI_FLAGS_IN_QUEUE;
675 pthread_mutex_lock (&stats_lock);
676 assert (stats_queue_length > 0);
677 stats_queue_length--;
678 pthread_mutex_unlock (&stats_lock);
680 } /* }}} static void remove_from_queue */
682 /* free the resources associated with the cache_item_t
683 * must hold cache_lock when calling this function
684 */
685 static void *free_cache_item(cache_item_t *ci) /* {{{ */
686 {
687 if (ci == NULL) return NULL;
689 remove_from_queue(ci);
691 for (size_t i=0; i < ci->values_num; i++)
692 free(ci->values[i]);
694 free (ci->values);
695 free (ci->file);
697 /* in case anyone is waiting */
698 pthread_cond_broadcast(&ci->flushed);
699 pthread_cond_destroy(&ci->flushed);
701 free (ci);
703 return NULL;
704 } /* }}} static void *free_cache_item */
706 /*
707 * enqueue_cache_item:
708 * `cache_lock' must be acquired before calling this function!
709 */
710 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
711 queue_side_t side)
712 {
713 if (ci == NULL)
714 return (-1);
716 if (ci->values_num == 0)
717 return (0);
719 if (side == HEAD)
720 {
721 if (cache_queue_head == ci)
722 return 0;
724 /* remove if further down in queue */
725 remove_from_queue(ci);
727 ci->prev = NULL;
728 ci->next = cache_queue_head;
729 if (ci->next != NULL)
730 ci->next->prev = ci;
731 cache_queue_head = ci;
733 if (cache_queue_tail == NULL)
734 cache_queue_tail = cache_queue_head;
735 }
736 else /* (side == TAIL) */
737 {
738 /* We don't move values back in the list.. */
739 if (ci->flags & CI_FLAGS_IN_QUEUE)
740 return (0);
742 assert (ci->next == NULL);
743 assert (ci->prev == NULL);
745 ci->prev = cache_queue_tail;
747 if (cache_queue_tail == NULL)
748 cache_queue_head = ci;
749 else
750 cache_queue_tail->next = ci;
752 cache_queue_tail = ci;
753 }
755 ci->flags |= CI_FLAGS_IN_QUEUE;
757 pthread_cond_signal(&queue_cond);
758 pthread_mutex_lock (&stats_lock);
759 stats_queue_length++;
760 pthread_mutex_unlock (&stats_lock);
762 return (0);
763 } /* }}} int enqueue_cache_item */
765 /*
766 * tree_callback_flush:
767 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
768 * while this is in progress.
769 */
770 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
771 gpointer data)
772 {
773 cache_item_t *ci;
774 callback_flush_data_t *cfd;
776 ci = (cache_item_t *) value;
777 cfd = (callback_flush_data_t *) data;
779 if (ci->flags & CI_FLAGS_IN_QUEUE)
780 return FALSE;
782 if (ci->values_num > 0
783 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
784 {
785 enqueue_cache_item (ci, TAIL);
786 }
787 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
788 && (ci->values_num <= 0))
789 {
790 assert ((char *) key == ci->file);
791 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
792 {
793 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
794 return (FALSE);
795 }
796 }
798 return (FALSE);
799 } /* }}} gboolean tree_callback_flush */
801 static int flush_old_values (int max_age)
802 {
803 callback_flush_data_t cfd;
804 size_t k;
806 memset (&cfd, 0, sizeof (cfd));
807 /* Pass the current time as user data so that we don't need to call
808 * `time' for each node. */
809 cfd.now = time (NULL);
810 cfd.keys = NULL;
811 cfd.keys_num = 0;
813 if (max_age > 0)
814 cfd.abs_timeout = cfd.now - max_age;
815 else
816 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
818 /* `tree_callback_flush' will return the keys of all values that haven't
819 * been touched in the last `config_flush_interval' seconds in `cfd'.
820 * The char*'s in this array point to the same memory as ci->file, so we
821 * don't need to free them separately. */
822 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
824 for (k = 0; k < cfd.keys_num; k++)
825 {
826 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
827 /* should never fail, since we have held the cache_lock
828 * the entire time */
829 assert(status == TRUE);
830 }
832 if (cfd.keys != NULL)
833 {
834 free (cfd.keys);
835 cfd.keys = NULL;
836 }
838 return (0);
839 } /* int flush_old_values */
841 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
842 {
843 struct timeval now;
844 struct timespec next_flush;
845 int status;
847 gettimeofday (&now, NULL);
848 next_flush.tv_sec = now.tv_sec + config_flush_interval;
849 next_flush.tv_nsec = 1000 * now.tv_usec;
851 pthread_mutex_lock(&cache_lock);
853 while (state == RUNNING)
854 {
855 gettimeofday (&now, NULL);
856 if ((now.tv_sec > next_flush.tv_sec)
857 || ((now.tv_sec == next_flush.tv_sec)
858 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
859 {
860 RRDD_LOG(LOG_DEBUG, "flushing old values");
862 /* Determine the time of the next cache flush. */
863 next_flush.tv_sec = now.tv_sec + config_flush_interval;
865 /* Flush all values that haven't been written in the last
866 * `config_write_interval' seconds. */
867 flush_old_values (config_write_interval);
869 /* unlock the cache while we rotate so we don't block incoming
870 * updates if the fsync() blocks on disk I/O */
871 pthread_mutex_unlock(&cache_lock);
872 journal_rotate();
873 pthread_mutex_lock(&cache_lock);
874 }
876 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
877 if (status != 0 && status != ETIMEDOUT)
878 {
879 RRDD_LOG (LOG_ERR, "flush_thread_main: "
880 "pthread_cond_timedwait returned %i.", status);
881 }
882 }
884 if (config_flush_at_shutdown)
885 flush_old_values (-1); /* flush everything */
887 state = SHUTDOWN;
889 pthread_mutex_unlock(&cache_lock);
891 return NULL;
892 } /* void *flush_thread_main */
894 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
895 {
896 pthread_mutex_lock (&cache_lock);
898 while (state != SHUTDOWN
899 || (cache_queue_head != NULL && config_flush_at_shutdown))
900 {
901 cache_item_t *ci;
902 char *file;
903 char **values;
904 size_t values_num;
905 int status;
907 /* Now, check if there's something to store away. If not, wait until
908 * something comes in. */
909 if (cache_queue_head == NULL)
910 {
911 status = pthread_cond_wait (&queue_cond, &cache_lock);
912 if ((status != 0) && (status != ETIMEDOUT))
913 {
914 RRDD_LOG (LOG_ERR, "queue_thread_main: "
915 "pthread_cond_wait returned %i.", status);
916 }
917 }
919 /* Check if a value has arrived. This may be NULL if we timed out or there
920 * was an interrupt such as a signal. */
921 if (cache_queue_head == NULL)
922 continue;
924 ci = cache_queue_head;
926 /* copy the relevant parts */
927 file = strdup (ci->file);
928 if (file == NULL)
929 {
930 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
931 continue;
932 }
934 assert(ci->values != NULL);
935 assert(ci->values_num > 0);
937 values = ci->values;
938 values_num = ci->values_num;
940 wipe_ci_values(ci, time(NULL));
941 remove_from_queue(ci);
943 pthread_mutex_unlock (&cache_lock);
945 rrd_clear_error ();
946 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
947 if (status != 0)
948 {
949 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
950 "rrd_update_r (%s) failed with status %i. (%s)",
951 file, status, rrd_get_error());
952 }
954 journal_write("wrote", file);
956 /* Search again in the tree. It's possible someone issued a "FORGET"
957 * while we were writing the update values. */
958 pthread_mutex_lock(&cache_lock);
959 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
960 if (ci)
961 pthread_cond_broadcast(&ci->flushed);
962 pthread_mutex_unlock(&cache_lock);
964 if (status == 0)
965 {
966 pthread_mutex_lock (&stats_lock);
967 stats_updates_written++;
968 stats_data_sets_written += values_num;
969 pthread_mutex_unlock (&stats_lock);
970 }
972 rrd_free_ptrs((void ***) &values, &values_num);
973 free(file);
975 pthread_mutex_lock (&cache_lock);
976 }
977 pthread_mutex_unlock (&cache_lock);
979 return (NULL);
980 } /* }}} void *queue_thread_main */
982 static int buffer_get_field (char **buffer_ret, /* {{{ */
983 size_t *buffer_size_ret, char **field_ret)
984 {
985 char *buffer;
986 size_t buffer_pos;
987 size_t buffer_size;
988 char *field;
989 size_t field_size;
990 int status;
992 buffer = *buffer_ret;
993 buffer_pos = 0;
994 buffer_size = *buffer_size_ret;
995 field = *buffer_ret;
996 field_size = 0;
998 if (buffer_size <= 0)
999 return (-1);
1001 /* This is ensured by `handle_request'. */
1002 assert (buffer[buffer_size - 1] == '\0');
1004 status = -1;
1005 while (buffer_pos < buffer_size)
1006 {
1007 /* Check for end-of-field or end-of-buffer */
1008 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1009 {
1010 field[field_size] = 0;
1011 field_size++;
1012 buffer_pos++;
1013 status = 0;
1014 break;
1015 }
1016 /* Handle escaped characters. */
1017 else if (buffer[buffer_pos] == '\\')
1018 {
1019 if (buffer_pos >= (buffer_size - 1))
1020 break;
1021 buffer_pos++;
1022 field[field_size] = buffer[buffer_pos];
1023 field_size++;
1024 buffer_pos++;
1025 }
1026 /* Normal operation */
1027 else
1028 {
1029 field[field_size] = buffer[buffer_pos];
1030 field_size++;
1031 buffer_pos++;
1032 }
1033 } /* while (buffer_pos < buffer_size) */
1035 if (status != 0)
1036 return (status);
1038 *buffer_ret = buffer + buffer_pos;
1039 *buffer_size_ret = buffer_size - buffer_pos;
1040 *field_ret = field;
1042 return (0);
1043 } /* }}} int buffer_get_field */
1045 /* if we're restricting writes to the base directory,
1046 * check whether the file falls within the dir
1047 * returns 1 if OK, otherwise 0
1048 */
1049 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1050 {
1051 assert(file != NULL);
1053 if (!config_write_base_only
1054 || JOURNAL_REPLAY(sock)
1055 || config_base_dir == NULL)
1056 return 1;
1058 if (strstr(file, "../") != NULL) goto err;
1060 /* relative paths without "../" are ok */
1061 if (*file != '/') return 1;
1063 /* file must be of the format base + "/" + <1+ char filename> */
1064 if (strlen(file) < _config_base_dir_len + 2) goto err;
1065 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1066 if (*(file + _config_base_dir_len) != '/') goto err;
1068 return 1;
1070 err:
1071 if (sock != NULL && sock->fd >= 0)
1072 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1074 return 0;
1075 } /* }}} static int check_file_access */
1077 /* when using a base dir, convert relative paths to absolute paths.
1078 * if necessary, modifies the "filename" pointer to point
1079 * to the new path created in "tmp". "tmp" is provided
1080 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1081 *
1082 * this allows us to optimize for the expected case (absolute path)
1083 * with a no-op.
1084 */
1085 static void get_abs_path(char **filename, char *tmp)
1086 {
1087 assert(tmp != NULL);
1088 assert(filename != NULL && *filename != NULL);
1090 if (config_base_dir == NULL || **filename == '/')
1091 return;
1093 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1094 *filename = tmp;
1095 } /* }}} static int get_abs_path */
1097 static int flush_file (const char *filename) /* {{{ */
1098 {
1099 cache_item_t *ci;
1101 pthread_mutex_lock (&cache_lock);
1103 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1104 if (ci == NULL)
1105 {
1106 pthread_mutex_unlock (&cache_lock);
1107 return (ENOENT);
1108 }
1110 if (ci->values_num > 0)
1111 {
1112 /* Enqueue at head */
1113 enqueue_cache_item (ci, HEAD);
1114 pthread_cond_wait(&ci->flushed, &cache_lock);
1115 }
1117 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1118 * may have been purged during our cond_wait() */
1120 pthread_mutex_unlock(&cache_lock);
1122 return (0);
1123 } /* }}} int flush_file */
1125 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1126 {
1127 char *err = "Syntax error.\n";
1129 if (cmd && cmd->syntax)
1130 err = cmd->syntax;
1132 return send_response(sock, RESP_ERR, "Usage: %s", err);
1133 } /* }}} static int syntax_error() */
1135 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1136 {
1137 uint64_t copy_queue_length;
1138 uint64_t copy_updates_received;
1139 uint64_t copy_flush_received;
1140 uint64_t copy_updates_written;
1141 uint64_t copy_data_sets_written;
1142 uint64_t copy_journal_bytes;
1143 uint64_t copy_journal_rotate;
1145 uint64_t tree_nodes_number;
1146 uint64_t tree_depth;
1148 pthread_mutex_lock (&stats_lock);
1149 copy_queue_length = stats_queue_length;
1150 copy_updates_received = stats_updates_received;
1151 copy_flush_received = stats_flush_received;
1152 copy_updates_written = stats_updates_written;
1153 copy_data_sets_written = stats_data_sets_written;
1154 copy_journal_bytes = stats_journal_bytes;
1155 copy_journal_rotate = stats_journal_rotate;
1156 pthread_mutex_unlock (&stats_lock);
1158 pthread_mutex_lock (&cache_lock);
1159 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160 tree_depth = (uint64_t) g_tree_height (cache_tree);
1161 pthread_mutex_unlock (&cache_lock);
1163 add_response_info(sock,
1164 "QueueLength: %"PRIu64"\n", copy_queue_length);
1165 add_response_info(sock,
1166 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167 add_response_info(sock,
1168 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169 add_response_info(sock,
1170 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171 add_response_info(sock,
1172 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1178 send_response(sock, RESP_OK, "Statistics follow\n");
1180 return (0);
1181 } /* }}} int handle_request_stats */
1183 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1184 {
1185 char *file, file_tmp[PATH_MAX];
1186 int status;
1188 status = buffer_get_field (&buffer, &buffer_size, &file);
1189 if (status != 0)
1190 {
1191 return syntax_error(sock,cmd);
1192 }
1193 else
1194 {
1195 pthread_mutex_lock(&stats_lock);
1196 stats_flush_received++;
1197 pthread_mutex_unlock(&stats_lock);
1199 get_abs_path(&file, file_tmp);
1200 if (!check_file_access(file, sock)) return 0;
1202 status = flush_file (file);
1203 if (status == 0)
1204 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205 else if (status == ENOENT)
1206 {
1207 /* no file in our tree; see whether it exists at all */
1208 struct stat statbuf;
1210 memset(&statbuf, 0, sizeof(statbuf));
1211 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1213 else
1214 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1215 }
1216 else if (status < 0)
1217 return send_response(sock, RESP_ERR, "Internal error.\n");
1218 else
1219 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1220 }
1222 /* NOTREACHED */
1223 assert(1==0);
1224 } /* }}} int handle_request_flush */
1226 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1227 {
1228 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1230 pthread_mutex_lock(&cache_lock);
1231 flush_old_values(-1);
1232 pthread_mutex_unlock(&cache_lock);
1234 return send_response(sock, RESP_OK, "Started flush.\n");
1235 } /* }}} static int handle_request_flushall */
1237 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1238 {
1239 int status;
1240 char *file, file_tmp[PATH_MAX];
1241 cache_item_t *ci;
1243 status = buffer_get_field(&buffer, &buffer_size, &file);
1244 if (status != 0)
1245 return syntax_error(sock,cmd);
1247 get_abs_path(&file, file_tmp);
1249 pthread_mutex_lock(&cache_lock);
1250 ci = g_tree_lookup(cache_tree, file);
1251 if (ci == NULL)
1252 {
1253 pthread_mutex_unlock(&cache_lock);
1254 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1255 }
1257 for (size_t i=0; i < ci->values_num; i++)
1258 add_response_info(sock, "%s\n", ci->values[i]);
1260 pthread_mutex_unlock(&cache_lock);
1261 return send_response(sock, RESP_OK, "updates pending\n");
1262 } /* }}} static int handle_request_pending */
1264 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1265 {
1266 int status;
1267 gboolean found;
1268 char *file, file_tmp[PATH_MAX];
1270 status = buffer_get_field(&buffer, &buffer_size, &file);
1271 if (status != 0)
1272 return syntax_error(sock,cmd);
1274 get_abs_path(&file, file_tmp);
1275 if (!check_file_access(file, sock)) return 0;
1277 pthread_mutex_lock(&cache_lock);
1278 found = g_tree_remove(cache_tree, file);
1279 pthread_mutex_unlock(&cache_lock);
1281 if (found == TRUE)
1282 {
1283 if (!JOURNAL_REPLAY(sock))
1284 journal_write("forget", file);
1286 return send_response(sock, RESP_OK, "Gone!\n");
1287 }
1288 else
1289 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1291 /* NOTREACHED */
1292 assert(1==0);
1293 } /* }}} static int handle_request_forget */
1295 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1296 {
1297 cache_item_t *ci;
1299 pthread_mutex_lock(&cache_lock);
1301 ci = cache_queue_head;
1302 while (ci != NULL)
1303 {
1304 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1305 ci = ci->next;
1306 }
1308 pthread_mutex_unlock(&cache_lock);
1310 return send_response(sock, RESP_OK, "in queue.\n");
1311 } /* }}} int handle_request_queue */
1313 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1314 {
1315 char *file, file_tmp[PATH_MAX];
1316 int values_num = 0;
1317 int status;
1318 char orig_buf[CMD_MAX];
1320 cache_item_t *ci;
1322 /* save it for the journal later */
1323 if (!JOURNAL_REPLAY(sock))
1324 strncpy(orig_buf, buffer, buffer_size);
1326 status = buffer_get_field (&buffer, &buffer_size, &file);
1327 if (status != 0)
1328 return syntax_error(sock,cmd);
1330 pthread_mutex_lock(&stats_lock);
1331 stats_updates_received++;
1332 pthread_mutex_unlock(&stats_lock);
1334 get_abs_path(&file, file_tmp);
1335 if (!check_file_access(file, sock)) return 0;
1337 pthread_mutex_lock (&cache_lock);
1338 ci = g_tree_lookup (cache_tree, file);
1340 if (ci == NULL) /* {{{ */
1341 {
1342 struct stat statbuf;
1343 cache_item_t *tmp;
1345 /* don't hold the lock while we setup; stat(2) might block */
1346 pthread_mutex_unlock(&cache_lock);
1348 memset (&statbuf, 0, sizeof (statbuf));
1349 status = stat (file, &statbuf);
1350 if (status != 0)
1351 {
1352 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1354 status = errno;
1355 if (status == ENOENT)
1356 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1357 else
1358 return send_response(sock, RESP_ERR,
1359 "stat failed with error %i.\n", status);
1360 }
1361 if (!S_ISREG (statbuf.st_mode))
1362 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1364 if (access(file, R_OK|W_OK) != 0)
1365 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1366 file, rrd_strerror(errno));
1368 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1369 if (ci == NULL)
1370 {
1371 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1373 return send_response(sock, RESP_ERR, "malloc failed.\n");
1374 }
1375 memset (ci, 0, sizeof (cache_item_t));
1377 ci->file = strdup (file);
1378 if (ci->file == NULL)
1379 {
1380 free (ci);
1381 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1383 return send_response(sock, RESP_ERR, "strdup failed.\n");
1384 }
1386 wipe_ci_values(ci, now);
1387 ci->flags = CI_FLAGS_IN_TREE;
1388 pthread_cond_init(&ci->flushed, NULL);
1390 pthread_mutex_lock(&cache_lock);
1392 /* another UPDATE might have added this entry in the meantime */
1393 tmp = g_tree_lookup (cache_tree, file);
1394 if (tmp == NULL)
1395 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1396 else
1397 {
1398 free_cache_item (ci);
1399 ci = tmp;
1400 }
1402 /* state may have changed while we were unlocked */
1403 if (state == SHUTDOWN)
1404 return -1;
1405 } /* }}} */
1406 assert (ci != NULL);
1408 /* don't re-write updates in replay mode */
1409 if (!JOURNAL_REPLAY(sock))
1410 journal_write("update", orig_buf);
1412 while (buffer_size > 0)
1413 {
1414 char *value;
1415 time_t stamp;
1416 char *eostamp;
1418 status = buffer_get_field (&buffer, &buffer_size, &value);
1419 if (status != 0)
1420 {
1421 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1422 break;
1423 }
1425 /* make sure update time is always moving forward */
1426 stamp = strtol(value, &eostamp, 10);
1427 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1428 {
1429 pthread_mutex_unlock(&cache_lock);
1430 return send_response(sock, RESP_ERR,
1431 "Cannot find timestamp in '%s'!\n", value);
1432 }
1433 else if (stamp <= ci->last_update_stamp)
1434 {
1435 pthread_mutex_unlock(&cache_lock);
1436 return send_response(sock, RESP_ERR,
1437 "illegal attempt to update using time %ld when last"
1438 " update time is %ld (minimum one second step)\n",
1439 stamp, ci->last_update_stamp);
1440 }
1441 else
1442 ci->last_update_stamp = stamp;
1444 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1445 {
1446 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1447 continue;
1448 }
1450 values_num++;
1451 }
1453 if (((now - ci->last_flush_time) >= config_write_interval)
1454 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1455 && (ci->values_num > 0))
1456 {
1457 enqueue_cache_item (ci, TAIL);
1458 }
1460 pthread_mutex_unlock (&cache_lock);
1462 if (values_num < 1)
1463 return send_response(sock, RESP_ERR, "No values updated.\n");
1464 else
1465 return send_response(sock, RESP_OK,
1466 "errors, enqueued %i value(s).\n", values_num);
1468 /* NOTREACHED */
1469 assert(1==0);
1471 } /* }}} int handle_request_update */
1473 /* we came across a "WROTE" entry during journal replay.
1474 * throw away any values that we have accumulated for this file
1475 */
1476 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1477 {
1478 cache_item_t *ci;
1479 const char *file = buffer;
1481 pthread_mutex_lock(&cache_lock);
1483 ci = g_tree_lookup(cache_tree, file);
1484 if (ci == NULL)
1485 {
1486 pthread_mutex_unlock(&cache_lock);
1487 return (0);
1488 }
1490 if (ci->values)
1491 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1493 wipe_ci_values(ci, now);
1494 remove_from_queue(ci);
1496 pthread_mutex_unlock(&cache_lock);
1497 return (0);
1498 } /* }}} int handle_request_wrote */
1500 /* start "BATCH" processing */
1501 static int batch_start (HANDLER_PROTO) /* {{{ */
1502 {
1503 int status;
1504 if (sock->batch_start)
1505 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1507 status = send_response(sock, RESP_OK,
1508 "Go ahead. End with dot '.' on its own line.\n");
1509 sock->batch_start = time(NULL);
1510 sock->batch_cmd = 0;
1512 return status;
1513 } /* }}} static int batch_start */
1515 /* finish "BATCH" processing and return results to the client */
1516 static int batch_done (HANDLER_PROTO) /* {{{ */
1517 {
1518 assert(sock->batch_start);
1519 sock->batch_start = 0;
1520 sock->batch_cmd = 0;
1521 return send_response(sock, RESP_OK, "errors\n");
1522 } /* }}} static int batch_done */
1524 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1525 {
1526 return -1;
1527 } /* }}} static int handle_request_quit */
1529 static command_t list_of_commands[] = { /* {{{ */
1530 {
1531 "UPDATE",
1532 handle_request_update,
1533 CMD_CONTEXT_ANY,
1534 "UPDATE <filename> <values> [<values> ...]\n"
1535 ,
1536 "Adds the given file to the internal cache if it is not yet known and\n"
1537 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1538 "for details.\n"
1539 "\n"
1540 "Each <values> has the following form:\n"
1541 " <values> = <time>:<value>[:<value>[...]]\n"
1542 "See the rrdupdate(1) manpage for details.\n"
1543 },
1544 {
1545 "WROTE",
1546 handle_request_wrote,
1547 CMD_CONTEXT_JOURNAL,
1548 NULL,
1549 NULL
1550 },
1551 {
1552 "FLUSH",
1553 handle_request_flush,
1554 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1555 "FLUSH <filename>\n"
1556 ,
1557 "Adds the given filename to the head of the update queue and returns\n"
1558 "after it has been dequeued.\n"
1559 },
1560 {
1561 "FLUSHALL",
1562 handle_request_flushall,
1563 CMD_CONTEXT_CLIENT,
1564 "FLUSHALL\n"
1565 ,
1566 "Triggers writing of all pending updates. Returns immediately.\n"
1567 },
1568 {
1569 "PENDING",
1570 handle_request_pending,
1571 CMD_CONTEXT_CLIENT,
1572 "PENDING <filename>\n"
1573 ,
1574 "Shows any 'pending' updates for a file, in order.\n"
1575 "The updates shown have not yet been written to the underlying RRD file.\n"
1576 },
1577 {
1578 "FORGET",
1579 handle_request_forget,
1580 CMD_CONTEXT_ANY,
1581 "FORGET <filename>\n"
1582 ,
1583 "Removes the file completely from the cache.\n"
1584 "Any pending updates for the file will be lost.\n"
1585 },
1586 {
1587 "QUEUE",
1588 handle_request_queue,
1589 CMD_CONTEXT_CLIENT,
1590 "QUEUE\n"
1591 ,
1592 "Shows all files in the output queue.\n"
1593 "The output is zero or more lines in the following format:\n"
1594 "(where <num_vals> is the number of values to be written)\n"
1595 "\n"
1596 "<num_vals> <filename>\n"
1597 },
1598 {
1599 "STATS",
1600 handle_request_stats,
1601 CMD_CONTEXT_CLIENT,
1602 "STATS\n"
1603 ,
1604 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1605 "a description of the values.\n"
1606 },
1607 {
1608 "HELP",
1609 handle_request_help,
1610 CMD_CONTEXT_CLIENT,
1611 "HELP [<command>]\n",
1612 NULL, /* special! */
1613 },
1614 {
1615 "BATCH",
1616 batch_start,
1617 CMD_CONTEXT_CLIENT,
1618 "BATCH\n"
1619 ,
1620 "The 'BATCH' command permits the client to initiate a bulk load\n"
1621 " of commands to rrdcached.\n"
1622 "\n"
1623 "Usage:\n"
1624 "\n"
1625 " client: BATCH\n"
1626 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1627 " client: command #1\n"
1628 " client: command #2\n"
1629 " client: ... and so on\n"
1630 " client: .\n"
1631 " server: 2 errors\n"
1632 " server: 7 message for command #7\n"
1633 " server: 9 message for command #9\n"
1634 "\n"
1635 "For more information, consult the rrdcached(1) documentation.\n"
1636 },
1637 {
1638 ".", /* BATCH terminator */
1639 batch_done,
1640 CMD_CONTEXT_BATCH,
1641 NULL,
1642 NULL
1643 },
1644 {
1645 "QUIT",
1646 handle_request_quit,
1647 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1648 "QUIT\n"
1649 ,
1650 "Disconnect from rrdcached.\n"
1651 }
1652 }; /* }}} command_t list_of_commands[] */
1653 static size_t list_of_commands_len = sizeof (list_of_commands)
1654 / sizeof (list_of_commands[0]);
1656 static command_t *find_command(char *cmd)
1657 {
1658 size_t i;
1660 for (i = 0; i < list_of_commands_len; i++)
1661 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1662 return (&list_of_commands[i]);
1663 return NULL;
1664 }
1666 /* We currently use the index in the `list_of_commands' array as a bit position
1667 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1668 * outside these functions so that switching to a more elegant storage method
1669 * is easily possible. */
1670 static ssize_t find_command_index (const char *cmd) /* {{{ */
1671 {
1672 size_t i;
1674 for (i = 0; i < list_of_commands_len; i++)
1675 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1676 return ((ssize_t) i);
1677 return (-1);
1678 } /* }}} ssize_t find_command_index */
1680 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1681 const char *cmd)
1682 {
1683 ssize_t i;
1685 if (JOURNAL_REPLAY(sock))
1686 return (1);
1688 if (cmd == NULL)
1689 return (-1);
1691 if ((strcasecmp ("QUIT", cmd) == 0)
1692 || (strcasecmp ("HELP", cmd) == 0))
1693 return (1);
1694 else if (strcmp (".", cmd) == 0)
1695 cmd = "BATCH";
1697 i = find_command_index (cmd);
1698 if (i < 0)
1699 return (-1);
1700 assert (i < 32);
1702 if ((sock->permissions & (1 << i)) != 0)
1703 return (1);
1704 return (0);
1705 } /* }}} int socket_permission_check */
1707 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1708 const char *cmd)
1709 {
1710 ssize_t i;
1712 i = find_command_index (cmd);
1713 if (i < 0)
1714 return (-1);
1715 assert (i < 32);
1717 sock->permissions |= (1 << i);
1718 return (0);
1719 } /* }}} int socket_permission_add */
1721 /* check whether commands are received in the expected context */
1722 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1723 {
1724 if (JOURNAL_REPLAY(sock))
1725 return (cmd->context & CMD_CONTEXT_JOURNAL);
1726 else if (sock->batch_start)
1727 return (cmd->context & CMD_CONTEXT_BATCH);
1728 else
1729 return (cmd->context & CMD_CONTEXT_CLIENT);
1731 /* NOTREACHED */
1732 assert(1==0);
1733 }
1735 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1736 {
1737 int status;
1738 char *cmd_str;
1739 char *resp_txt;
1740 command_t *help = NULL;
1742 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1743 if (status == 0)
1744 help = find_command(cmd_str);
1746 if (help && (help->syntax || help->help))
1747 {
1748 char tmp[CMD_MAX];
1750 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1751 resp_txt = tmp;
1753 if (help->syntax)
1754 add_response_info(sock, "Usage: %s\n", help->syntax);
1756 if (help->help)
1757 add_response_info(sock, "%s\n", help->help);
1758 }
1759 else
1760 {
1761 size_t i;
1763 resp_txt = "Command overview\n";
1765 for (i = 0; i < list_of_commands_len; i++)
1766 {
1767 if (list_of_commands[i].syntax == NULL)
1768 continue;
1769 add_response_info (sock, "%s", list_of_commands[i].syntax);
1770 }
1771 }
1773 return send_response(sock, RESP_OK, resp_txt);
1774 } /* }}} int handle_request_help */
1776 static int handle_request (DISPATCH_PROTO) /* {{{ */
1777 {
1778 char *buffer_ptr = buffer;
1779 char *cmd_str = NULL;
1780 command_t *cmd = NULL;
1781 int status;
1783 assert (buffer[buffer_size - 1] == '\0');
1785 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1786 if (status != 0)
1787 {
1788 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1789 return (-1);
1790 }
1792 if (sock != NULL && sock->batch_start)
1793 sock->batch_cmd++;
1795 cmd = find_command(cmd_str);
1796 if (!cmd)
1797 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1799 if (!socket_permission_check (sock, cmd->cmd))
1800 return send_response(sock, RESP_ERR, "Permission denied.\n");
1802 if (!command_check_context(sock, cmd))
1803 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1805 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1806 } /* }}} int handle_request */
1808 static void journal_set_free (journal_set *js) /* {{{ */
1809 {
1810 if (js == NULL)
1811 return;
1813 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1815 free(js);
1816 } /* }}} journal_set_free */
1818 static void journal_set_remove (journal_set *js) /* {{{ */
1819 {
1820 if (js == NULL)
1821 return;
1823 for (uint i=0; i < js->files_num; i++)
1824 {
1825 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1826 unlink(js->files[i]);
1827 }
1828 } /* }}} journal_set_remove */
1830 /* close current journal file handle.
1831 * MUST hold journal_lock before calling */
1832 static void journal_close(void) /* {{{ */
1833 {
1834 if (journal_fh != NULL)
1835 {
1836 if (fclose(journal_fh) != 0)
1837 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1838 }
1840 journal_fh = NULL;
1841 journal_size = 0;
1842 } /* }}} journal_close */
1844 /* MUST hold journal_lock before calling */
1845 static void journal_new_file(void) /* {{{ */
1846 {
1847 struct timeval now;
1848 int new_fd;
1849 char new_file[PATH_MAX + 1];
1851 assert(journal_dir != NULL);
1852 assert(journal_cur != NULL);
1854 journal_close();
1856 gettimeofday(&now, NULL);
1857 /* this format assures that the files sort in strcmp() order */
1858 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1859 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1861 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1862 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1863 if (new_fd < 0)
1864 goto error;
1866 journal_fh = fdopen(new_fd, "a");
1867 if (journal_fh == NULL)
1868 goto error;
1870 journal_size = ftell(journal_fh);
1871 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1873 /* record the file in the journal set */
1874 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1876 return;
1878 error:
1879 RRDD_LOG(LOG_CRIT,
1880 "JOURNALING DISABLED: Error while trying to create %s : %s",
1881 new_file, rrd_strerror(errno));
1882 RRDD_LOG(LOG_CRIT,
1883 "JOURNALING DISABLED: All values will be flushed at shutdown");
1885 close(new_fd);
1886 config_flush_at_shutdown = 1;
1888 } /* }}} journal_new_file */
1890 /* MUST NOT hold journal_lock before calling this */
1891 static void journal_rotate(void) /* {{{ */
1892 {
1893 journal_set *old_js = NULL;
1895 if (journal_dir == NULL)
1896 return;
1898 RRDD_LOG(LOG_DEBUG, "rotating journals");
1900 pthread_mutex_lock(&stats_lock);
1901 ++stats_journal_rotate;
1902 pthread_mutex_unlock(&stats_lock);
1904 pthread_mutex_lock(&journal_lock);
1906 journal_close();
1908 /* rotate the journal sets */
1909 old_js = journal_old;
1910 journal_old = journal_cur;
1911 journal_cur = calloc(1, sizeof(journal_set));
1913 if (journal_cur != NULL)
1914 journal_new_file();
1915 else
1916 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1918 pthread_mutex_unlock(&journal_lock);
1920 journal_set_remove(old_js);
1921 journal_set_free (old_js);
1923 } /* }}} static void journal_rotate */
1925 /* MUST hold journal_lock when calling */
1926 static void journal_done(void) /* {{{ */
1927 {
1928 if (journal_cur == NULL)
1929 return;
1931 journal_close();
1933 if (config_flush_at_shutdown)
1934 {
1935 RRDD_LOG(LOG_INFO, "removing journals");
1936 journal_set_remove(journal_old);
1937 journal_set_remove(journal_cur);
1938 }
1939 else
1940 {
1941 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1942 "journals will be used at next startup");
1943 }
1945 journal_set_free(journal_cur);
1946 journal_set_free(journal_old);
1947 free(journal_dir);
1949 } /* }}} static void journal_done */
1951 static int journal_write(char *cmd, char *args) /* {{{ */
1952 {
1953 int chars;
1955 if (journal_fh == NULL)
1956 return 0;
1958 pthread_mutex_lock(&journal_lock);
1959 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1960 journal_size += chars;
1962 if (journal_size > JOURNAL_MAX)
1963 journal_new_file();
1965 pthread_mutex_unlock(&journal_lock);
1967 if (chars > 0)
1968 {
1969 pthread_mutex_lock(&stats_lock);
1970 stats_journal_bytes += chars;
1971 pthread_mutex_unlock(&stats_lock);
1972 }
1974 return chars;
1975 } /* }}} static int journal_write */
1977 static int journal_replay (const char *file) /* {{{ */
1978 {
1979 FILE *fh;
1980 int entry_cnt = 0;
1981 int fail_cnt = 0;
1982 uint64_t line = 0;
1983 char entry[CMD_MAX];
1984 time_t now;
1986 if (file == NULL) return 0;
1988 {
1989 char *reason = "unknown error";
1990 int status = 0;
1991 struct stat statbuf;
1993 memset(&statbuf, 0, sizeof(statbuf));
1994 if (stat(file, &statbuf) != 0)
1995 {
1996 reason = "stat error";
1997 status = errno;
1998 }
1999 else if (!S_ISREG(statbuf.st_mode))
2000 {
2001 reason = "not a regular file";
2002 status = EPERM;
2003 }
2004 if (statbuf.st_uid != daemon_uid)
2005 {
2006 reason = "not owned by daemon user";
2007 status = EACCES;
2008 }
2009 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2010 {
2011 reason = "must not be user/group writable";
2012 status = EACCES;
2013 }
2015 if (status != 0)
2016 {
2017 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2018 file, rrd_strerror(status), reason);
2019 return 0;
2020 }
2021 }
2023 fh = fopen(file, "r");
2024 if (fh == NULL)
2025 {
2026 if (errno != ENOENT)
2027 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2028 file, rrd_strerror(errno));
2029 return 0;
2030 }
2031 else
2032 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2034 now = time(NULL);
2036 while(!feof(fh))
2037 {
2038 size_t entry_len;
2040 ++line;
2041 if (fgets(entry, sizeof(entry), fh) == NULL)
2042 break;
2043 entry_len = strlen(entry);
2045 /* check \n termination in case journal writing crashed mid-line */
2046 if (entry_len == 0)
2047 continue;
2048 else if (entry[entry_len - 1] != '\n')
2049 {
2050 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2051 ++fail_cnt;
2052 continue;
2053 }
2055 entry[entry_len - 1] = '\0';
2057 if (handle_request(NULL, now, entry, entry_len) == 0)
2058 ++entry_cnt;
2059 else
2060 ++fail_cnt;
2061 }
2063 fclose(fh);
2065 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2066 entry_cnt, fail_cnt);
2068 return entry_cnt > 0 ? 1 : 0;
2069 } /* }}} static int journal_replay */
2071 static int journal_sort(const void *v1, const void *v2)
2072 {
2073 char **jn1 = (char **) v1;
2074 char **jn2 = (char **) v2;
2076 return strcmp(*jn1,*jn2);
2077 }
2079 static void journal_init(void) /* {{{ */
2080 {
2081 int had_journal = 0;
2082 DIR *dir;
2083 struct dirent *dent;
2084 char path[PATH_MAX+1];
2086 if (journal_dir == NULL) return;
2088 pthread_mutex_lock(&journal_lock);
2090 journal_cur = calloc(1, sizeof(journal_set));
2091 if (journal_cur == NULL)
2092 {
2093 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2094 return;
2095 }
2097 RRDD_LOG(LOG_INFO, "checking for journal files");
2099 /* Handle old journal files during transition. This gives them the
2100 * correct sort order. TODO: remove after first release
2101 */
2102 {
2103 char old_path[PATH_MAX+1];
2104 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2105 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2106 rename(old_path, path);
2108 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2109 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2110 rename(old_path, path);
2111 }
2113 dir = opendir(journal_dir);
2114 while ((dent = readdir(dir)) != NULL)
2115 {
2116 /* looks like a journal file? */
2117 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2118 continue;
2120 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2122 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2123 {
2124 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2125 dent->d_name);
2126 break;
2127 }
2128 }
2129 closedir(dir);
2131 qsort(journal_cur->files, journal_cur->files_num,
2132 sizeof(journal_cur->files[0]), journal_sort);
2134 for (uint i=0; i < journal_cur->files_num; i++)
2135 had_journal += journal_replay(journal_cur->files[i]);
2137 journal_new_file();
2139 /* it must have been a crash. start a flush */
2140 if (had_journal && config_flush_at_shutdown)
2141 flush_old_values(-1);
2143 pthread_mutex_unlock(&journal_lock);
2145 RRDD_LOG(LOG_INFO, "journal processing complete");
2147 } /* }}} static void journal_init */
2149 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2150 {
2151 assert(sock != NULL);
2153 free(sock->rbuf); sock->rbuf = NULL;
2154 free(sock->wbuf); sock->wbuf = NULL;
2155 free(sock);
2156 } /* }}} void free_listen_socket */
2158 static void close_connection(listen_socket_t *sock) /* {{{ */
2159 {
2160 if (sock->fd >= 0)
2161 {
2162 close(sock->fd);
2163 sock->fd = -1;
2164 }
2166 free_listen_socket(sock);
2168 } /* }}} void close_connection */
2170 static void *connection_thread_main (void *args) /* {{{ */
2171 {
2172 listen_socket_t *sock;
2173 int fd;
2175 sock = (listen_socket_t *) args;
2176 fd = sock->fd;
2178 /* init read buffers */
2179 sock->next_read = sock->next_cmd = 0;
2180 sock->rbuf = malloc(RBUF_SIZE);
2181 if (sock->rbuf == NULL)
2182 {
2183 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2184 close_connection(sock);
2185 return NULL;
2186 }
2188 pthread_mutex_lock (&connection_threads_lock);
2189 connection_threads_num++;
2190 pthread_mutex_unlock (&connection_threads_lock);
2192 while (state == RUNNING)
2193 {
2194 char *cmd;
2195 ssize_t cmd_len;
2196 ssize_t rbytes;
2197 time_t now;
2199 struct pollfd pollfd;
2200 int status;
2202 pollfd.fd = fd;
2203 pollfd.events = POLLIN | POLLPRI;
2204 pollfd.revents = 0;
2206 status = poll (&pollfd, 1, /* timeout = */ 500);
2207 if (state != RUNNING)
2208 break;
2209 else if (status == 0) /* timeout */
2210 continue;
2211 else if (status < 0) /* error */
2212 {
2213 status = errno;
2214 if (status != EINTR)
2215 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2216 continue;
2217 }
2219 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2220 break;
2221 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2222 {
2223 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2224 "poll(2) returned something unexpected: %#04hx",
2225 pollfd.revents);
2226 break;
2227 }
2229 rbytes = read(fd, sock->rbuf + sock->next_read,
2230 RBUF_SIZE - sock->next_read);
2231 if (rbytes < 0)
2232 {
2233 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2234 break;
2235 }
2236 else if (rbytes == 0)
2237 break; /* eof */
2239 sock->next_read += rbytes;
2241 if (sock->batch_start)
2242 now = sock->batch_start;
2243 else
2244 now = time(NULL);
2246 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2247 {
2248 status = handle_request (sock, now, cmd, cmd_len+1);
2249 if (status != 0)
2250 goto out_close;
2251 }
2252 }
2254 out_close:
2255 close_connection(sock);
2257 /* Remove this thread from the connection threads list */
2258 pthread_mutex_lock (&connection_threads_lock);
2259 connection_threads_num--;
2260 if (connection_threads_num <= 0)
2261 pthread_cond_broadcast(&connection_threads_done);
2262 pthread_mutex_unlock (&connection_threads_lock);
2264 return (NULL);
2265 } /* }}} void *connection_thread_main */
2267 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2268 {
2269 int fd;
2270 struct sockaddr_un sa;
2271 listen_socket_t *temp;
2272 int status;
2273 const char *path;
2274 char *path_copy, *dir;
2276 path = sock->addr;
2277 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2278 path += strlen("unix:");
2280 /* dirname may modify its argument */
2281 path_copy = strdup(path);
2282 if (path_copy == NULL)
2283 {
2284 fprintf(stderr, "rrdcached: strdup(): %s\n",
2285 rrd_strerror(errno));
2286 return (-1);
2287 }
2289 dir = dirname(path_copy);
2290 if (rrd_mkdir_p(dir, 0777) != 0)
2291 {
2292 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2293 dir, rrd_strerror(errno));
2294 return (-1);
2295 }
2297 free(path_copy);
2299 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2300 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2301 if (temp == NULL)
2302 {
2303 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2304 return (-1);
2305 }
2306 listen_fds = temp;
2307 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2309 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2310 if (fd < 0)
2311 {
2312 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2313 rrd_strerror(errno));
2314 return (-1);
2315 }
2317 memset (&sa, 0, sizeof (sa));
2318 sa.sun_family = AF_UNIX;
2319 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2321 /* if we've gotten this far, we own the pid file. any daemon started
2322 * with the same args must not be alive. therefore, ensure that we can
2323 * create the socket...
2324 */
2325 unlink(path);
2327 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2328 if (status != 0)
2329 {
2330 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2331 path, rrd_strerror(errno));
2332 close (fd);
2333 return (-1);
2334 }
2336 /* tweak the sockets group ownership */
2337 if (sock->socket_group != (gid_t)-1)
2338 {
2339 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2340 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2341 {
2342 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2343 }
2344 }
2346 if (sock->socket_permissions != (mode_t)-1)
2347 {
2348 if (chmod(path, sock->socket_permissions) != 0)
2349 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2350 (unsigned int)sock->socket_permissions, strerror(errno));
2351 }
2353 status = listen (fd, /* backlog = */ 10);
2354 if (status != 0)
2355 {
2356 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2357 path, rrd_strerror(errno));
2358 close (fd);
2359 unlink (path);
2360 return (-1);
2361 }
2363 listen_fds[listen_fds_num].fd = fd;
2364 listen_fds[listen_fds_num].family = PF_UNIX;
2365 strncpy(listen_fds[listen_fds_num].addr, path,
2366 sizeof (listen_fds[listen_fds_num].addr) - 1);
2367 listen_fds_num++;
2369 return (0);
2370 } /* }}} int open_listen_socket_unix */
2372 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2373 {
2374 struct addrinfo ai_hints;
2375 struct addrinfo *ai_res;
2376 struct addrinfo *ai_ptr;
2377 char addr_copy[NI_MAXHOST];
2378 char *addr;
2379 char *port;
2380 int status;
2382 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2383 addr_copy[sizeof (addr_copy) - 1] = 0;
2384 addr = addr_copy;
2386 memset (&ai_hints, 0, sizeof (ai_hints));
2387 ai_hints.ai_flags = 0;
2388 #ifdef AI_ADDRCONFIG
2389 ai_hints.ai_flags |= AI_ADDRCONFIG;
2390 #endif
2391 ai_hints.ai_family = AF_UNSPEC;
2392 ai_hints.ai_socktype = SOCK_STREAM;
2394 port = NULL;
2395 if (*addr == '[') /* IPv6+port format */
2396 {
2397 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2398 addr++;
2400 port = strchr (addr, ']');
2401 if (port == NULL)
2402 {
2403 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2404 return (-1);
2405 }
2406 *port = 0;
2407 port++;
2409 if (*port == ':')
2410 port++;
2411 else if (*port == 0)
2412 port = NULL;
2413 else
2414 {
2415 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2416 return (-1);
2417 }
2418 } /* if (*addr == '[') */
2419 else
2420 {
2421 port = rindex(addr, ':');
2422 if (port != NULL)
2423 {
2424 *port = 0;
2425 port++;
2426 }
2427 }
2428 ai_res = NULL;
2429 status = getaddrinfo (addr,
2430 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2431 &ai_hints, &ai_res);
2432 if (status != 0)
2433 {
2434 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2435 addr, gai_strerror (status));
2436 return (-1);
2437 }
2439 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2440 {
2441 int fd;
2442 listen_socket_t *temp;
2443 int one = 1;
2445 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2446 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2447 if (temp == NULL)
2448 {
2449 fprintf (stderr,
2450 "rrdcached: open_listen_socket_network: realloc failed.\n");
2451 continue;
2452 }
2453 listen_fds = temp;
2454 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2456 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2457 if (fd < 0)
2458 {
2459 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2460 rrd_strerror(errno));
2461 continue;
2462 }
2464 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2466 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2467 if (status != 0)
2468 {
2469 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2470 sock->addr, rrd_strerror(errno));
2471 close (fd);
2472 continue;
2473 }
2475 status = listen (fd, /* backlog = */ 10);
2476 if (status != 0)
2477 {
2478 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2479 sock->addr, rrd_strerror(errno));
2480 close (fd);
2481 freeaddrinfo(ai_res);
2482 return (-1);
2483 }
2485 listen_fds[listen_fds_num].fd = fd;
2486 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2487 listen_fds_num++;
2488 } /* for (ai_ptr) */
2490 freeaddrinfo(ai_res);
2491 return (0);
2492 } /* }}} static int open_listen_socket_network */
2494 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2495 {
2496 assert(sock != NULL);
2497 assert(sock->addr != NULL);
2499 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2500 || sock->addr[0] == '/')
2501 return (open_listen_socket_unix(sock));
2502 else
2503 return (open_listen_socket_network(sock));
2504 } /* }}} int open_listen_socket */
2506 static int close_listen_sockets (void) /* {{{ */
2507 {
2508 size_t i;
2510 for (i = 0; i < listen_fds_num; i++)
2511 {
2512 close (listen_fds[i].fd);
2514 if (listen_fds[i].family == PF_UNIX)
2515 unlink(listen_fds[i].addr);
2516 }
2518 free (listen_fds);
2519 listen_fds = NULL;
2520 listen_fds_num = 0;
2522 return (0);
2523 } /* }}} int close_listen_sockets */
2525 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2526 {
2527 struct pollfd *pollfds;
2528 int pollfds_num;
2529 int status;
2530 int i;
2532 if (listen_fds_num < 1)
2533 {
2534 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2535 return (NULL);
2536 }
2538 pollfds_num = listen_fds_num;
2539 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2540 if (pollfds == NULL)
2541 {
2542 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2543 return (NULL);
2544 }
2545 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2547 RRDD_LOG(LOG_INFO, "listening for connections");
2549 while (state == RUNNING)
2550 {
2551 for (i = 0; i < pollfds_num; i++)
2552 {
2553 pollfds[i].fd = listen_fds[i].fd;
2554 pollfds[i].events = POLLIN | POLLPRI;
2555 pollfds[i].revents = 0;
2556 }
2558 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2559 if (state != RUNNING)
2560 break;
2561 else if (status == 0) /* timeout */
2562 continue;
2563 else if (status < 0) /* error */
2564 {
2565 status = errno;
2566 if (status != EINTR)
2567 {
2568 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2569 }
2570 continue;
2571 }
2573 for (i = 0; i < pollfds_num; i++)
2574 {
2575 listen_socket_t *client_sock;
2576 struct sockaddr_storage client_sa;
2577 socklen_t client_sa_size;
2578 pthread_t tid;
2579 pthread_attr_t attr;
2581 if (pollfds[i].revents == 0)
2582 continue;
2584 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2585 {
2586 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2587 "poll(2) returned something unexpected for listen FD #%i.",
2588 pollfds[i].fd);
2589 continue;
2590 }
2592 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2593 if (client_sock == NULL)
2594 {
2595 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2596 continue;
2597 }
2598 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2600 client_sa_size = sizeof (client_sa);
2601 client_sock->fd = accept (pollfds[i].fd,
2602 (struct sockaddr *) &client_sa, &client_sa_size);
2603 if (client_sock->fd < 0)
2604 {
2605 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2606 free(client_sock);
2607 continue;
2608 }
2610 pthread_attr_init (&attr);
2611 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2613 status = pthread_create (&tid, &attr, connection_thread_main,
2614 client_sock);
2615 if (status != 0)
2616 {
2617 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2618 close_connection(client_sock);
2619 continue;
2620 }
2621 } /* for (pollfds_num) */
2622 } /* while (state == RUNNING) */
2624 RRDD_LOG(LOG_INFO, "starting shutdown");
2626 close_listen_sockets ();
2628 pthread_mutex_lock (&connection_threads_lock);
2629 while (connection_threads_num > 0)
2630 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2631 pthread_mutex_unlock (&connection_threads_lock);
2633 free(pollfds);
2635 return (NULL);
2636 } /* }}} void *listen_thread_main */
2638 static int daemonize (void) /* {{{ */
2639 {
2640 int pid_fd;
2641 char *base_dir;
2643 daemon_uid = geteuid();
2645 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2646 if (pid_fd < 0)
2647 pid_fd = check_pidfile();
2648 if (pid_fd < 0)
2649 return pid_fd;
2651 /* open all the listen sockets */
2652 if (config_listen_address_list_len > 0)
2653 {
2654 for (size_t i = 0; i < config_listen_address_list_len; i++)
2655 open_listen_socket (config_listen_address_list[i]);
2657 rrd_free_ptrs((void ***) &config_listen_address_list,
2658 &config_listen_address_list_len);
2659 }
2660 else
2661 {
2662 listen_socket_t sock;
2663 memset(&sock, 0, sizeof(sock));
2664 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2665 open_listen_socket (&sock);
2666 }
2668 if (listen_fds_num < 1)
2669 {
2670 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2671 goto error;
2672 }
2674 if (!stay_foreground)
2675 {
2676 pid_t child;
2678 child = fork ();
2679 if (child < 0)
2680 {
2681 fprintf (stderr, "daemonize: fork(2) failed.\n");
2682 goto error;
2683 }
2684 else if (child > 0)
2685 exit(0);
2687 /* Become session leader */
2688 setsid ();
2690 /* Open the first three file descriptors to /dev/null */
2691 close (2);
2692 close (1);
2693 close (0);
2695 open ("/dev/null", O_RDWR);
2696 if (dup(0) == -1 || dup(0) == -1){
2697 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2698 }
2699 } /* if (!stay_foreground) */
2701 /* Change into the /tmp directory. */
2702 base_dir = (config_base_dir != NULL)
2703 ? config_base_dir
2704 : "/tmp";
2706 if (chdir (base_dir) != 0)
2707 {
2708 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2709 goto error;
2710 }
2712 install_signal_handlers();
2714 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2715 RRDD_LOG(LOG_INFO, "starting up");
2717 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2718 (GDestroyNotify) free_cache_item);
2719 if (cache_tree == NULL)
2720 {
2721 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2722 goto error;
2723 }
2725 return write_pidfile (pid_fd);
2727 error:
2728 remove_pidfile();
2729 return -1;
2730 } /* }}} int daemonize */
2732 static int cleanup (void) /* {{{ */
2733 {
2734 pthread_cond_broadcast (&flush_cond);
2735 pthread_join (flush_thread, NULL);
2737 pthread_cond_broadcast (&queue_cond);
2738 for (int i = 0; i < config_queue_threads; i++)
2739 pthread_join (queue_threads[i], NULL);
2741 if (config_flush_at_shutdown)
2742 {
2743 assert(cache_queue_head == NULL);
2744 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2745 }
2747 free(queue_threads);
2748 free(config_base_dir);
2750 pthread_mutex_lock(&cache_lock);
2751 g_tree_destroy(cache_tree);
2753 pthread_mutex_lock(&journal_lock);
2754 journal_done();
2756 RRDD_LOG(LOG_INFO, "goodbye");
2757 closelog ();
2759 remove_pidfile ();
2760 free(config_pid_file);
2762 return (0);
2763 } /* }}} int cleanup */
2765 static int read_options (int argc, char **argv) /* {{{ */
2766 {
2767 int option;
2768 int status = 0;
2770 char **permissions = NULL;
2771 size_t permissions_len = 0;
2773 gid_t socket_group = (gid_t)-1;
2774 mode_t socket_permissions = (mode_t)-1;
2776 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2777 {
2778 switch (option)
2779 {
2780 case 'g':
2781 stay_foreground=1;
2782 break;
2784 case 'l':
2785 {
2786 listen_socket_t *new;
2788 new = malloc(sizeof(listen_socket_t));
2789 if (new == NULL)
2790 {
2791 fprintf(stderr, "read_options: malloc failed.\n");
2792 return(2);
2793 }
2794 memset(new, 0, sizeof(listen_socket_t));
2796 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2798 /* Add permissions to the socket {{{ */
2799 if (permissions_len != 0)
2800 {
2801 size_t i;
2802 for (i = 0; i < permissions_len; i++)
2803 {
2804 status = socket_permission_add (new, permissions[i]);
2805 if (status != 0)
2806 {
2807 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2808 "socket failed. Most likely, this permission doesn't "
2809 "exist. Check your command line.\n", permissions[i]);
2810 status = 4;
2811 }
2812 }
2813 }
2814 else /* if (permissions_len == 0) */
2815 {
2816 /* Add permission for ALL commands to the socket. */
2817 size_t i;
2818 for (i = 0; i < list_of_commands_len; i++)
2819 {
2820 status = socket_permission_add (new, list_of_commands[i].cmd);
2821 if (status != 0)
2822 {
2823 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2824 "socket failed. This should never happen, ever! Sorry.\n",
2825 permissions[i]);
2826 status = 4;
2827 }
2828 }
2829 }
2830 /* }}} Done adding permissions. */
2832 new->socket_group = socket_group;
2833 new->socket_permissions = socket_permissions;
2835 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2836 &config_listen_address_list_len, new))
2837 {
2838 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2839 return (2);
2840 }
2841 }
2842 break;
2844 /* set socket group permissions */
2845 case 's':
2846 {
2847 gid_t group_gid;
2848 struct group *grp;
2850 group_gid = strtoul(optarg, NULL, 10);
2851 if (errno != EINVAL && group_gid>0)
2852 {
2853 /* we were passed a number */
2854 grp = getgrgid(group_gid);
2855 }
2856 else
2857 {
2858 grp = getgrnam(optarg);
2859 }
2861 if (grp)
2862 {
2863 socket_group = grp->gr_gid;
2864 }
2865 else
2866 {
2867 /* no idea what the user wanted... */
2868 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2869 return (5);
2870 }
2871 }
2872 break;
2874 /* set socket file permissions */
2875 case 'm':
2876 {
2877 long tmp;
2878 char *endptr = NULL;
2880 tmp = strtol (optarg, &endptr, 8);
2881 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2882 || (tmp > 07777) || (tmp < 0)) {
2883 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2884 optarg);
2885 return (5);
2886 }
2888 socket_permissions = (mode_t)tmp;
2889 }
2890 break;
2892 case 'P':
2893 {
2894 char *optcopy;
2895 char *saveptr;
2896 char *dummy;
2897 char *ptr;
2899 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2901 optcopy = strdup (optarg);
2902 dummy = optcopy;
2903 saveptr = NULL;
2904 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2905 {
2906 dummy = NULL;
2907 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2908 }
2910 free (optcopy);
2911 }
2912 break;
2914 case 'f':
2915 {
2916 int temp;
2918 temp = atoi (optarg);
2919 if (temp > 0)
2920 config_flush_interval = temp;
2921 else
2922 {
2923 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2924 status = 3;
2925 }
2926 }
2927 break;
2929 case 'w':
2930 {
2931 int temp;
2933 temp = atoi (optarg);
2934 if (temp > 0)
2935 config_write_interval = temp;
2936 else
2937 {
2938 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2939 status = 2;
2940 }
2941 }
2942 break;
2944 case 'z':
2945 {
2946 int temp;
2948 temp = atoi(optarg);
2949 if (temp > 0)
2950 config_write_jitter = temp;
2951 else
2952 {
2953 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2954 status = 2;
2955 }
2957 break;
2958 }
2960 case 't':
2961 {
2962 int threads;
2963 threads = atoi(optarg);
2964 if (threads >= 1)
2965 config_queue_threads = threads;
2966 else
2967 {
2968 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2969 return 1;
2970 }
2971 }
2972 break;
2974 case 'B':
2975 config_write_base_only = 1;
2976 break;
2978 case 'b':
2979 {
2980 size_t len;
2981 char base_realpath[PATH_MAX];
2983 if (config_base_dir != NULL)
2984 free (config_base_dir);
2985 config_base_dir = strdup (optarg);
2986 if (config_base_dir == NULL)
2987 {
2988 fprintf (stderr, "read_options: strdup failed.\n");
2989 return (3);
2990 }
2992 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2993 {
2994 fprintf (stderr, "Failed to create base directory '%s': %s\n",
2995 config_base_dir, rrd_strerror (errno));
2996 return (3);
2997 }
2999 /* make sure that the base directory is not resolved via
3000 * symbolic links. this makes some performance-enhancing
3001 * assumptions possible (we don't have to resolve paths
3002 * that start with a "/")
3003 */
3004 if (realpath(config_base_dir, base_realpath) == NULL)
3005 {
3006 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3007 "%s\n", config_base_dir, rrd_strerror(errno));
3008 return 5;
3009 }
3011 len = strlen (config_base_dir);
3012 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3013 {
3014 config_base_dir[len - 1] = 0;
3015 len--;
3016 }
3018 if (len < 1)
3019 {
3020 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3021 return (4);
3022 }
3024 _config_base_dir_len = len;
3026 len = strlen (base_realpath);
3027 while ((len > 0) && (base_realpath[len - 1] == '/'))
3028 {
3029 base_realpath[len - 1] = '\0';
3030 len--;
3031 }
3033 if (strncmp(config_base_dir,
3034 base_realpath, sizeof(base_realpath)) != 0)
3035 {
3036 fprintf(stderr,
3037 "Base directory (-b) resolved via file system links!\n"
3038 "Please consult rrdcached '-b' documentation!\n"
3039 "Consider specifying the real directory (%s)\n",
3040 base_realpath);
3041 return 5;
3042 }
3043 }
3044 break;
3046 case 'p':
3047 {
3048 if (config_pid_file != NULL)
3049 free (config_pid_file);
3050 config_pid_file = strdup (optarg);
3051 if (config_pid_file == NULL)
3052 {
3053 fprintf (stderr, "read_options: strdup failed.\n");
3054 return (3);
3055 }
3056 }
3057 break;
3059 case 'F':
3060 config_flush_at_shutdown = 1;
3061 break;
3063 case 'j':
3064 {
3065 const char *dir = journal_dir = strdup(optarg);
3067 status = rrd_mkdir_p(dir, 0777);
3068 if (status != 0)
3069 {
3070 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3071 dir, rrd_strerror(errno));
3072 return 6;
3073 }
3075 if (access(dir, R_OK|W_OK|X_OK) != 0)
3076 {
3077 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3078 errno ? rrd_strerror(errno) : "");
3079 return 6;
3080 }
3081 }
3082 break;
3084 case 'h':
3085 case '?':
3086 printf ("RRDCacheD %s\n"
3087 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3088 "\n"
3089 "Usage: rrdcached [options]\n"
3090 "\n"
3091 "Valid options are:\n"
3092 " -l <address> Socket address to listen to.\n"
3093 " -P <perms> Sets the permissions to assign to all following "
3094 "sockets\n"
3095 " -w <seconds> Interval in which to write data.\n"
3096 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3097 " -t <threads> Number of write threads.\n"
3098 " -f <seconds> Interval in which to flush dead data.\n"
3099 " -p <file> Location of the PID-file.\n"
3100 " -b <dir> Base directory to change to.\n"
3101 " -B Restrict file access to paths within -b <dir>\n"
3102 " -g Do not fork and run in the foreground.\n"
3103 " -j <dir> Directory in which to create the journal files.\n"
3104 " -F Always flush all updates at shutdown\n"
3105 " -s <id|name> Group owner of all following UNIX sockets\n"
3106 " (the socket will also have read/write permissions "
3107 "for that group)\n"
3108 " -m <mode> File permissions (octal) of all following UNIX "
3109 "sockets\n"
3110 "\n"
3111 "For more information and a detailed description of all options "
3112 "please refer\n"
3113 "to the rrdcached(1) manual page.\n",
3114 VERSION);
3115 if (option == 'h')
3116 status = -1;
3117 else
3118 status = 1;
3119 break;
3120 } /* switch (option) */
3121 } /* while (getopt) */
3123 /* advise the user when values are not sane */
3124 if (config_flush_interval < 2 * config_write_interval)
3125 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3126 " 2x write interval (-w) !\n");
3127 if (config_write_jitter > config_write_interval)
3128 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3129 " write interval (-w) !\n");
3131 if (config_write_base_only && config_base_dir == NULL)
3132 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3133 " Consult the rrdcached documentation\n");
3135 if (journal_dir == NULL)
3136 config_flush_at_shutdown = 1;
3138 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3140 return (status);
3141 } /* }}} int read_options */
3143 int main (int argc, char **argv)
3144 {
3145 int status;
3147 status = read_options (argc, argv);
3148 if (status != 0)
3149 {
3150 if (status < 0)
3151 status = 0;
3152 return (status);
3153 }
3155 status = daemonize ();
3156 if (status != 0)
3157 {
3158 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3159 return (1);
3160 }
3162 journal_init();
3164 /* start the queue threads */
3165 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3166 if (queue_threads == NULL)
3167 {
3168 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3169 cleanup();
3170 return (1);
3171 }
3172 for (int i = 0; i < config_queue_threads; i++)
3173 {
3174 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3175 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3176 if (status != 0)
3177 {
3178 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3179 cleanup();
3180 return (1);
3181 }
3182 }
3184 /* start the flush thread */
3185 memset(&flush_thread, 0, sizeof(flush_thread));
3186 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3187 if (status != 0)
3188 {
3189 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3190 cleanup();
3191 return (1);
3192 }
3194 listen_thread_main (NULL);
3195 cleanup ();
3197 return (0);
3198 } /* int main */
3200 /*
3201 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3202 */