1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120 do { \
121 if (stay_foreground) { \
122 fprintf(stderr, __VA_ARGS__); \
123 fprintf(stderr, "\n"); } \
124 syslog ((severity), __VA_ARGS__); \
125 } while (0)
127 /*
128 * Types
129 */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
133 {
134 int fd;
135 char addr[PATH_MAX + 1];
136 int family;
138 /* state for BATCH processing */
139 time_t batch_start;
140 int batch_cmd;
142 /* buffered IO */
143 char *rbuf;
144 off_t next_cmd;
145 off_t next_read;
147 char *wbuf;
148 ssize_t wbuf_len;
150 uint32_t permissions;
152 gid_t socket_group;
153 mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
161 time_t UNUSED(now),\
162 char UNUSED(*buffer),\
163 size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO command_t UNUSED(*cmd),\
166 DISPATCH_PROTO
168 struct command_s {
169 char *cmd;
170 int (*handler)(HANDLER_PROTO);
172 char context; /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT (1<<0)
174 #define CMD_CONTEXT_BATCH (1<<1)
175 #define CMD_CONTEXT_JOURNAL (1<<2)
176 #define CMD_CONTEXT_ANY (0x7f)
178 char *syntax;
179 char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
185 {
186 char *file;
187 char **values;
188 size_t values_num;
189 time_t last_flush_time;
190 double last_update_stamp;
191 #define CI_FLAGS_IN_TREE (1<<0)
192 #define CI_FLAGS_IN_QUEUE (1<<1)
193 int flags;
194 pthread_cond_t flushed;
195 cache_item_t *prev;
196 cache_item_t *next;
197 };
199 struct callback_flush_data_s
200 {
201 time_t now;
202 time_t abs_timeout;
203 char **keys;
204 size_t keys_num;
205 };
206 typedef struct callback_flush_data_s callback_flush_data_t;
208 enum queue_side_e
209 {
210 HEAD,
211 TAIL
212 };
213 typedef enum queue_side_e queue_side_t;
215 /* describe a set of journal files */
216 typedef struct {
217 char **files;
218 size_t files_num;
219 } journal_set;
221 /* max length of socket command or response */
222 #define CMD_MAX 4096
223 #define RBUF_SIZE (CMD_MAX*2)
225 /*
226 * Variables
227 */
228 static int stay_foreground = 0;
229 static uid_t daemon_uid;
231 static listen_socket_t *listen_fds = NULL;
232 static size_t listen_fds_num = 0;
234 static listen_socket_t default_socket;
236 enum {
237 RUNNING, /* normal operation */
238 FLUSHING, /* flushing remaining values */
239 SHUTDOWN /* shutting down */
240 } state = RUNNING;
242 static pthread_t *queue_threads;
243 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
244 static int config_queue_threads = 4;
246 static pthread_t flush_thread;
247 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
249 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
250 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
251 static int connection_threads_num = 0;
253 /* Cache stuff */
254 static GTree *cache_tree = NULL;
255 static cache_item_t *cache_queue_head = NULL;
256 static cache_item_t *cache_queue_tail = NULL;
257 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
259 static int config_write_interval = 300;
260 static int config_write_jitter = 0;
261 static int config_flush_interval = 3600;
262 static int config_flush_at_shutdown = 0;
263 static char *config_pid_file = NULL;
264 static char *config_base_dir = NULL;
265 static size_t _config_base_dir_len = 0;
266 static int config_write_base_only = 0;
268 static listen_socket_t **config_listen_address_list = NULL;
269 static size_t config_listen_address_list_len = 0;
271 static uint64_t stats_queue_length = 0;
272 static uint64_t stats_updates_received = 0;
273 static uint64_t stats_flush_received = 0;
274 static uint64_t stats_updates_written = 0;
275 static uint64_t stats_data_sets_written = 0;
276 static uint64_t stats_journal_bytes = 0;
277 static uint64_t stats_journal_rotate = 0;
278 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL; /* current journal file handle */
287 static long journal_size = 0; /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
297 /*
298 * Functions
299 */
300 static void sig_common (const char *sig) /* {{{ */
301 {
302 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
303 state = FLUSHING;
304 pthread_cond_broadcast(&flush_cond);
305 pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
309 {
310 sig_common("INT");
311 } /* }}} void sig_int_handler */
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
314 {
315 sig_common("TERM");
316 } /* }}} void sig_term_handler */
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
319 {
320 config_flush_at_shutdown = 1;
321 sig_common("USR1");
322 } /* }}} void sig_usr1_handler */
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
325 {
326 config_flush_at_shutdown = 0;
327 sig_common("USR2");
328 } /* }}} void sig_usr2_handler */
330 static void install_signal_handlers(void) /* {{{ */
331 {
332 /* These structures are static, because `sigaction' behaves weird if the are
333 * overwritten.. */
334 static struct sigaction sa_int;
335 static struct sigaction sa_term;
336 static struct sigaction sa_pipe;
337 static struct sigaction sa_usr1;
338 static struct sigaction sa_usr2;
340 /* Install signal handlers */
341 memset (&sa_int, 0, sizeof (sa_int));
342 sa_int.sa_handler = sig_int_handler;
343 sigaction (SIGINT, &sa_int, NULL);
345 memset (&sa_term, 0, sizeof (sa_term));
346 sa_term.sa_handler = sig_term_handler;
347 sigaction (SIGTERM, &sa_term, NULL);
349 memset (&sa_pipe, 0, sizeof (sa_pipe));
350 sa_pipe.sa_handler = SIG_IGN;
351 sigaction (SIGPIPE, &sa_pipe, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_usr1));
354 sa_usr1.sa_handler = sig_usr1_handler;
355 sigaction (SIGUSR1, &sa_usr1, NULL);
357 memset (&sa_usr2, 0, sizeof (sa_usr2));
358 sa_usr2.sa_handler = sig_usr2_handler;
359 sigaction (SIGUSR2, &sa_usr2, NULL);
361 } /* }}} void install_signal_handlers */
363 static int open_pidfile(char *action, int oflag) /* {{{ */
364 {
365 int fd;
366 const char *file;
367 char *file_copy, *dir;
369 file = (config_pid_file != NULL)
370 ? config_pid_file
371 : LOCALSTATEDIR "/run/rrdcached.pid";
373 /* dirname may modify its argument */
374 file_copy = strdup(file);
375 if (file_copy == NULL)
376 {
377 fprintf(stderr, "rrdcached: strdup(): %s\n",
378 rrd_strerror(errno));
379 return -1;
380 }
382 dir = dirname(file_copy);
383 if (rrd_mkdir_p(dir, 0777) != 0)
384 {
385 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386 dir, rrd_strerror(errno));
387 return -1;
388 }
390 free(file_copy);
392 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
393 if (fd < 0)
394 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395 action, file, rrd_strerror(errno));
397 return(fd);
398 } /* }}} static int open_pidfile */
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
402 {
403 int pid_fd;
404 pid_t pid;
405 char pid_str[16];
407 pid_fd = open_pidfile("open", O_RDWR);
408 if (pid_fd < 0)
409 return pid_fd;
411 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
412 return -1;
414 pid = atoi(pid_str);
415 if (pid <= 0)
416 return -1;
418 /* another running process that we can signal COULD be
419 * a competing rrdcached */
420 if (pid != getpid() && kill(pid, 0) == 0)
421 {
422 fprintf(stderr,
423 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
424 close(pid_fd);
425 return -1;
426 }
428 lseek(pid_fd, 0, SEEK_SET);
429 if (ftruncate(pid_fd, 0) == -1)
430 {
431 fprintf(stderr,
432 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
433 close(pid_fd);
434 return -1;
435 }
437 fprintf(stderr,
438 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439 "rrdcached: starting normally.\n", pid);
441 return pid_fd;
442 } /* }}} static int check_pidfile */
444 static int write_pidfile (int fd) /* {{{ */
445 {
446 pid_t pid;
447 FILE *fh;
449 pid = getpid ();
451 fh = fdopen (fd, "w");
452 if (fh == NULL)
453 {
454 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
455 close(fd);
456 return (-1);
457 }
459 fprintf (fh, "%i\n", (int) pid);
460 fclose (fh);
462 return (0);
463 } /* }}} int write_pidfile */
465 static int remove_pidfile (void) /* {{{ */
466 {
467 char *file;
468 int status;
470 file = (config_pid_file != NULL)
471 ? config_pid_file
472 : LOCALSTATEDIR "/run/rrdcached.pid";
474 status = unlink (file);
475 if (status == 0)
476 return (0);
477 return (errno);
478 } /* }}} int remove_pidfile */
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
481 {
482 char *eol;
484 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485 sock->next_read - sock->next_cmd);
487 if (eol == NULL)
488 {
489 /* no commands left, move remainder back to front of rbuf */
490 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491 sock->next_read - sock->next_cmd);
492 sock->next_read -= sock->next_cmd;
493 sock->next_cmd = 0;
494 *len = 0;
495 return NULL;
496 }
497 else
498 {
499 char *cmd = sock->rbuf + sock->next_cmd;
500 *eol = '\0';
502 sock->next_cmd = eol - sock->rbuf + 1;
504 if (eol > sock->rbuf && *(eol-1) == '\r')
505 *(--eol) = '\0'; /* handle "\r\n" EOL */
507 *len = eol - cmd;
509 return cmd;
510 }
512 /* NOTREACHED */
513 assert(1==0);
514 } /* }}} char *next_cmd */
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
518 {
519 char *new_buf;
521 assert(sock != NULL);
523 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
524 if (new_buf == NULL)
525 {
526 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
527 return -1;
528 }
530 strncpy(new_buf + sock->wbuf_len, str, len + 1);
532 sock->wbuf = new_buf;
533 sock->wbuf_len += len;
535 return 0;
536 } /* }}} static int add_to_wbuf */
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
540 {
541 va_list argp;
542 char buffer[CMD_MAX];
543 int len;
545 if (JOURNAL_REPLAY(sock)) return 0;
546 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
548 va_start(argp, fmt);
549 #ifdef HAVE_VSNPRINTF
550 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
551 #else
552 len = vsprintf(buffer, fmt, argp);
553 #endif
554 va_end(argp);
555 if (len < 0)
556 {
557 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
558 return -1;
559 }
561 return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
564 static int count_lines(char *str) /* {{{ */
565 {
566 int lines = 0;
568 if (str != NULL)
569 {
570 while ((str = strchr(str, '\n')) != NULL)
571 {
572 ++lines;
573 ++str;
574 }
575 }
577 return lines;
578 } /* }}} static int count_lines */
580 /* send the response back to the user.
581 * returns 0 on success, -1 on error
582 * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584 char *fmt, ...) /* {{{ */
585 {
586 va_list argp;
587 char buffer[CMD_MAX];
588 int lines;
589 ssize_t wrote;
590 int rclen, len;
592 if (JOURNAL_REPLAY(sock)) return rc;
594 if (sock->batch_start)
595 {
596 if (rc == RESP_OK)
597 return rc; /* no response on success during BATCH */
598 lines = sock->batch_cmd;
599 }
600 else if (rc == RESP_OK)
601 lines = count_lines(sock->wbuf);
602 else
603 lines = -1;
605 rclen = sprintf(buffer, "%d ", lines);
606 va_start(argp, fmt);
607 #ifdef HAVE_VSNPRINTF
608 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
609 #else
610 len = vsprintf(buffer+rclen, fmt, argp);
611 #endif
612 va_end(argp);
613 if (len < 0)
614 return -1;
616 len += rclen;
618 /* append the result to the wbuf, don't write to the user */
619 if (sock->batch_start)
620 return add_to_wbuf(sock, buffer, len);
622 /* first write must be complete */
623 if (len != write(sock->fd, buffer, len))
624 {
625 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
626 return -1;
627 }
629 if (sock->wbuf != NULL && rc == RESP_OK)
630 {
631 wrote = 0;
632 while (wrote < sock->wbuf_len)
633 {
634 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
635 if (wb <= 0)
636 {
637 RRDD_LOG(LOG_INFO, "send_response: could not write results");
638 return -1;
639 }
640 wrote += wb;
641 }
642 }
644 free(sock->wbuf); sock->wbuf = NULL;
645 sock->wbuf_len = 0;
647 return 0;
648 } /* }}} */
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
651 {
652 ci->values = NULL;
653 ci->values_num = 0;
655 ci->last_flush_time = when;
656 if (config_write_jitter > 0)
657 ci->last_flush_time += (rrd_random() % config_write_jitter);
658 }
660 /* remove_from_queue
661 * remove a "cache_item_t" item from the queue.
662 * must hold 'cache_lock' when calling this
663 */
664 static void remove_from_queue(cache_item_t *ci) /* {{{ */
665 {
666 if (ci == NULL) return;
667 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
669 if (ci->prev == NULL)
670 cache_queue_head = ci->next; /* reset head */
671 else
672 ci->prev->next = ci->next;
674 if (ci->next == NULL)
675 cache_queue_tail = ci->prev; /* reset the tail */
676 else
677 ci->next->prev = ci->prev;
679 ci->next = ci->prev = NULL;
680 ci->flags &= ~CI_FLAGS_IN_QUEUE;
682 pthread_mutex_lock (&stats_lock);
683 assert (stats_queue_length > 0);
684 stats_queue_length--;
685 pthread_mutex_unlock (&stats_lock);
687 } /* }}} static void remove_from_queue */
689 /* free the resources associated with the cache_item_t
690 * must hold cache_lock when calling this function
691 */
692 static void *free_cache_item(cache_item_t *ci) /* {{{ */
693 {
694 if (ci == NULL) return NULL;
696 remove_from_queue(ci);
698 for (size_t i=0; i < ci->values_num; i++)
699 free(ci->values[i]);
701 free (ci->values);
702 free (ci->file);
704 /* in case anyone is waiting */
705 pthread_cond_broadcast(&ci->flushed);
706 pthread_cond_destroy(&ci->flushed);
708 free (ci);
710 return NULL;
711 } /* }}} static void *free_cache_item */
713 /*
714 * enqueue_cache_item:
715 * `cache_lock' must be acquired before calling this function!
716 */
717 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
718 queue_side_t side)
719 {
720 if (ci == NULL)
721 return (-1);
723 if (ci->values_num == 0)
724 return (0);
726 if (side == HEAD)
727 {
728 if (cache_queue_head == ci)
729 return 0;
731 /* remove if further down in queue */
732 remove_from_queue(ci);
734 ci->prev = NULL;
735 ci->next = cache_queue_head;
736 if (ci->next != NULL)
737 ci->next->prev = ci;
738 cache_queue_head = ci;
740 if (cache_queue_tail == NULL)
741 cache_queue_tail = cache_queue_head;
742 }
743 else /* (side == TAIL) */
744 {
745 /* We don't move values back in the list.. */
746 if (ci->flags & CI_FLAGS_IN_QUEUE)
747 return (0);
749 assert (ci->next == NULL);
750 assert (ci->prev == NULL);
752 ci->prev = cache_queue_tail;
754 if (cache_queue_tail == NULL)
755 cache_queue_head = ci;
756 else
757 cache_queue_tail->next = ci;
759 cache_queue_tail = ci;
760 }
762 ci->flags |= CI_FLAGS_IN_QUEUE;
764 pthread_cond_signal(&queue_cond);
765 pthread_mutex_lock (&stats_lock);
766 stats_queue_length++;
767 pthread_mutex_unlock (&stats_lock);
769 return (0);
770 } /* }}} int enqueue_cache_item */
772 /*
773 * tree_callback_flush:
774 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
775 * while this is in progress.
776 */
777 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
778 gpointer data)
779 {
780 cache_item_t *ci;
781 callback_flush_data_t *cfd;
783 ci = (cache_item_t *) value;
784 cfd = (callback_flush_data_t *) data;
786 if (ci->flags & CI_FLAGS_IN_QUEUE)
787 return FALSE;
789 if (ci->values_num > 0
790 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
791 {
792 enqueue_cache_item (ci, TAIL);
793 }
794 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
795 && (ci->values_num <= 0))
796 {
797 assert ((char *) key == ci->file);
798 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
799 {
800 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
801 return (FALSE);
802 }
803 }
805 return (FALSE);
806 } /* }}} gboolean tree_callback_flush */
808 static int flush_old_values (int max_age)
809 {
810 callback_flush_data_t cfd;
811 size_t k;
813 memset (&cfd, 0, sizeof (cfd));
814 /* Pass the current time as user data so that we don't need to call
815 * `time' for each node. */
816 cfd.now = time (NULL);
817 cfd.keys = NULL;
818 cfd.keys_num = 0;
820 if (max_age > 0)
821 cfd.abs_timeout = cfd.now - max_age;
822 else
823 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
825 /* `tree_callback_flush' will return the keys of all values that haven't
826 * been touched in the last `config_flush_interval' seconds in `cfd'.
827 * The char*'s in this array point to the same memory as ci->file, so we
828 * don't need to free them separately. */
829 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
831 for (k = 0; k < cfd.keys_num; k++)
832 {
833 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
834 /* should never fail, since we have held the cache_lock
835 * the entire time */
836 assert(status == TRUE);
837 }
839 if (cfd.keys != NULL)
840 {
841 free (cfd.keys);
842 cfd.keys = NULL;
843 }
845 return (0);
846 } /* int flush_old_values */
848 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
849 {
850 struct timeval now;
851 struct timespec next_flush;
852 int status;
854 gettimeofday (&now, NULL);
855 next_flush.tv_sec = now.tv_sec + config_flush_interval;
856 next_flush.tv_nsec = 1000 * now.tv_usec;
858 pthread_mutex_lock(&cache_lock);
860 while (state == RUNNING)
861 {
862 gettimeofday (&now, NULL);
863 if ((now.tv_sec > next_flush.tv_sec)
864 || ((now.tv_sec == next_flush.tv_sec)
865 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
866 {
867 RRDD_LOG(LOG_DEBUG, "flushing old values");
869 /* Determine the time of the next cache flush. */
870 next_flush.tv_sec = now.tv_sec + config_flush_interval;
872 /* Flush all values that haven't been written in the last
873 * `config_write_interval' seconds. */
874 flush_old_values (config_write_interval);
876 /* unlock the cache while we rotate so we don't block incoming
877 * updates if the fsync() blocks on disk I/O */
878 pthread_mutex_unlock(&cache_lock);
879 journal_rotate();
880 pthread_mutex_lock(&cache_lock);
881 }
883 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
884 if (status != 0 && status != ETIMEDOUT)
885 {
886 RRDD_LOG (LOG_ERR, "flush_thread_main: "
887 "pthread_cond_timedwait returned %i.", status);
888 }
889 }
891 if (config_flush_at_shutdown)
892 flush_old_values (-1); /* flush everything */
894 state = SHUTDOWN;
896 pthread_mutex_unlock(&cache_lock);
898 return NULL;
899 } /* void *flush_thread_main */
901 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
902 {
903 pthread_mutex_lock (&cache_lock);
905 while (state != SHUTDOWN
906 || (cache_queue_head != NULL && config_flush_at_shutdown))
907 {
908 cache_item_t *ci;
909 char *file;
910 char **values;
911 size_t values_num;
912 int status;
914 /* Now, check if there's something to store away. If not, wait until
915 * something comes in. */
916 if (cache_queue_head == NULL)
917 {
918 status = pthread_cond_wait (&queue_cond, &cache_lock);
919 if ((status != 0) && (status != ETIMEDOUT))
920 {
921 RRDD_LOG (LOG_ERR, "queue_thread_main: "
922 "pthread_cond_wait returned %i.", status);
923 }
924 }
926 /* Check if a value has arrived. This may be NULL if we timed out or there
927 * was an interrupt such as a signal. */
928 if (cache_queue_head == NULL)
929 continue;
931 ci = cache_queue_head;
933 /* copy the relevant parts */
934 file = strdup (ci->file);
935 if (file == NULL)
936 {
937 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
938 continue;
939 }
941 assert(ci->values != NULL);
942 assert(ci->values_num > 0);
944 values = ci->values;
945 values_num = ci->values_num;
947 wipe_ci_values(ci, time(NULL));
948 remove_from_queue(ci);
950 pthread_mutex_unlock (&cache_lock);
952 rrd_clear_error ();
953 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
954 if (status != 0)
955 {
956 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
957 "rrd_update_r (%s) failed with status %i. (%s)",
958 file, status, rrd_get_error());
959 }
961 journal_write("wrote", file);
963 /* Search again in the tree. It's possible someone issued a "FORGET"
964 * while we were writing the update values. */
965 pthread_mutex_lock(&cache_lock);
966 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
967 if (ci)
968 pthread_cond_broadcast(&ci->flushed);
969 pthread_mutex_unlock(&cache_lock);
971 if (status == 0)
972 {
973 pthread_mutex_lock (&stats_lock);
974 stats_updates_written++;
975 stats_data_sets_written += values_num;
976 pthread_mutex_unlock (&stats_lock);
977 }
979 rrd_free_ptrs((void ***) &values, &values_num);
980 free(file);
982 pthread_mutex_lock (&cache_lock);
983 }
984 pthread_mutex_unlock (&cache_lock);
986 return (NULL);
987 } /* }}} void *queue_thread_main */
989 static int buffer_get_field (char **buffer_ret, /* {{{ */
990 size_t *buffer_size_ret, char **field_ret)
991 {
992 char *buffer;
993 size_t buffer_pos;
994 size_t buffer_size;
995 char *field;
996 size_t field_size;
997 int status;
999 buffer = *buffer_ret;
1000 buffer_pos = 0;
1001 buffer_size = *buffer_size_ret;
1002 field = *buffer_ret;
1003 field_size = 0;
1005 if (buffer_size <= 0)
1006 return (-1);
1008 /* This is ensured by `handle_request'. */
1009 assert (buffer[buffer_size - 1] == '\0');
1011 status = -1;
1012 while (buffer_pos < buffer_size)
1013 {
1014 /* Check for end-of-field or end-of-buffer */
1015 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1016 {
1017 field[field_size] = 0;
1018 field_size++;
1019 buffer_pos++;
1020 status = 0;
1021 break;
1022 }
1023 /* Handle escaped characters. */
1024 else if (buffer[buffer_pos] == '\\')
1025 {
1026 if (buffer_pos >= (buffer_size - 1))
1027 break;
1028 buffer_pos++;
1029 field[field_size] = buffer[buffer_pos];
1030 field_size++;
1031 buffer_pos++;
1032 }
1033 /* Normal operation */
1034 else
1035 {
1036 field[field_size] = buffer[buffer_pos];
1037 field_size++;
1038 buffer_pos++;
1039 }
1040 } /* while (buffer_pos < buffer_size) */
1042 if (status != 0)
1043 return (status);
1045 *buffer_ret = buffer + buffer_pos;
1046 *buffer_size_ret = buffer_size - buffer_pos;
1047 *field_ret = field;
1049 return (0);
1050 } /* }}} int buffer_get_field */
1052 /* if we're restricting writes to the base directory,
1053 * check whether the file falls within the dir
1054 * returns 1 if OK, otherwise 0
1055 */
1056 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1057 {
1058 assert(file != NULL);
1060 if (!config_write_base_only
1061 || JOURNAL_REPLAY(sock)
1062 || config_base_dir == NULL)
1063 return 1;
1065 if (strstr(file, "../") != NULL) goto err;
1067 /* relative paths without "../" are ok */
1068 if (*file != '/') return 1;
1070 /* file must be of the format base + "/" + <1+ char filename> */
1071 if (strlen(file) < _config_base_dir_len + 2) goto err;
1072 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1073 if (*(file + _config_base_dir_len) != '/') goto err;
1075 return 1;
1077 err:
1078 if (sock != NULL && sock->fd >= 0)
1079 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1081 return 0;
1082 } /* }}} static int check_file_access */
1084 /* when using a base dir, convert relative paths to absolute paths.
1085 * if necessary, modifies the "filename" pointer to point
1086 * to the new path created in "tmp". "tmp" is provided
1087 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1088 *
1089 * this allows us to optimize for the expected case (absolute path)
1090 * with a no-op.
1091 */
1092 static void get_abs_path(char **filename, char *tmp)
1093 {
1094 assert(tmp != NULL);
1095 assert(filename != NULL && *filename != NULL);
1097 if (config_base_dir == NULL || **filename == '/')
1098 return;
1100 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1101 *filename = tmp;
1102 } /* }}} static int get_abs_path */
1104 static int flush_file (const char *filename) /* {{{ */
1105 {
1106 cache_item_t *ci;
1108 pthread_mutex_lock (&cache_lock);
1110 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1111 if (ci == NULL)
1112 {
1113 pthread_mutex_unlock (&cache_lock);
1114 return (ENOENT);
1115 }
1117 if (ci->values_num > 0)
1118 {
1119 /* Enqueue at head */
1120 enqueue_cache_item (ci, HEAD);
1121 pthread_cond_wait(&ci->flushed, &cache_lock);
1122 }
1124 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1125 * may have been purged during our cond_wait() */
1127 pthread_mutex_unlock(&cache_lock);
1129 return (0);
1130 } /* }}} int flush_file */
1132 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1133 {
1134 char *err = "Syntax error.\n";
1136 if (cmd && cmd->syntax)
1137 err = cmd->syntax;
1139 return send_response(sock, RESP_ERR, "Usage: %s", err);
1140 } /* }}} static int syntax_error() */
1142 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1143 {
1144 uint64_t copy_queue_length;
1145 uint64_t copy_updates_received;
1146 uint64_t copy_flush_received;
1147 uint64_t copy_updates_written;
1148 uint64_t copy_data_sets_written;
1149 uint64_t copy_journal_bytes;
1150 uint64_t copy_journal_rotate;
1152 uint64_t tree_nodes_number;
1153 uint64_t tree_depth;
1155 pthread_mutex_lock (&stats_lock);
1156 copy_queue_length = stats_queue_length;
1157 copy_updates_received = stats_updates_received;
1158 copy_flush_received = stats_flush_received;
1159 copy_updates_written = stats_updates_written;
1160 copy_data_sets_written = stats_data_sets_written;
1161 copy_journal_bytes = stats_journal_bytes;
1162 copy_journal_rotate = stats_journal_rotate;
1163 pthread_mutex_unlock (&stats_lock);
1165 pthread_mutex_lock (&cache_lock);
1166 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1167 tree_depth = (uint64_t) g_tree_height (cache_tree);
1168 pthread_mutex_unlock (&cache_lock);
1170 add_response_info(sock,
1171 "QueueLength: %"PRIu64"\n", copy_queue_length);
1172 add_response_info(sock,
1173 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1174 add_response_info(sock,
1175 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1176 add_response_info(sock,
1177 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1178 add_response_info(sock,
1179 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1180 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1181 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1182 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1183 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1185 send_response(sock, RESP_OK, "Statistics follow\n");
1187 return (0);
1188 } /* }}} int handle_request_stats */
1190 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1191 {
1192 char *file, file_tmp[PATH_MAX];
1193 int status;
1195 status = buffer_get_field (&buffer, &buffer_size, &file);
1196 if (status != 0)
1197 {
1198 return syntax_error(sock,cmd);
1199 }
1200 else
1201 {
1202 pthread_mutex_lock(&stats_lock);
1203 stats_flush_received++;
1204 pthread_mutex_unlock(&stats_lock);
1206 get_abs_path(&file, file_tmp);
1207 if (!check_file_access(file, sock)) return 0;
1209 status = flush_file (file);
1210 if (status == 0)
1211 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1212 else if (status == ENOENT)
1213 {
1214 /* no file in our tree; see whether it exists at all */
1215 struct stat statbuf;
1217 memset(&statbuf, 0, sizeof(statbuf));
1218 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1219 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1220 else
1221 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1222 }
1223 else if (status < 0)
1224 return send_response(sock, RESP_ERR, "Internal error.\n");
1225 else
1226 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1227 }
1229 /* NOTREACHED */
1230 assert(1==0);
1231 } /* }}} int handle_request_flush */
1233 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1234 {
1235 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1237 pthread_mutex_lock(&cache_lock);
1238 flush_old_values(-1);
1239 pthread_mutex_unlock(&cache_lock);
1241 return send_response(sock, RESP_OK, "Started flush.\n");
1242 } /* }}} static int handle_request_flushall */
1244 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1245 {
1246 int status;
1247 char *file, file_tmp[PATH_MAX];
1248 cache_item_t *ci;
1250 status = buffer_get_field(&buffer, &buffer_size, &file);
1251 if (status != 0)
1252 return syntax_error(sock,cmd);
1254 get_abs_path(&file, file_tmp);
1256 pthread_mutex_lock(&cache_lock);
1257 ci = g_tree_lookup(cache_tree, file);
1258 if (ci == NULL)
1259 {
1260 pthread_mutex_unlock(&cache_lock);
1261 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1262 }
1264 for (size_t i=0; i < ci->values_num; i++)
1265 add_response_info(sock, "%s\n", ci->values[i]);
1267 pthread_mutex_unlock(&cache_lock);
1268 return send_response(sock, RESP_OK, "updates pending\n");
1269 } /* }}} static int handle_request_pending */
1271 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1272 {
1273 int status;
1274 gboolean found;
1275 char *file, file_tmp[PATH_MAX];
1277 status = buffer_get_field(&buffer, &buffer_size, &file);
1278 if (status != 0)
1279 return syntax_error(sock,cmd);
1281 get_abs_path(&file, file_tmp);
1282 if (!check_file_access(file, sock)) return 0;
1284 pthread_mutex_lock(&cache_lock);
1285 found = g_tree_remove(cache_tree, file);
1286 pthread_mutex_unlock(&cache_lock);
1288 if (found == TRUE)
1289 {
1290 if (!JOURNAL_REPLAY(sock))
1291 journal_write("forget", file);
1293 return send_response(sock, RESP_OK, "Gone!\n");
1294 }
1295 else
1296 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1298 /* NOTREACHED */
1299 assert(1==0);
1300 } /* }}} static int handle_request_forget */
1302 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1303 {
1304 cache_item_t *ci;
1306 pthread_mutex_lock(&cache_lock);
1308 ci = cache_queue_head;
1309 while (ci != NULL)
1310 {
1311 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1312 ci = ci->next;
1313 }
1315 pthread_mutex_unlock(&cache_lock);
1317 return send_response(sock, RESP_OK, "in queue.\n");
1318 } /* }}} int handle_request_queue */
1320 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1321 {
1322 char *file, file_tmp[PATH_MAX];
1323 int values_num = 0;
1324 int status;
1325 char orig_buf[CMD_MAX];
1327 cache_item_t *ci;
1329 /* save it for the journal later */
1330 if (!JOURNAL_REPLAY(sock))
1331 strncpy(orig_buf, buffer, min(CMD_MAX,buffer_size));
1333 status = buffer_get_field (&buffer, &buffer_size, &file);
1334 if (status != 0)
1335 return syntax_error(sock,cmd);
1337 pthread_mutex_lock(&stats_lock);
1338 stats_updates_received++;
1339 pthread_mutex_unlock(&stats_lock);
1341 get_abs_path(&file, file_tmp);
1342 if (!check_file_access(file, sock)) return 0;
1344 pthread_mutex_lock (&cache_lock);
1345 ci = g_tree_lookup (cache_tree, file);
1347 if (ci == NULL) /* {{{ */
1348 {
1349 struct stat statbuf;
1350 cache_item_t *tmp;
1352 /* don't hold the lock while we setup; stat(2) might block */
1353 pthread_mutex_unlock(&cache_lock);
1355 memset (&statbuf, 0, sizeof (statbuf));
1356 status = stat (file, &statbuf);
1357 if (status != 0)
1358 {
1359 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1361 status = errno;
1362 if (status == ENOENT)
1363 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1364 else
1365 return send_response(sock, RESP_ERR,
1366 "stat failed with error %i.\n", status);
1367 }
1368 if (!S_ISREG (statbuf.st_mode))
1369 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1371 if (access(file, R_OK|W_OK) != 0)
1372 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1373 file, rrd_strerror(errno));
1375 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1376 if (ci == NULL)
1377 {
1378 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1380 return send_response(sock, RESP_ERR, "malloc failed.\n");
1381 }
1382 memset (ci, 0, sizeof (cache_item_t));
1384 ci->file = strdup (file);
1385 if (ci->file == NULL)
1386 {
1387 free (ci);
1388 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1390 return send_response(sock, RESP_ERR, "strdup failed.\n");
1391 }
1393 wipe_ci_values(ci, now);
1394 ci->flags = CI_FLAGS_IN_TREE;
1395 pthread_cond_init(&ci->flushed, NULL);
1397 pthread_mutex_lock(&cache_lock);
1399 /* another UPDATE might have added this entry in the meantime */
1400 tmp = g_tree_lookup (cache_tree, file);
1401 if (tmp == NULL)
1402 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1403 else
1404 {
1405 free_cache_item (ci);
1406 ci = tmp;
1407 }
1409 /* state may have changed while we were unlocked */
1410 if (state == SHUTDOWN)
1411 return -1;
1412 } /* }}} */
1413 assert (ci != NULL);
1415 /* don't re-write updates in replay mode */
1416 if (!JOURNAL_REPLAY(sock))
1417 journal_write("update", orig_buf);
1419 while (buffer_size > 0)
1420 {
1421 char *value;
1422 double stamp;
1423 char *eostamp;
1425 status = buffer_get_field (&buffer, &buffer_size, &value);
1426 if (status != 0)
1427 {
1428 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1429 break;
1430 }
1432 /* make sure update time is always moving forward. We use double here since
1433 update does support subsecond precision for timestamps ... */
1434 stamp = strtod(value, &eostamp);
1435 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1436 {
1437 pthread_mutex_unlock(&cache_lock);
1438 return send_response(sock, RESP_ERR,
1439 "Cannot find timestamp in '%s'!\n", value);
1440 }
1441 else if (stamp <= ci->last_update_stamp)
1442 {
1443 pthread_mutex_unlock(&cache_lock);
1444 return send_response(sock, RESP_ERR,
1445 "illegal attempt to update using time %lf when last"
1446 " update time is %lf (minimum one second step)\n",
1447 stamp, ci->last_update_stamp);
1448 }
1449 else
1450 ci->last_update_stamp = stamp;
1452 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1453 {
1454 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1455 continue;
1456 }
1458 values_num++;
1459 }
1461 if (((now - ci->last_flush_time) >= config_write_interval)
1462 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1463 && (ci->values_num > 0))
1464 {
1465 enqueue_cache_item (ci, TAIL);
1466 }
1468 pthread_mutex_unlock (&cache_lock);
1470 if (values_num < 1)
1471 return send_response(sock, RESP_ERR, "No values updated.\n");
1472 else
1473 return send_response(sock, RESP_OK,
1474 "errors, enqueued %i value(s).\n", values_num);
1476 /* NOTREACHED */
1477 assert(1==0);
1479 } /* }}} int handle_request_update */
1481 /* we came across a "WROTE" entry during journal replay.
1482 * throw away any values that we have accumulated for this file
1483 */
1484 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1485 {
1486 cache_item_t *ci;
1487 const char *file = buffer;
1489 pthread_mutex_lock(&cache_lock);
1491 ci = g_tree_lookup(cache_tree, file);
1492 if (ci == NULL)
1493 {
1494 pthread_mutex_unlock(&cache_lock);
1495 return (0);
1496 }
1498 if (ci->values)
1499 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1501 wipe_ci_values(ci, now);
1502 remove_from_queue(ci);
1504 pthread_mutex_unlock(&cache_lock);
1505 return (0);
1506 } /* }}} int handle_request_wrote */
1508 /* start "BATCH" processing */
1509 static int batch_start (HANDLER_PROTO) /* {{{ */
1510 {
1511 int status;
1512 if (sock->batch_start)
1513 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1515 status = send_response(sock, RESP_OK,
1516 "Go ahead. End with dot '.' on its own line.\n");
1517 sock->batch_start = time(NULL);
1518 sock->batch_cmd = 0;
1520 return status;
1521 } /* }}} static int batch_start */
1523 /* finish "BATCH" processing and return results to the client */
1524 static int batch_done (HANDLER_PROTO) /* {{{ */
1525 {
1526 assert(sock->batch_start);
1527 sock->batch_start = 0;
1528 sock->batch_cmd = 0;
1529 return send_response(sock, RESP_OK, "errors\n");
1530 } /* }}} static int batch_done */
1532 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1533 {
1534 return -1;
1535 } /* }}} static int handle_request_quit */
1537 static command_t list_of_commands[] = { /* {{{ */
1538 {
1539 "UPDATE",
1540 handle_request_update,
1541 CMD_CONTEXT_ANY,
1542 "UPDATE <filename> <values> [<values> ...]\n"
1543 ,
1544 "Adds the given file to the internal cache if it is not yet known and\n"
1545 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1546 "for details.\n"
1547 "\n"
1548 "Each <values> has the following form:\n"
1549 " <values> = <time>:<value>[:<value>[...]]\n"
1550 "See the rrdupdate(1) manpage for details.\n"
1551 },
1552 {
1553 "WROTE",
1554 handle_request_wrote,
1555 CMD_CONTEXT_JOURNAL,
1556 NULL,
1557 NULL
1558 },
1559 {
1560 "FLUSH",
1561 handle_request_flush,
1562 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1563 "FLUSH <filename>\n"
1564 ,
1565 "Adds the given filename to the head of the update queue and returns\n"
1566 "after it has been dequeued.\n"
1567 },
1568 {
1569 "FLUSHALL",
1570 handle_request_flushall,
1571 CMD_CONTEXT_CLIENT,
1572 "FLUSHALL\n"
1573 ,
1574 "Triggers writing of all pending updates. Returns immediately.\n"
1575 },
1576 {
1577 "PENDING",
1578 handle_request_pending,
1579 CMD_CONTEXT_CLIENT,
1580 "PENDING <filename>\n"
1581 ,
1582 "Shows any 'pending' updates for a file, in order.\n"
1583 "The updates shown have not yet been written to the underlying RRD file.\n"
1584 },
1585 {
1586 "FORGET",
1587 handle_request_forget,
1588 CMD_CONTEXT_ANY,
1589 "FORGET <filename>\n"
1590 ,
1591 "Removes the file completely from the cache.\n"
1592 "Any pending updates for the file will be lost.\n"
1593 },
1594 {
1595 "QUEUE",
1596 handle_request_queue,
1597 CMD_CONTEXT_CLIENT,
1598 "QUEUE\n"
1599 ,
1600 "Shows all files in the output queue.\n"
1601 "The output is zero or more lines in the following format:\n"
1602 "(where <num_vals> is the number of values to be written)\n"
1603 "\n"
1604 "<num_vals> <filename>\n"
1605 },
1606 {
1607 "STATS",
1608 handle_request_stats,
1609 CMD_CONTEXT_CLIENT,
1610 "STATS\n"
1611 ,
1612 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1613 "a description of the values.\n"
1614 },
1615 {
1616 "HELP",
1617 handle_request_help,
1618 CMD_CONTEXT_CLIENT,
1619 "HELP [<command>]\n",
1620 NULL, /* special! */
1621 },
1622 {
1623 "BATCH",
1624 batch_start,
1625 CMD_CONTEXT_CLIENT,
1626 "BATCH\n"
1627 ,
1628 "The 'BATCH' command permits the client to initiate a bulk load\n"
1629 " of commands to rrdcached.\n"
1630 "\n"
1631 "Usage:\n"
1632 "\n"
1633 " client: BATCH\n"
1634 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1635 " client: command #1\n"
1636 " client: command #2\n"
1637 " client: ... and so on\n"
1638 " client: .\n"
1639 " server: 2 errors\n"
1640 " server: 7 message for command #7\n"
1641 " server: 9 message for command #9\n"
1642 "\n"
1643 "For more information, consult the rrdcached(1) documentation.\n"
1644 },
1645 {
1646 ".", /* BATCH terminator */
1647 batch_done,
1648 CMD_CONTEXT_BATCH,
1649 NULL,
1650 NULL
1651 },
1652 {
1653 "QUIT",
1654 handle_request_quit,
1655 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1656 "QUIT\n"
1657 ,
1658 "Disconnect from rrdcached.\n"
1659 }
1660 }; /* }}} command_t list_of_commands[] */
1661 static size_t list_of_commands_len = sizeof (list_of_commands)
1662 / sizeof (list_of_commands[0]);
1664 static command_t *find_command(char *cmd)
1665 {
1666 size_t i;
1668 for (i = 0; i < list_of_commands_len; i++)
1669 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1670 return (&list_of_commands[i]);
1671 return NULL;
1672 }
1674 /* We currently use the index in the `list_of_commands' array as a bit position
1675 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1676 * outside these functions so that switching to a more elegant storage method
1677 * is easily possible. */
1678 static ssize_t find_command_index (const char *cmd) /* {{{ */
1679 {
1680 size_t i;
1682 for (i = 0; i < list_of_commands_len; i++)
1683 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1684 return ((ssize_t) i);
1685 return (-1);
1686 } /* }}} ssize_t find_command_index */
1688 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1689 const char *cmd)
1690 {
1691 ssize_t i;
1693 if (JOURNAL_REPLAY(sock))
1694 return (1);
1696 if (cmd == NULL)
1697 return (-1);
1699 if ((strcasecmp ("QUIT", cmd) == 0)
1700 || (strcasecmp ("HELP", cmd) == 0))
1701 return (1);
1702 else if (strcmp (".", cmd) == 0)
1703 cmd = "BATCH";
1705 i = find_command_index (cmd);
1706 if (i < 0)
1707 return (-1);
1708 assert (i < 32);
1710 if ((sock->permissions & (1 << i)) != 0)
1711 return (1);
1712 return (0);
1713 } /* }}} int socket_permission_check */
1715 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1716 const char *cmd)
1717 {
1718 ssize_t i;
1720 i = find_command_index (cmd);
1721 if (i < 0)
1722 return (-1);
1723 assert (i < 32);
1725 sock->permissions |= (1 << i);
1726 return (0);
1727 } /* }}} int socket_permission_add */
1729 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1730 {
1731 sock->permissions = 0;
1732 } /* }}} socket_permission_clear */
1734 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1735 listen_socket_t *src)
1736 {
1737 dest->permissions = src->permissions;
1738 } /* }}} socket_permission_copy */
1740 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
1741 {
1742 size_t i;
1744 sock->permissions = 0;
1745 for (i = 0; i < list_of_commands_len; i++)
1746 sock->permissions |= (1 << i);
1747 } /* }}} void socket_permission_set_all */
1749 /* check whether commands are received in the expected context */
1750 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1751 {
1752 if (JOURNAL_REPLAY(sock))
1753 return (cmd->context & CMD_CONTEXT_JOURNAL);
1754 else if (sock->batch_start)
1755 return (cmd->context & CMD_CONTEXT_BATCH);
1756 else
1757 return (cmd->context & CMD_CONTEXT_CLIENT);
1759 /* NOTREACHED */
1760 assert(1==0);
1761 }
1763 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1764 {
1765 int status;
1766 char *cmd_str;
1767 char *resp_txt;
1768 command_t *help = NULL;
1770 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1771 if (status == 0)
1772 help = find_command(cmd_str);
1774 if (help && (help->syntax || help->help))
1775 {
1776 char tmp[CMD_MAX];
1778 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1779 resp_txt = tmp;
1781 if (help->syntax)
1782 add_response_info(sock, "Usage: %s\n", help->syntax);
1784 if (help->help)
1785 add_response_info(sock, "%s\n", help->help);
1786 }
1787 else
1788 {
1789 size_t i;
1791 resp_txt = "Command overview\n";
1793 for (i = 0; i < list_of_commands_len; i++)
1794 {
1795 if (list_of_commands[i].syntax == NULL)
1796 continue;
1797 add_response_info (sock, "%s", list_of_commands[i].syntax);
1798 }
1799 }
1801 return send_response(sock, RESP_OK, resp_txt);
1802 } /* }}} int handle_request_help */
1804 static int handle_request (DISPATCH_PROTO) /* {{{ */
1805 {
1806 char *buffer_ptr = buffer;
1807 char *cmd_str = NULL;
1808 command_t *cmd = NULL;
1809 int status;
1811 assert (buffer[buffer_size - 1] == '\0');
1813 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1814 if (status != 0)
1815 {
1816 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1817 return (-1);
1818 }
1820 if (sock != NULL && sock->batch_start)
1821 sock->batch_cmd++;
1823 cmd = find_command(cmd_str);
1824 if (!cmd)
1825 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1827 if (!socket_permission_check (sock, cmd->cmd))
1828 return send_response(sock, RESP_ERR, "Permission denied.\n");
1830 if (!command_check_context(sock, cmd))
1831 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1833 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1834 } /* }}} int handle_request */
1836 static void journal_set_free (journal_set *js) /* {{{ */
1837 {
1838 if (js == NULL)
1839 return;
1841 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1843 free(js);
1844 } /* }}} journal_set_free */
1846 static void journal_set_remove (journal_set *js) /* {{{ */
1847 {
1848 if (js == NULL)
1849 return;
1851 for (uint i=0; i < js->files_num; i++)
1852 {
1853 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1854 unlink(js->files[i]);
1855 }
1856 } /* }}} journal_set_remove */
1858 /* close current journal file handle.
1859 * MUST hold journal_lock before calling */
1860 static void journal_close(void) /* {{{ */
1861 {
1862 if (journal_fh != NULL)
1863 {
1864 if (fclose(journal_fh) != 0)
1865 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1866 }
1868 journal_fh = NULL;
1869 journal_size = 0;
1870 } /* }}} journal_close */
1872 /* MUST hold journal_lock before calling */
1873 static void journal_new_file(void) /* {{{ */
1874 {
1875 struct timeval now;
1876 int new_fd;
1877 char new_file[PATH_MAX + 1];
1879 assert(journal_dir != NULL);
1880 assert(journal_cur != NULL);
1882 journal_close();
1884 gettimeofday(&now, NULL);
1885 /* this format assures that the files sort in strcmp() order */
1886 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1887 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1889 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1890 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1891 if (new_fd < 0)
1892 goto error;
1894 journal_fh = fdopen(new_fd, "a");
1895 if (journal_fh == NULL)
1896 goto error;
1898 journal_size = ftell(journal_fh);
1899 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1901 /* record the file in the journal set */
1902 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1904 return;
1906 error:
1907 RRDD_LOG(LOG_CRIT,
1908 "JOURNALING DISABLED: Error while trying to create %s : %s",
1909 new_file, rrd_strerror(errno));
1910 RRDD_LOG(LOG_CRIT,
1911 "JOURNALING DISABLED: All values will be flushed at shutdown");
1913 close(new_fd);
1914 config_flush_at_shutdown = 1;
1916 } /* }}} journal_new_file */
1918 /* MUST NOT hold journal_lock before calling this */
1919 static void journal_rotate(void) /* {{{ */
1920 {
1921 journal_set *old_js = NULL;
1923 if (journal_dir == NULL)
1924 return;
1926 RRDD_LOG(LOG_DEBUG, "rotating journals");
1928 pthread_mutex_lock(&stats_lock);
1929 ++stats_journal_rotate;
1930 pthread_mutex_unlock(&stats_lock);
1932 pthread_mutex_lock(&journal_lock);
1934 journal_close();
1936 /* rotate the journal sets */
1937 old_js = journal_old;
1938 journal_old = journal_cur;
1939 journal_cur = calloc(1, sizeof(journal_set));
1941 if (journal_cur != NULL)
1942 journal_new_file();
1943 else
1944 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1946 pthread_mutex_unlock(&journal_lock);
1948 journal_set_remove(old_js);
1949 journal_set_free (old_js);
1951 } /* }}} static void journal_rotate */
1953 /* MUST hold journal_lock when calling */
1954 static void journal_done(void) /* {{{ */
1955 {
1956 if (journal_cur == NULL)
1957 return;
1959 journal_close();
1961 if (config_flush_at_shutdown)
1962 {
1963 RRDD_LOG(LOG_INFO, "removing journals");
1964 journal_set_remove(journal_old);
1965 journal_set_remove(journal_cur);
1966 }
1967 else
1968 {
1969 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1970 "journals will be used at next startup");
1971 }
1973 journal_set_free(journal_cur);
1974 journal_set_free(journal_old);
1975 free(journal_dir);
1977 } /* }}} static void journal_done */
1979 static int journal_write(char *cmd, char *args) /* {{{ */
1980 {
1981 int chars;
1983 if (journal_fh == NULL)
1984 return 0;
1986 pthread_mutex_lock(&journal_lock);
1987 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1988 journal_size += chars;
1990 if (journal_size > JOURNAL_MAX)
1991 journal_new_file();
1993 pthread_mutex_unlock(&journal_lock);
1995 if (chars > 0)
1996 {
1997 pthread_mutex_lock(&stats_lock);
1998 stats_journal_bytes += chars;
1999 pthread_mutex_unlock(&stats_lock);
2000 }
2002 return chars;
2003 } /* }}} static int journal_write */
2005 static int journal_replay (const char *file) /* {{{ */
2006 {
2007 FILE *fh;
2008 int entry_cnt = 0;
2009 int fail_cnt = 0;
2010 uint64_t line = 0;
2011 char entry[CMD_MAX];
2012 time_t now;
2014 if (file == NULL) return 0;
2016 {
2017 char *reason = "unknown error";
2018 int status = 0;
2019 struct stat statbuf;
2021 memset(&statbuf, 0, sizeof(statbuf));
2022 if (stat(file, &statbuf) != 0)
2023 {
2024 reason = "stat error";
2025 status = errno;
2026 }
2027 else if (!S_ISREG(statbuf.st_mode))
2028 {
2029 reason = "not a regular file";
2030 status = EPERM;
2031 }
2032 if (statbuf.st_uid != daemon_uid)
2033 {
2034 reason = "not owned by daemon user";
2035 status = EACCES;
2036 }
2037 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2038 {
2039 reason = "must not be user/group writable";
2040 status = EACCES;
2041 }
2043 if (status != 0)
2044 {
2045 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2046 file, rrd_strerror(status), reason);
2047 return 0;
2048 }
2049 }
2051 fh = fopen(file, "r");
2052 if (fh == NULL)
2053 {
2054 if (errno != ENOENT)
2055 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2056 file, rrd_strerror(errno));
2057 return 0;
2058 }
2059 else
2060 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2062 now = time(NULL);
2064 while(!feof(fh))
2065 {
2066 size_t entry_len;
2068 ++line;
2069 if (fgets(entry, sizeof(entry), fh) == NULL)
2070 break;
2071 entry_len = strlen(entry);
2073 /* check \n termination in case journal writing crashed mid-line */
2074 if (entry_len == 0)
2075 continue;
2076 else if (entry[entry_len - 1] != '\n')
2077 {
2078 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2079 ++fail_cnt;
2080 continue;
2081 }
2083 entry[entry_len - 1] = '\0';
2085 if (handle_request(NULL, now, entry, entry_len) == 0)
2086 ++entry_cnt;
2087 else
2088 ++fail_cnt;
2089 }
2091 fclose(fh);
2093 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2094 entry_cnt, fail_cnt);
2096 return entry_cnt > 0 ? 1 : 0;
2097 } /* }}} static int journal_replay */
2099 static int journal_sort(const void *v1, const void *v2)
2100 {
2101 char **jn1 = (char **) v1;
2102 char **jn2 = (char **) v2;
2104 return strcmp(*jn1,*jn2);
2105 }
2107 static void journal_init(void) /* {{{ */
2108 {
2109 int had_journal = 0;
2110 DIR *dir;
2111 struct dirent *dent;
2112 char path[PATH_MAX+1];
2114 if (journal_dir == NULL) return;
2116 pthread_mutex_lock(&journal_lock);
2118 journal_cur = calloc(1, sizeof(journal_set));
2119 if (journal_cur == NULL)
2120 {
2121 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2122 return;
2123 }
2125 RRDD_LOG(LOG_INFO, "checking for journal files");
2127 /* Handle old journal files during transition. This gives them the
2128 * correct sort order. TODO: remove after first release
2129 */
2130 {
2131 char old_path[PATH_MAX+1];
2132 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2133 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2134 rename(old_path, path);
2136 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2137 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2138 rename(old_path, path);
2139 }
2141 dir = opendir(journal_dir);
2142 if (!dir) {
2143 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2144 return;
2145 }
2146 while ((dent = readdir(dir)) != NULL)
2147 {
2148 /* looks like a journal file? */
2149 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2150 continue;
2152 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2154 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2155 {
2156 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2157 dent->d_name);
2158 break;
2159 }
2160 }
2161 closedir(dir);
2163 qsort(journal_cur->files, journal_cur->files_num,
2164 sizeof(journal_cur->files[0]), journal_sort);
2166 for (uint i=0; i < journal_cur->files_num; i++)
2167 had_journal += journal_replay(journal_cur->files[i]);
2169 journal_new_file();
2171 /* it must have been a crash. start a flush */
2172 if (had_journal && config_flush_at_shutdown)
2173 flush_old_values(-1);
2175 pthread_mutex_unlock(&journal_lock);
2177 RRDD_LOG(LOG_INFO, "journal processing complete");
2179 } /* }}} static void journal_init */
2181 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2182 {
2183 assert(sock != NULL);
2185 free(sock->rbuf); sock->rbuf = NULL;
2186 free(sock->wbuf); sock->wbuf = NULL;
2187 free(sock);
2188 } /* }}} void free_listen_socket */
2190 static void close_connection(listen_socket_t *sock) /* {{{ */
2191 {
2192 if (sock->fd >= 0)
2193 {
2194 close(sock->fd);
2195 sock->fd = -1;
2196 }
2198 free_listen_socket(sock);
2200 } /* }}} void close_connection */
2202 static void *connection_thread_main (void *args) /* {{{ */
2203 {
2204 listen_socket_t *sock;
2205 int fd;
2207 sock = (listen_socket_t *) args;
2208 fd = sock->fd;
2210 /* init read buffers */
2211 sock->next_read = sock->next_cmd = 0;
2212 sock->rbuf = malloc(RBUF_SIZE);
2213 if (sock->rbuf == NULL)
2214 {
2215 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2216 close_connection(sock);
2217 return NULL;
2218 }
2220 pthread_mutex_lock (&connection_threads_lock);
2221 #ifdef HAVE_LIBWRAP
2222 /* LIBWRAP does not support multiple threads! By putting this code
2223 inside pthread_mutex_lock we do not have to worry about request_info
2224 getting overwritten by another thread.
2225 */
2226 struct request_info req;
2227 request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2228 fromhost(&req);
2229 if(!hosts_access(&req)) {
2230 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2231 pthread_mutex_unlock (&connection_threads_lock);
2232 close_connection(sock);
2233 return NULL;
2234 }
2235 #endif /* HAVE_LIBWRAP */
2236 connection_threads_num++;
2237 pthread_mutex_unlock (&connection_threads_lock);
2239 while (state == RUNNING)
2240 {
2241 char *cmd;
2242 ssize_t cmd_len;
2243 ssize_t rbytes;
2244 time_t now;
2246 struct pollfd pollfd;
2247 int status;
2249 pollfd.fd = fd;
2250 pollfd.events = POLLIN | POLLPRI;
2251 pollfd.revents = 0;
2253 status = poll (&pollfd, 1, /* timeout = */ 500);
2254 if (state != RUNNING)
2255 break;
2256 else if (status == 0) /* timeout */
2257 continue;
2258 else if (status < 0) /* error */
2259 {
2260 status = errno;
2261 if (status != EINTR)
2262 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2263 continue;
2264 }
2266 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2267 break;
2268 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2269 {
2270 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2271 "poll(2) returned something unexpected: %#04hx",
2272 pollfd.revents);
2273 break;
2274 }
2276 rbytes = read(fd, sock->rbuf + sock->next_read,
2277 RBUF_SIZE - sock->next_read);
2278 if (rbytes < 0)
2279 {
2280 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2281 break;
2282 }
2283 else if (rbytes == 0)
2284 break; /* eof */
2286 sock->next_read += rbytes;
2288 if (sock->batch_start)
2289 now = sock->batch_start;
2290 else
2291 now = time(NULL);
2293 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2294 {
2295 status = handle_request (sock, now, cmd, cmd_len+1);
2296 if (status != 0)
2297 goto out_close;
2298 }
2299 }
2301 out_close:
2302 close_connection(sock);
2304 /* Remove this thread from the connection threads list */
2305 pthread_mutex_lock (&connection_threads_lock);
2306 connection_threads_num--;
2307 if (connection_threads_num <= 0)
2308 pthread_cond_broadcast(&connection_threads_done);
2309 pthread_mutex_unlock (&connection_threads_lock);
2311 return (NULL);
2312 } /* }}} void *connection_thread_main */
2314 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2315 {
2316 int fd;
2317 struct sockaddr_un sa;
2318 listen_socket_t *temp;
2319 int status;
2320 const char *path;
2321 char *path_copy, *dir;
2323 path = sock->addr;
2324 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2325 path += strlen("unix:");
2327 /* dirname may modify its argument */
2328 path_copy = strdup(path);
2329 if (path_copy == NULL)
2330 {
2331 fprintf(stderr, "rrdcached: strdup(): %s\n",
2332 rrd_strerror(errno));
2333 return (-1);
2334 }
2336 dir = dirname(path_copy);
2337 if (rrd_mkdir_p(dir, 0777) != 0)
2338 {
2339 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2340 dir, rrd_strerror(errno));
2341 return (-1);
2342 }
2344 free(path_copy);
2346 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2347 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2348 if (temp == NULL)
2349 {
2350 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2351 return (-1);
2352 }
2353 listen_fds = temp;
2354 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2356 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2357 if (fd < 0)
2358 {
2359 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2360 rrd_strerror(errno));
2361 return (-1);
2362 }
2364 memset (&sa, 0, sizeof (sa));
2365 sa.sun_family = AF_UNIX;
2366 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2368 /* if we've gotten this far, we own the pid file. any daemon started
2369 * with the same args must not be alive. therefore, ensure that we can
2370 * create the socket...
2371 */
2372 unlink(path);
2374 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2375 if (status != 0)
2376 {
2377 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2378 path, rrd_strerror(errno));
2379 close (fd);
2380 return (-1);
2381 }
2383 /* tweak the sockets group ownership */
2384 if (sock->socket_group != (gid_t)-1)
2385 {
2386 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2387 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2388 {
2389 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2390 }
2391 }
2393 if (sock->socket_permissions != (mode_t)-1)
2394 {
2395 if (chmod(path, sock->socket_permissions) != 0)
2396 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2397 (unsigned int)sock->socket_permissions, strerror(errno));
2398 }
2400 status = listen (fd, /* backlog = */ 10);
2401 if (status != 0)
2402 {
2403 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2404 path, rrd_strerror(errno));
2405 close (fd);
2406 unlink (path);
2407 return (-1);
2408 }
2410 listen_fds[listen_fds_num].fd = fd;
2411 listen_fds[listen_fds_num].family = PF_UNIX;
2412 strncpy(listen_fds[listen_fds_num].addr, path,
2413 sizeof (listen_fds[listen_fds_num].addr) - 1);
2414 listen_fds_num++;
2416 return (0);
2417 } /* }}} int open_listen_socket_unix */
2419 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2420 {
2421 struct addrinfo ai_hints;
2422 struct addrinfo *ai_res;
2423 struct addrinfo *ai_ptr;
2424 char addr_copy[NI_MAXHOST];
2425 char *addr;
2426 char *port;
2427 int status;
2429 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2430 addr_copy[sizeof (addr_copy) - 1] = 0;
2431 addr = addr_copy;
2433 memset (&ai_hints, 0, sizeof (ai_hints));
2434 ai_hints.ai_flags = 0;
2435 #ifdef AI_ADDRCONFIG
2436 ai_hints.ai_flags |= AI_ADDRCONFIG;
2437 #endif
2438 ai_hints.ai_family = AF_UNSPEC;
2439 ai_hints.ai_socktype = SOCK_STREAM;
2441 port = NULL;
2442 if (*addr == '[') /* IPv6+port format */
2443 {
2444 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2445 addr++;
2447 port = strchr (addr, ']');
2448 if (port == NULL)
2449 {
2450 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2451 return (-1);
2452 }
2453 *port = 0;
2454 port++;
2456 if (*port == ':')
2457 port++;
2458 else if (*port == 0)
2459 port = NULL;
2460 else
2461 {
2462 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2463 return (-1);
2464 }
2465 } /* if (*addr == '[') */
2466 else
2467 {
2468 port = rindex(addr, ':');
2469 if (port != NULL)
2470 {
2471 *port = 0;
2472 port++;
2473 }
2474 }
2475 ai_res = NULL;
2476 status = getaddrinfo (addr,
2477 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2478 &ai_hints, &ai_res);
2479 if (status != 0)
2480 {
2481 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2482 addr, gai_strerror (status));
2483 return (-1);
2484 }
2486 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2487 {
2488 int fd;
2489 listen_socket_t *temp;
2490 int one = 1;
2492 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2493 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2494 if (temp == NULL)
2495 {
2496 fprintf (stderr,
2497 "rrdcached: open_listen_socket_network: realloc failed.\n");
2498 continue;
2499 }
2500 listen_fds = temp;
2501 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2503 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2504 if (fd < 0)
2505 {
2506 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2507 rrd_strerror(errno));
2508 continue;
2509 }
2511 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2513 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2514 if (status != 0)
2515 {
2516 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2517 sock->addr, rrd_strerror(errno));
2518 close (fd);
2519 continue;
2520 }
2522 status = listen (fd, /* backlog = */ 10);
2523 if (status != 0)
2524 {
2525 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2526 sock->addr, rrd_strerror(errno));
2527 close (fd);
2528 freeaddrinfo(ai_res);
2529 return (-1);
2530 }
2532 listen_fds[listen_fds_num].fd = fd;
2533 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2534 listen_fds_num++;
2535 } /* for (ai_ptr) */
2537 freeaddrinfo(ai_res);
2538 return (0);
2539 } /* }}} static int open_listen_socket_network */
2541 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2542 {
2543 assert(sock != NULL);
2544 assert(sock->addr != NULL);
2546 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2547 || sock->addr[0] == '/')
2548 return (open_listen_socket_unix(sock));
2549 else
2550 return (open_listen_socket_network(sock));
2551 } /* }}} int open_listen_socket */
2553 static int close_listen_sockets (void) /* {{{ */
2554 {
2555 size_t i;
2557 for (i = 0; i < listen_fds_num; i++)
2558 {
2559 close (listen_fds[i].fd);
2561 if (listen_fds[i].family == PF_UNIX)
2562 unlink(listen_fds[i].addr);
2563 }
2565 free (listen_fds);
2566 listen_fds = NULL;
2567 listen_fds_num = 0;
2569 return (0);
2570 } /* }}} int close_listen_sockets */
2572 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2573 {
2574 struct pollfd *pollfds;
2575 int pollfds_num;
2576 int status;
2577 int i;
2579 if (listen_fds_num < 1)
2580 {
2581 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2582 return (NULL);
2583 }
2585 pollfds_num = listen_fds_num;
2586 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2587 if (pollfds == NULL)
2588 {
2589 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2590 return (NULL);
2591 }
2592 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2594 RRDD_LOG(LOG_INFO, "listening for connections");
2596 while (state == RUNNING)
2597 {
2598 for (i = 0; i < pollfds_num; i++)
2599 {
2600 pollfds[i].fd = listen_fds[i].fd;
2601 pollfds[i].events = POLLIN | POLLPRI;
2602 pollfds[i].revents = 0;
2603 }
2605 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2606 if (state != RUNNING)
2607 break;
2608 else if (status == 0) /* timeout */
2609 continue;
2610 else if (status < 0) /* error */
2611 {
2612 status = errno;
2613 if (status != EINTR)
2614 {
2615 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2616 }
2617 continue;
2618 }
2620 for (i = 0; i < pollfds_num; i++)
2621 {
2622 listen_socket_t *client_sock;
2623 struct sockaddr_storage client_sa;
2624 socklen_t client_sa_size;
2625 pthread_t tid;
2626 pthread_attr_t attr;
2628 if (pollfds[i].revents == 0)
2629 continue;
2631 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2632 {
2633 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2634 "poll(2) returned something unexpected for listen FD #%i.",
2635 pollfds[i].fd);
2636 continue;
2637 }
2639 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2640 if (client_sock == NULL)
2641 {
2642 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2643 continue;
2644 }
2645 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2647 client_sa_size = sizeof (client_sa);
2648 client_sock->fd = accept (pollfds[i].fd,
2649 (struct sockaddr *) &client_sa, &client_sa_size);
2650 if (client_sock->fd < 0)
2651 {
2652 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2653 free(client_sock);
2654 continue;
2655 }
2657 pthread_attr_init (&attr);
2658 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2660 status = pthread_create (&tid, &attr, connection_thread_main,
2661 client_sock);
2662 if (status != 0)
2663 {
2664 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2665 close_connection(client_sock);
2666 continue;
2667 }
2668 } /* for (pollfds_num) */
2669 } /* while (state == RUNNING) */
2671 RRDD_LOG(LOG_INFO, "starting shutdown");
2673 close_listen_sockets ();
2675 pthread_mutex_lock (&connection_threads_lock);
2676 while (connection_threads_num > 0)
2677 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2678 pthread_mutex_unlock (&connection_threads_lock);
2680 free(pollfds);
2682 return (NULL);
2683 } /* }}} void *listen_thread_main */
2685 static int daemonize (void) /* {{{ */
2686 {
2687 int pid_fd;
2688 char *base_dir;
2690 daemon_uid = geteuid();
2692 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2693 if (pid_fd < 0)
2694 pid_fd = check_pidfile();
2695 if (pid_fd < 0)
2696 return pid_fd;
2698 /* open all the listen sockets */
2699 if (config_listen_address_list_len > 0)
2700 {
2701 for (size_t i = 0; i < config_listen_address_list_len; i++)
2702 open_listen_socket (config_listen_address_list[i]);
2704 rrd_free_ptrs((void ***) &config_listen_address_list,
2705 &config_listen_address_list_len);
2706 }
2707 else
2708 {
2709 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2710 sizeof(default_socket.addr) - 1);
2711 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2713 if (default_socket.permissions == 0)
2714 socket_permission_set_all (&default_socket);
2716 open_listen_socket (&default_socket);
2717 }
2719 if (listen_fds_num < 1)
2720 {
2721 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2722 goto error;
2723 }
2725 if (!stay_foreground)
2726 {
2727 pid_t child;
2729 child = fork ();
2730 if (child < 0)
2731 {
2732 fprintf (stderr, "daemonize: fork(2) failed.\n");
2733 goto error;
2734 }
2735 else if (child > 0)
2736 exit(0);
2738 /* Become session leader */
2739 setsid ();
2741 /* Open the first three file descriptors to /dev/null */
2742 close (2);
2743 close (1);
2744 close (0);
2746 open ("/dev/null", O_RDWR);
2747 if (dup(0) == -1 || dup(0) == -1){
2748 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2749 }
2750 } /* if (!stay_foreground) */
2752 /* Change into the /tmp directory. */
2753 base_dir = (config_base_dir != NULL)
2754 ? config_base_dir
2755 : "/tmp";
2757 if (chdir (base_dir) != 0)
2758 {
2759 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2760 goto error;
2761 }
2763 install_signal_handlers();
2765 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2766 RRDD_LOG(LOG_INFO, "starting up");
2768 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2769 (GDestroyNotify) free_cache_item);
2770 if (cache_tree == NULL)
2771 {
2772 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2773 goto error;
2774 }
2776 return write_pidfile (pid_fd);
2778 error:
2779 remove_pidfile();
2780 return -1;
2781 } /* }}} int daemonize */
2783 static int cleanup (void) /* {{{ */
2784 {
2785 pthread_cond_broadcast (&flush_cond);
2786 pthread_join (flush_thread, NULL);
2788 pthread_cond_broadcast (&queue_cond);
2789 for (int i = 0; i < config_queue_threads; i++)
2790 pthread_join (queue_threads[i], NULL);
2792 if (config_flush_at_shutdown)
2793 {
2794 assert(cache_queue_head == NULL);
2795 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2796 }
2798 free(queue_threads);
2799 free(config_base_dir);
2801 pthread_mutex_lock(&cache_lock);
2802 g_tree_destroy(cache_tree);
2804 pthread_mutex_lock(&journal_lock);
2805 journal_done();
2807 RRDD_LOG(LOG_INFO, "goodbye");
2808 closelog ();
2810 remove_pidfile ();
2811 free(config_pid_file);
2813 return (0);
2814 } /* }}} int cleanup */
2816 static int read_options (int argc, char **argv) /* {{{ */
2817 {
2818 int option;
2819 int status = 0;
2821 socket_permission_clear (&default_socket);
2823 default_socket.socket_group = (gid_t)-1;
2824 default_socket.socket_permissions = (mode_t)-1;
2826 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2827 {
2828 switch (option)
2829 {
2830 case 'g':
2831 stay_foreground=1;
2832 break;
2834 case 'l':
2835 {
2836 listen_socket_t *new;
2838 new = malloc(sizeof(listen_socket_t));
2839 if (new == NULL)
2840 {
2841 fprintf(stderr, "read_options: malloc failed.\n");
2842 return(2);
2843 }
2844 memset(new, 0, sizeof(listen_socket_t));
2846 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2848 /* Add permissions to the socket {{{ */
2849 if (default_socket.permissions != 0)
2850 {
2851 socket_permission_copy (new, &default_socket);
2852 }
2853 else /* if (default_socket.permissions == 0) */
2854 {
2855 /* Add permission for ALL commands to the socket. */
2856 socket_permission_set_all (new);
2857 }
2858 /* }}} Done adding permissions. */
2860 new->socket_group = default_socket.socket_group;
2861 new->socket_permissions = default_socket.socket_permissions;
2863 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2864 &config_listen_address_list_len, new))
2865 {
2866 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2867 return (2);
2868 }
2869 }
2870 break;
2872 /* set socket group permissions */
2873 case 's':
2874 {
2875 gid_t group_gid;
2876 struct group *grp;
2878 group_gid = strtoul(optarg, NULL, 10);
2879 if (errno != EINVAL && group_gid>0)
2880 {
2881 /* we were passed a number */
2882 grp = getgrgid(group_gid);
2883 }
2884 else
2885 {
2886 grp = getgrnam(optarg);
2887 }
2889 if (grp)
2890 {
2891 default_socket.socket_group = grp->gr_gid;
2892 }
2893 else
2894 {
2895 /* no idea what the user wanted... */
2896 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2897 return (5);
2898 }
2899 }
2900 break;
2902 /* set socket file permissions */
2903 case 'm':
2904 {
2905 long tmp;
2906 char *endptr = NULL;
2908 tmp = strtol (optarg, &endptr, 8);
2909 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2910 || (tmp > 07777) || (tmp < 0)) {
2911 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2912 optarg);
2913 return (5);
2914 }
2916 default_socket.socket_permissions = (mode_t)tmp;
2917 }
2918 break;
2920 case 'P':
2921 {
2922 char *optcopy;
2923 char *saveptr;
2924 char *dummy;
2925 char *ptr;
2927 socket_permission_clear (&default_socket);
2929 optcopy = strdup (optarg);
2930 dummy = optcopy;
2931 saveptr = NULL;
2932 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2933 {
2934 dummy = NULL;
2935 status = socket_permission_add (&default_socket, ptr);
2936 if (status != 0)
2937 {
2938 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2939 "socket failed. Most likely, this permission doesn't "
2940 "exist. Check your command line.\n", ptr);
2941 status = 4;
2942 }
2943 }
2945 free (optcopy);
2946 }
2947 break;
2949 case 'f':
2950 {
2951 int temp;
2953 temp = atoi (optarg);
2954 if (temp > 0)
2955 config_flush_interval = temp;
2956 else
2957 {
2958 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2959 status = 3;
2960 }
2961 }
2962 break;
2964 case 'w':
2965 {
2966 int temp;
2968 temp = atoi (optarg);
2969 if (temp > 0)
2970 config_write_interval = temp;
2971 else
2972 {
2973 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2974 status = 2;
2975 }
2976 }
2977 break;
2979 case 'z':
2980 {
2981 int temp;
2983 temp = atoi(optarg);
2984 if (temp > 0)
2985 config_write_jitter = temp;
2986 else
2987 {
2988 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2989 status = 2;
2990 }
2992 break;
2993 }
2995 case 't':
2996 {
2997 int threads;
2998 threads = atoi(optarg);
2999 if (threads >= 1)
3000 config_queue_threads = threads;
3001 else
3002 {
3003 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3004 return 1;
3005 }
3006 }
3007 break;
3009 case 'B':
3010 config_write_base_only = 1;
3011 break;
3013 case 'b':
3014 {
3015 size_t len;
3016 char base_realpath[PATH_MAX];
3018 if (config_base_dir != NULL)
3019 free (config_base_dir);
3020 config_base_dir = strdup (optarg);
3021 if (config_base_dir == NULL)
3022 {
3023 fprintf (stderr, "read_options: strdup failed.\n");
3024 return (3);
3025 }
3027 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3028 {
3029 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3030 config_base_dir, rrd_strerror (errno));
3031 return (3);
3032 }
3034 /* make sure that the base directory is not resolved via
3035 * symbolic links. this makes some performance-enhancing
3036 * assumptions possible (we don't have to resolve paths
3037 * that start with a "/")
3038 */
3039 if (realpath(config_base_dir, base_realpath) == NULL)
3040 {
3041 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3042 "%s\n", config_base_dir, rrd_strerror(errno));
3043 return 5;
3044 }
3046 len = strlen (config_base_dir);
3047 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3048 {
3049 config_base_dir[len - 1] = 0;
3050 len--;
3051 }
3053 if (len < 1)
3054 {
3055 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3056 return (4);
3057 }
3059 _config_base_dir_len = len;
3061 len = strlen (base_realpath);
3062 while ((len > 0) && (base_realpath[len - 1] == '/'))
3063 {
3064 base_realpath[len - 1] = '\0';
3065 len--;
3066 }
3068 if (strncmp(config_base_dir,
3069 base_realpath, sizeof(base_realpath)) != 0)
3070 {
3071 fprintf(stderr,
3072 "Base directory (-b) resolved via file system links!\n"
3073 "Please consult rrdcached '-b' documentation!\n"
3074 "Consider specifying the real directory (%s)\n",
3075 base_realpath);
3076 return 5;
3077 }
3078 }
3079 break;
3081 case 'p':
3082 {
3083 if (config_pid_file != NULL)
3084 free (config_pid_file);
3085 config_pid_file = strdup (optarg);
3086 if (config_pid_file == NULL)
3087 {
3088 fprintf (stderr, "read_options: strdup failed.\n");
3089 return (3);
3090 }
3091 }
3092 break;
3094 case 'F':
3095 config_flush_at_shutdown = 1;
3096 break;
3098 case 'j':
3099 {
3100 char journal_dir_actual[PATH_MAX];
3101 const char *dir;
3102 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3104 status = rrd_mkdir_p(dir, 0777);
3105 if (status != 0)
3106 {
3107 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3108 dir, rrd_strerror(errno));
3109 return 6;
3110 }
3112 if (access(dir, R_OK|W_OK|X_OK) != 0)
3113 {
3114 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3115 errno ? rrd_strerror(errno) : "");
3116 return 6;
3117 }
3118 }
3119 break;
3121 case 'h':
3122 case '?':
3123 printf ("RRDCacheD %s\n"
3124 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3125 "\n"
3126 "Usage: rrdcached [options]\n"
3127 "\n"
3128 "Valid options are:\n"
3129 " -l <address> Socket address to listen to.\n"
3130 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3131 " -P <perms> Sets the permissions to assign to all following "
3132 "sockets\n"
3133 " -w <seconds> Interval in which to write data.\n"
3134 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3135 " -t <threads> Number of write threads.\n"
3136 " -f <seconds> Interval in which to flush dead data.\n"
3137 " -p <file> Location of the PID-file.\n"
3138 " -b <dir> Base directory to change to.\n"
3139 " -B Restrict file access to paths within -b <dir>\n"
3140 " -g Do not fork and run in the foreground.\n"
3141 " -j <dir> Directory in which to create the journal files.\n"
3142 " -F Always flush all updates at shutdown\n"
3143 " -s <id|name> Group owner of all following UNIX sockets\n"
3144 " (the socket will also have read/write permissions "
3145 "for that group)\n"
3146 " -m <mode> File permissions (octal) of all following UNIX "
3147 "sockets\n"
3148 "\n"
3149 "For more information and a detailed description of all options "
3150 "please refer\n"
3151 "to the rrdcached(1) manual page.\n",
3152 VERSION);
3153 if (option == 'h')
3154 status = -1;
3155 else
3156 status = 1;
3157 break;
3158 } /* switch (option) */
3159 } /* while (getopt) */
3161 /* advise the user when values are not sane */
3162 if (config_flush_interval < 2 * config_write_interval)
3163 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3164 " 2x write interval (-w) !\n");
3165 if (config_write_jitter > config_write_interval)
3166 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3167 " write interval (-w) !\n");
3169 if (config_write_base_only && config_base_dir == NULL)
3170 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3171 " Consult the rrdcached documentation\n");
3173 if (journal_dir == NULL)
3174 config_flush_at_shutdown = 1;
3176 return (status);
3177 } /* }}} int read_options */
3179 int main (int argc, char **argv)
3180 {
3181 int status;
3183 status = read_options (argc, argv);
3184 if (status != 0)
3185 {
3186 if (status < 0)
3187 status = 0;
3188 return (status);
3189 }
3191 status = daemonize ();
3192 if (status != 0)
3193 {
3194 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3195 return (1);
3196 }
3198 journal_init();
3200 /* start the queue threads */
3201 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3202 if (queue_threads == NULL)
3203 {
3204 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3205 cleanup();
3206 return (1);
3207 }
3208 for (int i = 0; i < config_queue_threads; i++)
3209 {
3210 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3211 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3212 if (status != 0)
3213 {
3214 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3215 cleanup();
3216 return (1);
3217 }
3218 }
3220 /* start the flush thread */
3221 memset(&flush_thread, 0, sizeof(flush_thread));
3222 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3223 if (status != 0)
3224 {
3225 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3226 cleanup();
3227 return (1);
3228 }
3230 listen_thread_main (NULL);
3231 cleanup ();
3233 return (0);
3234 } /* int main */
3236 /*
3237 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3238 */