1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116 do { \
117 if (stay_foreground) { \
118 fprintf(stderr, __VA_ARGS__); \
119 fprintf(stderr, "\n"); } \
120 syslog ((severity), __VA_ARGS__); \
121 } while (0)
123 /*
124 * Types
125 */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
129 {
130 int fd;
131 char addr[PATH_MAX + 1];
132 int family;
134 /* state for BATCH processing */
135 time_t batch_start;
136 int batch_cmd;
138 /* buffered IO */
139 char *rbuf;
140 off_t next_cmd;
141 off_t next_read;
143 char *wbuf;
144 ssize_t wbuf_len;
146 uint32_t permissions;
148 gid_t socket_group;
149 mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
157 time_t UNUSED(now),\
158 char UNUSED(*buffer),\
159 size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO command_t UNUSED(*cmd),\
162 DISPATCH_PROTO
164 struct command_s {
165 char *cmd;
166 int (*handler)(HANDLER_PROTO);
168 char context; /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT (1<<0)
170 #define CMD_CONTEXT_BATCH (1<<1)
171 #define CMD_CONTEXT_JOURNAL (1<<2)
172 #define CMD_CONTEXT_ANY (0x7f)
174 char *syntax;
175 char *help;
176 };
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182 char *file;
183 char **values;
184 size_t values_num;
185 time_t last_flush_time;
186 time_t last_update_stamp;
187 #define CI_FLAGS_IN_TREE (1<<0)
188 #define CI_FLAGS_IN_QUEUE (1<<1)
189 int flags;
190 pthread_cond_t flushed;
191 cache_item_t *prev;
192 cache_item_t *next;
193 };
195 struct callback_flush_data_s
196 {
197 time_t now;
198 time_t abs_timeout;
199 char **keys;
200 size_t keys_num;
201 };
202 typedef struct callback_flush_data_s callback_flush_data_t;
204 enum queue_side_e
205 {
206 HEAD,
207 TAIL
208 };
209 typedef enum queue_side_e queue_side_t;
211 /* describe a set of journal files */
212 typedef struct {
213 char **files;
214 size_t files_num;
215 } journal_set;
217 /* max length of socket command or response */
218 #define CMD_MAX 4096
219 #define RBUF_SIZE (CMD_MAX*2)
221 /*
222 * Variables
223 */
224 static int stay_foreground = 0;
225 static uid_t daemon_uid;
227 static listen_socket_t *listen_fds = NULL;
228 static size_t listen_fds_num = 0;
230 static listen_socket_t default_socket;
232 enum {
233 RUNNING, /* normal operation */
234 FLUSHING, /* flushing remaining values */
235 SHUTDOWN /* shutting down */
236 } state = RUNNING;
238 static pthread_t *queue_threads;
239 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
240 static int config_queue_threads = 4;
242 static pthread_t flush_thread;
243 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
245 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
246 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
247 static int connection_threads_num = 0;
249 /* Cache stuff */
250 static GTree *cache_tree = NULL;
251 static cache_item_t *cache_queue_head = NULL;
252 static cache_item_t *cache_queue_tail = NULL;
253 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
255 static int config_write_interval = 300;
256 static int config_write_jitter = 0;
257 static int config_flush_interval = 3600;
258 static int config_flush_at_shutdown = 0;
259 static char *config_pid_file = NULL;
260 static char *config_base_dir = NULL;
261 static size_t _config_base_dir_len = 0;
262 static int config_write_base_only = 0;
264 static listen_socket_t **config_listen_address_list = NULL;
265 static size_t config_listen_address_list_len = 0;
267 static uint64_t stats_queue_length = 0;
268 static uint64_t stats_updates_received = 0;
269 static uint64_t stats_flush_received = 0;
270 static uint64_t stats_updates_written = 0;
271 static uint64_t stats_data_sets_written = 0;
272 static uint64_t stats_journal_bytes = 0;
273 static uint64_t stats_journal_rotate = 0;
274 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
276 /* Journaled updates */
277 #define JOURNAL_REPLAY(s) ((s) == NULL)
278 #define JOURNAL_BASE "rrd.journal"
279 static journal_set *journal_cur = NULL;
280 static journal_set *journal_old = NULL;
281 static char *journal_dir = NULL;
282 static FILE *journal_fh = NULL; /* current journal file handle */
283 static long journal_size = 0; /* current journal size */
284 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
285 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
286 static int journal_write(char *cmd, char *args);
287 static void journal_done(void);
288 static void journal_rotate(void);
290 /* prototypes for forward refernces */
291 static int handle_request_help (HANDLER_PROTO);
293 /*
294 * Functions
295 */
296 static void sig_common (const char *sig) /* {{{ */
297 {
298 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
299 state = FLUSHING;
300 pthread_cond_broadcast(&flush_cond);
301 pthread_cond_broadcast(&queue_cond);
302 } /* }}} void sig_common */
304 static void sig_int_handler (int UNUSED(s)) /* {{{ */
305 {
306 sig_common("INT");
307 } /* }}} void sig_int_handler */
309 static void sig_term_handler (int UNUSED(s)) /* {{{ */
310 {
311 sig_common("TERM");
312 } /* }}} void sig_term_handler */
314 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
315 {
316 config_flush_at_shutdown = 1;
317 sig_common("USR1");
318 } /* }}} void sig_usr1_handler */
320 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
321 {
322 config_flush_at_shutdown = 0;
323 sig_common("USR2");
324 } /* }}} void sig_usr2_handler */
326 static void install_signal_handlers(void) /* {{{ */
327 {
328 /* These structures are static, because `sigaction' behaves weird if the are
329 * overwritten.. */
330 static struct sigaction sa_int;
331 static struct sigaction sa_term;
332 static struct sigaction sa_pipe;
333 static struct sigaction sa_usr1;
334 static struct sigaction sa_usr2;
336 /* Install signal handlers */
337 memset (&sa_int, 0, sizeof (sa_int));
338 sa_int.sa_handler = sig_int_handler;
339 sigaction (SIGINT, &sa_int, NULL);
341 memset (&sa_term, 0, sizeof (sa_term));
342 sa_term.sa_handler = sig_term_handler;
343 sigaction (SIGTERM, &sa_term, NULL);
345 memset (&sa_pipe, 0, sizeof (sa_pipe));
346 sa_pipe.sa_handler = SIG_IGN;
347 sigaction (SIGPIPE, &sa_pipe, NULL);
349 memset (&sa_pipe, 0, sizeof (sa_usr1));
350 sa_usr1.sa_handler = sig_usr1_handler;
351 sigaction (SIGUSR1, &sa_usr1, NULL);
353 memset (&sa_usr2, 0, sizeof (sa_usr2));
354 sa_usr2.sa_handler = sig_usr2_handler;
355 sigaction (SIGUSR2, &sa_usr2, NULL);
357 } /* }}} void install_signal_handlers */
359 static int open_pidfile(char *action, int oflag) /* {{{ */
360 {
361 int fd;
362 const char *file;
363 char *file_copy, *dir;
365 file = (config_pid_file != NULL)
366 ? config_pid_file
367 : LOCALSTATEDIR "/run/rrdcached.pid";
369 /* dirname may modify its argument */
370 file_copy = strdup(file);
371 if (file_copy == NULL)
372 {
373 fprintf(stderr, "rrdcached: strdup(): %s\n",
374 rrd_strerror(errno));
375 return -1;
376 }
378 dir = dirname(file_copy);
379 if (rrd_mkdir_p(dir, 0777) != 0)
380 {
381 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
382 dir, rrd_strerror(errno));
383 return -1;
384 }
386 free(file_copy);
388 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
389 if (fd < 0)
390 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
391 action, file, rrd_strerror(errno));
393 return(fd);
394 } /* }}} static int open_pidfile */
396 /* check existing pid file to see whether a daemon is running */
397 static int check_pidfile(void)
398 {
399 int pid_fd;
400 pid_t pid;
401 char pid_str[16];
403 pid_fd = open_pidfile("open", O_RDWR);
404 if (pid_fd < 0)
405 return pid_fd;
407 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
408 return -1;
410 pid = atoi(pid_str);
411 if (pid <= 0)
412 return -1;
414 /* another running process that we can signal COULD be
415 * a competing rrdcached */
416 if (pid != getpid() && kill(pid, 0) == 0)
417 {
418 fprintf(stderr,
419 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
420 close(pid_fd);
421 return -1;
422 }
424 lseek(pid_fd, 0, SEEK_SET);
425 if (ftruncate(pid_fd, 0) == -1)
426 {
427 fprintf(stderr,
428 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
429 close(pid_fd);
430 return -1;
431 }
433 fprintf(stderr,
434 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
435 "rrdcached: starting normally.\n", pid);
437 return pid_fd;
438 } /* }}} static int check_pidfile */
440 static int write_pidfile (int fd) /* {{{ */
441 {
442 pid_t pid;
443 FILE *fh;
445 pid = getpid ();
447 fh = fdopen (fd, "w");
448 if (fh == NULL)
449 {
450 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
451 close(fd);
452 return (-1);
453 }
455 fprintf (fh, "%i\n", (int) pid);
456 fclose (fh);
458 return (0);
459 } /* }}} int write_pidfile */
461 static int remove_pidfile (void) /* {{{ */
462 {
463 char *file;
464 int status;
466 file = (config_pid_file != NULL)
467 ? config_pid_file
468 : LOCALSTATEDIR "/run/rrdcached.pid";
470 status = unlink (file);
471 if (status == 0)
472 return (0);
473 return (errno);
474 } /* }}} int remove_pidfile */
476 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
477 {
478 char *eol;
480 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
481 sock->next_read - sock->next_cmd);
483 if (eol == NULL)
484 {
485 /* no commands left, move remainder back to front of rbuf */
486 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
487 sock->next_read - sock->next_cmd);
488 sock->next_read -= sock->next_cmd;
489 sock->next_cmd = 0;
490 *len = 0;
491 return NULL;
492 }
493 else
494 {
495 char *cmd = sock->rbuf + sock->next_cmd;
496 *eol = '\0';
498 sock->next_cmd = eol - sock->rbuf + 1;
500 if (eol > sock->rbuf && *(eol-1) == '\r')
501 *(--eol) = '\0'; /* handle "\r\n" EOL */
503 *len = eol - cmd;
505 return cmd;
506 }
508 /* NOTREACHED */
509 assert(1==0);
510 } /* }}} char *next_cmd */
512 /* add the characters directly to the write buffer */
513 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
514 {
515 char *new_buf;
517 assert(sock != NULL);
519 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
520 if (new_buf == NULL)
521 {
522 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
523 return -1;
524 }
526 strncpy(new_buf + sock->wbuf_len, str, len + 1);
528 sock->wbuf = new_buf;
529 sock->wbuf_len += len;
531 return 0;
532 } /* }}} static int add_to_wbuf */
534 /* add the text to the "extra" info that's sent after the status line */
535 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
536 {
537 va_list argp;
538 char buffer[CMD_MAX];
539 int len;
541 if (JOURNAL_REPLAY(sock)) return 0;
542 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
544 va_start(argp, fmt);
545 #ifdef HAVE_VSNPRINTF
546 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
547 #else
548 len = vsprintf(buffer, fmt, argp);
549 #endif
550 va_end(argp);
551 if (len < 0)
552 {
553 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
554 return -1;
555 }
557 return add_to_wbuf(sock, buffer, len);
558 } /* }}} static int add_response_info */
560 static int count_lines(char *str) /* {{{ */
561 {
562 int lines = 0;
564 if (str != NULL)
565 {
566 while ((str = strchr(str, '\n')) != NULL)
567 {
568 ++lines;
569 ++str;
570 }
571 }
573 return lines;
574 } /* }}} static int count_lines */
576 /* send the response back to the user.
577 * returns 0 on success, -1 on error
578 * write buffer is always zeroed after this call */
579 static int send_response (listen_socket_t *sock, response_code rc,
580 char *fmt, ...) /* {{{ */
581 {
582 va_list argp;
583 char buffer[CMD_MAX];
584 int lines;
585 ssize_t wrote;
586 int rclen, len;
588 if (JOURNAL_REPLAY(sock)) return rc;
590 if (sock->batch_start)
591 {
592 if (rc == RESP_OK)
593 return rc; /* no response on success during BATCH */
594 lines = sock->batch_cmd;
595 }
596 else if (rc == RESP_OK)
597 lines = count_lines(sock->wbuf);
598 else
599 lines = -1;
601 rclen = sprintf(buffer, "%d ", lines);
602 va_start(argp, fmt);
603 #ifdef HAVE_VSNPRINTF
604 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
605 #else
606 len = vsprintf(buffer+rclen, fmt, argp);
607 #endif
608 va_end(argp);
609 if (len < 0)
610 return -1;
612 len += rclen;
614 /* append the result to the wbuf, don't write to the user */
615 if (sock->batch_start)
616 return add_to_wbuf(sock, buffer, len);
618 /* first write must be complete */
619 if (len != write(sock->fd, buffer, len))
620 {
621 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
622 return -1;
623 }
625 if (sock->wbuf != NULL && rc == RESP_OK)
626 {
627 wrote = 0;
628 while (wrote < sock->wbuf_len)
629 {
630 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
631 if (wb <= 0)
632 {
633 RRDD_LOG(LOG_INFO, "send_response: could not write results");
634 return -1;
635 }
636 wrote += wb;
637 }
638 }
640 free(sock->wbuf); sock->wbuf = NULL;
641 sock->wbuf_len = 0;
643 return 0;
644 } /* }}} */
646 static void wipe_ci_values(cache_item_t *ci, time_t when)
647 {
648 ci->values = NULL;
649 ci->values_num = 0;
651 ci->last_flush_time = when;
652 if (config_write_jitter > 0)
653 ci->last_flush_time += (rrd_random() % config_write_jitter);
654 }
656 /* remove_from_queue
657 * remove a "cache_item_t" item from the queue.
658 * must hold 'cache_lock' when calling this
659 */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
661 {
662 if (ci == NULL) return;
663 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
665 if (ci->prev == NULL)
666 cache_queue_head = ci->next; /* reset head */
667 else
668 ci->prev->next = ci->next;
670 if (ci->next == NULL)
671 cache_queue_tail = ci->prev; /* reset the tail */
672 else
673 ci->next->prev = ci->prev;
675 ci->next = ci->prev = NULL;
676 ci->flags &= ~CI_FLAGS_IN_QUEUE;
678 pthread_mutex_lock (&stats_lock);
679 assert (stats_queue_length > 0);
680 stats_queue_length--;
681 pthread_mutex_unlock (&stats_lock);
683 } /* }}} static void remove_from_queue */
685 /* free the resources associated with the cache_item_t
686 * must hold cache_lock when calling this function
687 */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
689 {
690 if (ci == NULL) return NULL;
692 remove_from_queue(ci);
694 for (size_t i=0; i < ci->values_num; i++)
695 free(ci->values[i]);
697 free (ci->values);
698 free (ci->file);
700 /* in case anyone is waiting */
701 pthread_cond_broadcast(&ci->flushed);
702 pthread_cond_destroy(&ci->flushed);
704 free (ci);
706 return NULL;
707 } /* }}} static void *free_cache_item */
709 /*
710 * enqueue_cache_item:
711 * `cache_lock' must be acquired before calling this function!
712 */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714 queue_side_t side)
715 {
716 if (ci == NULL)
717 return (-1);
719 if (ci->values_num == 0)
720 return (0);
722 if (side == HEAD)
723 {
724 if (cache_queue_head == ci)
725 return 0;
727 /* remove if further down in queue */
728 remove_from_queue(ci);
730 ci->prev = NULL;
731 ci->next = cache_queue_head;
732 if (ci->next != NULL)
733 ci->next->prev = ci;
734 cache_queue_head = ci;
736 if (cache_queue_tail == NULL)
737 cache_queue_tail = cache_queue_head;
738 }
739 else /* (side == TAIL) */
740 {
741 /* We don't move values back in the list.. */
742 if (ci->flags & CI_FLAGS_IN_QUEUE)
743 return (0);
745 assert (ci->next == NULL);
746 assert (ci->prev == NULL);
748 ci->prev = cache_queue_tail;
750 if (cache_queue_tail == NULL)
751 cache_queue_head = ci;
752 else
753 cache_queue_tail->next = ci;
755 cache_queue_tail = ci;
756 }
758 ci->flags |= CI_FLAGS_IN_QUEUE;
760 pthread_cond_signal(&queue_cond);
761 pthread_mutex_lock (&stats_lock);
762 stats_queue_length++;
763 pthread_mutex_unlock (&stats_lock);
765 return (0);
766 } /* }}} int enqueue_cache_item */
768 /*
769 * tree_callback_flush:
770 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771 * while this is in progress.
772 */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774 gpointer data)
775 {
776 cache_item_t *ci;
777 callback_flush_data_t *cfd;
779 ci = (cache_item_t *) value;
780 cfd = (callback_flush_data_t *) data;
782 if (ci->flags & CI_FLAGS_IN_QUEUE)
783 return FALSE;
785 if (ci->values_num > 0
786 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787 {
788 enqueue_cache_item (ci, TAIL);
789 }
790 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791 && (ci->values_num <= 0))
792 {
793 assert ((char *) key == ci->file);
794 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795 {
796 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797 return (FALSE);
798 }
799 }
801 return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
804 static int flush_old_values (int max_age)
805 {
806 callback_flush_data_t cfd;
807 size_t k;
809 memset (&cfd, 0, sizeof (cfd));
810 /* Pass the current time as user data so that we don't need to call
811 * `time' for each node. */
812 cfd.now = time (NULL);
813 cfd.keys = NULL;
814 cfd.keys_num = 0;
816 if (max_age > 0)
817 cfd.abs_timeout = cfd.now - max_age;
818 else
819 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
821 /* `tree_callback_flush' will return the keys of all values that haven't
822 * been touched in the last `config_flush_interval' seconds in `cfd'.
823 * The char*'s in this array point to the same memory as ci->file, so we
824 * don't need to free them separately. */
825 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
827 for (k = 0; k < cfd.keys_num; k++)
828 {
829 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830 /* should never fail, since we have held the cache_lock
831 * the entire time */
832 assert(status == TRUE);
833 }
835 if (cfd.keys != NULL)
836 {
837 free (cfd.keys);
838 cfd.keys = NULL;
839 }
841 return (0);
842 } /* int flush_old_values */
844 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
845 {
846 struct timeval now;
847 struct timespec next_flush;
848 int status;
850 gettimeofday (&now, NULL);
851 next_flush.tv_sec = now.tv_sec + config_flush_interval;
852 next_flush.tv_nsec = 1000 * now.tv_usec;
854 pthread_mutex_lock(&cache_lock);
856 while (state == RUNNING)
857 {
858 gettimeofday (&now, NULL);
859 if ((now.tv_sec > next_flush.tv_sec)
860 || ((now.tv_sec == next_flush.tv_sec)
861 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862 {
863 RRDD_LOG(LOG_DEBUG, "flushing old values");
865 /* Determine the time of the next cache flush. */
866 next_flush.tv_sec = now.tv_sec + config_flush_interval;
868 /* Flush all values that haven't been written in the last
869 * `config_write_interval' seconds. */
870 flush_old_values (config_write_interval);
872 /* unlock the cache while we rotate so we don't block incoming
873 * updates if the fsync() blocks on disk I/O */
874 pthread_mutex_unlock(&cache_lock);
875 journal_rotate();
876 pthread_mutex_lock(&cache_lock);
877 }
879 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880 if (status != 0 && status != ETIMEDOUT)
881 {
882 RRDD_LOG (LOG_ERR, "flush_thread_main: "
883 "pthread_cond_timedwait returned %i.", status);
884 }
885 }
887 if (config_flush_at_shutdown)
888 flush_old_values (-1); /* flush everything */
890 state = SHUTDOWN;
892 pthread_mutex_unlock(&cache_lock);
894 return NULL;
895 } /* void *flush_thread_main */
897 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
898 {
899 pthread_mutex_lock (&cache_lock);
901 while (state != SHUTDOWN
902 || (cache_queue_head != NULL && config_flush_at_shutdown))
903 {
904 cache_item_t *ci;
905 char *file;
906 char **values;
907 size_t values_num;
908 int status;
910 /* Now, check if there's something to store away. If not, wait until
911 * something comes in. */
912 if (cache_queue_head == NULL)
913 {
914 status = pthread_cond_wait (&queue_cond, &cache_lock);
915 if ((status != 0) && (status != ETIMEDOUT))
916 {
917 RRDD_LOG (LOG_ERR, "queue_thread_main: "
918 "pthread_cond_wait returned %i.", status);
919 }
920 }
922 /* Check if a value has arrived. This may be NULL if we timed out or there
923 * was an interrupt such as a signal. */
924 if (cache_queue_head == NULL)
925 continue;
927 ci = cache_queue_head;
929 /* copy the relevant parts */
930 file = strdup (ci->file);
931 if (file == NULL)
932 {
933 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934 continue;
935 }
937 assert(ci->values != NULL);
938 assert(ci->values_num > 0);
940 values = ci->values;
941 values_num = ci->values_num;
943 wipe_ci_values(ci, time(NULL));
944 remove_from_queue(ci);
946 pthread_mutex_unlock (&cache_lock);
948 rrd_clear_error ();
949 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950 if (status != 0)
951 {
952 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953 "rrd_update_r (%s) failed with status %i. (%s)",
954 file, status, rrd_get_error());
955 }
957 journal_write("wrote", file);
959 /* Search again in the tree. It's possible someone issued a "FORGET"
960 * while we were writing the update values. */
961 pthread_mutex_lock(&cache_lock);
962 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963 if (ci)
964 pthread_cond_broadcast(&ci->flushed);
965 pthread_mutex_unlock(&cache_lock);
967 if (status == 0)
968 {
969 pthread_mutex_lock (&stats_lock);
970 stats_updates_written++;
971 stats_data_sets_written += values_num;
972 pthread_mutex_unlock (&stats_lock);
973 }
975 rrd_free_ptrs((void ***) &values, &values_num);
976 free(file);
978 pthread_mutex_lock (&cache_lock);
979 }
980 pthread_mutex_unlock (&cache_lock);
982 return (NULL);
983 } /* }}} void *queue_thread_main */
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986 size_t *buffer_size_ret, char **field_ret)
987 {
988 char *buffer;
989 size_t buffer_pos;
990 size_t buffer_size;
991 char *field;
992 size_t field_size;
993 int status;
995 buffer = *buffer_ret;
996 buffer_pos = 0;
997 buffer_size = *buffer_size_ret;
998 field = *buffer_ret;
999 field_size = 0;
1001 if (buffer_size <= 0)
1002 return (-1);
1004 /* This is ensured by `handle_request'. */
1005 assert (buffer[buffer_size - 1] == '\0');
1007 status = -1;
1008 while (buffer_pos < buffer_size)
1009 {
1010 /* Check for end-of-field or end-of-buffer */
1011 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012 {
1013 field[field_size] = 0;
1014 field_size++;
1015 buffer_pos++;
1016 status = 0;
1017 break;
1018 }
1019 /* Handle escaped characters. */
1020 else if (buffer[buffer_pos] == '\\')
1021 {
1022 if (buffer_pos >= (buffer_size - 1))
1023 break;
1024 buffer_pos++;
1025 field[field_size] = buffer[buffer_pos];
1026 field_size++;
1027 buffer_pos++;
1028 }
1029 /* Normal operation */
1030 else
1031 {
1032 field[field_size] = buffer[buffer_pos];
1033 field_size++;
1034 buffer_pos++;
1035 }
1036 } /* while (buffer_pos < buffer_size) */
1038 if (status != 0)
1039 return (status);
1041 *buffer_ret = buffer + buffer_pos;
1042 *buffer_size_ret = buffer_size - buffer_pos;
1043 *field_ret = field;
1045 return (0);
1046 } /* }}} int buffer_get_field */
1048 /* if we're restricting writes to the base directory,
1049 * check whether the file falls within the dir
1050 * returns 1 if OK, otherwise 0
1051 */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1053 {
1054 assert(file != NULL);
1056 if (!config_write_base_only
1057 || JOURNAL_REPLAY(sock)
1058 || config_base_dir == NULL)
1059 return 1;
1061 if (strstr(file, "../") != NULL) goto err;
1063 /* relative paths without "../" are ok */
1064 if (*file != '/') return 1;
1066 /* file must be of the format base + "/" + <1+ char filename> */
1067 if (strlen(file) < _config_base_dir_len + 2) goto err;
1068 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069 if (*(file + _config_base_dir_len) != '/') goto err;
1071 return 1;
1073 err:
1074 if (sock != NULL && sock->fd >= 0)
1075 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1077 return 0;
1078 } /* }}} static int check_file_access */
1080 /* when using a base dir, convert relative paths to absolute paths.
1081 * if necessary, modifies the "filename" pointer to point
1082 * to the new path created in "tmp". "tmp" is provided
1083 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084 *
1085 * this allows us to optimize for the expected case (absolute path)
1086 * with a no-op.
1087 */
1088 static void get_abs_path(char **filename, char *tmp)
1089 {
1090 assert(tmp != NULL);
1091 assert(filename != NULL && *filename != NULL);
1093 if (config_base_dir == NULL || **filename == '/')
1094 return;
1096 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097 *filename = tmp;
1098 } /* }}} static int get_abs_path */
1100 static int flush_file (const char *filename) /* {{{ */
1101 {
1102 cache_item_t *ci;
1104 pthread_mutex_lock (&cache_lock);
1106 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107 if (ci == NULL)
1108 {
1109 pthread_mutex_unlock (&cache_lock);
1110 return (ENOENT);
1111 }
1113 if (ci->values_num > 0)
1114 {
1115 /* Enqueue at head */
1116 enqueue_cache_item (ci, HEAD);
1117 pthread_cond_wait(&ci->flushed, &cache_lock);
1118 }
1120 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1121 * may have been purged during our cond_wait() */
1123 pthread_mutex_unlock(&cache_lock);
1125 return (0);
1126 } /* }}} int flush_file */
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1129 {
1130 char *err = "Syntax error.\n";
1132 if (cmd && cmd->syntax)
1133 err = cmd->syntax;
1135 return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1139 {
1140 uint64_t copy_queue_length;
1141 uint64_t copy_updates_received;
1142 uint64_t copy_flush_received;
1143 uint64_t copy_updates_written;
1144 uint64_t copy_data_sets_written;
1145 uint64_t copy_journal_bytes;
1146 uint64_t copy_journal_rotate;
1148 uint64_t tree_nodes_number;
1149 uint64_t tree_depth;
1151 pthread_mutex_lock (&stats_lock);
1152 copy_queue_length = stats_queue_length;
1153 copy_updates_received = stats_updates_received;
1154 copy_flush_received = stats_flush_received;
1155 copy_updates_written = stats_updates_written;
1156 copy_data_sets_written = stats_data_sets_written;
1157 copy_journal_bytes = stats_journal_bytes;
1158 copy_journal_rotate = stats_journal_rotate;
1159 pthread_mutex_unlock (&stats_lock);
1161 pthread_mutex_lock (&cache_lock);
1162 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163 tree_depth = (uint64_t) g_tree_height (cache_tree);
1164 pthread_mutex_unlock (&cache_lock);
1166 add_response_info(sock,
1167 "QueueLength: %"PRIu64"\n", copy_queue_length);
1168 add_response_info(sock,
1169 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170 add_response_info(sock,
1171 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172 add_response_info(sock,
1173 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174 add_response_info(sock,
1175 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1181 send_response(sock, RESP_OK, "Statistics follow\n");
1183 return (0);
1184 } /* }}} int handle_request_stats */
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1187 {
1188 char *file, file_tmp[PATH_MAX];
1189 int status;
1191 status = buffer_get_field (&buffer, &buffer_size, &file);
1192 if (status != 0)
1193 {
1194 return syntax_error(sock,cmd);
1195 }
1196 else
1197 {
1198 pthread_mutex_lock(&stats_lock);
1199 stats_flush_received++;
1200 pthread_mutex_unlock(&stats_lock);
1202 get_abs_path(&file, file_tmp);
1203 if (!check_file_access(file, sock)) return 0;
1205 status = flush_file (file);
1206 if (status == 0)
1207 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208 else if (status == ENOENT)
1209 {
1210 /* no file in our tree; see whether it exists at all */
1211 struct stat statbuf;
1213 memset(&statbuf, 0, sizeof(statbuf));
1214 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216 else
1217 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218 }
1219 else if (status < 0)
1220 return send_response(sock, RESP_ERR, "Internal error.\n");
1221 else
1222 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223 }
1225 /* NOTREACHED */
1226 assert(1==0);
1227 } /* }}} int handle_request_flush */
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1230 {
1231 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1233 pthread_mutex_lock(&cache_lock);
1234 flush_old_values(-1);
1235 pthread_mutex_unlock(&cache_lock);
1237 return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1241 {
1242 int status;
1243 char *file, file_tmp[PATH_MAX];
1244 cache_item_t *ci;
1246 status = buffer_get_field(&buffer, &buffer_size, &file);
1247 if (status != 0)
1248 return syntax_error(sock,cmd);
1250 get_abs_path(&file, file_tmp);
1252 pthread_mutex_lock(&cache_lock);
1253 ci = g_tree_lookup(cache_tree, file);
1254 if (ci == NULL)
1255 {
1256 pthread_mutex_unlock(&cache_lock);
1257 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258 }
1260 for (size_t i=0; i < ci->values_num; i++)
1261 add_response_info(sock, "%s\n", ci->values[i]);
1263 pthread_mutex_unlock(&cache_lock);
1264 return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1268 {
1269 int status;
1270 gboolean found;
1271 char *file, file_tmp[PATH_MAX];
1273 status = buffer_get_field(&buffer, &buffer_size, &file);
1274 if (status != 0)
1275 return syntax_error(sock,cmd);
1277 get_abs_path(&file, file_tmp);
1278 if (!check_file_access(file, sock)) return 0;
1280 pthread_mutex_lock(&cache_lock);
1281 found = g_tree_remove(cache_tree, file);
1282 pthread_mutex_unlock(&cache_lock);
1284 if (found == TRUE)
1285 {
1286 if (!JOURNAL_REPLAY(sock))
1287 journal_write("forget", file);
1289 return send_response(sock, RESP_OK, "Gone!\n");
1290 }
1291 else
1292 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1294 /* NOTREACHED */
1295 assert(1==0);
1296 } /* }}} static int handle_request_forget */
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1299 {
1300 cache_item_t *ci;
1302 pthread_mutex_lock(&cache_lock);
1304 ci = cache_queue_head;
1305 while (ci != NULL)
1306 {
1307 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308 ci = ci->next;
1309 }
1311 pthread_mutex_unlock(&cache_lock);
1313 return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1317 {
1318 char *file, file_tmp[PATH_MAX];
1319 int values_num = 0;
1320 int status;
1321 char orig_buf[CMD_MAX];
1323 cache_item_t *ci;
1325 /* save it for the journal later */
1326 if (!JOURNAL_REPLAY(sock))
1327 strncpy(orig_buf, buffer, buffer_size);
1329 status = buffer_get_field (&buffer, &buffer_size, &file);
1330 if (status != 0)
1331 return syntax_error(sock,cmd);
1333 pthread_mutex_lock(&stats_lock);
1334 stats_updates_received++;
1335 pthread_mutex_unlock(&stats_lock);
1337 get_abs_path(&file, file_tmp);
1338 if (!check_file_access(file, sock)) return 0;
1340 pthread_mutex_lock (&cache_lock);
1341 ci = g_tree_lookup (cache_tree, file);
1343 if (ci == NULL) /* {{{ */
1344 {
1345 struct stat statbuf;
1346 cache_item_t *tmp;
1348 /* don't hold the lock while we setup; stat(2) might block */
1349 pthread_mutex_unlock(&cache_lock);
1351 memset (&statbuf, 0, sizeof (statbuf));
1352 status = stat (file, &statbuf);
1353 if (status != 0)
1354 {
1355 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1357 status = errno;
1358 if (status == ENOENT)
1359 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360 else
1361 return send_response(sock, RESP_ERR,
1362 "stat failed with error %i.\n", status);
1363 }
1364 if (!S_ISREG (statbuf.st_mode))
1365 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1367 if (access(file, R_OK|W_OK) != 0)
1368 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369 file, rrd_strerror(errno));
1371 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372 if (ci == NULL)
1373 {
1374 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1376 return send_response(sock, RESP_ERR, "malloc failed.\n");
1377 }
1378 memset (ci, 0, sizeof (cache_item_t));
1380 ci->file = strdup (file);
1381 if (ci->file == NULL)
1382 {
1383 free (ci);
1384 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1386 return send_response(sock, RESP_ERR, "strdup failed.\n");
1387 }
1389 wipe_ci_values(ci, now);
1390 ci->flags = CI_FLAGS_IN_TREE;
1391 pthread_cond_init(&ci->flushed, NULL);
1393 pthread_mutex_lock(&cache_lock);
1395 /* another UPDATE might have added this entry in the meantime */
1396 tmp = g_tree_lookup (cache_tree, file);
1397 if (tmp == NULL)
1398 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399 else
1400 {
1401 free_cache_item (ci);
1402 ci = tmp;
1403 }
1405 /* state may have changed while we were unlocked */
1406 if (state == SHUTDOWN)
1407 return -1;
1408 } /* }}} */
1409 assert (ci != NULL);
1411 /* don't re-write updates in replay mode */
1412 if (!JOURNAL_REPLAY(sock))
1413 journal_write("update", orig_buf);
1415 while (buffer_size > 0)
1416 {
1417 char *value;
1418 time_t stamp;
1419 char *eostamp;
1421 status = buffer_get_field (&buffer, &buffer_size, &value);
1422 if (status != 0)
1423 {
1424 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425 break;
1426 }
1428 /* make sure update time is always moving forward */
1429 stamp = strtol(value, &eostamp, 10);
1430 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1431 {
1432 pthread_mutex_unlock(&cache_lock);
1433 return send_response(sock, RESP_ERR,
1434 "Cannot find timestamp in '%s'!\n", value);
1435 }
1436 else if (stamp <= ci->last_update_stamp)
1437 {
1438 pthread_mutex_unlock(&cache_lock);
1439 return send_response(sock, RESP_ERR,
1440 "illegal attempt to update using time %ld when last"
1441 " update time is %ld (minimum one second step)\n",
1442 stamp, ci->last_update_stamp);
1443 }
1444 else
1445 ci->last_update_stamp = stamp;
1447 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1448 {
1449 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1450 continue;
1451 }
1453 values_num++;
1454 }
1456 if (((now - ci->last_flush_time) >= config_write_interval)
1457 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1458 && (ci->values_num > 0))
1459 {
1460 enqueue_cache_item (ci, TAIL);
1461 }
1463 pthread_mutex_unlock (&cache_lock);
1465 if (values_num < 1)
1466 return send_response(sock, RESP_ERR, "No values updated.\n");
1467 else
1468 return send_response(sock, RESP_OK,
1469 "errors, enqueued %i value(s).\n", values_num);
1471 /* NOTREACHED */
1472 assert(1==0);
1474 } /* }}} int handle_request_update */
1476 /* we came across a "WROTE" entry during journal replay.
1477 * throw away any values that we have accumulated for this file
1478 */
1479 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1480 {
1481 cache_item_t *ci;
1482 const char *file = buffer;
1484 pthread_mutex_lock(&cache_lock);
1486 ci = g_tree_lookup(cache_tree, file);
1487 if (ci == NULL)
1488 {
1489 pthread_mutex_unlock(&cache_lock);
1490 return (0);
1491 }
1493 if (ci->values)
1494 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1496 wipe_ci_values(ci, now);
1497 remove_from_queue(ci);
1499 pthread_mutex_unlock(&cache_lock);
1500 return (0);
1501 } /* }}} int handle_request_wrote */
1503 /* start "BATCH" processing */
1504 static int batch_start (HANDLER_PROTO) /* {{{ */
1505 {
1506 int status;
1507 if (sock->batch_start)
1508 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1510 status = send_response(sock, RESP_OK,
1511 "Go ahead. End with dot '.' on its own line.\n");
1512 sock->batch_start = time(NULL);
1513 sock->batch_cmd = 0;
1515 return status;
1516 } /* }}} static int batch_start */
1518 /* finish "BATCH" processing and return results to the client */
1519 static int batch_done (HANDLER_PROTO) /* {{{ */
1520 {
1521 assert(sock->batch_start);
1522 sock->batch_start = 0;
1523 sock->batch_cmd = 0;
1524 return send_response(sock, RESP_OK, "errors\n");
1525 } /* }}} static int batch_done */
1527 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1528 {
1529 return -1;
1530 } /* }}} static int handle_request_quit */
1532 static command_t list_of_commands[] = { /* {{{ */
1533 {
1534 "UPDATE",
1535 handle_request_update,
1536 CMD_CONTEXT_ANY,
1537 "UPDATE <filename> <values> [<values> ...]\n"
1538 ,
1539 "Adds the given file to the internal cache if it is not yet known and\n"
1540 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1541 "for details.\n"
1542 "\n"
1543 "Each <values> has the following form:\n"
1544 " <values> = <time>:<value>[:<value>[...]]\n"
1545 "See the rrdupdate(1) manpage for details.\n"
1546 },
1547 {
1548 "WROTE",
1549 handle_request_wrote,
1550 CMD_CONTEXT_JOURNAL,
1551 NULL,
1552 NULL
1553 },
1554 {
1555 "FLUSH",
1556 handle_request_flush,
1557 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1558 "FLUSH <filename>\n"
1559 ,
1560 "Adds the given filename to the head of the update queue and returns\n"
1561 "after it has been dequeued.\n"
1562 },
1563 {
1564 "FLUSHALL",
1565 handle_request_flushall,
1566 CMD_CONTEXT_CLIENT,
1567 "FLUSHALL\n"
1568 ,
1569 "Triggers writing of all pending updates. Returns immediately.\n"
1570 },
1571 {
1572 "PENDING",
1573 handle_request_pending,
1574 CMD_CONTEXT_CLIENT,
1575 "PENDING <filename>\n"
1576 ,
1577 "Shows any 'pending' updates for a file, in order.\n"
1578 "The updates shown have not yet been written to the underlying RRD file.\n"
1579 },
1580 {
1581 "FORGET",
1582 handle_request_forget,
1583 CMD_CONTEXT_ANY,
1584 "FORGET <filename>\n"
1585 ,
1586 "Removes the file completely from the cache.\n"
1587 "Any pending updates for the file will be lost.\n"
1588 },
1589 {
1590 "QUEUE",
1591 handle_request_queue,
1592 CMD_CONTEXT_CLIENT,
1593 "QUEUE\n"
1594 ,
1595 "Shows all files in the output queue.\n"
1596 "The output is zero or more lines in the following format:\n"
1597 "(where <num_vals> is the number of values to be written)\n"
1598 "\n"
1599 "<num_vals> <filename>\n"
1600 },
1601 {
1602 "STATS",
1603 handle_request_stats,
1604 CMD_CONTEXT_CLIENT,
1605 "STATS\n"
1606 ,
1607 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1608 "a description of the values.\n"
1609 },
1610 {
1611 "HELP",
1612 handle_request_help,
1613 CMD_CONTEXT_CLIENT,
1614 "HELP [<command>]\n",
1615 NULL, /* special! */
1616 },
1617 {
1618 "BATCH",
1619 batch_start,
1620 CMD_CONTEXT_CLIENT,
1621 "BATCH\n"
1622 ,
1623 "The 'BATCH' command permits the client to initiate a bulk load\n"
1624 " of commands to rrdcached.\n"
1625 "\n"
1626 "Usage:\n"
1627 "\n"
1628 " client: BATCH\n"
1629 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1630 " client: command #1\n"
1631 " client: command #2\n"
1632 " client: ... and so on\n"
1633 " client: .\n"
1634 " server: 2 errors\n"
1635 " server: 7 message for command #7\n"
1636 " server: 9 message for command #9\n"
1637 "\n"
1638 "For more information, consult the rrdcached(1) documentation.\n"
1639 },
1640 {
1641 ".", /* BATCH terminator */
1642 batch_done,
1643 CMD_CONTEXT_BATCH,
1644 NULL,
1645 NULL
1646 },
1647 {
1648 "QUIT",
1649 handle_request_quit,
1650 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1651 "QUIT\n"
1652 ,
1653 "Disconnect from rrdcached.\n"
1654 }
1655 }; /* }}} command_t list_of_commands[] */
1656 static size_t list_of_commands_len = sizeof (list_of_commands)
1657 / sizeof (list_of_commands[0]);
1659 static command_t *find_command(char *cmd)
1660 {
1661 size_t i;
1663 for (i = 0; i < list_of_commands_len; i++)
1664 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1665 return (&list_of_commands[i]);
1666 return NULL;
1667 }
1669 /* We currently use the index in the `list_of_commands' array as a bit position
1670 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1671 * outside these functions so that switching to a more elegant storage method
1672 * is easily possible. */
1673 static ssize_t find_command_index (const char *cmd) /* {{{ */
1674 {
1675 size_t i;
1677 for (i = 0; i < list_of_commands_len; i++)
1678 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1679 return ((ssize_t) i);
1680 return (-1);
1681 } /* }}} ssize_t find_command_index */
1683 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1684 const char *cmd)
1685 {
1686 ssize_t i;
1688 if (JOURNAL_REPLAY(sock))
1689 return (1);
1691 if (cmd == NULL)
1692 return (-1);
1694 if ((strcasecmp ("QUIT", cmd) == 0)
1695 || (strcasecmp ("HELP", cmd) == 0))
1696 return (1);
1697 else if (strcmp (".", cmd) == 0)
1698 cmd = "BATCH";
1700 i = find_command_index (cmd);
1701 if (i < 0)
1702 return (-1);
1703 assert (i < 32);
1705 if ((sock->permissions & (1 << i)) != 0)
1706 return (1);
1707 return (0);
1708 } /* }}} int socket_permission_check */
1710 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1711 const char *cmd)
1712 {
1713 ssize_t i;
1715 i = find_command_index (cmd);
1716 if (i < 0)
1717 return (-1);
1718 assert (i < 32);
1720 sock->permissions |= (1 << i);
1721 return (0);
1722 } /* }}} int socket_permission_add */
1724 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1725 {
1726 sock->permissions = 0;
1727 } /* }}} socket_permission_clear */
1729 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1730 listen_socket_t *src)
1731 {
1732 dest->permissions = src->permissions;
1733 } /* }}} socket_permission_copy */
1735 /* check whether commands are received in the expected context */
1736 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1737 {
1738 if (JOURNAL_REPLAY(sock))
1739 return (cmd->context & CMD_CONTEXT_JOURNAL);
1740 else if (sock->batch_start)
1741 return (cmd->context & CMD_CONTEXT_BATCH);
1742 else
1743 return (cmd->context & CMD_CONTEXT_CLIENT);
1745 /* NOTREACHED */
1746 assert(1==0);
1747 }
1749 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1750 {
1751 int status;
1752 char *cmd_str;
1753 char *resp_txt;
1754 command_t *help = NULL;
1756 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1757 if (status == 0)
1758 help = find_command(cmd_str);
1760 if (help && (help->syntax || help->help))
1761 {
1762 char tmp[CMD_MAX];
1764 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1765 resp_txt = tmp;
1767 if (help->syntax)
1768 add_response_info(sock, "Usage: %s\n", help->syntax);
1770 if (help->help)
1771 add_response_info(sock, "%s\n", help->help);
1772 }
1773 else
1774 {
1775 size_t i;
1777 resp_txt = "Command overview\n";
1779 for (i = 0; i < list_of_commands_len; i++)
1780 {
1781 if (list_of_commands[i].syntax == NULL)
1782 continue;
1783 add_response_info (sock, "%s", list_of_commands[i].syntax);
1784 }
1785 }
1787 return send_response(sock, RESP_OK, resp_txt);
1788 } /* }}} int handle_request_help */
1790 static int handle_request (DISPATCH_PROTO) /* {{{ */
1791 {
1792 char *buffer_ptr = buffer;
1793 char *cmd_str = NULL;
1794 command_t *cmd = NULL;
1795 int status;
1797 assert (buffer[buffer_size - 1] == '\0');
1799 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1800 if (status != 0)
1801 {
1802 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1803 return (-1);
1804 }
1806 if (sock != NULL && sock->batch_start)
1807 sock->batch_cmd++;
1809 cmd = find_command(cmd_str);
1810 if (!cmd)
1811 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1813 if (!socket_permission_check (sock, cmd->cmd))
1814 return send_response(sock, RESP_ERR, "Permission denied.\n");
1816 if (!command_check_context(sock, cmd))
1817 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1819 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1820 } /* }}} int handle_request */
1822 static void journal_set_free (journal_set *js) /* {{{ */
1823 {
1824 if (js == NULL)
1825 return;
1827 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1829 free(js);
1830 } /* }}} journal_set_free */
1832 static void journal_set_remove (journal_set *js) /* {{{ */
1833 {
1834 if (js == NULL)
1835 return;
1837 for (uint i=0; i < js->files_num; i++)
1838 {
1839 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1840 unlink(js->files[i]);
1841 }
1842 } /* }}} journal_set_remove */
1844 /* close current journal file handle.
1845 * MUST hold journal_lock before calling */
1846 static void journal_close(void) /* {{{ */
1847 {
1848 if (journal_fh != NULL)
1849 {
1850 if (fclose(journal_fh) != 0)
1851 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1852 }
1854 journal_fh = NULL;
1855 journal_size = 0;
1856 } /* }}} journal_close */
1858 /* MUST hold journal_lock before calling */
1859 static void journal_new_file(void) /* {{{ */
1860 {
1861 struct timeval now;
1862 int new_fd;
1863 char new_file[PATH_MAX + 1];
1865 assert(journal_dir != NULL);
1866 assert(journal_cur != NULL);
1868 journal_close();
1870 gettimeofday(&now, NULL);
1871 /* this format assures that the files sort in strcmp() order */
1872 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1873 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1875 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1876 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1877 if (new_fd < 0)
1878 goto error;
1880 journal_fh = fdopen(new_fd, "a");
1881 if (journal_fh == NULL)
1882 goto error;
1884 journal_size = ftell(journal_fh);
1885 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1887 /* record the file in the journal set */
1888 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1890 return;
1892 error:
1893 RRDD_LOG(LOG_CRIT,
1894 "JOURNALING DISABLED: Error while trying to create %s : %s",
1895 new_file, rrd_strerror(errno));
1896 RRDD_LOG(LOG_CRIT,
1897 "JOURNALING DISABLED: All values will be flushed at shutdown");
1899 close(new_fd);
1900 config_flush_at_shutdown = 1;
1902 } /* }}} journal_new_file */
1904 /* MUST NOT hold journal_lock before calling this */
1905 static void journal_rotate(void) /* {{{ */
1906 {
1907 journal_set *old_js = NULL;
1909 if (journal_dir == NULL)
1910 return;
1912 RRDD_LOG(LOG_DEBUG, "rotating journals");
1914 pthread_mutex_lock(&stats_lock);
1915 ++stats_journal_rotate;
1916 pthread_mutex_unlock(&stats_lock);
1918 pthread_mutex_lock(&journal_lock);
1920 journal_close();
1922 /* rotate the journal sets */
1923 old_js = journal_old;
1924 journal_old = journal_cur;
1925 journal_cur = calloc(1, sizeof(journal_set));
1927 if (journal_cur != NULL)
1928 journal_new_file();
1929 else
1930 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1932 pthread_mutex_unlock(&journal_lock);
1934 journal_set_remove(old_js);
1935 journal_set_free (old_js);
1937 } /* }}} static void journal_rotate */
1939 /* MUST hold journal_lock when calling */
1940 static void journal_done(void) /* {{{ */
1941 {
1942 if (journal_cur == NULL)
1943 return;
1945 journal_close();
1947 if (config_flush_at_shutdown)
1948 {
1949 RRDD_LOG(LOG_INFO, "removing journals");
1950 journal_set_remove(journal_old);
1951 journal_set_remove(journal_cur);
1952 }
1953 else
1954 {
1955 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1956 "journals will be used at next startup");
1957 }
1959 journal_set_free(journal_cur);
1960 journal_set_free(journal_old);
1961 free(journal_dir);
1963 } /* }}} static void journal_done */
1965 static int journal_write(char *cmd, char *args) /* {{{ */
1966 {
1967 int chars;
1969 if (journal_fh == NULL)
1970 return 0;
1972 pthread_mutex_lock(&journal_lock);
1973 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1974 journal_size += chars;
1976 if (journal_size > JOURNAL_MAX)
1977 journal_new_file();
1979 pthread_mutex_unlock(&journal_lock);
1981 if (chars > 0)
1982 {
1983 pthread_mutex_lock(&stats_lock);
1984 stats_journal_bytes += chars;
1985 pthread_mutex_unlock(&stats_lock);
1986 }
1988 return chars;
1989 } /* }}} static int journal_write */
1991 static int journal_replay (const char *file) /* {{{ */
1992 {
1993 FILE *fh;
1994 int entry_cnt = 0;
1995 int fail_cnt = 0;
1996 uint64_t line = 0;
1997 char entry[CMD_MAX];
1998 time_t now;
2000 if (file == NULL) return 0;
2002 {
2003 char *reason = "unknown error";
2004 int status = 0;
2005 struct stat statbuf;
2007 memset(&statbuf, 0, sizeof(statbuf));
2008 if (stat(file, &statbuf) != 0)
2009 {
2010 reason = "stat error";
2011 status = errno;
2012 }
2013 else if (!S_ISREG(statbuf.st_mode))
2014 {
2015 reason = "not a regular file";
2016 status = EPERM;
2017 }
2018 if (statbuf.st_uid != daemon_uid)
2019 {
2020 reason = "not owned by daemon user";
2021 status = EACCES;
2022 }
2023 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2024 {
2025 reason = "must not be user/group writable";
2026 status = EACCES;
2027 }
2029 if (status != 0)
2030 {
2031 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2032 file, rrd_strerror(status), reason);
2033 return 0;
2034 }
2035 }
2037 fh = fopen(file, "r");
2038 if (fh == NULL)
2039 {
2040 if (errno != ENOENT)
2041 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2042 file, rrd_strerror(errno));
2043 return 0;
2044 }
2045 else
2046 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2048 now = time(NULL);
2050 while(!feof(fh))
2051 {
2052 size_t entry_len;
2054 ++line;
2055 if (fgets(entry, sizeof(entry), fh) == NULL)
2056 break;
2057 entry_len = strlen(entry);
2059 /* check \n termination in case journal writing crashed mid-line */
2060 if (entry_len == 0)
2061 continue;
2062 else if (entry[entry_len - 1] != '\n')
2063 {
2064 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2065 ++fail_cnt;
2066 continue;
2067 }
2069 entry[entry_len - 1] = '\0';
2071 if (handle_request(NULL, now, entry, entry_len) == 0)
2072 ++entry_cnt;
2073 else
2074 ++fail_cnt;
2075 }
2077 fclose(fh);
2079 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2080 entry_cnt, fail_cnt);
2082 return entry_cnt > 0 ? 1 : 0;
2083 } /* }}} static int journal_replay */
2085 static int journal_sort(const void *v1, const void *v2)
2086 {
2087 char **jn1 = (char **) v1;
2088 char **jn2 = (char **) v2;
2090 return strcmp(*jn1,*jn2);
2091 }
2093 static void journal_init(void) /* {{{ */
2094 {
2095 int had_journal = 0;
2096 DIR *dir;
2097 struct dirent *dent;
2098 char path[PATH_MAX+1];
2100 if (journal_dir == NULL) return;
2102 pthread_mutex_lock(&journal_lock);
2104 journal_cur = calloc(1, sizeof(journal_set));
2105 if (journal_cur == NULL)
2106 {
2107 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2108 return;
2109 }
2111 RRDD_LOG(LOG_INFO, "checking for journal files");
2113 /* Handle old journal files during transition. This gives them the
2114 * correct sort order. TODO: remove after first release
2115 */
2116 {
2117 char old_path[PATH_MAX+1];
2118 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2119 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2120 rename(old_path, path);
2122 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2123 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2124 rename(old_path, path);
2125 }
2127 dir = opendir(journal_dir);
2128 if (!dir) {
2129 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2130 return;
2131 }
2132 while ((dent = readdir(dir)) != NULL)
2133 {
2134 /* looks like a journal file? */
2135 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2136 continue;
2138 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2140 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2141 {
2142 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2143 dent->d_name);
2144 break;
2145 }
2146 }
2147 closedir(dir);
2149 qsort(journal_cur->files, journal_cur->files_num,
2150 sizeof(journal_cur->files[0]), journal_sort);
2152 for (uint i=0; i < journal_cur->files_num; i++)
2153 had_journal += journal_replay(journal_cur->files[i]);
2155 journal_new_file();
2157 /* it must have been a crash. start a flush */
2158 if (had_journal && config_flush_at_shutdown)
2159 flush_old_values(-1);
2161 pthread_mutex_unlock(&journal_lock);
2163 RRDD_LOG(LOG_INFO, "journal processing complete");
2165 } /* }}} static void journal_init */
2167 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2168 {
2169 assert(sock != NULL);
2171 free(sock->rbuf); sock->rbuf = NULL;
2172 free(sock->wbuf); sock->wbuf = NULL;
2173 free(sock);
2174 } /* }}} void free_listen_socket */
2176 static void close_connection(listen_socket_t *sock) /* {{{ */
2177 {
2178 if (sock->fd >= 0)
2179 {
2180 close(sock->fd);
2181 sock->fd = -1;
2182 }
2184 free_listen_socket(sock);
2186 } /* }}} void close_connection */
2188 static void *connection_thread_main (void *args) /* {{{ */
2189 {
2190 listen_socket_t *sock;
2191 int fd;
2193 sock = (listen_socket_t *) args;
2194 fd = sock->fd;
2196 /* init read buffers */
2197 sock->next_read = sock->next_cmd = 0;
2198 sock->rbuf = malloc(RBUF_SIZE);
2199 if (sock->rbuf == NULL)
2200 {
2201 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2202 close_connection(sock);
2203 return NULL;
2204 }
2206 pthread_mutex_lock (&connection_threads_lock);
2207 connection_threads_num++;
2208 pthread_mutex_unlock (&connection_threads_lock);
2210 while (state == RUNNING)
2211 {
2212 char *cmd;
2213 ssize_t cmd_len;
2214 ssize_t rbytes;
2215 time_t now;
2217 struct pollfd pollfd;
2218 int status;
2220 pollfd.fd = fd;
2221 pollfd.events = POLLIN | POLLPRI;
2222 pollfd.revents = 0;
2224 status = poll (&pollfd, 1, /* timeout = */ 500);
2225 if (state != RUNNING)
2226 break;
2227 else if (status == 0) /* timeout */
2228 continue;
2229 else if (status < 0) /* error */
2230 {
2231 status = errno;
2232 if (status != EINTR)
2233 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2234 continue;
2235 }
2237 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2238 break;
2239 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2240 {
2241 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2242 "poll(2) returned something unexpected: %#04hx",
2243 pollfd.revents);
2244 break;
2245 }
2247 rbytes = read(fd, sock->rbuf + sock->next_read,
2248 RBUF_SIZE - sock->next_read);
2249 if (rbytes < 0)
2250 {
2251 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2252 break;
2253 }
2254 else if (rbytes == 0)
2255 break; /* eof */
2257 sock->next_read += rbytes;
2259 if (sock->batch_start)
2260 now = sock->batch_start;
2261 else
2262 now = time(NULL);
2264 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2265 {
2266 status = handle_request (sock, now, cmd, cmd_len+1);
2267 if (status != 0)
2268 goto out_close;
2269 }
2270 }
2272 out_close:
2273 close_connection(sock);
2275 /* Remove this thread from the connection threads list */
2276 pthread_mutex_lock (&connection_threads_lock);
2277 connection_threads_num--;
2278 if (connection_threads_num <= 0)
2279 pthread_cond_broadcast(&connection_threads_done);
2280 pthread_mutex_unlock (&connection_threads_lock);
2282 return (NULL);
2283 } /* }}} void *connection_thread_main */
2285 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2286 {
2287 int fd;
2288 struct sockaddr_un sa;
2289 listen_socket_t *temp;
2290 int status;
2291 const char *path;
2292 char *path_copy, *dir;
2294 path = sock->addr;
2295 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2296 path += strlen("unix:");
2298 /* dirname may modify its argument */
2299 path_copy = strdup(path);
2300 if (path_copy == NULL)
2301 {
2302 fprintf(stderr, "rrdcached: strdup(): %s\n",
2303 rrd_strerror(errno));
2304 return (-1);
2305 }
2307 dir = dirname(path_copy);
2308 if (rrd_mkdir_p(dir, 0777) != 0)
2309 {
2310 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2311 dir, rrd_strerror(errno));
2312 return (-1);
2313 }
2315 free(path_copy);
2317 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2318 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2319 if (temp == NULL)
2320 {
2321 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2322 return (-1);
2323 }
2324 listen_fds = temp;
2325 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2327 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2328 if (fd < 0)
2329 {
2330 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2331 rrd_strerror(errno));
2332 return (-1);
2333 }
2335 memset (&sa, 0, sizeof (sa));
2336 sa.sun_family = AF_UNIX;
2337 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2339 /* if we've gotten this far, we own the pid file. any daemon started
2340 * with the same args must not be alive. therefore, ensure that we can
2341 * create the socket...
2342 */
2343 unlink(path);
2345 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2346 if (status != 0)
2347 {
2348 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2349 path, rrd_strerror(errno));
2350 close (fd);
2351 return (-1);
2352 }
2354 /* tweak the sockets group ownership */
2355 if (sock->socket_group != (gid_t)-1)
2356 {
2357 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2358 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2359 {
2360 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2361 }
2362 }
2364 if (sock->socket_permissions != (mode_t)-1)
2365 {
2366 if (chmod(path, sock->socket_permissions) != 0)
2367 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2368 (unsigned int)sock->socket_permissions, strerror(errno));
2369 }
2371 status = listen (fd, /* backlog = */ 10);
2372 if (status != 0)
2373 {
2374 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2375 path, rrd_strerror(errno));
2376 close (fd);
2377 unlink (path);
2378 return (-1);
2379 }
2381 listen_fds[listen_fds_num].fd = fd;
2382 listen_fds[listen_fds_num].family = PF_UNIX;
2383 strncpy(listen_fds[listen_fds_num].addr, path,
2384 sizeof (listen_fds[listen_fds_num].addr) - 1);
2385 listen_fds_num++;
2387 return (0);
2388 } /* }}} int open_listen_socket_unix */
2390 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2391 {
2392 struct addrinfo ai_hints;
2393 struct addrinfo *ai_res;
2394 struct addrinfo *ai_ptr;
2395 char addr_copy[NI_MAXHOST];
2396 char *addr;
2397 char *port;
2398 int status;
2400 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2401 addr_copy[sizeof (addr_copy) - 1] = 0;
2402 addr = addr_copy;
2404 memset (&ai_hints, 0, sizeof (ai_hints));
2405 ai_hints.ai_flags = 0;
2406 #ifdef AI_ADDRCONFIG
2407 ai_hints.ai_flags |= AI_ADDRCONFIG;
2408 #endif
2409 ai_hints.ai_family = AF_UNSPEC;
2410 ai_hints.ai_socktype = SOCK_STREAM;
2412 port = NULL;
2413 if (*addr == '[') /* IPv6+port format */
2414 {
2415 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2416 addr++;
2418 port = strchr (addr, ']');
2419 if (port == NULL)
2420 {
2421 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2422 return (-1);
2423 }
2424 *port = 0;
2425 port++;
2427 if (*port == ':')
2428 port++;
2429 else if (*port == 0)
2430 port = NULL;
2431 else
2432 {
2433 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2434 return (-1);
2435 }
2436 } /* if (*addr == '[') */
2437 else
2438 {
2439 port = rindex(addr, ':');
2440 if (port != NULL)
2441 {
2442 *port = 0;
2443 port++;
2444 }
2445 }
2446 ai_res = NULL;
2447 status = getaddrinfo (addr,
2448 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2449 &ai_hints, &ai_res);
2450 if (status != 0)
2451 {
2452 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2453 addr, gai_strerror (status));
2454 return (-1);
2455 }
2457 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2458 {
2459 int fd;
2460 listen_socket_t *temp;
2461 int one = 1;
2463 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2464 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2465 if (temp == NULL)
2466 {
2467 fprintf (stderr,
2468 "rrdcached: open_listen_socket_network: realloc failed.\n");
2469 continue;
2470 }
2471 listen_fds = temp;
2472 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2474 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2475 if (fd < 0)
2476 {
2477 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2478 rrd_strerror(errno));
2479 continue;
2480 }
2482 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2484 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2485 if (status != 0)
2486 {
2487 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2488 sock->addr, rrd_strerror(errno));
2489 close (fd);
2490 continue;
2491 }
2493 status = listen (fd, /* backlog = */ 10);
2494 if (status != 0)
2495 {
2496 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2497 sock->addr, rrd_strerror(errno));
2498 close (fd);
2499 freeaddrinfo(ai_res);
2500 return (-1);
2501 }
2503 listen_fds[listen_fds_num].fd = fd;
2504 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2505 listen_fds_num++;
2506 } /* for (ai_ptr) */
2508 freeaddrinfo(ai_res);
2509 return (0);
2510 } /* }}} static int open_listen_socket_network */
2512 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2513 {
2514 assert(sock != NULL);
2515 assert(sock->addr != NULL);
2517 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2518 || sock->addr[0] == '/')
2519 return (open_listen_socket_unix(sock));
2520 else
2521 return (open_listen_socket_network(sock));
2522 } /* }}} int open_listen_socket */
2524 static int close_listen_sockets (void) /* {{{ */
2525 {
2526 size_t i;
2528 for (i = 0; i < listen_fds_num; i++)
2529 {
2530 close (listen_fds[i].fd);
2532 if (listen_fds[i].family == PF_UNIX)
2533 unlink(listen_fds[i].addr);
2534 }
2536 free (listen_fds);
2537 listen_fds = NULL;
2538 listen_fds_num = 0;
2540 return (0);
2541 } /* }}} int close_listen_sockets */
2543 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2544 {
2545 struct pollfd *pollfds;
2546 int pollfds_num;
2547 int status;
2548 int i;
2550 if (listen_fds_num < 1)
2551 {
2552 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2553 return (NULL);
2554 }
2556 pollfds_num = listen_fds_num;
2557 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2558 if (pollfds == NULL)
2559 {
2560 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2561 return (NULL);
2562 }
2563 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2565 RRDD_LOG(LOG_INFO, "listening for connections");
2567 while (state == RUNNING)
2568 {
2569 for (i = 0; i < pollfds_num; i++)
2570 {
2571 pollfds[i].fd = listen_fds[i].fd;
2572 pollfds[i].events = POLLIN | POLLPRI;
2573 pollfds[i].revents = 0;
2574 }
2576 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2577 if (state != RUNNING)
2578 break;
2579 else if (status == 0) /* timeout */
2580 continue;
2581 else if (status < 0) /* error */
2582 {
2583 status = errno;
2584 if (status != EINTR)
2585 {
2586 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2587 }
2588 continue;
2589 }
2591 for (i = 0; i < pollfds_num; i++)
2592 {
2593 listen_socket_t *client_sock;
2594 struct sockaddr_storage client_sa;
2595 socklen_t client_sa_size;
2596 pthread_t tid;
2597 pthread_attr_t attr;
2599 if (pollfds[i].revents == 0)
2600 continue;
2602 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2603 {
2604 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2605 "poll(2) returned something unexpected for listen FD #%i.",
2606 pollfds[i].fd);
2607 continue;
2608 }
2610 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2611 if (client_sock == NULL)
2612 {
2613 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2614 continue;
2615 }
2616 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2618 client_sa_size = sizeof (client_sa);
2619 client_sock->fd = accept (pollfds[i].fd,
2620 (struct sockaddr *) &client_sa, &client_sa_size);
2621 if (client_sock->fd < 0)
2622 {
2623 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2624 free(client_sock);
2625 continue;
2626 }
2628 pthread_attr_init (&attr);
2629 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2631 status = pthread_create (&tid, &attr, connection_thread_main,
2632 client_sock);
2633 if (status != 0)
2634 {
2635 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2636 close_connection(client_sock);
2637 continue;
2638 }
2639 } /* for (pollfds_num) */
2640 } /* while (state == RUNNING) */
2642 RRDD_LOG(LOG_INFO, "starting shutdown");
2644 close_listen_sockets ();
2646 pthread_mutex_lock (&connection_threads_lock);
2647 while (connection_threads_num > 0)
2648 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2649 pthread_mutex_unlock (&connection_threads_lock);
2651 free(pollfds);
2653 return (NULL);
2654 } /* }}} void *listen_thread_main */
2656 static int daemonize (void) /* {{{ */
2657 {
2658 int pid_fd;
2659 char *base_dir;
2661 daemon_uid = geteuid();
2663 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2664 if (pid_fd < 0)
2665 pid_fd = check_pidfile();
2666 if (pid_fd < 0)
2667 return pid_fd;
2669 /* open all the listen sockets */
2670 if (config_listen_address_list_len > 0)
2671 {
2672 for (size_t i = 0; i < config_listen_address_list_len; i++)
2673 open_listen_socket (config_listen_address_list[i]);
2675 rrd_free_ptrs((void ***) &config_listen_address_list,
2676 &config_listen_address_list_len);
2677 }
2678 else
2679 {
2680 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2681 sizeof(default_socket.addr) - 1);
2682 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2683 open_listen_socket (&default_socket);
2684 }
2686 if (listen_fds_num < 1)
2687 {
2688 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2689 goto error;
2690 }
2692 if (!stay_foreground)
2693 {
2694 pid_t child;
2696 child = fork ();
2697 if (child < 0)
2698 {
2699 fprintf (stderr, "daemonize: fork(2) failed.\n");
2700 goto error;
2701 }
2702 else if (child > 0)
2703 exit(0);
2705 /* Become session leader */
2706 setsid ();
2708 /* Open the first three file descriptors to /dev/null */
2709 close (2);
2710 close (1);
2711 close (0);
2713 open ("/dev/null", O_RDWR);
2714 if (dup(0) == -1 || dup(0) == -1){
2715 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2716 }
2717 } /* if (!stay_foreground) */
2719 /* Change into the /tmp directory. */
2720 base_dir = (config_base_dir != NULL)
2721 ? config_base_dir
2722 : "/tmp";
2724 if (chdir (base_dir) != 0)
2725 {
2726 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2727 goto error;
2728 }
2730 install_signal_handlers();
2732 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2733 RRDD_LOG(LOG_INFO, "starting up");
2735 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2736 (GDestroyNotify) free_cache_item);
2737 if (cache_tree == NULL)
2738 {
2739 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2740 goto error;
2741 }
2743 return write_pidfile (pid_fd);
2745 error:
2746 remove_pidfile();
2747 return -1;
2748 } /* }}} int daemonize */
2750 static int cleanup (void) /* {{{ */
2751 {
2752 pthread_cond_broadcast (&flush_cond);
2753 pthread_join (flush_thread, NULL);
2755 pthread_cond_broadcast (&queue_cond);
2756 for (int i = 0; i < config_queue_threads; i++)
2757 pthread_join (queue_threads[i], NULL);
2759 if (config_flush_at_shutdown)
2760 {
2761 assert(cache_queue_head == NULL);
2762 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2763 }
2765 free(queue_threads);
2766 free(config_base_dir);
2768 pthread_mutex_lock(&cache_lock);
2769 g_tree_destroy(cache_tree);
2771 pthread_mutex_lock(&journal_lock);
2772 journal_done();
2774 RRDD_LOG(LOG_INFO, "goodbye");
2775 closelog ();
2777 remove_pidfile ();
2778 free(config_pid_file);
2780 return (0);
2781 } /* }}} int cleanup */
2783 static int read_options (int argc, char **argv) /* {{{ */
2784 {
2785 int option;
2786 int status = 0;
2788 socket_permission_clear (&default_socket);
2790 default_socket.socket_group = (gid_t)-1;
2791 default_socket.socket_permissions = (mode_t)-1;
2793 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2794 {
2795 switch (option)
2796 {
2797 case 'g':
2798 stay_foreground=1;
2799 break;
2801 case 'l':
2802 {
2803 listen_socket_t *new;
2805 new = malloc(sizeof(listen_socket_t));
2806 if (new == NULL)
2807 {
2808 fprintf(stderr, "read_options: malloc failed.\n");
2809 return(2);
2810 }
2811 memset(new, 0, sizeof(listen_socket_t));
2813 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2815 /* Add permissions to the socket {{{ */
2816 if (default_socket.permissions != 0)
2817 {
2818 socket_permission_copy (new, &default_socket);
2819 }
2820 else /* if (default_socket.permissions == 0) */
2821 {
2822 /* Add permission for ALL commands to the socket. */
2823 size_t i;
2824 for (i = 0; i < list_of_commands_len; i++)
2825 {
2826 status = socket_permission_add (new, list_of_commands[i].cmd);
2827 if (status != 0)
2828 {
2829 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2830 "socket failed. This should never happen, ever! Sorry.\n",
2831 list_of_commands[i].cmd);
2832 status = 4;
2833 }
2834 }
2835 }
2836 /* }}} Done adding permissions. */
2838 new->socket_group = default_socket.socket_group;
2839 new->socket_permissions = default_socket.socket_permissions;
2841 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2842 &config_listen_address_list_len, new))
2843 {
2844 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2845 return (2);
2846 }
2847 }
2848 break;
2850 /* set socket group permissions */
2851 case 's':
2852 {
2853 gid_t group_gid;
2854 struct group *grp;
2856 group_gid = strtoul(optarg, NULL, 10);
2857 if (errno != EINVAL && group_gid>0)
2858 {
2859 /* we were passed a number */
2860 grp = getgrgid(group_gid);
2861 }
2862 else
2863 {
2864 grp = getgrnam(optarg);
2865 }
2867 if (grp)
2868 {
2869 default_socket.socket_group = grp->gr_gid;
2870 }
2871 else
2872 {
2873 /* no idea what the user wanted... */
2874 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2875 return (5);
2876 }
2877 }
2878 break;
2880 /* set socket file permissions */
2881 case 'm':
2882 {
2883 long tmp;
2884 char *endptr = NULL;
2886 tmp = strtol (optarg, &endptr, 8);
2887 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2888 || (tmp > 07777) || (tmp < 0)) {
2889 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2890 optarg);
2891 return (5);
2892 }
2894 default_socket.socket_permissions = (mode_t)tmp;
2895 }
2896 break;
2898 case 'P':
2899 {
2900 char *optcopy;
2901 char *saveptr;
2902 char *dummy;
2903 char *ptr;
2905 socket_permission_clear (&default_socket);
2907 optcopy = strdup (optarg);
2908 dummy = optcopy;
2909 saveptr = NULL;
2910 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2911 {
2912 dummy = NULL;
2913 status = socket_permission_add (&default_socket, ptr);
2914 if (status != 0)
2915 {
2916 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2917 "socket failed. Most likely, this permission doesn't "
2918 "exist. Check your command line.\n", ptr);
2919 status = 4;
2920 }
2921 }
2923 free (optcopy);
2924 }
2925 break;
2927 case 'f':
2928 {
2929 int temp;
2931 temp = atoi (optarg);
2932 if (temp > 0)
2933 config_flush_interval = temp;
2934 else
2935 {
2936 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2937 status = 3;
2938 }
2939 }
2940 break;
2942 case 'w':
2943 {
2944 int temp;
2946 temp = atoi (optarg);
2947 if (temp > 0)
2948 config_write_interval = temp;
2949 else
2950 {
2951 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2952 status = 2;
2953 }
2954 }
2955 break;
2957 case 'z':
2958 {
2959 int temp;
2961 temp = atoi(optarg);
2962 if (temp > 0)
2963 config_write_jitter = temp;
2964 else
2965 {
2966 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2967 status = 2;
2968 }
2970 break;
2971 }
2973 case 't':
2974 {
2975 int threads;
2976 threads = atoi(optarg);
2977 if (threads >= 1)
2978 config_queue_threads = threads;
2979 else
2980 {
2981 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2982 return 1;
2983 }
2984 }
2985 break;
2987 case 'B':
2988 config_write_base_only = 1;
2989 break;
2991 case 'b':
2992 {
2993 size_t len;
2994 char base_realpath[PATH_MAX];
2996 if (config_base_dir != NULL)
2997 free (config_base_dir);
2998 config_base_dir = strdup (optarg);
2999 if (config_base_dir == NULL)
3000 {
3001 fprintf (stderr, "read_options: strdup failed.\n");
3002 return (3);
3003 }
3005 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3006 {
3007 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3008 config_base_dir, rrd_strerror (errno));
3009 return (3);
3010 }
3012 /* make sure that the base directory is not resolved via
3013 * symbolic links. this makes some performance-enhancing
3014 * assumptions possible (we don't have to resolve paths
3015 * that start with a "/")
3016 */
3017 if (realpath(config_base_dir, base_realpath) == NULL)
3018 {
3019 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3020 "%s\n", config_base_dir, rrd_strerror(errno));
3021 return 5;
3022 }
3024 len = strlen (config_base_dir);
3025 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3026 {
3027 config_base_dir[len - 1] = 0;
3028 len--;
3029 }
3031 if (len < 1)
3032 {
3033 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3034 return (4);
3035 }
3037 _config_base_dir_len = len;
3039 len = strlen (base_realpath);
3040 while ((len > 0) && (base_realpath[len - 1] == '/'))
3041 {
3042 base_realpath[len - 1] = '\0';
3043 len--;
3044 }
3046 if (strncmp(config_base_dir,
3047 base_realpath, sizeof(base_realpath)) != 0)
3048 {
3049 fprintf(stderr,
3050 "Base directory (-b) resolved via file system links!\n"
3051 "Please consult rrdcached '-b' documentation!\n"
3052 "Consider specifying the real directory (%s)\n",
3053 base_realpath);
3054 return 5;
3055 }
3056 }
3057 break;
3059 case 'p':
3060 {
3061 if (config_pid_file != NULL)
3062 free (config_pid_file);
3063 config_pid_file = strdup (optarg);
3064 if (config_pid_file == NULL)
3065 {
3066 fprintf (stderr, "read_options: strdup failed.\n");
3067 return (3);
3068 }
3069 }
3070 break;
3072 case 'F':
3073 config_flush_at_shutdown = 1;
3074 break;
3076 case 'j':
3077 {
3078 char journal_dir_actual[PATH_MAX];
3079 const char *dir;
3080 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3082 status = rrd_mkdir_p(dir, 0777);
3083 if (status != 0)
3084 {
3085 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3086 dir, rrd_strerror(errno));
3087 return 6;
3088 }
3090 if (access(dir, R_OK|W_OK|X_OK) != 0)
3091 {
3092 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3093 errno ? rrd_strerror(errno) : "");
3094 return 6;
3095 }
3096 }
3097 break;
3099 case 'h':
3100 case '?':
3101 printf ("RRDCacheD %s\n"
3102 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3103 "\n"
3104 "Usage: rrdcached [options]\n"
3105 "\n"
3106 "Valid options are:\n"
3107 " -l <address> Socket address to listen to.\n"
3108 " -P <perms> Sets the permissions to assign to all following "
3109 "sockets\n"
3110 " -w <seconds> Interval in which to write data.\n"
3111 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3112 " -t <threads> Number of write threads.\n"
3113 " -f <seconds> Interval in which to flush dead data.\n"
3114 " -p <file> Location of the PID-file.\n"
3115 " -b <dir> Base directory to change to.\n"
3116 " -B Restrict file access to paths within -b <dir>\n"
3117 " -g Do not fork and run in the foreground.\n"
3118 " -j <dir> Directory in which to create the journal files.\n"
3119 " -F Always flush all updates at shutdown\n"
3120 " -s <id|name> Group owner of all following UNIX sockets\n"
3121 " (the socket will also have read/write permissions "
3122 "for that group)\n"
3123 " -m <mode> File permissions (octal) of all following UNIX "
3124 "sockets\n"
3125 "\n"
3126 "For more information and a detailed description of all options "
3127 "please refer\n"
3128 "to the rrdcached(1) manual page.\n",
3129 VERSION);
3130 if (option == 'h')
3131 status = -1;
3132 else
3133 status = 1;
3134 break;
3135 } /* switch (option) */
3136 } /* while (getopt) */
3138 /* advise the user when values are not sane */
3139 if (config_flush_interval < 2 * config_write_interval)
3140 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3141 " 2x write interval (-w) !\n");
3142 if (config_write_jitter > config_write_interval)
3143 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3144 " write interval (-w) !\n");
3146 if (config_write_base_only && config_base_dir == NULL)
3147 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3148 " Consult the rrdcached documentation\n");
3150 if (journal_dir == NULL)
3151 config_flush_at_shutdown = 1;
3153 return (status);
3154 } /* }}} int read_options */
3156 int main (int argc, char **argv)
3157 {
3158 int status;
3160 status = read_options (argc, argv);
3161 if (status != 0)
3162 {
3163 if (status < 0)
3164 status = 0;
3165 return (status);
3166 }
3168 status = daemonize ();
3169 if (status != 0)
3170 {
3171 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3172 return (1);
3173 }
3175 journal_init();
3177 /* start the queue threads */
3178 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3179 if (queue_threads == NULL)
3180 {
3181 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3182 cleanup();
3183 return (1);
3184 }
3185 for (int i = 0; i < config_queue_threads; i++)
3186 {
3187 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3188 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3189 if (status != 0)
3190 {
3191 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3192 cleanup();
3193 return (1);
3194 }
3195 }
3197 /* start the flush thread */
3198 memset(&flush_thread, 0, sizeof(flush_thread));
3199 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3200 if (status != 0)
3201 {
3202 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3203 cleanup();
3204 return (1);
3205 }
3207 listen_thread_main (NULL);
3208 cleanup ();
3210 return (0);
3211 } /* int main */
3213 /*
3214 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3215 */