1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120 do { \
121 if (stay_foreground) { \
122 fprintf(stderr, __VA_ARGS__); \
123 fprintf(stderr, "\n"); } \
124 syslog ((severity), __VA_ARGS__); \
125 } while (0)
127 /*
128 * Types
129 */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
133 {
134 int fd;
135 char addr[PATH_MAX + 1];
136 int family;
138 /* state for BATCH processing */
139 time_t batch_start;
140 int batch_cmd;
142 /* buffered IO */
143 char *rbuf;
144 off_t next_cmd;
145 off_t next_read;
147 char *wbuf;
148 ssize_t wbuf_len;
150 uint32_t permissions;
152 gid_t socket_group;
153 mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
161 time_t UNUSED(now),\
162 char UNUSED(*buffer),\
163 size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO command_t UNUSED(*cmd),\
166 DISPATCH_PROTO
168 struct command_s {
169 char *cmd;
170 int (*handler)(HANDLER_PROTO);
172 char context; /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT (1<<0)
174 #define CMD_CONTEXT_BATCH (1<<1)
175 #define CMD_CONTEXT_JOURNAL (1<<2)
176 #define CMD_CONTEXT_ANY (0x7f)
178 char *syntax;
179 char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
185 {
186 char *file;
187 char **values;
188 size_t values_num;
189 time_t last_flush_time;
190 time_t last_update_stamp;
191 #define CI_FLAGS_IN_TREE (1<<0)
192 #define CI_FLAGS_IN_QUEUE (1<<1)
193 int flags;
194 pthread_cond_t flushed;
195 cache_item_t *prev;
196 cache_item_t *next;
197 };
199 struct callback_flush_data_s
200 {
201 time_t now;
202 time_t abs_timeout;
203 char **keys;
204 size_t keys_num;
205 };
206 typedef struct callback_flush_data_s callback_flush_data_t;
208 enum queue_side_e
209 {
210 HEAD,
211 TAIL
212 };
213 typedef enum queue_side_e queue_side_t;
215 /* describe a set of journal files */
216 typedef struct {
217 char **files;
218 size_t files_num;
219 } journal_set;
221 /* max length of socket command or response */
222 #define CMD_MAX 4096
223 #define RBUF_SIZE (CMD_MAX*2)
225 /*
226 * Variables
227 */
228 static int stay_foreground = 0;
229 static uid_t daemon_uid;
231 static listen_socket_t *listen_fds = NULL;
232 static size_t listen_fds_num = 0;
234 static listen_socket_t default_socket;
236 enum {
237 RUNNING, /* normal operation */
238 FLUSHING, /* flushing remaining values */
239 SHUTDOWN /* shutting down */
240 } state = RUNNING;
242 static pthread_t *queue_threads;
243 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
244 static int config_queue_threads = 4;
246 static pthread_t flush_thread;
247 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
249 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
250 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
251 static int connection_threads_num = 0;
253 /* Cache stuff */
254 static GTree *cache_tree = NULL;
255 static cache_item_t *cache_queue_head = NULL;
256 static cache_item_t *cache_queue_tail = NULL;
257 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
259 static int config_write_interval = 300;
260 static int config_write_jitter = 0;
261 static int config_flush_interval = 3600;
262 static int config_flush_at_shutdown = 0;
263 static char *config_pid_file = NULL;
264 static char *config_base_dir = NULL;
265 static size_t _config_base_dir_len = 0;
266 static int config_write_base_only = 0;
268 static listen_socket_t **config_listen_address_list = NULL;
269 static size_t config_listen_address_list_len = 0;
271 static uint64_t stats_queue_length = 0;
272 static uint64_t stats_updates_received = 0;
273 static uint64_t stats_flush_received = 0;
274 static uint64_t stats_updates_written = 0;
275 static uint64_t stats_data_sets_written = 0;
276 static uint64_t stats_journal_bytes = 0;
277 static uint64_t stats_journal_rotate = 0;
278 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL; /* current journal file handle */
287 static long journal_size = 0; /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
297 /*
298 * Functions
299 */
300 static void sig_common (const char *sig) /* {{{ */
301 {
302 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
303 state = FLUSHING;
304 pthread_cond_broadcast(&flush_cond);
305 pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
309 {
310 sig_common("INT");
311 } /* }}} void sig_int_handler */
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
314 {
315 sig_common("TERM");
316 } /* }}} void sig_term_handler */
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
319 {
320 config_flush_at_shutdown = 1;
321 sig_common("USR1");
322 } /* }}} void sig_usr1_handler */
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
325 {
326 config_flush_at_shutdown = 0;
327 sig_common("USR2");
328 } /* }}} void sig_usr2_handler */
330 static void install_signal_handlers(void) /* {{{ */
331 {
332 /* These structures are static, because `sigaction' behaves weird if the are
333 * overwritten.. */
334 static struct sigaction sa_int;
335 static struct sigaction sa_term;
336 static struct sigaction sa_pipe;
337 static struct sigaction sa_usr1;
338 static struct sigaction sa_usr2;
340 /* Install signal handlers */
341 memset (&sa_int, 0, sizeof (sa_int));
342 sa_int.sa_handler = sig_int_handler;
343 sigaction (SIGINT, &sa_int, NULL);
345 memset (&sa_term, 0, sizeof (sa_term));
346 sa_term.sa_handler = sig_term_handler;
347 sigaction (SIGTERM, &sa_term, NULL);
349 memset (&sa_pipe, 0, sizeof (sa_pipe));
350 sa_pipe.sa_handler = SIG_IGN;
351 sigaction (SIGPIPE, &sa_pipe, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_usr1));
354 sa_usr1.sa_handler = sig_usr1_handler;
355 sigaction (SIGUSR1, &sa_usr1, NULL);
357 memset (&sa_usr2, 0, sizeof (sa_usr2));
358 sa_usr2.sa_handler = sig_usr2_handler;
359 sigaction (SIGUSR2, &sa_usr2, NULL);
361 } /* }}} void install_signal_handlers */
363 static int open_pidfile(char *action, int oflag) /* {{{ */
364 {
365 int fd;
366 const char *file;
367 char *file_copy, *dir;
369 file = (config_pid_file != NULL)
370 ? config_pid_file
371 : LOCALSTATEDIR "/run/rrdcached.pid";
373 /* dirname may modify its argument */
374 file_copy = strdup(file);
375 if (file_copy == NULL)
376 {
377 fprintf(stderr, "rrdcached: strdup(): %s\n",
378 rrd_strerror(errno));
379 return -1;
380 }
382 dir = dirname(file_copy);
383 if (rrd_mkdir_p(dir, 0777) != 0)
384 {
385 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386 dir, rrd_strerror(errno));
387 return -1;
388 }
390 free(file_copy);
392 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
393 if (fd < 0)
394 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395 action, file, rrd_strerror(errno));
397 return(fd);
398 } /* }}} static int open_pidfile */
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
402 {
403 int pid_fd;
404 pid_t pid;
405 char pid_str[16];
407 pid_fd = open_pidfile("open", O_RDWR);
408 if (pid_fd < 0)
409 return pid_fd;
411 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
412 return -1;
414 pid = atoi(pid_str);
415 if (pid <= 0)
416 return -1;
418 /* another running process that we can signal COULD be
419 * a competing rrdcached */
420 if (pid != getpid() && kill(pid, 0) == 0)
421 {
422 fprintf(stderr,
423 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
424 close(pid_fd);
425 return -1;
426 }
428 lseek(pid_fd, 0, SEEK_SET);
429 if (ftruncate(pid_fd, 0) == -1)
430 {
431 fprintf(stderr,
432 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
433 close(pid_fd);
434 return -1;
435 }
437 fprintf(stderr,
438 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439 "rrdcached: starting normally.\n", pid);
441 return pid_fd;
442 } /* }}} static int check_pidfile */
444 static int write_pidfile (int fd) /* {{{ */
445 {
446 pid_t pid;
447 FILE *fh;
449 pid = getpid ();
451 fh = fdopen (fd, "w");
452 if (fh == NULL)
453 {
454 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
455 close(fd);
456 return (-1);
457 }
459 fprintf (fh, "%i\n", (int) pid);
460 fclose (fh);
462 return (0);
463 } /* }}} int write_pidfile */
465 static int remove_pidfile (void) /* {{{ */
466 {
467 char *file;
468 int status;
470 file = (config_pid_file != NULL)
471 ? config_pid_file
472 : LOCALSTATEDIR "/run/rrdcached.pid";
474 status = unlink (file);
475 if (status == 0)
476 return (0);
477 return (errno);
478 } /* }}} int remove_pidfile */
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
481 {
482 char *eol;
484 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485 sock->next_read - sock->next_cmd);
487 if (eol == NULL)
488 {
489 /* no commands left, move remainder back to front of rbuf */
490 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491 sock->next_read - sock->next_cmd);
492 sock->next_read -= sock->next_cmd;
493 sock->next_cmd = 0;
494 *len = 0;
495 return NULL;
496 }
497 else
498 {
499 char *cmd = sock->rbuf + sock->next_cmd;
500 *eol = '\0';
502 sock->next_cmd = eol - sock->rbuf + 1;
504 if (eol > sock->rbuf && *(eol-1) == '\r')
505 *(--eol) = '\0'; /* handle "\r\n" EOL */
507 *len = eol - cmd;
509 return cmd;
510 }
512 /* NOTREACHED */
513 assert(1==0);
514 } /* }}} char *next_cmd */
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
518 {
519 char *new_buf;
521 assert(sock != NULL);
523 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
524 if (new_buf == NULL)
525 {
526 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
527 return -1;
528 }
530 strncpy(new_buf + sock->wbuf_len, str, len + 1);
532 sock->wbuf = new_buf;
533 sock->wbuf_len += len;
535 return 0;
536 } /* }}} static int add_to_wbuf */
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
540 {
541 va_list argp;
542 char buffer[CMD_MAX];
543 int len;
545 if (JOURNAL_REPLAY(sock)) return 0;
546 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
548 va_start(argp, fmt);
549 #ifdef HAVE_VSNPRINTF
550 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
551 #else
552 len = vsprintf(buffer, fmt, argp);
553 #endif
554 va_end(argp);
555 if (len < 0)
556 {
557 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
558 return -1;
559 }
561 return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
564 static int count_lines(char *str) /* {{{ */
565 {
566 int lines = 0;
568 if (str != NULL)
569 {
570 while ((str = strchr(str, '\n')) != NULL)
571 {
572 ++lines;
573 ++str;
574 }
575 }
577 return lines;
578 } /* }}} static int count_lines */
580 /* send the response back to the user.
581 * returns 0 on success, -1 on error
582 * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584 char *fmt, ...) /* {{{ */
585 {
586 va_list argp;
587 char buffer[CMD_MAX];
588 int lines;
589 ssize_t wrote;
590 int rclen, len;
592 if (JOURNAL_REPLAY(sock)) return rc;
594 if (sock->batch_start)
595 {
596 if (rc == RESP_OK)
597 return rc; /* no response on success during BATCH */
598 lines = sock->batch_cmd;
599 }
600 else if (rc == RESP_OK)
601 lines = count_lines(sock->wbuf);
602 else
603 lines = -1;
605 rclen = sprintf(buffer, "%d ", lines);
606 va_start(argp, fmt);
607 #ifdef HAVE_VSNPRINTF
608 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
609 #else
610 len = vsprintf(buffer+rclen, fmt, argp);
611 #endif
612 va_end(argp);
613 if (len < 0)
614 return -1;
616 len += rclen;
618 /* append the result to the wbuf, don't write to the user */
619 if (sock->batch_start)
620 return add_to_wbuf(sock, buffer, len);
622 /* first write must be complete */
623 if (len != write(sock->fd, buffer, len))
624 {
625 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
626 return -1;
627 }
629 if (sock->wbuf != NULL && rc == RESP_OK)
630 {
631 wrote = 0;
632 while (wrote < sock->wbuf_len)
633 {
634 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
635 if (wb <= 0)
636 {
637 RRDD_LOG(LOG_INFO, "send_response: could not write results");
638 return -1;
639 }
640 wrote += wb;
641 }
642 }
644 free(sock->wbuf); sock->wbuf = NULL;
645 sock->wbuf_len = 0;
647 return 0;
648 } /* }}} */
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
651 {
652 ci->values = NULL;
653 ci->values_num = 0;
655 ci->last_flush_time = when;
656 if (config_write_jitter > 0)
657 ci->last_flush_time += (rrd_random() % config_write_jitter);
658 }
660 /* remove_from_queue
661 * remove a "cache_item_t" item from the queue.
662 * must hold 'cache_lock' when calling this
663 */
664 static void remove_from_queue(cache_item_t *ci) /* {{{ */
665 {
666 if (ci == NULL) return;
667 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
669 if (ci->prev == NULL)
670 cache_queue_head = ci->next; /* reset head */
671 else
672 ci->prev->next = ci->next;
674 if (ci->next == NULL)
675 cache_queue_tail = ci->prev; /* reset the tail */
676 else
677 ci->next->prev = ci->prev;
679 ci->next = ci->prev = NULL;
680 ci->flags &= ~CI_FLAGS_IN_QUEUE;
682 pthread_mutex_lock (&stats_lock);
683 assert (stats_queue_length > 0);
684 stats_queue_length--;
685 pthread_mutex_unlock (&stats_lock);
687 } /* }}} static void remove_from_queue */
689 /* free the resources associated with the cache_item_t
690 * must hold cache_lock when calling this function
691 */
692 static void *free_cache_item(cache_item_t *ci) /* {{{ */
693 {
694 if (ci == NULL) return NULL;
696 remove_from_queue(ci);
698 for (size_t i=0; i < ci->values_num; i++)
699 free(ci->values[i]);
701 free (ci->values);
702 free (ci->file);
704 /* in case anyone is waiting */
705 pthread_cond_broadcast(&ci->flushed);
706 pthread_cond_destroy(&ci->flushed);
708 free (ci);
710 return NULL;
711 } /* }}} static void *free_cache_item */
713 /*
714 * enqueue_cache_item:
715 * `cache_lock' must be acquired before calling this function!
716 */
717 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
718 queue_side_t side)
719 {
720 if (ci == NULL)
721 return (-1);
723 if (ci->values_num == 0)
724 return (0);
726 if (side == HEAD)
727 {
728 if (cache_queue_head == ci)
729 return 0;
731 /* remove if further down in queue */
732 remove_from_queue(ci);
734 ci->prev = NULL;
735 ci->next = cache_queue_head;
736 if (ci->next != NULL)
737 ci->next->prev = ci;
738 cache_queue_head = ci;
740 if (cache_queue_tail == NULL)
741 cache_queue_tail = cache_queue_head;
742 }
743 else /* (side == TAIL) */
744 {
745 /* We don't move values back in the list.. */
746 if (ci->flags & CI_FLAGS_IN_QUEUE)
747 return (0);
749 assert (ci->next == NULL);
750 assert (ci->prev == NULL);
752 ci->prev = cache_queue_tail;
754 if (cache_queue_tail == NULL)
755 cache_queue_head = ci;
756 else
757 cache_queue_tail->next = ci;
759 cache_queue_tail = ci;
760 }
762 ci->flags |= CI_FLAGS_IN_QUEUE;
764 pthread_cond_signal(&queue_cond);
765 pthread_mutex_lock (&stats_lock);
766 stats_queue_length++;
767 pthread_mutex_unlock (&stats_lock);
769 return (0);
770 } /* }}} int enqueue_cache_item */
772 /*
773 * tree_callback_flush:
774 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
775 * while this is in progress.
776 */
777 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
778 gpointer data)
779 {
780 cache_item_t *ci;
781 callback_flush_data_t *cfd;
783 ci = (cache_item_t *) value;
784 cfd = (callback_flush_data_t *) data;
786 if (ci->flags & CI_FLAGS_IN_QUEUE)
787 return FALSE;
789 if (ci->values_num > 0
790 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
791 {
792 enqueue_cache_item (ci, TAIL);
793 }
794 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
795 && (ci->values_num <= 0))
796 {
797 assert ((char *) key == ci->file);
798 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
799 {
800 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
801 return (FALSE);
802 }
803 }
805 return (FALSE);
806 } /* }}} gboolean tree_callback_flush */
808 static int flush_old_values (int max_age)
809 {
810 callback_flush_data_t cfd;
811 size_t k;
813 memset (&cfd, 0, sizeof (cfd));
814 /* Pass the current time as user data so that we don't need to call
815 * `time' for each node. */
816 cfd.now = time (NULL);
817 cfd.keys = NULL;
818 cfd.keys_num = 0;
820 if (max_age > 0)
821 cfd.abs_timeout = cfd.now - max_age;
822 else
823 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
825 /* `tree_callback_flush' will return the keys of all values that haven't
826 * been touched in the last `config_flush_interval' seconds in `cfd'.
827 * The char*'s in this array point to the same memory as ci->file, so we
828 * don't need to free them separately. */
829 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
831 for (k = 0; k < cfd.keys_num; k++)
832 {
833 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
834 /* should never fail, since we have held the cache_lock
835 * the entire time */
836 assert(status == TRUE);
837 }
839 if (cfd.keys != NULL)
840 {
841 free (cfd.keys);
842 cfd.keys = NULL;
843 }
845 return (0);
846 } /* int flush_old_values */
848 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
849 {
850 struct timeval now;
851 struct timespec next_flush;
852 int status;
854 gettimeofday (&now, NULL);
855 next_flush.tv_sec = now.tv_sec + config_flush_interval;
856 next_flush.tv_nsec = 1000 * now.tv_usec;
858 pthread_mutex_lock(&cache_lock);
860 while (state == RUNNING)
861 {
862 gettimeofday (&now, NULL);
863 if ((now.tv_sec > next_flush.tv_sec)
864 || ((now.tv_sec == next_flush.tv_sec)
865 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
866 {
867 RRDD_LOG(LOG_DEBUG, "flushing old values");
869 /* Determine the time of the next cache flush. */
870 next_flush.tv_sec = now.tv_sec + config_flush_interval;
872 /* Flush all values that haven't been written in the last
873 * `config_write_interval' seconds. */
874 flush_old_values (config_write_interval);
876 /* unlock the cache while we rotate so we don't block incoming
877 * updates if the fsync() blocks on disk I/O */
878 pthread_mutex_unlock(&cache_lock);
879 journal_rotate();
880 pthread_mutex_lock(&cache_lock);
881 }
883 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
884 if (status != 0 && status != ETIMEDOUT)
885 {
886 RRDD_LOG (LOG_ERR, "flush_thread_main: "
887 "pthread_cond_timedwait returned %i.", status);
888 }
889 }
891 if (config_flush_at_shutdown)
892 flush_old_values (-1); /* flush everything */
894 state = SHUTDOWN;
896 pthread_mutex_unlock(&cache_lock);
898 return NULL;
899 } /* void *flush_thread_main */
901 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
902 {
903 pthread_mutex_lock (&cache_lock);
905 while (state != SHUTDOWN
906 || (cache_queue_head != NULL && config_flush_at_shutdown))
907 {
908 cache_item_t *ci;
909 char *file;
910 char **values;
911 size_t values_num;
912 int status;
914 /* Now, check if there's something to store away. If not, wait until
915 * something comes in. */
916 if (cache_queue_head == NULL)
917 {
918 status = pthread_cond_wait (&queue_cond, &cache_lock);
919 if ((status != 0) && (status != ETIMEDOUT))
920 {
921 RRDD_LOG (LOG_ERR, "queue_thread_main: "
922 "pthread_cond_wait returned %i.", status);
923 }
924 }
926 /* Check if a value has arrived. This may be NULL if we timed out or there
927 * was an interrupt such as a signal. */
928 if (cache_queue_head == NULL)
929 continue;
931 ci = cache_queue_head;
933 /* copy the relevant parts */
934 file = strdup (ci->file);
935 if (file == NULL)
936 {
937 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
938 continue;
939 }
941 assert(ci->values != NULL);
942 assert(ci->values_num > 0);
944 values = ci->values;
945 values_num = ci->values_num;
947 wipe_ci_values(ci, time(NULL));
948 remove_from_queue(ci);
950 pthread_mutex_unlock (&cache_lock);
952 rrd_clear_error ();
953 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
954 if (status != 0)
955 {
956 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
957 "rrd_update_r (%s) failed with status %i. (%s)",
958 file, status, rrd_get_error());
959 }
961 journal_write("wrote", file);
963 /* Search again in the tree. It's possible someone issued a "FORGET"
964 * while we were writing the update values. */
965 pthread_mutex_lock(&cache_lock);
966 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
967 if (ci)
968 pthread_cond_broadcast(&ci->flushed);
969 pthread_mutex_unlock(&cache_lock);
971 if (status == 0)
972 {
973 pthread_mutex_lock (&stats_lock);
974 stats_updates_written++;
975 stats_data_sets_written += values_num;
976 pthread_mutex_unlock (&stats_lock);
977 }
979 rrd_free_ptrs((void ***) &values, &values_num);
980 free(file);
982 pthread_mutex_lock (&cache_lock);
983 }
984 pthread_mutex_unlock (&cache_lock);
986 return (NULL);
987 } /* }}} void *queue_thread_main */
989 static int buffer_get_field (char **buffer_ret, /* {{{ */
990 size_t *buffer_size_ret, char **field_ret)
991 {
992 char *buffer;
993 size_t buffer_pos;
994 size_t buffer_size;
995 char *field;
996 size_t field_size;
997 int status;
999 buffer = *buffer_ret;
1000 buffer_pos = 0;
1001 buffer_size = *buffer_size_ret;
1002 field = *buffer_ret;
1003 field_size = 0;
1005 if (buffer_size <= 0)
1006 return (-1);
1008 /* This is ensured by `handle_request'. */
1009 assert (buffer[buffer_size - 1] == '\0');
1011 status = -1;
1012 while (buffer_pos < buffer_size)
1013 {
1014 /* Check for end-of-field or end-of-buffer */
1015 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1016 {
1017 field[field_size] = 0;
1018 field_size++;
1019 buffer_pos++;
1020 status = 0;
1021 break;
1022 }
1023 /* Handle escaped characters. */
1024 else if (buffer[buffer_pos] == '\\')
1025 {
1026 if (buffer_pos >= (buffer_size - 1))
1027 break;
1028 buffer_pos++;
1029 field[field_size] = buffer[buffer_pos];
1030 field_size++;
1031 buffer_pos++;
1032 }
1033 /* Normal operation */
1034 else
1035 {
1036 field[field_size] = buffer[buffer_pos];
1037 field_size++;
1038 buffer_pos++;
1039 }
1040 } /* while (buffer_pos < buffer_size) */
1042 if (status != 0)
1043 return (status);
1045 *buffer_ret = buffer + buffer_pos;
1046 *buffer_size_ret = buffer_size - buffer_pos;
1047 *field_ret = field;
1049 return (0);
1050 } /* }}} int buffer_get_field */
1052 /* if we're restricting writes to the base directory,
1053 * check whether the file falls within the dir
1054 * returns 1 if OK, otherwise 0
1055 */
1056 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1057 {
1058 assert(file != NULL);
1060 if (!config_write_base_only
1061 || JOURNAL_REPLAY(sock)
1062 || config_base_dir == NULL)
1063 return 1;
1065 if (strstr(file, "../") != NULL) goto err;
1067 /* relative paths without "../" are ok */
1068 if (*file != '/') return 1;
1070 /* file must be of the format base + "/" + <1+ char filename> */
1071 if (strlen(file) < _config_base_dir_len + 2) goto err;
1072 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1073 if (*(file + _config_base_dir_len) != '/') goto err;
1075 return 1;
1077 err:
1078 if (sock != NULL && sock->fd >= 0)
1079 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1081 return 0;
1082 } /* }}} static int check_file_access */
1084 /* when using a base dir, convert relative paths to absolute paths.
1085 * if necessary, modifies the "filename" pointer to point
1086 * to the new path created in "tmp". "tmp" is provided
1087 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1088 *
1089 * this allows us to optimize for the expected case (absolute path)
1090 * with a no-op.
1091 */
1092 static void get_abs_path(char **filename, char *tmp)
1093 {
1094 assert(tmp != NULL);
1095 assert(filename != NULL && *filename != NULL);
1097 if (config_base_dir == NULL || **filename == '/')
1098 return;
1100 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1101 *filename = tmp;
1102 } /* }}} static int get_abs_path */
1104 static int flush_file (const char *filename) /* {{{ */
1105 {
1106 cache_item_t *ci;
1108 pthread_mutex_lock (&cache_lock);
1110 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1111 if (ci == NULL)
1112 {
1113 pthread_mutex_unlock (&cache_lock);
1114 return (ENOENT);
1115 }
1117 if (ci->values_num > 0)
1118 {
1119 /* Enqueue at head */
1120 enqueue_cache_item (ci, HEAD);
1121 pthread_cond_wait(&ci->flushed, &cache_lock);
1122 }
1124 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1125 * may have been purged during our cond_wait() */
1127 pthread_mutex_unlock(&cache_lock);
1129 return (0);
1130 } /* }}} int flush_file */
1132 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1133 {
1134 char *err = "Syntax error.\n";
1136 if (cmd && cmd->syntax)
1137 err = cmd->syntax;
1139 return send_response(sock, RESP_ERR, "Usage: %s", err);
1140 } /* }}} static int syntax_error() */
1142 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1143 {
1144 uint64_t copy_queue_length;
1145 uint64_t copy_updates_received;
1146 uint64_t copy_flush_received;
1147 uint64_t copy_updates_written;
1148 uint64_t copy_data_sets_written;
1149 uint64_t copy_journal_bytes;
1150 uint64_t copy_journal_rotate;
1152 uint64_t tree_nodes_number;
1153 uint64_t tree_depth;
1155 pthread_mutex_lock (&stats_lock);
1156 copy_queue_length = stats_queue_length;
1157 copy_updates_received = stats_updates_received;
1158 copy_flush_received = stats_flush_received;
1159 copy_updates_written = stats_updates_written;
1160 copy_data_sets_written = stats_data_sets_written;
1161 copy_journal_bytes = stats_journal_bytes;
1162 copy_journal_rotate = stats_journal_rotate;
1163 pthread_mutex_unlock (&stats_lock);
1165 pthread_mutex_lock (&cache_lock);
1166 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1167 tree_depth = (uint64_t) g_tree_height (cache_tree);
1168 pthread_mutex_unlock (&cache_lock);
1170 add_response_info(sock,
1171 "QueueLength: %"PRIu64"\n", copy_queue_length);
1172 add_response_info(sock,
1173 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1174 add_response_info(sock,
1175 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1176 add_response_info(sock,
1177 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1178 add_response_info(sock,
1179 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1180 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1181 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1182 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1183 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1185 send_response(sock, RESP_OK, "Statistics follow\n");
1187 return (0);
1188 } /* }}} int handle_request_stats */
1190 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1191 {
1192 char *file, file_tmp[PATH_MAX];
1193 int status;
1195 status = buffer_get_field (&buffer, &buffer_size, &file);
1196 if (status != 0)
1197 {
1198 return syntax_error(sock,cmd);
1199 }
1200 else
1201 {
1202 pthread_mutex_lock(&stats_lock);
1203 stats_flush_received++;
1204 pthread_mutex_unlock(&stats_lock);
1206 get_abs_path(&file, file_tmp);
1207 if (!check_file_access(file, sock)) return 0;
1209 status = flush_file (file);
1210 if (status == 0)
1211 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1212 else if (status == ENOENT)
1213 {
1214 /* no file in our tree; see whether it exists at all */
1215 struct stat statbuf;
1217 memset(&statbuf, 0, sizeof(statbuf));
1218 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1219 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1220 else
1221 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1222 }
1223 else if (status < 0)
1224 return send_response(sock, RESP_ERR, "Internal error.\n");
1225 else
1226 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1227 }
1229 /* NOTREACHED */
1230 assert(1==0);
1231 } /* }}} int handle_request_flush */
1233 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1234 {
1235 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1237 pthread_mutex_lock(&cache_lock);
1238 flush_old_values(-1);
1239 pthread_mutex_unlock(&cache_lock);
1241 return send_response(sock, RESP_OK, "Started flush.\n");
1242 } /* }}} static int handle_request_flushall */
1244 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1245 {
1246 int status;
1247 char *file, file_tmp[PATH_MAX];
1248 cache_item_t *ci;
1250 status = buffer_get_field(&buffer, &buffer_size, &file);
1251 if (status != 0)
1252 return syntax_error(sock,cmd);
1254 get_abs_path(&file, file_tmp);
1256 pthread_mutex_lock(&cache_lock);
1257 ci = g_tree_lookup(cache_tree, file);
1258 if (ci == NULL)
1259 {
1260 pthread_mutex_unlock(&cache_lock);
1261 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1262 }
1264 for (size_t i=0; i < ci->values_num; i++)
1265 add_response_info(sock, "%s\n", ci->values[i]);
1267 pthread_mutex_unlock(&cache_lock);
1268 return send_response(sock, RESP_OK, "updates pending\n");
1269 } /* }}} static int handle_request_pending */
1271 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1272 {
1273 int status;
1274 gboolean found;
1275 char *file, file_tmp[PATH_MAX];
1277 status = buffer_get_field(&buffer, &buffer_size, &file);
1278 if (status != 0)
1279 return syntax_error(sock,cmd);
1281 get_abs_path(&file, file_tmp);
1282 if (!check_file_access(file, sock)) return 0;
1284 pthread_mutex_lock(&cache_lock);
1285 found = g_tree_remove(cache_tree, file);
1286 pthread_mutex_unlock(&cache_lock);
1288 if (found == TRUE)
1289 {
1290 if (!JOURNAL_REPLAY(sock))
1291 journal_write("forget", file);
1293 return send_response(sock, RESP_OK, "Gone!\n");
1294 }
1295 else
1296 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1298 /* NOTREACHED */
1299 assert(1==0);
1300 } /* }}} static int handle_request_forget */
1302 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1303 {
1304 cache_item_t *ci;
1306 pthread_mutex_lock(&cache_lock);
1308 ci = cache_queue_head;
1309 while (ci != NULL)
1310 {
1311 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1312 ci = ci->next;
1313 }
1315 pthread_mutex_unlock(&cache_lock);
1317 return send_response(sock, RESP_OK, "in queue.\n");
1318 } /* }}} int handle_request_queue */
1320 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1321 {
1322 char *file, file_tmp[PATH_MAX];
1323 int values_num = 0;
1324 int status;
1325 char orig_buf[CMD_MAX];
1327 cache_item_t *ci;
1329 /* save it for the journal later */
1330 if (!JOURNAL_REPLAY(sock))
1331 strncpy(orig_buf, buffer, buffer_size);
1333 status = buffer_get_field (&buffer, &buffer_size, &file);
1334 if (status != 0)
1335 return syntax_error(sock,cmd);
1337 pthread_mutex_lock(&stats_lock);
1338 stats_updates_received++;
1339 pthread_mutex_unlock(&stats_lock);
1341 get_abs_path(&file, file_tmp);
1342 if (!check_file_access(file, sock)) return 0;
1344 pthread_mutex_lock (&cache_lock);
1345 ci = g_tree_lookup (cache_tree, file);
1347 if (ci == NULL) /* {{{ */
1348 {
1349 struct stat statbuf;
1350 cache_item_t *tmp;
1352 /* don't hold the lock while we setup; stat(2) might block */
1353 pthread_mutex_unlock(&cache_lock);
1355 memset (&statbuf, 0, sizeof (statbuf));
1356 status = stat (file, &statbuf);
1357 if (status != 0)
1358 {
1359 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1361 status = errno;
1362 if (status == ENOENT)
1363 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1364 else
1365 return send_response(sock, RESP_ERR,
1366 "stat failed with error %i.\n", status);
1367 }
1368 if (!S_ISREG (statbuf.st_mode))
1369 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1371 if (access(file, R_OK|W_OK) != 0)
1372 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1373 file, rrd_strerror(errno));
1375 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1376 if (ci == NULL)
1377 {
1378 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1380 return send_response(sock, RESP_ERR, "malloc failed.\n");
1381 }
1382 memset (ci, 0, sizeof (cache_item_t));
1384 ci->file = strdup (file);
1385 if (ci->file == NULL)
1386 {
1387 free (ci);
1388 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1390 return send_response(sock, RESP_ERR, "strdup failed.\n");
1391 }
1393 wipe_ci_values(ci, now);
1394 ci->flags = CI_FLAGS_IN_TREE;
1395 pthread_cond_init(&ci->flushed, NULL);
1397 pthread_mutex_lock(&cache_lock);
1399 /* another UPDATE might have added this entry in the meantime */
1400 tmp = g_tree_lookup (cache_tree, file);
1401 if (tmp == NULL)
1402 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1403 else
1404 {
1405 free_cache_item (ci);
1406 ci = tmp;
1407 }
1409 /* state may have changed while we were unlocked */
1410 if (state == SHUTDOWN)
1411 return -1;
1412 } /* }}} */
1413 assert (ci != NULL);
1415 /* don't re-write updates in replay mode */
1416 if (!JOURNAL_REPLAY(sock))
1417 journal_write("update", orig_buf);
1419 while (buffer_size > 0)
1420 {
1421 char *value;
1422 time_t stamp;
1423 char *eostamp;
1425 status = buffer_get_field (&buffer, &buffer_size, &value);
1426 if (status != 0)
1427 {
1428 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1429 break;
1430 }
1432 /* make sure update time is always moving forward */
1433 stamp = strtol(value, &eostamp, 10);
1434 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1435 {
1436 pthread_mutex_unlock(&cache_lock);
1437 return send_response(sock, RESP_ERR,
1438 "Cannot find timestamp in '%s'!\n", value);
1439 }
1440 else if (stamp <= ci->last_update_stamp)
1441 {
1442 pthread_mutex_unlock(&cache_lock);
1443 return send_response(sock, RESP_ERR,
1444 "illegal attempt to update using time %ld when last"
1445 " update time is %ld (minimum one second step)\n",
1446 stamp, ci->last_update_stamp);
1447 }
1448 else
1449 ci->last_update_stamp = stamp;
1451 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1452 {
1453 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454 continue;
1455 }
1457 values_num++;
1458 }
1460 if (((now - ci->last_flush_time) >= config_write_interval)
1461 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462 && (ci->values_num > 0))
1463 {
1464 enqueue_cache_item (ci, TAIL);
1465 }
1467 pthread_mutex_unlock (&cache_lock);
1469 if (values_num < 1)
1470 return send_response(sock, RESP_ERR, "No values updated.\n");
1471 else
1472 return send_response(sock, RESP_OK,
1473 "errors, enqueued %i value(s).\n", values_num);
1475 /* NOTREACHED */
1476 assert(1==0);
1478 } /* }}} int handle_request_update */
1480 /* we came across a "WROTE" entry during journal replay.
1481 * throw away any values that we have accumulated for this file
1482 */
1483 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1484 {
1485 cache_item_t *ci;
1486 const char *file = buffer;
1488 pthread_mutex_lock(&cache_lock);
1490 ci = g_tree_lookup(cache_tree, file);
1491 if (ci == NULL)
1492 {
1493 pthread_mutex_unlock(&cache_lock);
1494 return (0);
1495 }
1497 if (ci->values)
1498 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1500 wipe_ci_values(ci, now);
1501 remove_from_queue(ci);
1503 pthread_mutex_unlock(&cache_lock);
1504 return (0);
1505 } /* }}} int handle_request_wrote */
1507 /* start "BATCH" processing */
1508 static int batch_start (HANDLER_PROTO) /* {{{ */
1509 {
1510 int status;
1511 if (sock->batch_start)
1512 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1514 status = send_response(sock, RESP_OK,
1515 "Go ahead. End with dot '.' on its own line.\n");
1516 sock->batch_start = time(NULL);
1517 sock->batch_cmd = 0;
1519 return status;
1520 } /* }}} static int batch_start */
1522 /* finish "BATCH" processing and return results to the client */
1523 static int batch_done (HANDLER_PROTO) /* {{{ */
1524 {
1525 assert(sock->batch_start);
1526 sock->batch_start = 0;
1527 sock->batch_cmd = 0;
1528 return send_response(sock, RESP_OK, "errors\n");
1529 } /* }}} static int batch_done */
1531 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1532 {
1533 return -1;
1534 } /* }}} static int handle_request_quit */
1536 static command_t list_of_commands[] = { /* {{{ */
1537 {
1538 "UPDATE",
1539 handle_request_update,
1540 CMD_CONTEXT_ANY,
1541 "UPDATE <filename> <values> [<values> ...]\n"
1542 ,
1543 "Adds the given file to the internal cache if it is not yet known and\n"
1544 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1545 "for details.\n"
1546 "\n"
1547 "Each <values> has the following form:\n"
1548 " <values> = <time>:<value>[:<value>[...]]\n"
1549 "See the rrdupdate(1) manpage for details.\n"
1550 },
1551 {
1552 "WROTE",
1553 handle_request_wrote,
1554 CMD_CONTEXT_JOURNAL,
1555 NULL,
1556 NULL
1557 },
1558 {
1559 "FLUSH",
1560 handle_request_flush,
1561 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1562 "FLUSH <filename>\n"
1563 ,
1564 "Adds the given filename to the head of the update queue and returns\n"
1565 "after it has been dequeued.\n"
1566 },
1567 {
1568 "FLUSHALL",
1569 handle_request_flushall,
1570 CMD_CONTEXT_CLIENT,
1571 "FLUSHALL\n"
1572 ,
1573 "Triggers writing of all pending updates. Returns immediately.\n"
1574 },
1575 {
1576 "PENDING",
1577 handle_request_pending,
1578 CMD_CONTEXT_CLIENT,
1579 "PENDING <filename>\n"
1580 ,
1581 "Shows any 'pending' updates for a file, in order.\n"
1582 "The updates shown have not yet been written to the underlying RRD file.\n"
1583 },
1584 {
1585 "FORGET",
1586 handle_request_forget,
1587 CMD_CONTEXT_ANY,
1588 "FORGET <filename>\n"
1589 ,
1590 "Removes the file completely from the cache.\n"
1591 "Any pending updates for the file will be lost.\n"
1592 },
1593 {
1594 "QUEUE",
1595 handle_request_queue,
1596 CMD_CONTEXT_CLIENT,
1597 "QUEUE\n"
1598 ,
1599 "Shows all files in the output queue.\n"
1600 "The output is zero or more lines in the following format:\n"
1601 "(where <num_vals> is the number of values to be written)\n"
1602 "\n"
1603 "<num_vals> <filename>\n"
1604 },
1605 {
1606 "STATS",
1607 handle_request_stats,
1608 CMD_CONTEXT_CLIENT,
1609 "STATS\n"
1610 ,
1611 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1612 "a description of the values.\n"
1613 },
1614 {
1615 "HELP",
1616 handle_request_help,
1617 CMD_CONTEXT_CLIENT,
1618 "HELP [<command>]\n",
1619 NULL, /* special! */
1620 },
1621 {
1622 "BATCH",
1623 batch_start,
1624 CMD_CONTEXT_CLIENT,
1625 "BATCH\n"
1626 ,
1627 "The 'BATCH' command permits the client to initiate a bulk load\n"
1628 " of commands to rrdcached.\n"
1629 "\n"
1630 "Usage:\n"
1631 "\n"
1632 " client: BATCH\n"
1633 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1634 " client: command #1\n"
1635 " client: command #2\n"
1636 " client: ... and so on\n"
1637 " client: .\n"
1638 " server: 2 errors\n"
1639 " server: 7 message for command #7\n"
1640 " server: 9 message for command #9\n"
1641 "\n"
1642 "For more information, consult the rrdcached(1) documentation.\n"
1643 },
1644 {
1645 ".", /* BATCH terminator */
1646 batch_done,
1647 CMD_CONTEXT_BATCH,
1648 NULL,
1649 NULL
1650 },
1651 {
1652 "QUIT",
1653 handle_request_quit,
1654 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1655 "QUIT\n"
1656 ,
1657 "Disconnect from rrdcached.\n"
1658 }
1659 }; /* }}} command_t list_of_commands[] */
1660 static size_t list_of_commands_len = sizeof (list_of_commands)
1661 / sizeof (list_of_commands[0]);
1663 static command_t *find_command(char *cmd)
1664 {
1665 size_t i;
1667 for (i = 0; i < list_of_commands_len; i++)
1668 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1669 return (&list_of_commands[i]);
1670 return NULL;
1671 }
1673 /* We currently use the index in the `list_of_commands' array as a bit position
1674 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1675 * outside these functions so that switching to a more elegant storage method
1676 * is easily possible. */
1677 static ssize_t find_command_index (const char *cmd) /* {{{ */
1678 {
1679 size_t i;
1681 for (i = 0; i < list_of_commands_len; i++)
1682 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1683 return ((ssize_t) i);
1684 return (-1);
1685 } /* }}} ssize_t find_command_index */
1687 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1688 const char *cmd)
1689 {
1690 ssize_t i;
1692 if (JOURNAL_REPLAY(sock))
1693 return (1);
1695 if (cmd == NULL)
1696 return (-1);
1698 if ((strcasecmp ("QUIT", cmd) == 0)
1699 || (strcasecmp ("HELP", cmd) == 0))
1700 return (1);
1701 else if (strcmp (".", cmd) == 0)
1702 cmd = "BATCH";
1704 i = find_command_index (cmd);
1705 if (i < 0)
1706 return (-1);
1707 assert (i < 32);
1709 if ((sock->permissions & (1 << i)) != 0)
1710 return (1);
1711 return (0);
1712 } /* }}} int socket_permission_check */
1714 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1715 const char *cmd)
1716 {
1717 ssize_t i;
1719 i = find_command_index (cmd);
1720 if (i < 0)
1721 return (-1);
1722 assert (i < 32);
1724 sock->permissions |= (1 << i);
1725 return (0);
1726 } /* }}} int socket_permission_add */
1728 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1729 {
1730 sock->permissions = 0;
1731 } /* }}} socket_permission_clear */
1733 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1734 listen_socket_t *src)
1735 {
1736 dest->permissions = src->permissions;
1737 } /* }}} socket_permission_copy */
1739 /* check whether commands are received in the expected context */
1740 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1741 {
1742 if (JOURNAL_REPLAY(sock))
1743 return (cmd->context & CMD_CONTEXT_JOURNAL);
1744 else if (sock->batch_start)
1745 return (cmd->context & CMD_CONTEXT_BATCH);
1746 else
1747 return (cmd->context & CMD_CONTEXT_CLIENT);
1749 /* NOTREACHED */
1750 assert(1==0);
1751 }
1753 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1754 {
1755 int status;
1756 char *cmd_str;
1757 char *resp_txt;
1758 command_t *help = NULL;
1760 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1761 if (status == 0)
1762 help = find_command(cmd_str);
1764 if (help && (help->syntax || help->help))
1765 {
1766 char tmp[CMD_MAX];
1768 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1769 resp_txt = tmp;
1771 if (help->syntax)
1772 add_response_info(sock, "Usage: %s\n", help->syntax);
1774 if (help->help)
1775 add_response_info(sock, "%s\n", help->help);
1776 }
1777 else
1778 {
1779 size_t i;
1781 resp_txt = "Command overview\n";
1783 for (i = 0; i < list_of_commands_len; i++)
1784 {
1785 if (list_of_commands[i].syntax == NULL)
1786 continue;
1787 add_response_info (sock, "%s", list_of_commands[i].syntax);
1788 }
1789 }
1791 return send_response(sock, RESP_OK, resp_txt);
1792 } /* }}} int handle_request_help */
1794 static int handle_request (DISPATCH_PROTO) /* {{{ */
1795 {
1796 char *buffer_ptr = buffer;
1797 char *cmd_str = NULL;
1798 command_t *cmd = NULL;
1799 int status;
1801 assert (buffer[buffer_size - 1] == '\0');
1803 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1804 if (status != 0)
1805 {
1806 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1807 return (-1);
1808 }
1810 if (sock != NULL && sock->batch_start)
1811 sock->batch_cmd++;
1813 cmd = find_command(cmd_str);
1814 if (!cmd)
1815 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1817 if (!socket_permission_check (sock, cmd->cmd))
1818 return send_response(sock, RESP_ERR, "Permission denied.\n");
1820 if (!command_check_context(sock, cmd))
1821 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1823 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1824 } /* }}} int handle_request */
1826 static void journal_set_free (journal_set *js) /* {{{ */
1827 {
1828 if (js == NULL)
1829 return;
1831 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1833 free(js);
1834 } /* }}} journal_set_free */
1836 static void journal_set_remove (journal_set *js) /* {{{ */
1837 {
1838 if (js == NULL)
1839 return;
1841 for (uint i=0; i < js->files_num; i++)
1842 {
1843 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1844 unlink(js->files[i]);
1845 }
1846 } /* }}} journal_set_remove */
1848 /* close current journal file handle.
1849 * MUST hold journal_lock before calling */
1850 static void journal_close(void) /* {{{ */
1851 {
1852 if (journal_fh != NULL)
1853 {
1854 if (fclose(journal_fh) != 0)
1855 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1856 }
1858 journal_fh = NULL;
1859 journal_size = 0;
1860 } /* }}} journal_close */
1862 /* MUST hold journal_lock before calling */
1863 static void journal_new_file(void) /* {{{ */
1864 {
1865 struct timeval now;
1866 int new_fd;
1867 char new_file[PATH_MAX + 1];
1869 assert(journal_dir != NULL);
1870 assert(journal_cur != NULL);
1872 journal_close();
1874 gettimeofday(&now, NULL);
1875 /* this format assures that the files sort in strcmp() order */
1876 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1877 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1879 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1880 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1881 if (new_fd < 0)
1882 goto error;
1884 journal_fh = fdopen(new_fd, "a");
1885 if (journal_fh == NULL)
1886 goto error;
1888 journal_size = ftell(journal_fh);
1889 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1891 /* record the file in the journal set */
1892 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1894 return;
1896 error:
1897 RRDD_LOG(LOG_CRIT,
1898 "JOURNALING DISABLED: Error while trying to create %s : %s",
1899 new_file, rrd_strerror(errno));
1900 RRDD_LOG(LOG_CRIT,
1901 "JOURNALING DISABLED: All values will be flushed at shutdown");
1903 close(new_fd);
1904 config_flush_at_shutdown = 1;
1906 } /* }}} journal_new_file */
1908 /* MUST NOT hold journal_lock before calling this */
1909 static void journal_rotate(void) /* {{{ */
1910 {
1911 journal_set *old_js = NULL;
1913 if (journal_dir == NULL)
1914 return;
1916 RRDD_LOG(LOG_DEBUG, "rotating journals");
1918 pthread_mutex_lock(&stats_lock);
1919 ++stats_journal_rotate;
1920 pthread_mutex_unlock(&stats_lock);
1922 pthread_mutex_lock(&journal_lock);
1924 journal_close();
1926 /* rotate the journal sets */
1927 old_js = journal_old;
1928 journal_old = journal_cur;
1929 journal_cur = calloc(1, sizeof(journal_set));
1931 if (journal_cur != NULL)
1932 journal_new_file();
1933 else
1934 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1936 pthread_mutex_unlock(&journal_lock);
1938 journal_set_remove(old_js);
1939 journal_set_free (old_js);
1941 } /* }}} static void journal_rotate */
1943 /* MUST hold journal_lock when calling */
1944 static void journal_done(void) /* {{{ */
1945 {
1946 if (journal_cur == NULL)
1947 return;
1949 journal_close();
1951 if (config_flush_at_shutdown)
1952 {
1953 RRDD_LOG(LOG_INFO, "removing journals");
1954 journal_set_remove(journal_old);
1955 journal_set_remove(journal_cur);
1956 }
1957 else
1958 {
1959 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1960 "journals will be used at next startup");
1961 }
1963 journal_set_free(journal_cur);
1964 journal_set_free(journal_old);
1965 free(journal_dir);
1967 } /* }}} static void journal_done */
1969 static int journal_write(char *cmd, char *args) /* {{{ */
1970 {
1971 int chars;
1973 if (journal_fh == NULL)
1974 return 0;
1976 pthread_mutex_lock(&journal_lock);
1977 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1978 journal_size += chars;
1980 if (journal_size > JOURNAL_MAX)
1981 journal_new_file();
1983 pthread_mutex_unlock(&journal_lock);
1985 if (chars > 0)
1986 {
1987 pthread_mutex_lock(&stats_lock);
1988 stats_journal_bytes += chars;
1989 pthread_mutex_unlock(&stats_lock);
1990 }
1992 return chars;
1993 } /* }}} static int journal_write */
1995 static int journal_replay (const char *file) /* {{{ */
1996 {
1997 FILE *fh;
1998 int entry_cnt = 0;
1999 int fail_cnt = 0;
2000 uint64_t line = 0;
2001 char entry[CMD_MAX];
2002 time_t now;
2004 if (file == NULL) return 0;
2006 {
2007 char *reason = "unknown error";
2008 int status = 0;
2009 struct stat statbuf;
2011 memset(&statbuf, 0, sizeof(statbuf));
2012 if (stat(file, &statbuf) != 0)
2013 {
2014 reason = "stat error";
2015 status = errno;
2016 }
2017 else if (!S_ISREG(statbuf.st_mode))
2018 {
2019 reason = "not a regular file";
2020 status = EPERM;
2021 }
2022 if (statbuf.st_uid != daemon_uid)
2023 {
2024 reason = "not owned by daemon user";
2025 status = EACCES;
2026 }
2027 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2028 {
2029 reason = "must not be user/group writable";
2030 status = EACCES;
2031 }
2033 if (status != 0)
2034 {
2035 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2036 file, rrd_strerror(status), reason);
2037 return 0;
2038 }
2039 }
2041 fh = fopen(file, "r");
2042 if (fh == NULL)
2043 {
2044 if (errno != ENOENT)
2045 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2046 file, rrd_strerror(errno));
2047 return 0;
2048 }
2049 else
2050 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2052 now = time(NULL);
2054 while(!feof(fh))
2055 {
2056 size_t entry_len;
2058 ++line;
2059 if (fgets(entry, sizeof(entry), fh) == NULL)
2060 break;
2061 entry_len = strlen(entry);
2063 /* check \n termination in case journal writing crashed mid-line */
2064 if (entry_len == 0)
2065 continue;
2066 else if (entry[entry_len - 1] != '\n')
2067 {
2068 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2069 ++fail_cnt;
2070 continue;
2071 }
2073 entry[entry_len - 1] = '\0';
2075 if (handle_request(NULL, now, entry, entry_len) == 0)
2076 ++entry_cnt;
2077 else
2078 ++fail_cnt;
2079 }
2081 fclose(fh);
2083 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2084 entry_cnt, fail_cnt);
2086 return entry_cnt > 0 ? 1 : 0;
2087 } /* }}} static int journal_replay */
2089 static int journal_sort(const void *v1, const void *v2)
2090 {
2091 char **jn1 = (char **) v1;
2092 char **jn2 = (char **) v2;
2094 return strcmp(*jn1,*jn2);
2095 }
2097 static void journal_init(void) /* {{{ */
2098 {
2099 int had_journal = 0;
2100 DIR *dir;
2101 struct dirent *dent;
2102 char path[PATH_MAX+1];
2104 if (journal_dir == NULL) return;
2106 pthread_mutex_lock(&journal_lock);
2108 journal_cur = calloc(1, sizeof(journal_set));
2109 if (journal_cur == NULL)
2110 {
2111 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2112 return;
2113 }
2115 RRDD_LOG(LOG_INFO, "checking for journal files");
2117 /* Handle old journal files during transition. This gives them the
2118 * correct sort order. TODO: remove after first release
2119 */
2120 {
2121 char old_path[PATH_MAX+1];
2122 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2123 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2124 rename(old_path, path);
2126 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2127 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2128 rename(old_path, path);
2129 }
2131 dir = opendir(journal_dir);
2132 if (!dir) {
2133 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2134 return;
2135 }
2136 while ((dent = readdir(dir)) != NULL)
2137 {
2138 /* looks like a journal file? */
2139 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2140 continue;
2142 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2144 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2145 {
2146 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2147 dent->d_name);
2148 break;
2149 }
2150 }
2151 closedir(dir);
2153 qsort(journal_cur->files, journal_cur->files_num,
2154 sizeof(journal_cur->files[0]), journal_sort);
2156 for (uint i=0; i < journal_cur->files_num; i++)
2157 had_journal += journal_replay(journal_cur->files[i]);
2159 journal_new_file();
2161 /* it must have been a crash. start a flush */
2162 if (had_journal && config_flush_at_shutdown)
2163 flush_old_values(-1);
2165 pthread_mutex_unlock(&journal_lock);
2167 RRDD_LOG(LOG_INFO, "journal processing complete");
2169 } /* }}} static void journal_init */
2171 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2172 {
2173 assert(sock != NULL);
2175 free(sock->rbuf); sock->rbuf = NULL;
2176 free(sock->wbuf); sock->wbuf = NULL;
2177 free(sock);
2178 } /* }}} void free_listen_socket */
2180 static void close_connection(listen_socket_t *sock) /* {{{ */
2181 {
2182 if (sock->fd >= 0)
2183 {
2184 close(sock->fd);
2185 sock->fd = -1;
2186 }
2188 free_listen_socket(sock);
2190 } /* }}} void close_connection */
2192 static void *connection_thread_main (void *args) /* {{{ */
2193 {
2194 listen_socket_t *sock;
2195 int fd;
2197 sock = (listen_socket_t *) args;
2198 fd = sock->fd;
2200 /* init read buffers */
2201 sock->next_read = sock->next_cmd = 0;
2202 sock->rbuf = malloc(RBUF_SIZE);
2203 if (sock->rbuf == NULL)
2204 {
2205 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2206 close_connection(sock);
2207 return NULL;
2208 }
2210 pthread_mutex_lock (&connection_threads_lock);
2211 #ifdef HAVE_LIBWRAP
2212 /* LIBWRAP does not support multiple threads! By putting this code
2213 inside pthread_mutex_lock we do not have to worry about request_info
2214 getting overwritten by another thread.
2215 */
2216 struct request_info req;
2217 request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2218 fromhost(&req);
2219 if(!hosts_access(&req)) {
2220 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2221 pthread_mutex_unlock (&connection_threads_lock);
2222 close_connection(sock);
2223 return NULL;
2224 }
2225 #endif /* HAVE_LIBWRAP */
2226 connection_threads_num++;
2227 pthread_mutex_unlock (&connection_threads_lock);
2229 while (state == RUNNING)
2230 {
2231 char *cmd;
2232 ssize_t cmd_len;
2233 ssize_t rbytes;
2234 time_t now;
2236 struct pollfd pollfd;
2237 int status;
2239 pollfd.fd = fd;
2240 pollfd.events = POLLIN | POLLPRI;
2241 pollfd.revents = 0;
2243 status = poll (&pollfd, 1, /* timeout = */ 500);
2244 if (state != RUNNING)
2245 break;
2246 else if (status == 0) /* timeout */
2247 continue;
2248 else if (status < 0) /* error */
2249 {
2250 status = errno;
2251 if (status != EINTR)
2252 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2253 continue;
2254 }
2256 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2257 break;
2258 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2259 {
2260 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2261 "poll(2) returned something unexpected: %#04hx",
2262 pollfd.revents);
2263 break;
2264 }
2266 rbytes = read(fd, sock->rbuf + sock->next_read,
2267 RBUF_SIZE - sock->next_read);
2268 if (rbytes < 0)
2269 {
2270 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2271 break;
2272 }
2273 else if (rbytes == 0)
2274 break; /* eof */
2276 sock->next_read += rbytes;
2278 if (sock->batch_start)
2279 now = sock->batch_start;
2280 else
2281 now = time(NULL);
2283 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2284 {
2285 status = handle_request (sock, now, cmd, cmd_len+1);
2286 if (status != 0)
2287 goto out_close;
2288 }
2289 }
2291 out_close:
2292 close_connection(sock);
2294 /* Remove this thread from the connection threads list */
2295 pthread_mutex_lock (&connection_threads_lock);
2296 connection_threads_num--;
2297 if (connection_threads_num <= 0)
2298 pthread_cond_broadcast(&connection_threads_done);
2299 pthread_mutex_unlock (&connection_threads_lock);
2301 return (NULL);
2302 } /* }}} void *connection_thread_main */
2304 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2305 {
2306 int fd;
2307 struct sockaddr_un sa;
2308 listen_socket_t *temp;
2309 int status;
2310 const char *path;
2311 char *path_copy, *dir;
2313 path = sock->addr;
2314 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2315 path += strlen("unix:");
2317 /* dirname may modify its argument */
2318 path_copy = strdup(path);
2319 if (path_copy == NULL)
2320 {
2321 fprintf(stderr, "rrdcached: strdup(): %s\n",
2322 rrd_strerror(errno));
2323 return (-1);
2324 }
2326 dir = dirname(path_copy);
2327 if (rrd_mkdir_p(dir, 0777) != 0)
2328 {
2329 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2330 dir, rrd_strerror(errno));
2331 return (-1);
2332 }
2334 free(path_copy);
2336 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2337 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2338 if (temp == NULL)
2339 {
2340 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2341 return (-1);
2342 }
2343 listen_fds = temp;
2344 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2346 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2347 if (fd < 0)
2348 {
2349 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2350 rrd_strerror(errno));
2351 return (-1);
2352 }
2354 memset (&sa, 0, sizeof (sa));
2355 sa.sun_family = AF_UNIX;
2356 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2358 /* if we've gotten this far, we own the pid file. any daemon started
2359 * with the same args must not be alive. therefore, ensure that we can
2360 * create the socket...
2361 */
2362 unlink(path);
2364 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2365 if (status != 0)
2366 {
2367 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2368 path, rrd_strerror(errno));
2369 close (fd);
2370 return (-1);
2371 }
2373 /* tweak the sockets group ownership */
2374 if (sock->socket_group != (gid_t)-1)
2375 {
2376 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2377 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2378 {
2379 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2380 }
2381 }
2383 if (sock->socket_permissions != (mode_t)-1)
2384 {
2385 if (chmod(path, sock->socket_permissions) != 0)
2386 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2387 (unsigned int)sock->socket_permissions, strerror(errno));
2388 }
2390 status = listen (fd, /* backlog = */ 10);
2391 if (status != 0)
2392 {
2393 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2394 path, rrd_strerror(errno));
2395 close (fd);
2396 unlink (path);
2397 return (-1);
2398 }
2400 listen_fds[listen_fds_num].fd = fd;
2401 listen_fds[listen_fds_num].family = PF_UNIX;
2402 strncpy(listen_fds[listen_fds_num].addr, path,
2403 sizeof (listen_fds[listen_fds_num].addr) - 1);
2404 listen_fds_num++;
2406 return (0);
2407 } /* }}} int open_listen_socket_unix */
2409 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2410 {
2411 struct addrinfo ai_hints;
2412 struct addrinfo *ai_res;
2413 struct addrinfo *ai_ptr;
2414 char addr_copy[NI_MAXHOST];
2415 char *addr;
2416 char *port;
2417 int status;
2419 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2420 addr_copy[sizeof (addr_copy) - 1] = 0;
2421 addr = addr_copy;
2423 memset (&ai_hints, 0, sizeof (ai_hints));
2424 ai_hints.ai_flags = 0;
2425 #ifdef AI_ADDRCONFIG
2426 ai_hints.ai_flags |= AI_ADDRCONFIG;
2427 #endif
2428 ai_hints.ai_family = AF_UNSPEC;
2429 ai_hints.ai_socktype = SOCK_STREAM;
2431 port = NULL;
2432 if (*addr == '[') /* IPv6+port format */
2433 {
2434 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2435 addr++;
2437 port = strchr (addr, ']');
2438 if (port == NULL)
2439 {
2440 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2441 return (-1);
2442 }
2443 *port = 0;
2444 port++;
2446 if (*port == ':')
2447 port++;
2448 else if (*port == 0)
2449 port = NULL;
2450 else
2451 {
2452 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2453 return (-1);
2454 }
2455 } /* if (*addr == '[') */
2456 else
2457 {
2458 port = rindex(addr, ':');
2459 if (port != NULL)
2460 {
2461 *port = 0;
2462 port++;
2463 }
2464 }
2465 ai_res = NULL;
2466 status = getaddrinfo (addr,
2467 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2468 &ai_hints, &ai_res);
2469 if (status != 0)
2470 {
2471 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2472 addr, gai_strerror (status));
2473 return (-1);
2474 }
2476 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2477 {
2478 int fd;
2479 listen_socket_t *temp;
2480 int one = 1;
2482 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2483 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2484 if (temp == NULL)
2485 {
2486 fprintf (stderr,
2487 "rrdcached: open_listen_socket_network: realloc failed.\n");
2488 continue;
2489 }
2490 listen_fds = temp;
2491 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2493 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2494 if (fd < 0)
2495 {
2496 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2497 rrd_strerror(errno));
2498 continue;
2499 }
2501 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2503 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2504 if (status != 0)
2505 {
2506 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2507 sock->addr, rrd_strerror(errno));
2508 close (fd);
2509 continue;
2510 }
2512 status = listen (fd, /* backlog = */ 10);
2513 if (status != 0)
2514 {
2515 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2516 sock->addr, rrd_strerror(errno));
2517 close (fd);
2518 freeaddrinfo(ai_res);
2519 return (-1);
2520 }
2522 listen_fds[listen_fds_num].fd = fd;
2523 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2524 listen_fds_num++;
2525 } /* for (ai_ptr) */
2527 freeaddrinfo(ai_res);
2528 return (0);
2529 } /* }}} static int open_listen_socket_network */
2531 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2532 {
2533 assert(sock != NULL);
2534 assert(sock->addr != NULL);
2536 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2537 || sock->addr[0] == '/')
2538 return (open_listen_socket_unix(sock));
2539 else
2540 return (open_listen_socket_network(sock));
2541 } /* }}} int open_listen_socket */
2543 static int close_listen_sockets (void) /* {{{ */
2544 {
2545 size_t i;
2547 for (i = 0; i < listen_fds_num; i++)
2548 {
2549 close (listen_fds[i].fd);
2551 if (listen_fds[i].family == PF_UNIX)
2552 unlink(listen_fds[i].addr);
2553 }
2555 free (listen_fds);
2556 listen_fds = NULL;
2557 listen_fds_num = 0;
2559 return (0);
2560 } /* }}} int close_listen_sockets */
2562 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2563 {
2564 struct pollfd *pollfds;
2565 int pollfds_num;
2566 int status;
2567 int i;
2569 if (listen_fds_num < 1)
2570 {
2571 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2572 return (NULL);
2573 }
2575 pollfds_num = listen_fds_num;
2576 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2577 if (pollfds == NULL)
2578 {
2579 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2580 return (NULL);
2581 }
2582 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2584 RRDD_LOG(LOG_INFO, "listening for connections");
2586 while (state == RUNNING)
2587 {
2588 for (i = 0; i < pollfds_num; i++)
2589 {
2590 pollfds[i].fd = listen_fds[i].fd;
2591 pollfds[i].events = POLLIN | POLLPRI;
2592 pollfds[i].revents = 0;
2593 }
2595 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2596 if (state != RUNNING)
2597 break;
2598 else if (status == 0) /* timeout */
2599 continue;
2600 else if (status < 0) /* error */
2601 {
2602 status = errno;
2603 if (status != EINTR)
2604 {
2605 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2606 }
2607 continue;
2608 }
2610 for (i = 0; i < pollfds_num; i++)
2611 {
2612 listen_socket_t *client_sock;
2613 struct sockaddr_storage client_sa;
2614 socklen_t client_sa_size;
2615 pthread_t tid;
2616 pthread_attr_t attr;
2618 if (pollfds[i].revents == 0)
2619 continue;
2621 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2622 {
2623 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2624 "poll(2) returned something unexpected for listen FD #%i.",
2625 pollfds[i].fd);
2626 continue;
2627 }
2629 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2630 if (client_sock == NULL)
2631 {
2632 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2633 continue;
2634 }
2635 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2637 client_sa_size = sizeof (client_sa);
2638 client_sock->fd = accept (pollfds[i].fd,
2639 (struct sockaddr *) &client_sa, &client_sa_size);
2640 if (client_sock->fd < 0)
2641 {
2642 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2643 free(client_sock);
2644 continue;
2645 }
2647 pthread_attr_init (&attr);
2648 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2650 status = pthread_create (&tid, &attr, connection_thread_main,
2651 client_sock);
2652 if (status != 0)
2653 {
2654 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2655 close_connection(client_sock);
2656 continue;
2657 }
2658 } /* for (pollfds_num) */
2659 } /* while (state == RUNNING) */
2661 RRDD_LOG(LOG_INFO, "starting shutdown");
2663 close_listen_sockets ();
2665 pthread_mutex_lock (&connection_threads_lock);
2666 while (connection_threads_num > 0)
2667 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2668 pthread_mutex_unlock (&connection_threads_lock);
2670 free(pollfds);
2672 return (NULL);
2673 } /* }}} void *listen_thread_main */
2675 static int daemonize (void) /* {{{ */
2676 {
2677 int pid_fd;
2678 char *base_dir;
2680 daemon_uid = geteuid();
2682 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2683 if (pid_fd < 0)
2684 pid_fd = check_pidfile();
2685 if (pid_fd < 0)
2686 return pid_fd;
2688 /* open all the listen sockets */
2689 if (config_listen_address_list_len > 0)
2690 {
2691 for (size_t i = 0; i < config_listen_address_list_len; i++)
2692 open_listen_socket (config_listen_address_list[i]);
2694 rrd_free_ptrs((void ***) &config_listen_address_list,
2695 &config_listen_address_list_len);
2696 }
2697 else
2698 {
2699 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2700 sizeof(default_socket.addr) - 1);
2701 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2702 open_listen_socket (&default_socket);
2703 }
2705 if (listen_fds_num < 1)
2706 {
2707 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2708 goto error;
2709 }
2711 if (!stay_foreground)
2712 {
2713 pid_t child;
2715 child = fork ();
2716 if (child < 0)
2717 {
2718 fprintf (stderr, "daemonize: fork(2) failed.\n");
2719 goto error;
2720 }
2721 else if (child > 0)
2722 exit(0);
2724 /* Become session leader */
2725 setsid ();
2727 /* Open the first three file descriptors to /dev/null */
2728 close (2);
2729 close (1);
2730 close (0);
2732 open ("/dev/null", O_RDWR);
2733 if (dup(0) == -1 || dup(0) == -1){
2734 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2735 }
2736 } /* if (!stay_foreground) */
2738 /* Change into the /tmp directory. */
2739 base_dir = (config_base_dir != NULL)
2740 ? config_base_dir
2741 : "/tmp";
2743 if (chdir (base_dir) != 0)
2744 {
2745 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2746 goto error;
2747 }
2749 install_signal_handlers();
2751 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2752 RRDD_LOG(LOG_INFO, "starting up");
2754 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2755 (GDestroyNotify) free_cache_item);
2756 if (cache_tree == NULL)
2757 {
2758 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2759 goto error;
2760 }
2762 return write_pidfile (pid_fd);
2764 error:
2765 remove_pidfile();
2766 return -1;
2767 } /* }}} int daemonize */
2769 static int cleanup (void) /* {{{ */
2770 {
2771 pthread_cond_broadcast (&flush_cond);
2772 pthread_join (flush_thread, NULL);
2774 pthread_cond_broadcast (&queue_cond);
2775 for (int i = 0; i < config_queue_threads; i++)
2776 pthread_join (queue_threads[i], NULL);
2778 if (config_flush_at_shutdown)
2779 {
2780 assert(cache_queue_head == NULL);
2781 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2782 }
2784 free(queue_threads);
2785 free(config_base_dir);
2787 pthread_mutex_lock(&cache_lock);
2788 g_tree_destroy(cache_tree);
2790 pthread_mutex_lock(&journal_lock);
2791 journal_done();
2793 RRDD_LOG(LOG_INFO, "goodbye");
2794 closelog ();
2796 remove_pidfile ();
2797 free(config_pid_file);
2799 return (0);
2800 } /* }}} int cleanup */
2802 static int read_options (int argc, char **argv) /* {{{ */
2803 {
2804 int option;
2805 int status = 0;
2807 socket_permission_clear (&default_socket);
2809 default_socket.socket_group = (gid_t)-1;
2810 default_socket.socket_permissions = (mode_t)-1;
2812 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2813 {
2814 switch (option)
2815 {
2816 case 'g':
2817 stay_foreground=1;
2818 break;
2820 case 'l':
2821 {
2822 listen_socket_t *new;
2824 new = malloc(sizeof(listen_socket_t));
2825 if (new == NULL)
2826 {
2827 fprintf(stderr, "read_options: malloc failed.\n");
2828 return(2);
2829 }
2830 memset(new, 0, sizeof(listen_socket_t));
2832 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2834 /* Add permissions to the socket {{{ */
2835 if (default_socket.permissions != 0)
2836 {
2837 socket_permission_copy (new, &default_socket);
2838 }
2839 else /* if (default_socket.permissions == 0) */
2840 {
2841 /* Add permission for ALL commands to the socket. */
2842 size_t i;
2843 for (i = 0; i < list_of_commands_len; i++)
2844 {
2845 status = socket_permission_add (new, list_of_commands[i].cmd);
2846 if (status != 0)
2847 {
2848 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2849 "socket failed. This should never happen, ever! Sorry.\n",
2850 list_of_commands[i].cmd);
2851 status = 4;
2852 }
2853 }
2854 }
2855 /* }}} Done adding permissions. */
2857 new->socket_group = default_socket.socket_group;
2858 new->socket_permissions = default_socket.socket_permissions;
2860 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2861 &config_listen_address_list_len, new))
2862 {
2863 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2864 return (2);
2865 }
2866 }
2867 break;
2869 /* set socket group permissions */
2870 case 's':
2871 {
2872 gid_t group_gid;
2873 struct group *grp;
2875 group_gid = strtoul(optarg, NULL, 10);
2876 if (errno != EINVAL && group_gid>0)
2877 {
2878 /* we were passed a number */
2879 grp = getgrgid(group_gid);
2880 }
2881 else
2882 {
2883 grp = getgrnam(optarg);
2884 }
2886 if (grp)
2887 {
2888 default_socket.socket_group = grp->gr_gid;
2889 }
2890 else
2891 {
2892 /* no idea what the user wanted... */
2893 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2894 return (5);
2895 }
2896 }
2897 break;
2899 /* set socket file permissions */
2900 case 'm':
2901 {
2902 long tmp;
2903 char *endptr = NULL;
2905 tmp = strtol (optarg, &endptr, 8);
2906 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2907 || (tmp > 07777) || (tmp < 0)) {
2908 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2909 optarg);
2910 return (5);
2911 }
2913 default_socket.socket_permissions = (mode_t)tmp;
2914 }
2915 break;
2917 case 'P':
2918 {
2919 char *optcopy;
2920 char *saveptr;
2921 char *dummy;
2922 char *ptr;
2924 socket_permission_clear (&default_socket);
2926 optcopy = strdup (optarg);
2927 dummy = optcopy;
2928 saveptr = NULL;
2929 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2930 {
2931 dummy = NULL;
2932 status = socket_permission_add (&default_socket, ptr);
2933 if (status != 0)
2934 {
2935 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2936 "socket failed. Most likely, this permission doesn't "
2937 "exist. Check your command line.\n", ptr);
2938 status = 4;
2939 }
2940 }
2942 free (optcopy);
2943 }
2944 break;
2946 case 'f':
2947 {
2948 int temp;
2950 temp = atoi (optarg);
2951 if (temp > 0)
2952 config_flush_interval = temp;
2953 else
2954 {
2955 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2956 status = 3;
2957 }
2958 }
2959 break;
2961 case 'w':
2962 {
2963 int temp;
2965 temp = atoi (optarg);
2966 if (temp > 0)
2967 config_write_interval = temp;
2968 else
2969 {
2970 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2971 status = 2;
2972 }
2973 }
2974 break;
2976 case 'z':
2977 {
2978 int temp;
2980 temp = atoi(optarg);
2981 if (temp > 0)
2982 config_write_jitter = temp;
2983 else
2984 {
2985 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2986 status = 2;
2987 }
2989 break;
2990 }
2992 case 't':
2993 {
2994 int threads;
2995 threads = atoi(optarg);
2996 if (threads >= 1)
2997 config_queue_threads = threads;
2998 else
2999 {
3000 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3001 return 1;
3002 }
3003 }
3004 break;
3006 case 'B':
3007 config_write_base_only = 1;
3008 break;
3010 case 'b':
3011 {
3012 size_t len;
3013 char base_realpath[PATH_MAX];
3015 if (config_base_dir != NULL)
3016 free (config_base_dir);
3017 config_base_dir = strdup (optarg);
3018 if (config_base_dir == NULL)
3019 {
3020 fprintf (stderr, "read_options: strdup failed.\n");
3021 return (3);
3022 }
3024 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3025 {
3026 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3027 config_base_dir, rrd_strerror (errno));
3028 return (3);
3029 }
3031 /* make sure that the base directory is not resolved via
3032 * symbolic links. this makes some performance-enhancing
3033 * assumptions possible (we don't have to resolve paths
3034 * that start with a "/")
3035 */
3036 if (realpath(config_base_dir, base_realpath) == NULL)
3037 {
3038 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3039 "%s\n", config_base_dir, rrd_strerror(errno));
3040 return 5;
3041 }
3043 len = strlen (config_base_dir);
3044 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3045 {
3046 config_base_dir[len - 1] = 0;
3047 len--;
3048 }
3050 if (len < 1)
3051 {
3052 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3053 return (4);
3054 }
3056 _config_base_dir_len = len;
3058 len = strlen (base_realpath);
3059 while ((len > 0) && (base_realpath[len - 1] == '/'))
3060 {
3061 base_realpath[len - 1] = '\0';
3062 len--;
3063 }
3065 if (strncmp(config_base_dir,
3066 base_realpath, sizeof(base_realpath)) != 0)
3067 {
3068 fprintf(stderr,
3069 "Base directory (-b) resolved via file system links!\n"
3070 "Please consult rrdcached '-b' documentation!\n"
3071 "Consider specifying the real directory (%s)\n",
3072 base_realpath);
3073 return 5;
3074 }
3075 }
3076 break;
3078 case 'p':
3079 {
3080 if (config_pid_file != NULL)
3081 free (config_pid_file);
3082 config_pid_file = strdup (optarg);
3083 if (config_pid_file == NULL)
3084 {
3085 fprintf (stderr, "read_options: strdup failed.\n");
3086 return (3);
3087 }
3088 }
3089 break;
3091 case 'F':
3092 config_flush_at_shutdown = 1;
3093 break;
3095 case 'j':
3096 {
3097 char journal_dir_actual[PATH_MAX];
3098 const char *dir;
3099 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3101 status = rrd_mkdir_p(dir, 0777);
3102 if (status != 0)
3103 {
3104 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3105 dir, rrd_strerror(errno));
3106 return 6;
3107 }
3109 if (access(dir, R_OK|W_OK|X_OK) != 0)
3110 {
3111 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3112 errno ? rrd_strerror(errno) : "");
3113 return 6;
3114 }
3115 }
3116 break;
3118 case 'h':
3119 case '?':
3120 printf ("RRDCacheD %s\n"
3121 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3122 "\n"
3123 "Usage: rrdcached [options]\n"
3124 "\n"
3125 "Valid options are:\n"
3126 " -l <address> Socket address to listen to.\n"
3127 " -P <perms> Sets the permissions to assign to all following "
3128 "sockets\n"
3129 " -w <seconds> Interval in which to write data.\n"
3130 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3131 " -t <threads> Number of write threads.\n"
3132 " -f <seconds> Interval in which to flush dead data.\n"
3133 " -p <file> Location of the PID-file.\n"
3134 " -b <dir> Base directory to change to.\n"
3135 " -B Restrict file access to paths within -b <dir>\n"
3136 " -g Do not fork and run in the foreground.\n"
3137 " -j <dir> Directory in which to create the journal files.\n"
3138 " -F Always flush all updates at shutdown\n"
3139 " -s <id|name> Group owner of all following UNIX sockets\n"
3140 " (the socket will also have read/write permissions "
3141 "for that group)\n"
3142 " -m <mode> File permissions (octal) of all following UNIX "
3143 "sockets\n"
3144 "\n"
3145 "For more information and a detailed description of all options "
3146 "please refer\n"
3147 "to the rrdcached(1) manual page.\n",
3148 VERSION);
3149 if (option == 'h')
3150 status = -1;
3151 else
3152 status = 1;
3153 break;
3154 } /* switch (option) */
3155 } /* while (getopt) */
3157 /* advise the user when values are not sane */
3158 if (config_flush_interval < 2 * config_write_interval)
3159 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3160 " 2x write interval (-w) !\n");
3161 if (config_write_jitter > config_write_interval)
3162 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3163 " write interval (-w) !\n");
3165 if (config_write_base_only && config_base_dir == NULL)
3166 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3167 " Consult the rrdcached documentation\n");
3169 if (journal_dir == NULL)
3170 config_flush_at_shutdown = 1;
3172 return (status);
3173 } /* }}} int read_options */
3175 int main (int argc, char **argv)
3176 {
3177 int status;
3179 status = read_options (argc, argv);
3180 if (status != 0)
3181 {
3182 if (status < 0)
3183 status = 0;
3184 return (status);
3185 }
3187 status = daemonize ();
3188 if (status != 0)
3189 {
3190 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3191 return (1);
3192 }
3194 journal_init();
3196 /* start the queue threads */
3197 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3198 if (queue_threads == NULL)
3199 {
3200 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3201 cleanup();
3202 return (1);
3203 }
3204 for (int i = 0; i < config_queue_threads; i++)
3205 {
3206 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3207 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3208 if (status != 0)
3209 {
3210 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3211 cleanup();
3212 return (1);
3213 }
3214 }
3216 /* start the flush thread */
3217 memset(&flush_thread, 0, sizeof(flush_thread));
3218 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3219 if (status != 0)
3220 {
3221 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3222 cleanup();
3223 return (1);
3224 }
3226 listen_thread_main (NULL);
3227 cleanup ();
3229 return (0);
3230 } /* int main */
3232 /*
3233 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3234 */