1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116 do { \
117 if (stay_foreground) \
118 fprintf(stderr, __VA_ARGS__); \
119 syslog ((severity), __VA_ARGS__); \
120 } while (0)
122 /*
123 * Types
124 */
125 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127 struct listen_socket_s
128 {
129 int fd;
130 char addr[PATH_MAX + 1];
131 int family;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
145 uint32_t permissions;
147 gid_t socket_group;
148 mode_t socket_permissions;
149 };
150 typedef struct listen_socket_s listen_socket_t;
152 struct command_s;
153 typedef struct command_s command_t;
154 /* note: guard against "unused" warnings in the handlers */
155 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
156 time_t UNUSED(now),\
157 char UNUSED(*buffer),\
158 size_t UNUSED(buffer_size)
160 #define HANDLER_PROTO command_t UNUSED(*cmd),\
161 DISPATCH_PROTO
163 struct command_s {
164 char *cmd;
165 int (*handler)(HANDLER_PROTO);
167 char context; /* where we expect to see it */
168 #define CMD_CONTEXT_CLIENT (1<<0)
169 #define CMD_CONTEXT_BATCH (1<<1)
170 #define CMD_CONTEXT_JOURNAL (1<<2)
171 #define CMD_CONTEXT_ANY (0x7f)
173 char *syntax;
174 char *help;
175 };
177 struct cache_item_s;
178 typedef struct cache_item_s cache_item_t;
179 struct cache_item_s
180 {
181 char *file;
182 char **values;
183 size_t values_num;
184 time_t last_flush_time;
185 time_t last_update_stamp;
186 #define CI_FLAGS_IN_TREE (1<<0)
187 #define CI_FLAGS_IN_QUEUE (1<<1)
188 int flags;
189 pthread_cond_t flushed;
190 cache_item_t *prev;
191 cache_item_t *next;
192 };
194 struct callback_flush_data_s
195 {
196 time_t now;
197 time_t abs_timeout;
198 char **keys;
199 size_t keys_num;
200 };
201 typedef struct callback_flush_data_s callback_flush_data_t;
203 enum queue_side_e
204 {
205 HEAD,
206 TAIL
207 };
208 typedef enum queue_side_e queue_side_t;
210 /* describe a set of journal files */
211 typedef struct {
212 char **files;
213 size_t files_num;
214 } journal_set;
216 /* max length of socket command or response */
217 #define CMD_MAX 4096
218 #define RBUF_SIZE (CMD_MAX*2)
220 /*
221 * Variables
222 */
223 static int stay_foreground = 0;
224 static uid_t daemon_uid;
226 static listen_socket_t *listen_fds = NULL;
227 static size_t listen_fds_num = 0;
229 enum {
230 RUNNING, /* normal operation */
231 FLUSHING, /* flushing remaining values */
232 SHUTDOWN /* shutting down */
233 } state = RUNNING;
235 static pthread_t *queue_threads;
236 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
237 static int config_queue_threads = 4;
239 static pthread_t flush_thread;
240 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
242 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
243 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
244 static int connection_threads_num = 0;
246 /* Cache stuff */
247 static GTree *cache_tree = NULL;
248 static cache_item_t *cache_queue_head = NULL;
249 static cache_item_t *cache_queue_tail = NULL;
250 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
252 static int config_write_interval = 300;
253 static int config_write_jitter = 0;
254 static int config_flush_interval = 3600;
255 static int config_flush_at_shutdown = 0;
256 static char *config_pid_file = NULL;
257 static char *config_base_dir = NULL;
258 static size_t _config_base_dir_len = 0;
259 static int config_write_base_only = 0;
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
273 /* Journaled updates */
274 #define JOURNAL_REPLAY(s) ((s) == NULL)
275 #define JOURNAL_BASE "rrd.journal"
276 static journal_set *journal_cur = NULL;
277 static journal_set *journal_old = NULL;
278 static char *journal_dir = NULL;
279 static FILE *journal_fh = NULL; /* current journal file handle */
280 static long journal_size = 0; /* current journal size */
281 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
282 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
283 static int journal_write(char *cmd, char *args);
284 static void journal_done(void);
285 static void journal_rotate(void);
287 /* prototypes for forward refernces */
288 static int handle_request_help (HANDLER_PROTO);
290 /*
291 * Functions
292 */
293 static void sig_common (const char *sig) /* {{{ */
294 {
295 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
296 state = FLUSHING;
297 pthread_cond_broadcast(&flush_cond);
298 pthread_cond_broadcast(&queue_cond);
299 } /* }}} void sig_common */
301 static void sig_int_handler (int UNUSED(s)) /* {{{ */
302 {
303 sig_common("INT");
304 } /* }}} void sig_int_handler */
306 static void sig_term_handler (int UNUSED(s)) /* {{{ */
307 {
308 sig_common("TERM");
309 } /* }}} void sig_term_handler */
311 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
312 {
313 config_flush_at_shutdown = 1;
314 sig_common("USR1");
315 } /* }}} void sig_usr1_handler */
317 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
318 {
319 config_flush_at_shutdown = 0;
320 sig_common("USR2");
321 } /* }}} void sig_usr2_handler */
323 static void install_signal_handlers(void) /* {{{ */
324 {
325 /* These structures are static, because `sigaction' behaves weird if the are
326 * overwritten.. */
327 static struct sigaction sa_int;
328 static struct sigaction sa_term;
329 static struct sigaction sa_pipe;
330 static struct sigaction sa_usr1;
331 static struct sigaction sa_usr2;
333 /* Install signal handlers */
334 memset (&sa_int, 0, sizeof (sa_int));
335 sa_int.sa_handler = sig_int_handler;
336 sigaction (SIGINT, &sa_int, NULL);
338 memset (&sa_term, 0, sizeof (sa_term));
339 sa_term.sa_handler = sig_term_handler;
340 sigaction (SIGTERM, &sa_term, NULL);
342 memset (&sa_pipe, 0, sizeof (sa_pipe));
343 sa_pipe.sa_handler = SIG_IGN;
344 sigaction (SIGPIPE, &sa_pipe, NULL);
346 memset (&sa_pipe, 0, sizeof (sa_usr1));
347 sa_usr1.sa_handler = sig_usr1_handler;
348 sigaction (SIGUSR1, &sa_usr1, NULL);
350 memset (&sa_usr2, 0, sizeof (sa_usr2));
351 sa_usr2.sa_handler = sig_usr2_handler;
352 sigaction (SIGUSR2, &sa_usr2, NULL);
354 } /* }}} void install_signal_handlers */
356 static int open_pidfile(char *action, int oflag) /* {{{ */
357 {
358 int fd;
359 const char *file;
360 char *file_copy, *dir;
362 file = (config_pid_file != NULL)
363 ? config_pid_file
364 : LOCALSTATEDIR "/run/rrdcached.pid";
366 /* dirname may modify its argument */
367 file_copy = strdup(file);
368 if (file_copy == NULL)
369 {
370 fprintf(stderr, "rrdcached: strdup(): %s\n",
371 rrd_strerror(errno));
372 return -1;
373 }
375 dir = dirname(file_copy);
376 if (rrd_mkdir_p(dir, 0777) != 0)
377 {
378 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
379 dir, rrd_strerror(errno));
380 return -1;
381 }
383 free(file_copy);
385 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
386 if (fd < 0)
387 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
388 action, file, rrd_strerror(errno));
390 return(fd);
391 } /* }}} static int open_pidfile */
393 /* check existing pid file to see whether a daemon is running */
394 static int check_pidfile(void)
395 {
396 int pid_fd;
397 pid_t pid;
398 char pid_str[16];
400 pid_fd = open_pidfile("open", O_RDWR);
401 if (pid_fd < 0)
402 return pid_fd;
404 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
405 return -1;
407 pid = atoi(pid_str);
408 if (pid <= 0)
409 return -1;
411 /* another running process that we can signal COULD be
412 * a competing rrdcached */
413 if (pid != getpid() && kill(pid, 0) == 0)
414 {
415 fprintf(stderr,
416 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
417 close(pid_fd);
418 return -1;
419 }
421 lseek(pid_fd, 0, SEEK_SET);
422 if (ftruncate(pid_fd, 0) == -1)
423 {
424 fprintf(stderr,
425 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
426 close(pid_fd);
427 return -1;
428 }
430 fprintf(stderr,
431 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
432 "rrdcached: starting normally.\n", pid);
434 return pid_fd;
435 } /* }}} static int check_pidfile */
437 static int write_pidfile (int fd) /* {{{ */
438 {
439 pid_t pid;
440 FILE *fh;
442 pid = getpid ();
444 fh = fdopen (fd, "w");
445 if (fh == NULL)
446 {
447 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
448 close(fd);
449 return (-1);
450 }
452 fprintf (fh, "%i\n", (int) pid);
453 fclose (fh);
455 return (0);
456 } /* }}} int write_pidfile */
458 static int remove_pidfile (void) /* {{{ */
459 {
460 char *file;
461 int status;
463 file = (config_pid_file != NULL)
464 ? config_pid_file
465 : LOCALSTATEDIR "/run/rrdcached.pid";
467 status = unlink (file);
468 if (status == 0)
469 return (0);
470 return (errno);
471 } /* }}} int remove_pidfile */
473 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
474 {
475 char *eol;
477 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
478 sock->next_read - sock->next_cmd);
480 if (eol == NULL)
481 {
482 /* no commands left, move remainder back to front of rbuf */
483 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
484 sock->next_read - sock->next_cmd);
485 sock->next_read -= sock->next_cmd;
486 sock->next_cmd = 0;
487 *len = 0;
488 return NULL;
489 }
490 else
491 {
492 char *cmd = sock->rbuf + sock->next_cmd;
493 *eol = '\0';
495 sock->next_cmd = eol - sock->rbuf + 1;
497 if (eol > sock->rbuf && *(eol-1) == '\r')
498 *(--eol) = '\0'; /* handle "\r\n" EOL */
500 *len = eol - cmd;
502 return cmd;
503 }
505 /* NOTREACHED */
506 assert(1==0);
507 } /* }}} char *next_cmd */
509 /* add the characters directly to the write buffer */
510 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
511 {
512 char *new_buf;
514 assert(sock != NULL);
516 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
517 if (new_buf == NULL)
518 {
519 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
520 return -1;
521 }
523 strncpy(new_buf + sock->wbuf_len, str, len + 1);
525 sock->wbuf = new_buf;
526 sock->wbuf_len += len;
528 return 0;
529 } /* }}} static int add_to_wbuf */
531 /* add the text to the "extra" info that's sent after the status line */
532 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
533 {
534 va_list argp;
535 char buffer[CMD_MAX];
536 int len;
538 if (JOURNAL_REPLAY(sock)) return 0;
539 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
541 va_start(argp, fmt);
542 #ifdef HAVE_VSNPRINTF
543 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
544 #else
545 len = vsprintf(buffer, fmt, argp);
546 #endif
547 va_end(argp);
548 if (len < 0)
549 {
550 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
551 return -1;
552 }
554 return add_to_wbuf(sock, buffer, len);
555 } /* }}} static int add_response_info */
557 static int count_lines(char *str) /* {{{ */
558 {
559 int lines = 0;
561 if (str != NULL)
562 {
563 while ((str = strchr(str, '\n')) != NULL)
564 {
565 ++lines;
566 ++str;
567 }
568 }
570 return lines;
571 } /* }}} static int count_lines */
573 /* send the response back to the user.
574 * returns 0 on success, -1 on error
575 * write buffer is always zeroed after this call */
576 static int send_response (listen_socket_t *sock, response_code rc,
577 char *fmt, ...) /* {{{ */
578 {
579 va_list argp;
580 char buffer[CMD_MAX];
581 int lines;
582 ssize_t wrote;
583 int rclen, len;
585 if (JOURNAL_REPLAY(sock)) return rc;
587 if (sock->batch_start)
588 {
589 if (rc == RESP_OK)
590 return rc; /* no response on success during BATCH */
591 lines = sock->batch_cmd;
592 }
593 else if (rc == RESP_OK)
594 lines = count_lines(sock->wbuf);
595 else
596 lines = -1;
598 rclen = sprintf(buffer, "%d ", lines);
599 va_start(argp, fmt);
600 #ifdef HAVE_VSNPRINTF
601 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
602 #else
603 len = vsprintf(buffer+rclen, fmt, argp);
604 #endif
605 va_end(argp);
606 if (len < 0)
607 return -1;
609 len += rclen;
611 /* append the result to the wbuf, don't write to the user */
612 if (sock->batch_start)
613 return add_to_wbuf(sock, buffer, len);
615 /* first write must be complete */
616 if (len != write(sock->fd, buffer, len))
617 {
618 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
619 return -1;
620 }
622 if (sock->wbuf != NULL && rc == RESP_OK)
623 {
624 wrote = 0;
625 while (wrote < sock->wbuf_len)
626 {
627 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
628 if (wb <= 0)
629 {
630 RRDD_LOG(LOG_INFO, "send_response: could not write results");
631 return -1;
632 }
633 wrote += wb;
634 }
635 }
637 free(sock->wbuf); sock->wbuf = NULL;
638 sock->wbuf_len = 0;
640 return 0;
641 } /* }}} */
643 static void wipe_ci_values(cache_item_t *ci, time_t when)
644 {
645 ci->values = NULL;
646 ci->values_num = 0;
648 ci->last_flush_time = when;
649 if (config_write_jitter > 0)
650 ci->last_flush_time += (rrd_random() % config_write_jitter);
651 }
653 /* remove_from_queue
654 * remove a "cache_item_t" item from the queue.
655 * must hold 'cache_lock' when calling this
656 */
657 static void remove_from_queue(cache_item_t *ci) /* {{{ */
658 {
659 if (ci == NULL) return;
660 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
662 if (ci->prev == NULL)
663 cache_queue_head = ci->next; /* reset head */
664 else
665 ci->prev->next = ci->next;
667 if (ci->next == NULL)
668 cache_queue_tail = ci->prev; /* reset the tail */
669 else
670 ci->next->prev = ci->prev;
672 ci->next = ci->prev = NULL;
673 ci->flags &= ~CI_FLAGS_IN_QUEUE;
675 pthread_mutex_lock (&stats_lock);
676 assert (stats_queue_length > 0);
677 stats_queue_length--;
678 pthread_mutex_unlock (&stats_lock);
680 } /* }}} static void remove_from_queue */
682 /* free the resources associated with the cache_item_t
683 * must hold cache_lock when calling this function
684 */
685 static void *free_cache_item(cache_item_t *ci) /* {{{ */
686 {
687 if (ci == NULL) return NULL;
689 remove_from_queue(ci);
691 for (size_t i=0; i < ci->values_num; i++)
692 free(ci->values[i]);
694 free (ci->values);
695 free (ci->file);
697 /* in case anyone is waiting */
698 pthread_cond_broadcast(&ci->flushed);
699 pthread_cond_destroy(&ci->flushed);
701 free (ci);
703 return NULL;
704 } /* }}} static void *free_cache_item */
706 /*
707 * enqueue_cache_item:
708 * `cache_lock' must be acquired before calling this function!
709 */
710 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
711 queue_side_t side)
712 {
713 if (ci == NULL)
714 return (-1);
716 if (ci->values_num == 0)
717 return (0);
719 if (side == HEAD)
720 {
721 if (cache_queue_head == ci)
722 return 0;
724 /* remove if further down in queue */
725 remove_from_queue(ci);
727 ci->prev = NULL;
728 ci->next = cache_queue_head;
729 if (ci->next != NULL)
730 ci->next->prev = ci;
731 cache_queue_head = ci;
733 if (cache_queue_tail == NULL)
734 cache_queue_tail = cache_queue_head;
735 }
736 else /* (side == TAIL) */
737 {
738 /* We don't move values back in the list.. */
739 if (ci->flags & CI_FLAGS_IN_QUEUE)
740 return (0);
742 assert (ci->next == NULL);
743 assert (ci->prev == NULL);
745 ci->prev = cache_queue_tail;
747 if (cache_queue_tail == NULL)
748 cache_queue_head = ci;
749 else
750 cache_queue_tail->next = ci;
752 cache_queue_tail = ci;
753 }
755 ci->flags |= CI_FLAGS_IN_QUEUE;
757 pthread_cond_signal(&queue_cond);
758 pthread_mutex_lock (&stats_lock);
759 stats_queue_length++;
760 pthread_mutex_unlock (&stats_lock);
762 return (0);
763 } /* }}} int enqueue_cache_item */
765 /*
766 * tree_callback_flush:
767 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
768 * while this is in progress.
769 */
770 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
771 gpointer data)
772 {
773 cache_item_t *ci;
774 callback_flush_data_t *cfd;
776 ci = (cache_item_t *) value;
777 cfd = (callback_flush_data_t *) data;
779 if (ci->flags & CI_FLAGS_IN_QUEUE)
780 return FALSE;
782 if (ci->values_num > 0
783 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
784 {
785 enqueue_cache_item (ci, TAIL);
786 }
787 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
788 && (ci->values_num <= 0))
789 {
790 assert ((char *) key == ci->file);
791 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
792 {
793 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
794 return (FALSE);
795 }
796 }
798 return (FALSE);
799 } /* }}} gboolean tree_callback_flush */
801 static int flush_old_values (int max_age)
802 {
803 callback_flush_data_t cfd;
804 size_t k;
806 memset (&cfd, 0, sizeof (cfd));
807 /* Pass the current time as user data so that we don't need to call
808 * `time' for each node. */
809 cfd.now = time (NULL);
810 cfd.keys = NULL;
811 cfd.keys_num = 0;
813 if (max_age > 0)
814 cfd.abs_timeout = cfd.now - max_age;
815 else
816 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
818 /* `tree_callback_flush' will return the keys of all values that haven't
819 * been touched in the last `config_flush_interval' seconds in `cfd'.
820 * The char*'s in this array point to the same memory as ci->file, so we
821 * don't need to free them separately. */
822 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
824 for (k = 0; k < cfd.keys_num; k++)
825 {
826 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
827 /* should never fail, since we have held the cache_lock
828 * the entire time */
829 assert(status == TRUE);
830 }
832 if (cfd.keys != NULL)
833 {
834 free (cfd.keys);
835 cfd.keys = NULL;
836 }
838 return (0);
839 } /* int flush_old_values */
841 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
842 {
843 struct timeval now;
844 struct timespec next_flush;
845 int status;
847 gettimeofday (&now, NULL);
848 next_flush.tv_sec = now.tv_sec + config_flush_interval;
849 next_flush.tv_nsec = 1000 * now.tv_usec;
851 pthread_mutex_lock(&cache_lock);
853 while (state == RUNNING)
854 {
855 gettimeofday (&now, NULL);
856 if ((now.tv_sec > next_flush.tv_sec)
857 || ((now.tv_sec == next_flush.tv_sec)
858 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
859 {
860 RRDD_LOG(LOG_DEBUG, "flushing old values");
862 /* Determine the time of the next cache flush. */
863 next_flush.tv_sec = now.tv_sec + config_flush_interval;
865 /* Flush all values that haven't been written in the last
866 * `config_write_interval' seconds. */
867 flush_old_values (config_write_interval);
869 /* unlock the cache while we rotate so we don't block incoming
870 * updates if the fsync() blocks on disk I/O */
871 pthread_mutex_unlock(&cache_lock);
872 journal_rotate();
873 pthread_mutex_lock(&cache_lock);
874 }
876 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
877 if (status != 0 && status != ETIMEDOUT)
878 {
879 RRDD_LOG (LOG_ERR, "flush_thread_main: "
880 "pthread_cond_timedwait returned %i.", status);
881 }
882 }
884 if (config_flush_at_shutdown)
885 flush_old_values (-1); /* flush everything */
887 state = SHUTDOWN;
889 pthread_mutex_unlock(&cache_lock);
891 return NULL;
892 } /* void *flush_thread_main */
894 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
895 {
896 pthread_mutex_lock (&cache_lock);
898 while (state != SHUTDOWN
899 || (cache_queue_head != NULL && config_flush_at_shutdown))
900 {
901 cache_item_t *ci;
902 char *file;
903 char **values;
904 size_t values_num;
905 int status;
907 /* Now, check if there's something to store away. If not, wait until
908 * something comes in. */
909 if (cache_queue_head == NULL)
910 {
911 status = pthread_cond_wait (&queue_cond, &cache_lock);
912 if ((status != 0) && (status != ETIMEDOUT))
913 {
914 RRDD_LOG (LOG_ERR, "queue_thread_main: "
915 "pthread_cond_wait returned %i.", status);
916 }
917 }
919 /* Check if a value has arrived. This may be NULL if we timed out or there
920 * was an interrupt such as a signal. */
921 if (cache_queue_head == NULL)
922 continue;
924 ci = cache_queue_head;
926 /* copy the relevant parts */
927 file = strdup (ci->file);
928 if (file == NULL)
929 {
930 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
931 continue;
932 }
934 assert(ci->values != NULL);
935 assert(ci->values_num > 0);
937 values = ci->values;
938 values_num = ci->values_num;
940 wipe_ci_values(ci, time(NULL));
941 remove_from_queue(ci);
943 pthread_mutex_unlock (&cache_lock);
945 rrd_clear_error ();
946 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
947 if (status != 0)
948 {
949 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
950 "rrd_update_r (%s) failed with status %i. (%s)",
951 file, status, rrd_get_error());
952 }
954 journal_write("wrote", file);
956 /* Search again in the tree. It's possible someone issued a "FORGET"
957 * while we were writing the update values. */
958 pthread_mutex_lock(&cache_lock);
959 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
960 if (ci)
961 pthread_cond_broadcast(&ci->flushed);
962 pthread_mutex_unlock(&cache_lock);
964 if (status == 0)
965 {
966 pthread_mutex_lock (&stats_lock);
967 stats_updates_written++;
968 stats_data_sets_written += values_num;
969 pthread_mutex_unlock (&stats_lock);
970 }
972 rrd_free_ptrs((void ***) &values, &values_num);
973 free(file);
975 pthread_mutex_lock (&cache_lock);
976 }
977 pthread_mutex_unlock (&cache_lock);
979 return (NULL);
980 } /* }}} void *queue_thread_main */
982 static int buffer_get_field (char **buffer_ret, /* {{{ */
983 size_t *buffer_size_ret, char **field_ret)
984 {
985 char *buffer;
986 size_t buffer_pos;
987 size_t buffer_size;
988 char *field;
989 size_t field_size;
990 int status;
992 buffer = *buffer_ret;
993 buffer_pos = 0;
994 buffer_size = *buffer_size_ret;
995 field = *buffer_ret;
996 field_size = 0;
998 if (buffer_size <= 0)
999 return (-1);
1001 /* This is ensured by `handle_request'. */
1002 assert (buffer[buffer_size - 1] == '\0');
1004 status = -1;
1005 while (buffer_pos < buffer_size)
1006 {
1007 /* Check for end-of-field or end-of-buffer */
1008 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1009 {
1010 field[field_size] = 0;
1011 field_size++;
1012 buffer_pos++;
1013 status = 0;
1014 break;
1015 }
1016 /* Handle escaped characters. */
1017 else if (buffer[buffer_pos] == '\\')
1018 {
1019 if (buffer_pos >= (buffer_size - 1))
1020 break;
1021 buffer_pos++;
1022 field[field_size] = buffer[buffer_pos];
1023 field_size++;
1024 buffer_pos++;
1025 }
1026 /* Normal operation */
1027 else
1028 {
1029 field[field_size] = buffer[buffer_pos];
1030 field_size++;
1031 buffer_pos++;
1032 }
1033 } /* while (buffer_pos < buffer_size) */
1035 if (status != 0)
1036 return (status);
1038 *buffer_ret = buffer + buffer_pos;
1039 *buffer_size_ret = buffer_size - buffer_pos;
1040 *field_ret = field;
1042 return (0);
1043 } /* }}} int buffer_get_field */
1045 /* if we're restricting writes to the base directory,
1046 * check whether the file falls within the dir
1047 * returns 1 if OK, otherwise 0
1048 */
1049 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1050 {
1051 assert(file != NULL);
1053 if (!config_write_base_only
1054 || JOURNAL_REPLAY(sock)
1055 || config_base_dir == NULL)
1056 return 1;
1058 if (strstr(file, "../") != NULL) goto err;
1060 /* relative paths without "../" are ok */
1061 if (*file != '/') return 1;
1063 /* file must be of the format base + "/" + <1+ char filename> */
1064 if (strlen(file) < _config_base_dir_len + 2) goto err;
1065 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1066 if (*(file + _config_base_dir_len) != '/') goto err;
1068 return 1;
1070 err:
1071 if (sock != NULL && sock->fd >= 0)
1072 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1074 return 0;
1075 } /* }}} static int check_file_access */
1077 /* when using a base dir, convert relative paths to absolute paths.
1078 * if necessary, modifies the "filename" pointer to point
1079 * to the new path created in "tmp". "tmp" is provided
1080 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1081 *
1082 * this allows us to optimize for the expected case (absolute path)
1083 * with a no-op.
1084 */
1085 static void get_abs_path(char **filename, char *tmp)
1086 {
1087 assert(tmp != NULL);
1088 assert(filename != NULL && *filename != NULL);
1090 if (config_base_dir == NULL || **filename == '/')
1091 return;
1093 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1094 *filename = tmp;
1095 } /* }}} static int get_abs_path */
1097 static int flush_file (const char *filename) /* {{{ */
1098 {
1099 cache_item_t *ci;
1101 pthread_mutex_lock (&cache_lock);
1103 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1104 if (ci == NULL)
1105 {
1106 pthread_mutex_unlock (&cache_lock);
1107 return (ENOENT);
1108 }
1110 if (ci->values_num > 0)
1111 {
1112 /* Enqueue at head */
1113 enqueue_cache_item (ci, HEAD);
1114 pthread_cond_wait(&ci->flushed, &cache_lock);
1115 }
1117 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1118 * may have been purged during our cond_wait() */
1120 pthread_mutex_unlock(&cache_lock);
1122 return (0);
1123 } /* }}} int flush_file */
1125 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1126 {
1127 char *err = "Syntax error.\n";
1129 if (cmd && cmd->syntax)
1130 err = cmd->syntax;
1132 return send_response(sock, RESP_ERR, "Usage: %s", err);
1133 } /* }}} static int syntax_error() */
1135 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1136 {
1137 uint64_t copy_queue_length;
1138 uint64_t copy_updates_received;
1139 uint64_t copy_flush_received;
1140 uint64_t copy_updates_written;
1141 uint64_t copy_data_sets_written;
1142 uint64_t copy_journal_bytes;
1143 uint64_t copy_journal_rotate;
1145 uint64_t tree_nodes_number;
1146 uint64_t tree_depth;
1148 pthread_mutex_lock (&stats_lock);
1149 copy_queue_length = stats_queue_length;
1150 copy_updates_received = stats_updates_received;
1151 copy_flush_received = stats_flush_received;
1152 copy_updates_written = stats_updates_written;
1153 copy_data_sets_written = stats_data_sets_written;
1154 copy_journal_bytes = stats_journal_bytes;
1155 copy_journal_rotate = stats_journal_rotate;
1156 pthread_mutex_unlock (&stats_lock);
1158 pthread_mutex_lock (&cache_lock);
1159 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160 tree_depth = (uint64_t) g_tree_height (cache_tree);
1161 pthread_mutex_unlock (&cache_lock);
1163 add_response_info(sock,
1164 "QueueLength: %"PRIu64"\n", copy_queue_length);
1165 add_response_info(sock,
1166 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167 add_response_info(sock,
1168 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169 add_response_info(sock,
1170 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171 add_response_info(sock,
1172 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1178 send_response(sock, RESP_OK, "Statistics follow\n");
1180 return (0);
1181 } /* }}} int handle_request_stats */
1183 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1184 {
1185 char *file, file_tmp[PATH_MAX];
1186 int status;
1188 status = buffer_get_field (&buffer, &buffer_size, &file);
1189 if (status != 0)
1190 {
1191 return syntax_error(sock,cmd);
1192 }
1193 else
1194 {
1195 pthread_mutex_lock(&stats_lock);
1196 stats_flush_received++;
1197 pthread_mutex_unlock(&stats_lock);
1199 get_abs_path(&file, file_tmp);
1200 if (!check_file_access(file, sock)) return 0;
1202 status = flush_file (file);
1203 if (status == 0)
1204 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205 else if (status == ENOENT)
1206 {
1207 /* no file in our tree; see whether it exists at all */
1208 struct stat statbuf;
1210 memset(&statbuf, 0, sizeof(statbuf));
1211 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1213 else
1214 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1215 }
1216 else if (status < 0)
1217 return send_response(sock, RESP_ERR, "Internal error.\n");
1218 else
1219 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1220 }
1222 /* NOTREACHED */
1223 assert(1==0);
1224 } /* }}} int handle_request_flush */
1226 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1227 {
1228 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1230 pthread_mutex_lock(&cache_lock);
1231 flush_old_values(-1);
1232 pthread_mutex_unlock(&cache_lock);
1234 return send_response(sock, RESP_OK, "Started flush.\n");
1235 } /* }}} static int handle_request_flushall */
1237 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1238 {
1239 int status;
1240 char *file, file_tmp[PATH_MAX];
1241 cache_item_t *ci;
1243 status = buffer_get_field(&buffer, &buffer_size, &file);
1244 if (status != 0)
1245 return syntax_error(sock,cmd);
1247 get_abs_path(&file, file_tmp);
1249 pthread_mutex_lock(&cache_lock);
1250 ci = g_tree_lookup(cache_tree, file);
1251 if (ci == NULL)
1252 {
1253 pthread_mutex_unlock(&cache_lock);
1254 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1255 }
1257 for (size_t i=0; i < ci->values_num; i++)
1258 add_response_info(sock, "%s\n", ci->values[i]);
1260 pthread_mutex_unlock(&cache_lock);
1261 return send_response(sock, RESP_OK, "updates pending\n");
1262 } /* }}} static int handle_request_pending */
1264 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1265 {
1266 int status;
1267 gboolean found;
1268 char *file, file_tmp[PATH_MAX];
1270 status = buffer_get_field(&buffer, &buffer_size, &file);
1271 if (status != 0)
1272 return syntax_error(sock,cmd);
1274 get_abs_path(&file, file_tmp);
1275 if (!check_file_access(file, sock)) return 0;
1277 pthread_mutex_lock(&cache_lock);
1278 found = g_tree_remove(cache_tree, file);
1279 pthread_mutex_unlock(&cache_lock);
1281 if (found == TRUE)
1282 {
1283 if (!JOURNAL_REPLAY(sock))
1284 journal_write("forget", file);
1286 return send_response(sock, RESP_OK, "Gone!\n");
1287 }
1288 else
1289 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1291 /* NOTREACHED */
1292 assert(1==0);
1293 } /* }}} static int handle_request_forget */
1295 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1296 {
1297 cache_item_t *ci;
1299 pthread_mutex_lock(&cache_lock);
1301 ci = cache_queue_head;
1302 while (ci != NULL)
1303 {
1304 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1305 ci = ci->next;
1306 }
1308 pthread_mutex_unlock(&cache_lock);
1310 return send_response(sock, RESP_OK, "in queue.\n");
1311 } /* }}} int handle_request_queue */
1313 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1314 {
1315 char *file, file_tmp[PATH_MAX];
1316 int values_num = 0;
1317 int status;
1318 char orig_buf[CMD_MAX];
1320 cache_item_t *ci;
1322 /* save it for the journal later */
1323 if (!JOURNAL_REPLAY(sock))
1324 strncpy(orig_buf, buffer, buffer_size);
1326 status = buffer_get_field (&buffer, &buffer_size, &file);
1327 if (status != 0)
1328 return syntax_error(sock,cmd);
1330 pthread_mutex_lock(&stats_lock);
1331 stats_updates_received++;
1332 pthread_mutex_unlock(&stats_lock);
1334 get_abs_path(&file, file_tmp);
1335 if (!check_file_access(file, sock)) return 0;
1337 pthread_mutex_lock (&cache_lock);
1338 ci = g_tree_lookup (cache_tree, file);
1340 if (ci == NULL) /* {{{ */
1341 {
1342 struct stat statbuf;
1343 cache_item_t *tmp;
1345 /* don't hold the lock while we setup; stat(2) might block */
1346 pthread_mutex_unlock(&cache_lock);
1348 memset (&statbuf, 0, sizeof (statbuf));
1349 status = stat (file, &statbuf);
1350 if (status != 0)
1351 {
1352 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1354 status = errno;
1355 if (status == ENOENT)
1356 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1357 else
1358 return send_response(sock, RESP_ERR,
1359 "stat failed with error %i.\n", status);
1360 }
1361 if (!S_ISREG (statbuf.st_mode))
1362 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1364 if (access(file, R_OK|W_OK) != 0)
1365 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1366 file, rrd_strerror(errno));
1368 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1369 if (ci == NULL)
1370 {
1371 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1373 return send_response(sock, RESP_ERR, "malloc failed.\n");
1374 }
1375 memset (ci, 0, sizeof (cache_item_t));
1377 ci->file = strdup (file);
1378 if (ci->file == NULL)
1379 {
1380 free (ci);
1381 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1383 return send_response(sock, RESP_ERR, "strdup failed.\n");
1384 }
1386 wipe_ci_values(ci, now);
1387 ci->flags = CI_FLAGS_IN_TREE;
1388 pthread_cond_init(&ci->flushed, NULL);
1390 pthread_mutex_lock(&cache_lock);
1392 /* another UPDATE might have added this entry in the meantime */
1393 tmp = g_tree_lookup (cache_tree, file);
1394 if (tmp == NULL)
1395 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1396 else
1397 {
1398 free_cache_item (ci);
1399 ci = tmp;
1400 }
1402 /* state may have changed while we were unlocked */
1403 if (state == SHUTDOWN)
1404 return -1;
1405 } /* }}} */
1406 assert (ci != NULL);
1408 /* don't re-write updates in replay mode */
1409 if (!JOURNAL_REPLAY(sock))
1410 journal_write("update", orig_buf);
1412 while (buffer_size > 0)
1413 {
1414 char *value;
1415 time_t stamp;
1416 char *eostamp;
1418 status = buffer_get_field (&buffer, &buffer_size, &value);
1419 if (status != 0)
1420 {
1421 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1422 break;
1423 }
1425 /* make sure update time is always moving forward */
1426 stamp = strtol(value, &eostamp, 10);
1427 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1428 {
1429 pthread_mutex_unlock(&cache_lock);
1430 return send_response(sock, RESP_ERR,
1431 "Cannot find timestamp in '%s'!\n", value);
1432 }
1433 else if (stamp <= ci->last_update_stamp)
1434 {
1435 pthread_mutex_unlock(&cache_lock);
1436 return send_response(sock, RESP_ERR,
1437 "illegal attempt to update using time %ld when last"
1438 " update time is %ld (minimum one second step)\n",
1439 stamp, ci->last_update_stamp);
1440 }
1441 else
1442 ci->last_update_stamp = stamp;
1444 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1445 {
1446 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1447 continue;
1448 }
1450 values_num++;
1451 }
1453 if (((now - ci->last_flush_time) >= config_write_interval)
1454 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1455 && (ci->values_num > 0))
1456 {
1457 enqueue_cache_item (ci, TAIL);
1458 }
1460 pthread_mutex_unlock (&cache_lock);
1462 if (values_num < 1)
1463 return send_response(sock, RESP_ERR, "No values updated.\n");
1464 else
1465 return send_response(sock, RESP_OK,
1466 "errors, enqueued %i value(s).\n", values_num);
1468 /* NOTREACHED */
1469 assert(1==0);
1471 } /* }}} int handle_request_update */
1473 /* we came across a "WROTE" entry during journal replay.
1474 * throw away any values that we have accumulated for this file
1475 */
1476 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1477 {
1478 cache_item_t *ci;
1479 const char *file = buffer;
1481 pthread_mutex_lock(&cache_lock);
1483 ci = g_tree_lookup(cache_tree, file);
1484 if (ci == NULL)
1485 {
1486 pthread_mutex_unlock(&cache_lock);
1487 return (0);
1488 }
1490 if (ci->values)
1491 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1493 wipe_ci_values(ci, now);
1494 remove_from_queue(ci);
1496 pthread_mutex_unlock(&cache_lock);
1497 return (0);
1498 } /* }}} int handle_request_wrote */
1500 /* start "BATCH" processing */
1501 static int batch_start (HANDLER_PROTO) /* {{{ */
1502 {
1503 int status;
1504 if (sock->batch_start)
1505 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1507 status = send_response(sock, RESP_OK,
1508 "Go ahead. End with dot '.' on its own line.\n");
1509 sock->batch_start = time(NULL);
1510 sock->batch_cmd = 0;
1512 return status;
1513 } /* }}} static int batch_start */
1515 /* finish "BATCH" processing and return results to the client */
1516 static int batch_done (HANDLER_PROTO) /* {{{ */
1517 {
1518 assert(sock->batch_start);
1519 sock->batch_start = 0;
1520 sock->batch_cmd = 0;
1521 return send_response(sock, RESP_OK, "errors\n");
1522 } /* }}} static int batch_done */
1524 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1525 {
1526 return -1;
1527 } /* }}} static int handle_request_quit */
1529 static command_t list_of_commands[] = { /* {{{ */
1530 {
1531 "UPDATE",
1532 handle_request_update,
1533 CMD_CONTEXT_ANY,
1534 "UPDATE <filename> <values> [<values> ...]\n"
1535 ,
1536 "Adds the given file to the internal cache if it is not yet known and\n"
1537 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1538 "for details.\n"
1539 "\n"
1540 "Each <values> has the following form:\n"
1541 " <values> = <time>:<value>[:<value>[...]]\n"
1542 "See the rrdupdate(1) manpage for details.\n"
1543 },
1544 {
1545 "WROTE",
1546 handle_request_wrote,
1547 CMD_CONTEXT_JOURNAL,
1548 NULL,
1549 NULL
1550 },
1551 {
1552 "FLUSH",
1553 handle_request_flush,
1554 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1555 "FLUSH <filename>\n"
1556 ,
1557 "Adds the given filename to the head of the update queue and returns\n"
1558 "after it has been dequeued.\n"
1559 },
1560 {
1561 "FLUSHALL",
1562 handle_request_flushall,
1563 CMD_CONTEXT_CLIENT,
1564 "FLUSHALL\n"
1565 ,
1566 "Triggers writing of all pending updates. Returns immediately.\n"
1567 },
1568 {
1569 "PENDING",
1570 handle_request_pending,
1571 CMD_CONTEXT_CLIENT,
1572 "PENDING <filename>\n"
1573 ,
1574 "Shows any 'pending' updates for a file, in order.\n"
1575 "The updates shown have not yet been written to the underlying RRD file.\n"
1576 },
1577 {
1578 "FORGET",
1579 handle_request_forget,
1580 CMD_CONTEXT_ANY,
1581 "FORGET <filename>\n"
1582 ,
1583 "Removes the file completely from the cache.\n"
1584 "Any pending updates for the file will be lost.\n"
1585 },
1586 {
1587 "QUEUE",
1588 handle_request_queue,
1589 CMD_CONTEXT_CLIENT,
1590 "QUEUE\n"
1591 ,
1592 "Shows all files in the output queue.\n"
1593 "The output is zero or more lines in the following format:\n"
1594 "(where <num_vals> is the number of values to be written)\n"
1595 "\n"
1596 "<num_vals> <filename>\n"
1597 },
1598 {
1599 "STATS",
1600 handle_request_stats,
1601 CMD_CONTEXT_CLIENT,
1602 "STATS\n"
1603 ,
1604 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1605 "a description of the values.\n"
1606 },
1607 {
1608 "HELP",
1609 handle_request_help,
1610 CMD_CONTEXT_CLIENT,
1611 "HELP [<command>]\n",
1612 NULL, /* special! */
1613 },
1614 {
1615 "BATCH",
1616 batch_start,
1617 CMD_CONTEXT_CLIENT,
1618 "BATCH\n"
1619 ,
1620 "The 'BATCH' command permits the client to initiate a bulk load\n"
1621 " of commands to rrdcached.\n"
1622 "\n"
1623 "Usage:\n"
1624 "\n"
1625 " client: BATCH\n"
1626 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1627 " client: command #1\n"
1628 " client: command #2\n"
1629 " client: ... and so on\n"
1630 " client: .\n"
1631 " server: 2 errors\n"
1632 " server: 7 message for command #7\n"
1633 " server: 9 message for command #9\n"
1634 "\n"
1635 "For more information, consult the rrdcached(1) documentation.\n"
1636 },
1637 {
1638 ".", /* BATCH terminator */
1639 batch_done,
1640 CMD_CONTEXT_BATCH,
1641 NULL,
1642 NULL
1643 },
1644 {
1645 "QUIT",
1646 handle_request_quit,
1647 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1648 "QUIT\n"
1649 ,
1650 "Disconnect from rrdcached.\n"
1651 }
1652 }; /* }}} command_t list_of_commands[] */
1653 static size_t list_of_commands_len = sizeof (list_of_commands)
1654 / sizeof (list_of_commands[0]);
1656 static command_t *find_command(char *cmd)
1657 {
1658 size_t i;
1660 for (i = 0; i < list_of_commands_len; i++)
1661 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1662 return (&list_of_commands[i]);
1663 return NULL;
1664 }
1666 /* We currently use the index in the `list_of_commands' array as a bit position
1667 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1668 * outside these functions so that switching to a more elegant storage method
1669 * is easily possible. */
1670 static ssize_t find_command_index (const char *cmd) /* {{{ */
1671 {
1672 size_t i;
1674 for (i = 0; i < list_of_commands_len; i++)
1675 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1676 return ((ssize_t) i);
1677 return (-1);
1678 } /* }}} ssize_t find_command_index */
1680 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1681 const char *cmd)
1682 {
1683 ssize_t i;
1685 if (JOURNAL_REPLAY(sock))
1686 return (1);
1688 if (cmd == NULL)
1689 return (-1);
1691 if ((strcasecmp ("QUIT", cmd) == 0)
1692 || (strcasecmp ("HELP", cmd) == 0))
1693 return (1);
1694 else if (strcmp (".", cmd) == 0)
1695 cmd = "BATCH";
1697 i = find_command_index (cmd);
1698 if (i < 0)
1699 return (-1);
1700 assert (i < 32);
1702 if ((sock->permissions & (1 << i)) != 0)
1703 return (1);
1704 return (0);
1705 } /* }}} int socket_permission_check */
1707 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1708 const char *cmd)
1709 {
1710 ssize_t i;
1712 i = find_command_index (cmd);
1713 if (i < 0)
1714 return (-1);
1715 assert (i < 32);
1717 sock->permissions |= (1 << i);
1718 return (0);
1719 } /* }}} int socket_permission_add */
1721 /* check whether commands are received in the expected context */
1722 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1723 {
1724 if (JOURNAL_REPLAY(sock))
1725 return (cmd->context & CMD_CONTEXT_JOURNAL);
1726 else if (sock->batch_start)
1727 return (cmd->context & CMD_CONTEXT_BATCH);
1728 else
1729 return (cmd->context & CMD_CONTEXT_CLIENT);
1731 /* NOTREACHED */
1732 assert(1==0);
1733 }
1735 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1736 {
1737 int status;
1738 char *cmd_str;
1739 char *resp_txt;
1740 command_t *help = NULL;
1742 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1743 if (status == 0)
1744 help = find_command(cmd_str);
1746 if (help && (help->syntax || help->help))
1747 {
1748 char tmp[CMD_MAX];
1750 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1751 resp_txt = tmp;
1753 if (help->syntax)
1754 add_response_info(sock, "Usage: %s\n", help->syntax);
1756 if (help->help)
1757 add_response_info(sock, "%s\n", help->help);
1758 }
1759 else
1760 {
1761 size_t i;
1763 resp_txt = "Command overview\n";
1765 for (i = 0; i < list_of_commands_len; i++)
1766 {
1767 if (list_of_commands[i].syntax == NULL)
1768 continue;
1769 add_response_info (sock, "%s", list_of_commands[i].syntax);
1770 }
1771 }
1773 return send_response(sock, RESP_OK, resp_txt);
1774 } /* }}} int handle_request_help */
1776 static int handle_request (DISPATCH_PROTO) /* {{{ */
1777 {
1778 char *buffer_ptr = buffer;
1779 char *cmd_str = NULL;
1780 command_t *cmd = NULL;
1781 int status;
1783 assert (buffer[buffer_size - 1] == '\0');
1785 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1786 if (status != 0)
1787 {
1788 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1789 return (-1);
1790 }
1792 if (sock != NULL && sock->batch_start)
1793 sock->batch_cmd++;
1795 cmd = find_command(cmd_str);
1796 if (!cmd)
1797 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1799 if (!socket_permission_check (sock, cmd->cmd))
1800 return send_response(sock, RESP_ERR, "Permission denied.\n");
1802 if (!command_check_context(sock, cmd))
1803 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1805 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1806 } /* }}} int handle_request */
1808 static void journal_set_free (journal_set *js) /* {{{ */
1809 {
1810 if (js == NULL)
1811 return;
1813 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1815 free(js);
1816 } /* }}} journal_set_free */
1818 static void journal_set_remove (journal_set *js) /* {{{ */
1819 {
1820 if (js == NULL)
1821 return;
1823 for (uint i=0; i < js->files_num; i++)
1824 {
1825 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1826 unlink(js->files[i]);
1827 }
1828 } /* }}} journal_set_remove */
1830 /* close current journal file handle.
1831 * MUST hold journal_lock before calling */
1832 static void journal_close(void) /* {{{ */
1833 {
1834 if (journal_fh != NULL)
1835 {
1836 if (fclose(journal_fh) != 0)
1837 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1838 }
1840 journal_fh = NULL;
1841 journal_size = 0;
1842 } /* }}} journal_close */
1844 /* MUST hold journal_lock before calling */
1845 static void journal_new_file(void) /* {{{ */
1846 {
1847 struct timeval now;
1848 int new_fd;
1849 char new_file[PATH_MAX + 1];
1851 assert(journal_dir != NULL);
1852 assert(journal_cur != NULL);
1854 journal_close();
1856 gettimeofday(&now, NULL);
1857 /* this format assures that the files sort in strcmp() order */
1858 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1859 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1861 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1862 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1863 if (new_fd < 0)
1864 goto error;
1866 journal_fh = fdopen(new_fd, "a");
1867 if (journal_fh == NULL)
1868 goto error;
1870 journal_size = ftell(journal_fh);
1871 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1873 /* record the file in the journal set */
1874 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1876 return;
1878 error:
1879 RRDD_LOG(LOG_CRIT,
1880 "JOURNALING DISABLED: Error while trying to create %s : %s",
1881 new_file, rrd_strerror(errno));
1882 RRDD_LOG(LOG_CRIT,
1883 "JOURNALING DISABLED: All values will be flushed at shutdown");
1885 close(new_fd);
1886 config_flush_at_shutdown = 1;
1888 } /* }}} journal_new_file */
1890 /* MUST NOT hold journal_lock before calling this */
1891 static void journal_rotate(void) /* {{{ */
1892 {
1893 journal_set *old_js = NULL;
1895 if (journal_dir == NULL)
1896 return;
1898 RRDD_LOG(LOG_DEBUG, "rotating journals");
1900 pthread_mutex_lock(&stats_lock);
1901 ++stats_journal_rotate;
1902 pthread_mutex_unlock(&stats_lock);
1904 pthread_mutex_lock(&journal_lock);
1906 journal_close();
1908 /* rotate the journal sets */
1909 old_js = journal_old;
1910 journal_old = journal_cur;
1911 journal_cur = calloc(1, sizeof(journal_set));
1913 if (journal_cur != NULL)
1914 journal_new_file();
1915 else
1916 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1918 pthread_mutex_unlock(&journal_lock);
1920 journal_set_remove(old_js);
1921 journal_set_free (old_js);
1923 } /* }}} static void journal_rotate */
1925 /* MUST hold journal_lock when calling */
1926 static void journal_done(void) /* {{{ */
1927 {
1928 if (journal_cur == NULL)
1929 return;
1931 journal_close();
1933 if (config_flush_at_shutdown)
1934 {
1935 RRDD_LOG(LOG_INFO, "removing journals");
1936 journal_set_remove(journal_old);
1937 journal_set_remove(journal_cur);
1938 }
1939 else
1940 {
1941 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1942 "journals will be used at next startup");
1943 }
1945 journal_set_free(journal_cur);
1946 journal_set_free(journal_old);
1947 free(journal_dir);
1949 } /* }}} static void journal_done */
1951 static int journal_write(char *cmd, char *args) /* {{{ */
1952 {
1953 int chars;
1955 if (journal_fh == NULL)
1956 return 0;
1958 pthread_mutex_lock(&journal_lock);
1959 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1960 journal_size += chars;
1962 if (journal_size > JOURNAL_MAX)
1963 journal_new_file();
1965 pthread_mutex_unlock(&journal_lock);
1967 if (chars > 0)
1968 {
1969 pthread_mutex_lock(&stats_lock);
1970 stats_journal_bytes += chars;
1971 pthread_mutex_unlock(&stats_lock);
1972 }
1974 return chars;
1975 } /* }}} static int journal_write */
1977 static int journal_replay (const char *file) /* {{{ */
1978 {
1979 FILE *fh;
1980 int entry_cnt = 0;
1981 int fail_cnt = 0;
1982 uint64_t line = 0;
1983 char entry[CMD_MAX];
1984 time_t now;
1986 if (file == NULL) return 0;
1988 {
1989 char *reason = "unknown error";
1990 int status = 0;
1991 struct stat statbuf;
1993 memset(&statbuf, 0, sizeof(statbuf));
1994 if (stat(file, &statbuf) != 0)
1995 {
1996 reason = "stat error";
1997 status = errno;
1998 }
1999 else if (!S_ISREG(statbuf.st_mode))
2000 {
2001 reason = "not a regular file";
2002 status = EPERM;
2003 }
2004 if (statbuf.st_uid != daemon_uid)
2005 {
2006 reason = "not owned by daemon user";
2007 status = EACCES;
2008 }
2009 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2010 {
2011 reason = "must not be user/group writable";
2012 status = EACCES;
2013 }
2015 if (status != 0)
2016 {
2017 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2018 file, rrd_strerror(status), reason);
2019 return 0;
2020 }
2021 }
2023 fh = fopen(file, "r");
2024 if (fh == NULL)
2025 {
2026 if (errno != ENOENT)
2027 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2028 file, rrd_strerror(errno));
2029 return 0;
2030 }
2031 else
2032 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2034 now = time(NULL);
2036 while(!feof(fh))
2037 {
2038 size_t entry_len;
2040 ++line;
2041 if (fgets(entry, sizeof(entry), fh) == NULL)
2042 break;
2043 entry_len = strlen(entry);
2045 /* check \n termination in case journal writing crashed mid-line */
2046 if (entry_len == 0)
2047 continue;
2048 else if (entry[entry_len - 1] != '\n')
2049 {
2050 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2051 ++fail_cnt;
2052 continue;
2053 }
2055 entry[entry_len - 1] = '\0';
2057 if (handle_request(NULL, now, entry, entry_len) == 0)
2058 ++entry_cnt;
2059 else
2060 ++fail_cnt;
2061 }
2063 fclose(fh);
2065 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2066 entry_cnt, fail_cnt);
2068 return entry_cnt > 0 ? 1 : 0;
2069 } /* }}} static int journal_replay */
2071 static int journal_sort(const void *v1, const void *v2)
2072 {
2073 char **jn1 = (char **) v1;
2074 char **jn2 = (char **) v2;
2076 return strcmp(*jn1,*jn2);
2077 }
2079 static void journal_init(void) /* {{{ */
2080 {
2081 int had_journal = 0;
2082 DIR *dir;
2083 struct dirent *dent;
2084 char path[PATH_MAX+1];
2086 if (journal_dir == NULL) return;
2088 pthread_mutex_lock(&journal_lock);
2090 journal_cur = calloc(1, sizeof(journal_set));
2091 if (journal_cur == NULL)
2092 {
2093 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2094 return;
2095 }
2097 RRDD_LOG(LOG_INFO, "checking for journal files");
2099 /* Handle old journal files during transition. This gives them the
2100 * correct sort order. TODO: remove after first release
2101 */
2102 {
2103 char old_path[PATH_MAX+1];
2104 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2105 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2106 rename(old_path, path);
2108 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2109 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2110 rename(old_path, path);
2111 }
2113 dir = opendir(journal_dir);
2114 if (!dir) {
2115 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2116 return;
2117 }
2118 while ((dent = readdir(dir)) != NULL)
2119 {
2120 /* looks like a journal file? */
2121 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2122 continue;
2124 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2126 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2127 {
2128 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2129 dent->d_name);
2130 break;
2131 }
2132 }
2133 closedir(dir);
2135 qsort(journal_cur->files, journal_cur->files_num,
2136 sizeof(journal_cur->files[0]), journal_sort);
2138 for (uint i=0; i < journal_cur->files_num; i++)
2139 had_journal += journal_replay(journal_cur->files[i]);
2141 journal_new_file();
2143 /* it must have been a crash. start a flush */
2144 if (had_journal && config_flush_at_shutdown)
2145 flush_old_values(-1);
2147 pthread_mutex_unlock(&journal_lock);
2149 RRDD_LOG(LOG_INFO, "journal processing complete");
2151 } /* }}} static void journal_init */
2153 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2154 {
2155 assert(sock != NULL);
2157 free(sock->rbuf); sock->rbuf = NULL;
2158 free(sock->wbuf); sock->wbuf = NULL;
2159 free(sock);
2160 } /* }}} void free_listen_socket */
2162 static void close_connection(listen_socket_t *sock) /* {{{ */
2163 {
2164 if (sock->fd >= 0)
2165 {
2166 close(sock->fd);
2167 sock->fd = -1;
2168 }
2170 free_listen_socket(sock);
2172 } /* }}} void close_connection */
2174 static void *connection_thread_main (void *args) /* {{{ */
2175 {
2176 listen_socket_t *sock;
2177 int fd;
2179 sock = (listen_socket_t *) args;
2180 fd = sock->fd;
2182 /* init read buffers */
2183 sock->next_read = sock->next_cmd = 0;
2184 sock->rbuf = malloc(RBUF_SIZE);
2185 if (sock->rbuf == NULL)
2186 {
2187 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2188 close_connection(sock);
2189 return NULL;
2190 }
2192 pthread_mutex_lock (&connection_threads_lock);
2193 connection_threads_num++;
2194 pthread_mutex_unlock (&connection_threads_lock);
2196 while (state == RUNNING)
2197 {
2198 char *cmd;
2199 ssize_t cmd_len;
2200 ssize_t rbytes;
2201 time_t now;
2203 struct pollfd pollfd;
2204 int status;
2206 pollfd.fd = fd;
2207 pollfd.events = POLLIN | POLLPRI;
2208 pollfd.revents = 0;
2210 status = poll (&pollfd, 1, /* timeout = */ 500);
2211 if (state != RUNNING)
2212 break;
2213 else if (status == 0) /* timeout */
2214 continue;
2215 else if (status < 0) /* error */
2216 {
2217 status = errno;
2218 if (status != EINTR)
2219 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2220 continue;
2221 }
2223 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2224 break;
2225 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2226 {
2227 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2228 "poll(2) returned something unexpected: %#04hx",
2229 pollfd.revents);
2230 break;
2231 }
2233 rbytes = read(fd, sock->rbuf + sock->next_read,
2234 RBUF_SIZE - sock->next_read);
2235 if (rbytes < 0)
2236 {
2237 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2238 break;
2239 }
2240 else if (rbytes == 0)
2241 break; /* eof */
2243 sock->next_read += rbytes;
2245 if (sock->batch_start)
2246 now = sock->batch_start;
2247 else
2248 now = time(NULL);
2250 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2251 {
2252 status = handle_request (sock, now, cmd, cmd_len+1);
2253 if (status != 0)
2254 goto out_close;
2255 }
2256 }
2258 out_close:
2259 close_connection(sock);
2261 /* Remove this thread from the connection threads list */
2262 pthread_mutex_lock (&connection_threads_lock);
2263 connection_threads_num--;
2264 if (connection_threads_num <= 0)
2265 pthread_cond_broadcast(&connection_threads_done);
2266 pthread_mutex_unlock (&connection_threads_lock);
2268 return (NULL);
2269 } /* }}} void *connection_thread_main */
2271 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2272 {
2273 int fd;
2274 struct sockaddr_un sa;
2275 listen_socket_t *temp;
2276 int status;
2277 const char *path;
2278 char *path_copy, *dir;
2280 path = sock->addr;
2281 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2282 path += strlen("unix:");
2284 /* dirname may modify its argument */
2285 path_copy = strdup(path);
2286 if (path_copy == NULL)
2287 {
2288 fprintf(stderr, "rrdcached: strdup(): %s\n",
2289 rrd_strerror(errno));
2290 return (-1);
2291 }
2293 dir = dirname(path_copy);
2294 if (rrd_mkdir_p(dir, 0777) != 0)
2295 {
2296 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2297 dir, rrd_strerror(errno));
2298 return (-1);
2299 }
2301 free(path_copy);
2303 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2304 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2305 if (temp == NULL)
2306 {
2307 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2308 return (-1);
2309 }
2310 listen_fds = temp;
2311 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2313 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2314 if (fd < 0)
2315 {
2316 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2317 rrd_strerror(errno));
2318 return (-1);
2319 }
2321 memset (&sa, 0, sizeof (sa));
2322 sa.sun_family = AF_UNIX;
2323 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2325 /* if we've gotten this far, we own the pid file. any daemon started
2326 * with the same args must not be alive. therefore, ensure that we can
2327 * create the socket...
2328 */
2329 unlink(path);
2331 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2332 if (status != 0)
2333 {
2334 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2335 path, rrd_strerror(errno));
2336 close (fd);
2337 return (-1);
2338 }
2340 /* tweak the sockets group ownership */
2341 if (sock->socket_group != (gid_t)-1)
2342 {
2343 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2344 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2345 {
2346 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2347 }
2348 }
2350 if (sock->socket_permissions != (mode_t)-1)
2351 {
2352 if (chmod(path, sock->socket_permissions) != 0)
2353 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2354 (unsigned int)sock->socket_permissions, strerror(errno));
2355 }
2357 status = listen (fd, /* backlog = */ 10);
2358 if (status != 0)
2359 {
2360 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2361 path, rrd_strerror(errno));
2362 close (fd);
2363 unlink (path);
2364 return (-1);
2365 }
2367 listen_fds[listen_fds_num].fd = fd;
2368 listen_fds[listen_fds_num].family = PF_UNIX;
2369 strncpy(listen_fds[listen_fds_num].addr, path,
2370 sizeof (listen_fds[listen_fds_num].addr) - 1);
2371 listen_fds_num++;
2373 return (0);
2374 } /* }}} int open_listen_socket_unix */
2376 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2377 {
2378 struct addrinfo ai_hints;
2379 struct addrinfo *ai_res;
2380 struct addrinfo *ai_ptr;
2381 char addr_copy[NI_MAXHOST];
2382 char *addr;
2383 char *port;
2384 int status;
2386 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2387 addr_copy[sizeof (addr_copy) - 1] = 0;
2388 addr = addr_copy;
2390 memset (&ai_hints, 0, sizeof (ai_hints));
2391 ai_hints.ai_flags = 0;
2392 #ifdef AI_ADDRCONFIG
2393 ai_hints.ai_flags |= AI_ADDRCONFIG;
2394 #endif
2395 ai_hints.ai_family = AF_UNSPEC;
2396 ai_hints.ai_socktype = SOCK_STREAM;
2398 port = NULL;
2399 if (*addr == '[') /* IPv6+port format */
2400 {
2401 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2402 addr++;
2404 port = strchr (addr, ']');
2405 if (port == NULL)
2406 {
2407 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2408 return (-1);
2409 }
2410 *port = 0;
2411 port++;
2413 if (*port == ':')
2414 port++;
2415 else if (*port == 0)
2416 port = NULL;
2417 else
2418 {
2419 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2420 return (-1);
2421 }
2422 } /* if (*addr == '[') */
2423 else
2424 {
2425 port = rindex(addr, ':');
2426 if (port != NULL)
2427 {
2428 *port = 0;
2429 port++;
2430 }
2431 }
2432 ai_res = NULL;
2433 status = getaddrinfo (addr,
2434 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2435 &ai_hints, &ai_res);
2436 if (status != 0)
2437 {
2438 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2439 addr, gai_strerror (status));
2440 return (-1);
2441 }
2443 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2444 {
2445 int fd;
2446 listen_socket_t *temp;
2447 int one = 1;
2449 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2450 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2451 if (temp == NULL)
2452 {
2453 fprintf (stderr,
2454 "rrdcached: open_listen_socket_network: realloc failed.\n");
2455 continue;
2456 }
2457 listen_fds = temp;
2458 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2460 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2461 if (fd < 0)
2462 {
2463 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2464 rrd_strerror(errno));
2465 continue;
2466 }
2468 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2470 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2471 if (status != 0)
2472 {
2473 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2474 sock->addr, rrd_strerror(errno));
2475 close (fd);
2476 continue;
2477 }
2479 status = listen (fd, /* backlog = */ 10);
2480 if (status != 0)
2481 {
2482 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2483 sock->addr, rrd_strerror(errno));
2484 close (fd);
2485 freeaddrinfo(ai_res);
2486 return (-1);
2487 }
2489 listen_fds[listen_fds_num].fd = fd;
2490 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2491 listen_fds_num++;
2492 } /* for (ai_ptr) */
2494 freeaddrinfo(ai_res);
2495 return (0);
2496 } /* }}} static int open_listen_socket_network */
2498 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2499 {
2500 assert(sock != NULL);
2501 assert(sock->addr != NULL);
2503 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2504 || sock->addr[0] == '/')
2505 return (open_listen_socket_unix(sock));
2506 else
2507 return (open_listen_socket_network(sock));
2508 } /* }}} int open_listen_socket */
2510 static int close_listen_sockets (void) /* {{{ */
2511 {
2512 size_t i;
2514 for (i = 0; i < listen_fds_num; i++)
2515 {
2516 close (listen_fds[i].fd);
2518 if (listen_fds[i].family == PF_UNIX)
2519 unlink(listen_fds[i].addr);
2520 }
2522 free (listen_fds);
2523 listen_fds = NULL;
2524 listen_fds_num = 0;
2526 return (0);
2527 } /* }}} int close_listen_sockets */
2529 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2530 {
2531 struct pollfd *pollfds;
2532 int pollfds_num;
2533 int status;
2534 int i;
2536 if (listen_fds_num < 1)
2537 {
2538 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2539 return (NULL);
2540 }
2542 pollfds_num = listen_fds_num;
2543 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2544 if (pollfds == NULL)
2545 {
2546 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2547 return (NULL);
2548 }
2549 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2551 RRDD_LOG(LOG_INFO, "listening for connections");
2553 while (state == RUNNING)
2554 {
2555 for (i = 0; i < pollfds_num; i++)
2556 {
2557 pollfds[i].fd = listen_fds[i].fd;
2558 pollfds[i].events = POLLIN | POLLPRI;
2559 pollfds[i].revents = 0;
2560 }
2562 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2563 if (state != RUNNING)
2564 break;
2565 else if (status == 0) /* timeout */
2566 continue;
2567 else if (status < 0) /* error */
2568 {
2569 status = errno;
2570 if (status != EINTR)
2571 {
2572 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2573 }
2574 continue;
2575 }
2577 for (i = 0; i < pollfds_num; i++)
2578 {
2579 listen_socket_t *client_sock;
2580 struct sockaddr_storage client_sa;
2581 socklen_t client_sa_size;
2582 pthread_t tid;
2583 pthread_attr_t attr;
2585 if (pollfds[i].revents == 0)
2586 continue;
2588 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2589 {
2590 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2591 "poll(2) returned something unexpected for listen FD #%i.",
2592 pollfds[i].fd);
2593 continue;
2594 }
2596 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2597 if (client_sock == NULL)
2598 {
2599 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2600 continue;
2601 }
2602 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2604 client_sa_size = sizeof (client_sa);
2605 client_sock->fd = accept (pollfds[i].fd,
2606 (struct sockaddr *) &client_sa, &client_sa_size);
2607 if (client_sock->fd < 0)
2608 {
2609 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2610 free(client_sock);
2611 continue;
2612 }
2614 pthread_attr_init (&attr);
2615 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2617 status = pthread_create (&tid, &attr, connection_thread_main,
2618 client_sock);
2619 if (status != 0)
2620 {
2621 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2622 close_connection(client_sock);
2623 continue;
2624 }
2625 } /* for (pollfds_num) */
2626 } /* while (state == RUNNING) */
2628 RRDD_LOG(LOG_INFO, "starting shutdown");
2630 close_listen_sockets ();
2632 pthread_mutex_lock (&connection_threads_lock);
2633 while (connection_threads_num > 0)
2634 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2635 pthread_mutex_unlock (&connection_threads_lock);
2637 free(pollfds);
2639 return (NULL);
2640 } /* }}} void *listen_thread_main */
2642 static int daemonize (void) /* {{{ */
2643 {
2644 int pid_fd;
2645 char *base_dir;
2647 daemon_uid = geteuid();
2649 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2650 if (pid_fd < 0)
2651 pid_fd = check_pidfile();
2652 if (pid_fd < 0)
2653 return pid_fd;
2655 /* open all the listen sockets */
2656 if (config_listen_address_list_len > 0)
2657 {
2658 for (size_t i = 0; i < config_listen_address_list_len; i++)
2659 open_listen_socket (config_listen_address_list[i]);
2661 rrd_free_ptrs((void ***) &config_listen_address_list,
2662 &config_listen_address_list_len);
2663 }
2664 else
2665 {
2666 listen_socket_t sock;
2667 memset(&sock, 0, sizeof(sock));
2668 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2669 open_listen_socket (&sock);
2670 }
2672 if (listen_fds_num < 1)
2673 {
2674 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2675 goto error;
2676 }
2678 if (!stay_foreground)
2679 {
2680 pid_t child;
2682 child = fork ();
2683 if (child < 0)
2684 {
2685 fprintf (stderr, "daemonize: fork(2) failed.\n");
2686 goto error;
2687 }
2688 else if (child > 0)
2689 exit(0);
2691 /* Become session leader */
2692 setsid ();
2694 /* Open the first three file descriptors to /dev/null */
2695 close (2);
2696 close (1);
2697 close (0);
2699 open ("/dev/null", O_RDWR);
2700 if (dup(0) == -1 || dup(0) == -1){
2701 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2702 }
2703 } /* if (!stay_foreground) */
2705 /* Change into the /tmp directory. */
2706 base_dir = (config_base_dir != NULL)
2707 ? config_base_dir
2708 : "/tmp";
2710 if (chdir (base_dir) != 0)
2711 {
2712 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2713 goto error;
2714 }
2716 install_signal_handlers();
2718 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2719 RRDD_LOG(LOG_INFO, "starting up");
2721 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2722 (GDestroyNotify) free_cache_item);
2723 if (cache_tree == NULL)
2724 {
2725 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2726 goto error;
2727 }
2729 return write_pidfile (pid_fd);
2731 error:
2732 remove_pidfile();
2733 return -1;
2734 } /* }}} int daemonize */
2736 static int cleanup (void) /* {{{ */
2737 {
2738 pthread_cond_broadcast (&flush_cond);
2739 pthread_join (flush_thread, NULL);
2741 pthread_cond_broadcast (&queue_cond);
2742 for (int i = 0; i < config_queue_threads; i++)
2743 pthread_join (queue_threads[i], NULL);
2745 if (config_flush_at_shutdown)
2746 {
2747 assert(cache_queue_head == NULL);
2748 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2749 }
2751 free(queue_threads);
2752 free(config_base_dir);
2754 pthread_mutex_lock(&cache_lock);
2755 g_tree_destroy(cache_tree);
2757 pthread_mutex_lock(&journal_lock);
2758 journal_done();
2760 RRDD_LOG(LOG_INFO, "goodbye");
2761 closelog ();
2763 remove_pidfile ();
2764 free(config_pid_file);
2766 return (0);
2767 } /* }}} int cleanup */
2769 static int read_options (int argc, char **argv) /* {{{ */
2770 {
2771 int option;
2772 int status = 0;
2774 char **permissions = NULL;
2775 size_t permissions_len = 0;
2777 gid_t socket_group = (gid_t)-1;
2778 mode_t socket_permissions = (mode_t)-1;
2780 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2781 {
2782 switch (option)
2783 {
2784 case 'g':
2785 stay_foreground=1;
2786 break;
2788 case 'l':
2789 {
2790 listen_socket_t *new;
2792 new = malloc(sizeof(listen_socket_t));
2793 if (new == NULL)
2794 {
2795 fprintf(stderr, "read_options: malloc failed.\n");
2796 return(2);
2797 }
2798 memset(new, 0, sizeof(listen_socket_t));
2800 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2802 /* Add permissions to the socket {{{ */
2803 if (permissions_len != 0)
2804 {
2805 size_t i;
2806 for (i = 0; i < permissions_len; i++)
2807 {
2808 status = socket_permission_add (new, permissions[i]);
2809 if (status != 0)
2810 {
2811 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2812 "socket failed. Most likely, this permission doesn't "
2813 "exist. Check your command line.\n", permissions[i]);
2814 status = 4;
2815 }
2816 }
2817 }
2818 else /* if (permissions_len == 0) */
2819 {
2820 /* Add permission for ALL commands to the socket. */
2821 size_t i;
2822 for (i = 0; i < list_of_commands_len; i++)
2823 {
2824 status = socket_permission_add (new, list_of_commands[i].cmd);
2825 if (status != 0)
2826 {
2827 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2828 "socket failed. This should never happen, ever! Sorry.\n",
2829 permissions[i]);
2830 status = 4;
2831 }
2832 }
2833 }
2834 /* }}} Done adding permissions. */
2836 new->socket_group = socket_group;
2837 new->socket_permissions = socket_permissions;
2839 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2840 &config_listen_address_list_len, new))
2841 {
2842 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2843 return (2);
2844 }
2845 }
2846 break;
2848 /* set socket group permissions */
2849 case 's':
2850 {
2851 gid_t group_gid;
2852 struct group *grp;
2854 group_gid = strtoul(optarg, NULL, 10);
2855 if (errno != EINVAL && group_gid>0)
2856 {
2857 /* we were passed a number */
2858 grp = getgrgid(group_gid);
2859 }
2860 else
2861 {
2862 grp = getgrnam(optarg);
2863 }
2865 if (grp)
2866 {
2867 socket_group = grp->gr_gid;
2868 }
2869 else
2870 {
2871 /* no idea what the user wanted... */
2872 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2873 return (5);
2874 }
2875 }
2876 break;
2878 /* set socket file permissions */
2879 case 'm':
2880 {
2881 long tmp;
2882 char *endptr = NULL;
2884 tmp = strtol (optarg, &endptr, 8);
2885 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2886 || (tmp > 07777) || (tmp < 0)) {
2887 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2888 optarg);
2889 return (5);
2890 }
2892 socket_permissions = (mode_t)tmp;
2893 }
2894 break;
2896 case 'P':
2897 {
2898 char *optcopy;
2899 char *saveptr;
2900 char *dummy;
2901 char *ptr;
2903 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2905 optcopy = strdup (optarg);
2906 dummy = optcopy;
2907 saveptr = NULL;
2908 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2909 {
2910 dummy = NULL;
2911 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2912 }
2914 free (optcopy);
2915 }
2916 break;
2918 case 'f':
2919 {
2920 int temp;
2922 temp = atoi (optarg);
2923 if (temp > 0)
2924 config_flush_interval = temp;
2925 else
2926 {
2927 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2928 status = 3;
2929 }
2930 }
2931 break;
2933 case 'w':
2934 {
2935 int temp;
2937 temp = atoi (optarg);
2938 if (temp > 0)
2939 config_write_interval = temp;
2940 else
2941 {
2942 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2943 status = 2;
2944 }
2945 }
2946 break;
2948 case 'z':
2949 {
2950 int temp;
2952 temp = atoi(optarg);
2953 if (temp > 0)
2954 config_write_jitter = temp;
2955 else
2956 {
2957 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2958 status = 2;
2959 }
2961 break;
2962 }
2964 case 't':
2965 {
2966 int threads;
2967 threads = atoi(optarg);
2968 if (threads >= 1)
2969 config_queue_threads = threads;
2970 else
2971 {
2972 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2973 return 1;
2974 }
2975 }
2976 break;
2978 case 'B':
2979 config_write_base_only = 1;
2980 break;
2982 case 'b':
2983 {
2984 size_t len;
2985 char base_realpath[PATH_MAX];
2987 if (config_base_dir != NULL)
2988 free (config_base_dir);
2989 config_base_dir = strdup (optarg);
2990 if (config_base_dir == NULL)
2991 {
2992 fprintf (stderr, "read_options: strdup failed.\n");
2993 return (3);
2994 }
2996 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2997 {
2998 fprintf (stderr, "Failed to create base directory '%s': %s\n",
2999 config_base_dir, rrd_strerror (errno));
3000 return (3);
3001 }
3003 /* make sure that the base directory is not resolved via
3004 * symbolic links. this makes some performance-enhancing
3005 * assumptions possible (we don't have to resolve paths
3006 * that start with a "/")
3007 */
3008 if (realpath(config_base_dir, base_realpath) == NULL)
3009 {
3010 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3011 "%s\n", config_base_dir, rrd_strerror(errno));
3012 return 5;
3013 }
3015 len = strlen (config_base_dir);
3016 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3017 {
3018 config_base_dir[len - 1] = 0;
3019 len--;
3020 }
3022 if (len < 1)
3023 {
3024 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3025 return (4);
3026 }
3028 _config_base_dir_len = len;
3030 len = strlen (base_realpath);
3031 while ((len > 0) && (base_realpath[len - 1] == '/'))
3032 {
3033 base_realpath[len - 1] = '\0';
3034 len--;
3035 }
3037 if (strncmp(config_base_dir,
3038 base_realpath, sizeof(base_realpath)) != 0)
3039 {
3040 fprintf(stderr,
3041 "Base directory (-b) resolved via file system links!\n"
3042 "Please consult rrdcached '-b' documentation!\n"
3043 "Consider specifying the real directory (%s)\n",
3044 base_realpath);
3045 return 5;
3046 }
3047 }
3048 break;
3050 case 'p':
3051 {
3052 if (config_pid_file != NULL)
3053 free (config_pid_file);
3054 config_pid_file = strdup (optarg);
3055 if (config_pid_file == NULL)
3056 {
3057 fprintf (stderr, "read_options: strdup failed.\n");
3058 return (3);
3059 }
3060 }
3061 break;
3063 case 'F':
3064 config_flush_at_shutdown = 1;
3065 break;
3067 case 'j':
3068 {
3069 char journal_dir_actual[PATH_MAX];
3070 const char *dir;
3071 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3073 status = rrd_mkdir_p(dir, 0777);
3074 if (status != 0)
3075 {
3076 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3077 dir, rrd_strerror(errno));
3078 return 6;
3079 }
3081 if (access(dir, R_OK|W_OK|X_OK) != 0)
3082 {
3083 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3084 errno ? rrd_strerror(errno) : "");
3085 return 6;
3086 }
3087 }
3088 break;
3090 case 'h':
3091 case '?':
3092 printf ("RRDCacheD %s\n"
3093 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3094 "\n"
3095 "Usage: rrdcached [options]\n"
3096 "\n"
3097 "Valid options are:\n"
3098 " -l <address> Socket address to listen to.\n"
3099 " -P <perms> Sets the permissions to assign to all following "
3100 "sockets\n"
3101 " -w <seconds> Interval in which to write data.\n"
3102 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3103 " -t <threads> Number of write threads.\n"
3104 " -f <seconds> Interval in which to flush dead data.\n"
3105 " -p <file> Location of the PID-file.\n"
3106 " -b <dir> Base directory to change to.\n"
3107 " -B Restrict file access to paths within -b <dir>\n"
3108 " -g Do not fork and run in the foreground.\n"
3109 " -j <dir> Directory in which to create the journal files.\n"
3110 " -F Always flush all updates at shutdown\n"
3111 " -s <id|name> Group owner of all following UNIX sockets\n"
3112 " (the socket will also have read/write permissions "
3113 "for that group)\n"
3114 " -m <mode> File permissions (octal) of all following UNIX "
3115 "sockets\n"
3116 "\n"
3117 "For more information and a detailed description of all options "
3118 "please refer\n"
3119 "to the rrdcached(1) manual page.\n",
3120 VERSION);
3121 if (option == 'h')
3122 status = -1;
3123 else
3124 status = 1;
3125 break;
3126 } /* switch (option) */
3127 } /* while (getopt) */
3129 /* advise the user when values are not sane */
3130 if (config_flush_interval < 2 * config_write_interval)
3131 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3132 " 2x write interval (-w) !\n");
3133 if (config_write_jitter > config_write_interval)
3134 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3135 " write interval (-w) !\n");
3137 if (config_write_base_only && config_base_dir == NULL)
3138 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3139 " Consult the rrdcached documentation\n");
3141 if (journal_dir == NULL)
3142 config_flush_at_shutdown = 1;
3144 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3146 return (status);
3147 } /* }}} int read_options */
3149 int main (int argc, char **argv)
3150 {
3151 int status;
3153 status = read_options (argc, argv);
3154 if (status != 0)
3155 {
3156 if (status < 0)
3157 status = 0;
3158 return (status);
3159 }
3161 status = daemonize ();
3162 if (status != 0)
3163 {
3164 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3165 return (1);
3166 }
3168 journal_init();
3170 /* start the queue threads */
3171 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3172 if (queue_threads == NULL)
3173 {
3174 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3175 cleanup();
3176 return (1);
3177 }
3178 for (int i = 0; i < config_queue_threads; i++)
3179 {
3180 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3181 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3182 if (status != 0)
3183 {
3184 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3185 cleanup();
3186 return (1);
3187 }
3188 }
3190 /* start the flush thread */
3191 memset(&flush_thread, 0, sizeof(flush_thread));
3192 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3193 if (status != 0)
3194 {
3195 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3196 cleanup();
3197 return (1);
3198 }
3200 listen_thread_main (NULL);
3201 cleanup ();
3203 return (0);
3204 } /* int main */
3206 /*
3207 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3208 */