1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116 do { \
117 if (stay_foreground) { \
118 fprintf(stderr, __VA_ARGS__); \
119 fprintf(stderr, "\n"); } \
120 syslog ((severity), __VA_ARGS__); \
121 } while (0)
123 /*
124 * Types
125 */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
129 {
130 int fd;
131 char addr[PATH_MAX + 1];
132 int family;
134 /* state for BATCH processing */
135 time_t batch_start;
136 int batch_cmd;
138 /* buffered IO */
139 char *rbuf;
140 off_t next_cmd;
141 off_t next_read;
143 char *wbuf;
144 ssize_t wbuf_len;
146 uint32_t permissions;
148 gid_t socket_group;
149 mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
157 time_t UNUSED(now),\
158 char UNUSED(*buffer),\
159 size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO command_t UNUSED(*cmd),\
162 DISPATCH_PROTO
164 struct command_s {
165 char *cmd;
166 int (*handler)(HANDLER_PROTO);
168 char context; /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT (1<<0)
170 #define CMD_CONTEXT_BATCH (1<<1)
171 #define CMD_CONTEXT_JOURNAL (1<<2)
172 #define CMD_CONTEXT_ANY (0x7f)
174 char *syntax;
175 char *help;
176 };
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182 char *file;
183 char **values;
184 size_t values_num; /* number of valid pointers */
185 size_t values_alloc; /* number of allocated pointers */
186 time_t last_flush_time;
187 time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
190 int flags;
191 pthread_cond_t flushed;
192 cache_item_t *prev;
193 cache_item_t *next;
194 };
196 struct callback_flush_data_s
197 {
198 time_t now;
199 time_t abs_timeout;
200 char **keys;
201 size_t keys_num;
202 };
203 typedef struct callback_flush_data_s callback_flush_data_t;
205 enum queue_side_e
206 {
207 HEAD,
208 TAIL
209 };
210 typedef enum queue_side_e queue_side_t;
212 /* describe a set of journal files */
213 typedef struct {
214 char **files;
215 size_t files_num;
216 } journal_set;
218 /* max length of socket command or response */
219 #define CMD_MAX 4096
220 #define RBUF_SIZE (CMD_MAX*2)
222 /*
223 * Variables
224 */
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
231 enum {
232 RUNNING, /* normal operation */
233 FLUSHING, /* flushing remaining values */
234 SHUTDOWN /* shutting down */
235 } state = RUNNING;
237 static pthread_t *queue_threads;
238 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
239 static int config_queue_threads = 4;
241 static pthread_t flush_thread;
242 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
244 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
245 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
246 static int connection_threads_num = 0;
248 /* Cache stuff */
249 static GTree *cache_tree = NULL;
250 static cache_item_t *cache_queue_head = NULL;
251 static cache_item_t *cache_queue_tail = NULL;
252 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
254 static int config_write_interval = 300;
255 static int config_write_jitter = 0;
256 static int config_flush_interval = 3600;
257 static int config_flush_at_shutdown = 0;
258 static char *config_pid_file = NULL;
259 static char *config_base_dir = NULL;
260 static size_t _config_base_dir_len = 0;
261 static int config_write_base_only = 0;
262 static size_t config_alloc_chunk = 1;
264 static listen_socket_t **config_listen_address_list = NULL;
265 static size_t config_listen_address_list_len = 0;
267 static uint64_t stats_queue_length = 0;
268 static uint64_t stats_updates_received = 0;
269 static uint64_t stats_flush_received = 0;
270 static uint64_t stats_updates_written = 0;
271 static uint64_t stats_data_sets_written = 0;
272 static uint64_t stats_journal_bytes = 0;
273 static uint64_t stats_journal_rotate = 0;
274 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
276 /* Journaled updates */
277 #define JOURNAL_REPLAY(s) ((s) == NULL)
278 #define JOURNAL_BASE "rrd.journal"
279 static journal_set *journal_cur = NULL;
280 static journal_set *journal_old = NULL;
281 static char *journal_dir = NULL;
282 static FILE *journal_fh = NULL; /* current journal file handle */
283 static long journal_size = 0; /* current journal size */
284 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
285 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
286 static int journal_write(char *cmd, char *args);
287 static void journal_done(void);
288 static void journal_rotate(void);
290 /* prototypes for forward refernces */
291 static int handle_request_help (HANDLER_PROTO);
293 /*
294 * Functions
295 */
296 static void sig_common (const char *sig) /* {{{ */
297 {
298 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
299 state = FLUSHING;
300 pthread_cond_broadcast(&flush_cond);
301 pthread_cond_broadcast(&queue_cond);
302 } /* }}} void sig_common */
304 static void sig_int_handler (int UNUSED(s)) /* {{{ */
305 {
306 sig_common("INT");
307 } /* }}} void sig_int_handler */
309 static void sig_term_handler (int UNUSED(s)) /* {{{ */
310 {
311 sig_common("TERM");
312 } /* }}} void sig_term_handler */
314 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
315 {
316 config_flush_at_shutdown = 1;
317 sig_common("USR1");
318 } /* }}} void sig_usr1_handler */
320 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
321 {
322 config_flush_at_shutdown = 0;
323 sig_common("USR2");
324 } /* }}} void sig_usr2_handler */
326 static void install_signal_handlers(void) /* {{{ */
327 {
328 /* These structures are static, because `sigaction' behaves weird if the are
329 * overwritten.. */
330 static struct sigaction sa_int;
331 static struct sigaction sa_term;
332 static struct sigaction sa_pipe;
333 static struct sigaction sa_usr1;
334 static struct sigaction sa_usr2;
336 /* Install signal handlers */
337 memset (&sa_int, 0, sizeof (sa_int));
338 sa_int.sa_handler = sig_int_handler;
339 sigaction (SIGINT, &sa_int, NULL);
341 memset (&sa_term, 0, sizeof (sa_term));
342 sa_term.sa_handler = sig_term_handler;
343 sigaction (SIGTERM, &sa_term, NULL);
345 memset (&sa_pipe, 0, sizeof (sa_pipe));
346 sa_pipe.sa_handler = SIG_IGN;
347 sigaction (SIGPIPE, &sa_pipe, NULL);
349 memset (&sa_pipe, 0, sizeof (sa_usr1));
350 sa_usr1.sa_handler = sig_usr1_handler;
351 sigaction (SIGUSR1, &sa_usr1, NULL);
353 memset (&sa_usr2, 0, sizeof (sa_usr2));
354 sa_usr2.sa_handler = sig_usr2_handler;
355 sigaction (SIGUSR2, &sa_usr2, NULL);
357 } /* }}} void install_signal_handlers */
359 static int open_pidfile(char *action, int oflag) /* {{{ */
360 {
361 int fd;
362 const char *file;
363 char *file_copy, *dir;
365 file = (config_pid_file != NULL)
366 ? config_pid_file
367 : LOCALSTATEDIR "/run/rrdcached.pid";
369 /* dirname may modify its argument */
370 file_copy = strdup(file);
371 if (file_copy == NULL)
372 {
373 fprintf(stderr, "rrdcached: strdup(): %s\n",
374 rrd_strerror(errno));
375 return -1;
376 }
378 dir = dirname(file_copy);
379 if (rrd_mkdir_p(dir, 0777) != 0)
380 {
381 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
382 dir, rrd_strerror(errno));
383 return -1;
384 }
386 free(file_copy);
388 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
389 if (fd < 0)
390 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
391 action, file, rrd_strerror(errno));
393 return(fd);
394 } /* }}} static int open_pidfile */
396 /* check existing pid file to see whether a daemon is running */
397 static int check_pidfile(void)
398 {
399 int pid_fd;
400 pid_t pid;
401 char pid_str[16];
403 pid_fd = open_pidfile("open", O_RDWR);
404 if (pid_fd < 0)
405 return pid_fd;
407 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
408 return -1;
410 pid = atoi(pid_str);
411 if (pid <= 0)
412 return -1;
414 /* another running process that we can signal COULD be
415 * a competing rrdcached */
416 if (pid != getpid() && kill(pid, 0) == 0)
417 {
418 fprintf(stderr,
419 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
420 close(pid_fd);
421 return -1;
422 }
424 lseek(pid_fd, 0, SEEK_SET);
425 if (ftruncate(pid_fd, 0) == -1)
426 {
427 fprintf(stderr,
428 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
429 close(pid_fd);
430 return -1;
431 }
433 fprintf(stderr,
434 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
435 "rrdcached: starting normally.\n", pid);
437 return pid_fd;
438 } /* }}} static int check_pidfile */
440 static int write_pidfile (int fd) /* {{{ */
441 {
442 pid_t pid;
443 FILE *fh;
445 pid = getpid ();
447 fh = fdopen (fd, "w");
448 if (fh == NULL)
449 {
450 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
451 close(fd);
452 return (-1);
453 }
455 fprintf (fh, "%i\n", (int) pid);
456 fclose (fh);
458 return (0);
459 } /* }}} int write_pidfile */
461 static int remove_pidfile (void) /* {{{ */
462 {
463 char *file;
464 int status;
466 file = (config_pid_file != NULL)
467 ? config_pid_file
468 : LOCALSTATEDIR "/run/rrdcached.pid";
470 status = unlink (file);
471 if (status == 0)
472 return (0);
473 return (errno);
474 } /* }}} int remove_pidfile */
476 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
477 {
478 char *eol;
480 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
481 sock->next_read - sock->next_cmd);
483 if (eol == NULL)
484 {
485 /* no commands left, move remainder back to front of rbuf */
486 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
487 sock->next_read - sock->next_cmd);
488 sock->next_read -= sock->next_cmd;
489 sock->next_cmd = 0;
490 *len = 0;
491 return NULL;
492 }
493 else
494 {
495 char *cmd = sock->rbuf + sock->next_cmd;
496 *eol = '\0';
498 sock->next_cmd = eol - sock->rbuf + 1;
500 if (eol > sock->rbuf && *(eol-1) == '\r')
501 *(--eol) = '\0'; /* handle "\r\n" EOL */
503 *len = eol - cmd;
505 return cmd;
506 }
508 /* NOTREACHED */
509 assert(1==0);
510 } /* }}} char *next_cmd */
512 /* add the characters directly to the write buffer */
513 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
514 {
515 char *new_buf;
517 assert(sock != NULL);
519 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
520 if (new_buf == NULL)
521 {
522 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
523 return -1;
524 }
526 strncpy(new_buf + sock->wbuf_len, str, len + 1);
528 sock->wbuf = new_buf;
529 sock->wbuf_len += len;
531 return 0;
532 } /* }}} static int add_to_wbuf */
534 /* add the text to the "extra" info that's sent after the status line */
535 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
536 {
537 va_list argp;
538 char buffer[CMD_MAX];
539 int len;
541 if (JOURNAL_REPLAY(sock)) return 0;
542 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
544 va_start(argp, fmt);
545 #ifdef HAVE_VSNPRINTF
546 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
547 #else
548 len = vsprintf(buffer, fmt, argp);
549 #endif
550 va_end(argp);
551 if (len < 0)
552 {
553 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
554 return -1;
555 }
557 return add_to_wbuf(sock, buffer, len);
558 } /* }}} static int add_response_info */
560 static int count_lines(char *str) /* {{{ */
561 {
562 int lines = 0;
564 if (str != NULL)
565 {
566 while ((str = strchr(str, '\n')) != NULL)
567 {
568 ++lines;
569 ++str;
570 }
571 }
573 return lines;
574 } /* }}} static int count_lines */
576 /* send the response back to the user.
577 * returns 0 on success, -1 on error
578 * write buffer is always zeroed after this call */
579 static int send_response (listen_socket_t *sock, response_code rc,
580 char *fmt, ...) /* {{{ */
581 {
582 va_list argp;
583 char buffer[CMD_MAX];
584 int lines;
585 ssize_t wrote;
586 int rclen, len;
588 if (JOURNAL_REPLAY(sock)) return rc;
590 if (sock->batch_start)
591 {
592 if (rc == RESP_OK)
593 return rc; /* no response on success during BATCH */
594 lines = sock->batch_cmd;
595 }
596 else if (rc == RESP_OK)
597 lines = count_lines(sock->wbuf);
598 else
599 lines = -1;
601 rclen = sprintf(buffer, "%d ", lines);
602 va_start(argp, fmt);
603 #ifdef HAVE_VSNPRINTF
604 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
605 #else
606 len = vsprintf(buffer+rclen, fmt, argp);
607 #endif
608 va_end(argp);
609 if (len < 0)
610 return -1;
612 len += rclen;
614 /* append the result to the wbuf, don't write to the user */
615 if (sock->batch_start)
616 return add_to_wbuf(sock, buffer, len);
618 /* first write must be complete */
619 if (len != write(sock->fd, buffer, len))
620 {
621 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
622 return -1;
623 }
625 if (sock->wbuf != NULL && rc == RESP_OK)
626 {
627 wrote = 0;
628 while (wrote < sock->wbuf_len)
629 {
630 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
631 if (wb <= 0)
632 {
633 RRDD_LOG(LOG_INFO, "send_response: could not write results");
634 return -1;
635 }
636 wrote += wb;
637 }
638 }
640 free(sock->wbuf); sock->wbuf = NULL;
641 sock->wbuf_len = 0;
643 return 0;
644 } /* }}} */
646 static void wipe_ci_values(cache_item_t *ci, time_t when)
647 {
648 ci->values = NULL;
649 ci->values_num = 0;
650 ci->values_alloc = 0;
652 ci->last_flush_time = when;
653 if (config_write_jitter > 0)
654 ci->last_flush_time += (rrd_random() % config_write_jitter);
655 }
657 /* remove_from_queue
658 * remove a "cache_item_t" item from the queue.
659 * must hold 'cache_lock' when calling this
660 */
661 static void remove_from_queue(cache_item_t *ci) /* {{{ */
662 {
663 if (ci == NULL) return;
664 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
666 if (ci->prev == NULL)
667 cache_queue_head = ci->next; /* reset head */
668 else
669 ci->prev->next = ci->next;
671 if (ci->next == NULL)
672 cache_queue_tail = ci->prev; /* reset the tail */
673 else
674 ci->next->prev = ci->prev;
676 ci->next = ci->prev = NULL;
677 ci->flags &= ~CI_FLAGS_IN_QUEUE;
679 pthread_mutex_lock (&stats_lock);
680 assert (stats_queue_length > 0);
681 stats_queue_length--;
682 pthread_mutex_unlock (&stats_lock);
684 } /* }}} static void remove_from_queue */
686 /* free the resources associated with the cache_item_t
687 * must hold cache_lock when calling this function
688 */
689 static void *free_cache_item(cache_item_t *ci) /* {{{ */
690 {
691 if (ci == NULL) return NULL;
693 remove_from_queue(ci);
695 for (size_t i=0; i < ci->values_num; i++)
696 free(ci->values[i]);
698 free (ci->values);
699 free (ci->file);
701 /* in case anyone is waiting */
702 pthread_cond_broadcast(&ci->flushed);
703 pthread_cond_destroy(&ci->flushed);
705 free (ci);
707 return NULL;
708 } /* }}} static void *free_cache_item */
710 /*
711 * enqueue_cache_item:
712 * `cache_lock' must be acquired before calling this function!
713 */
714 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
715 queue_side_t side)
716 {
717 if (ci == NULL)
718 return (-1);
720 if (ci->values_num == 0)
721 return (0);
723 if (side == HEAD)
724 {
725 if (cache_queue_head == ci)
726 return 0;
728 /* remove if further down in queue */
729 remove_from_queue(ci);
731 ci->prev = NULL;
732 ci->next = cache_queue_head;
733 if (ci->next != NULL)
734 ci->next->prev = ci;
735 cache_queue_head = ci;
737 if (cache_queue_tail == NULL)
738 cache_queue_tail = cache_queue_head;
739 }
740 else /* (side == TAIL) */
741 {
742 /* We don't move values back in the list.. */
743 if (ci->flags & CI_FLAGS_IN_QUEUE)
744 return (0);
746 assert (ci->next == NULL);
747 assert (ci->prev == NULL);
749 ci->prev = cache_queue_tail;
751 if (cache_queue_tail == NULL)
752 cache_queue_head = ci;
753 else
754 cache_queue_tail->next = ci;
756 cache_queue_tail = ci;
757 }
759 ci->flags |= CI_FLAGS_IN_QUEUE;
761 pthread_cond_signal(&queue_cond);
762 pthread_mutex_lock (&stats_lock);
763 stats_queue_length++;
764 pthread_mutex_unlock (&stats_lock);
766 return (0);
767 } /* }}} int enqueue_cache_item */
769 /*
770 * tree_callback_flush:
771 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
772 * while this is in progress.
773 */
774 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
775 gpointer data)
776 {
777 cache_item_t *ci;
778 callback_flush_data_t *cfd;
780 ci = (cache_item_t *) value;
781 cfd = (callback_flush_data_t *) data;
783 if (ci->flags & CI_FLAGS_IN_QUEUE)
784 return FALSE;
786 if (ci->values_num > 0
787 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
788 {
789 enqueue_cache_item (ci, TAIL);
790 }
791 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
792 && (ci->values_num <= 0))
793 {
794 assert ((char *) key == ci->file);
795 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
796 {
797 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
798 return (FALSE);
799 }
800 }
802 return (FALSE);
803 } /* }}} gboolean tree_callback_flush */
805 static int flush_old_values (int max_age)
806 {
807 callback_flush_data_t cfd;
808 size_t k;
810 memset (&cfd, 0, sizeof (cfd));
811 /* Pass the current time as user data so that we don't need to call
812 * `time' for each node. */
813 cfd.now = time (NULL);
814 cfd.keys = NULL;
815 cfd.keys_num = 0;
817 if (max_age > 0)
818 cfd.abs_timeout = cfd.now - max_age;
819 else
820 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
822 /* `tree_callback_flush' will return the keys of all values that haven't
823 * been touched in the last `config_flush_interval' seconds in `cfd'.
824 * The char*'s in this array point to the same memory as ci->file, so we
825 * don't need to free them separately. */
826 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
828 for (k = 0; k < cfd.keys_num; k++)
829 {
830 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
831 /* should never fail, since we have held the cache_lock
832 * the entire time */
833 assert(status == TRUE);
834 }
836 if (cfd.keys != NULL)
837 {
838 free (cfd.keys);
839 cfd.keys = NULL;
840 }
842 return (0);
843 } /* int flush_old_values */
845 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
846 {
847 struct timeval now;
848 struct timespec next_flush;
849 int status;
851 gettimeofday (&now, NULL);
852 next_flush.tv_sec = now.tv_sec + config_flush_interval;
853 next_flush.tv_nsec = 1000 * now.tv_usec;
855 pthread_mutex_lock(&cache_lock);
857 while (state == RUNNING)
858 {
859 gettimeofday (&now, NULL);
860 if ((now.tv_sec > next_flush.tv_sec)
861 || ((now.tv_sec == next_flush.tv_sec)
862 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
863 {
864 RRDD_LOG(LOG_DEBUG, "flushing old values");
866 /* Determine the time of the next cache flush. */
867 next_flush.tv_sec = now.tv_sec + config_flush_interval;
869 /* Flush all values that haven't been written in the last
870 * `config_write_interval' seconds. */
871 flush_old_values (config_write_interval);
873 /* unlock the cache while we rotate so we don't block incoming
874 * updates if the fsync() blocks on disk I/O */
875 pthread_mutex_unlock(&cache_lock);
876 journal_rotate();
877 pthread_mutex_lock(&cache_lock);
878 }
880 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
881 if (status != 0 && status != ETIMEDOUT)
882 {
883 RRDD_LOG (LOG_ERR, "flush_thread_main: "
884 "pthread_cond_timedwait returned %i.", status);
885 }
886 }
888 if (config_flush_at_shutdown)
889 flush_old_values (-1); /* flush everything */
891 state = SHUTDOWN;
893 pthread_mutex_unlock(&cache_lock);
895 return NULL;
896 } /* void *flush_thread_main */
898 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
899 {
900 pthread_mutex_lock (&cache_lock);
902 while (state != SHUTDOWN
903 || (cache_queue_head != NULL && config_flush_at_shutdown))
904 {
905 cache_item_t *ci;
906 char *file;
907 char **values;
908 size_t values_num;
909 int status;
911 /* Now, check if there's something to store away. If not, wait until
912 * something comes in. */
913 if (cache_queue_head == NULL)
914 {
915 status = pthread_cond_wait (&queue_cond, &cache_lock);
916 if ((status != 0) && (status != ETIMEDOUT))
917 {
918 RRDD_LOG (LOG_ERR, "queue_thread_main: "
919 "pthread_cond_wait returned %i.", status);
920 }
921 }
923 /* Check if a value has arrived. This may be NULL if we timed out or there
924 * was an interrupt such as a signal. */
925 if (cache_queue_head == NULL)
926 continue;
928 ci = cache_queue_head;
930 /* copy the relevant parts */
931 file = strdup (ci->file);
932 if (file == NULL)
933 {
934 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
935 continue;
936 }
938 assert(ci->values != NULL);
939 assert(ci->values_num > 0);
941 values = ci->values;
942 values_num = ci->values_num;
944 wipe_ci_values(ci, time(NULL));
945 remove_from_queue(ci);
947 pthread_mutex_unlock (&cache_lock);
949 rrd_clear_error ();
950 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
951 if (status != 0)
952 {
953 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
954 "rrd_update_r (%s) failed with status %i. (%s)",
955 file, status, rrd_get_error());
956 }
958 journal_write("wrote", file);
960 /* Search again in the tree. It's possible someone issued a "FORGET"
961 * while we were writing the update values. */
962 pthread_mutex_lock(&cache_lock);
963 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
964 if (ci)
965 pthread_cond_broadcast(&ci->flushed);
966 pthread_mutex_unlock(&cache_lock);
968 if (status == 0)
969 {
970 pthread_mutex_lock (&stats_lock);
971 stats_updates_written++;
972 stats_data_sets_written += values_num;
973 pthread_mutex_unlock (&stats_lock);
974 }
976 rrd_free_ptrs((void ***) &values, &values_num);
977 free(file);
979 pthread_mutex_lock (&cache_lock);
980 }
981 pthread_mutex_unlock (&cache_lock);
983 return (NULL);
984 } /* }}} void *queue_thread_main */
986 static int buffer_get_field (char **buffer_ret, /* {{{ */
987 size_t *buffer_size_ret, char **field_ret)
988 {
989 char *buffer;
990 size_t buffer_pos;
991 size_t buffer_size;
992 char *field;
993 size_t field_size;
994 int status;
996 buffer = *buffer_ret;
997 buffer_pos = 0;
998 buffer_size = *buffer_size_ret;
999 field = *buffer_ret;
1000 field_size = 0;
1002 if (buffer_size <= 0)
1003 return (-1);
1005 /* This is ensured by `handle_request'. */
1006 assert (buffer[buffer_size - 1] == '\0');
1008 status = -1;
1009 while (buffer_pos < buffer_size)
1010 {
1011 /* Check for end-of-field or end-of-buffer */
1012 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1013 {
1014 field[field_size] = 0;
1015 field_size++;
1016 buffer_pos++;
1017 status = 0;
1018 break;
1019 }
1020 /* Handle escaped characters. */
1021 else if (buffer[buffer_pos] == '\\')
1022 {
1023 if (buffer_pos >= (buffer_size - 1))
1024 break;
1025 buffer_pos++;
1026 field[field_size] = buffer[buffer_pos];
1027 field_size++;
1028 buffer_pos++;
1029 }
1030 /* Normal operation */
1031 else
1032 {
1033 field[field_size] = buffer[buffer_pos];
1034 field_size++;
1035 buffer_pos++;
1036 }
1037 } /* while (buffer_pos < buffer_size) */
1039 if (status != 0)
1040 return (status);
1042 *buffer_ret = buffer + buffer_pos;
1043 *buffer_size_ret = buffer_size - buffer_pos;
1044 *field_ret = field;
1046 return (0);
1047 } /* }}} int buffer_get_field */
1049 /* if we're restricting writes to the base directory,
1050 * check whether the file falls within the dir
1051 * returns 1 if OK, otherwise 0
1052 */
1053 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1054 {
1055 assert(file != NULL);
1057 if (!config_write_base_only
1058 || JOURNAL_REPLAY(sock)
1059 || config_base_dir == NULL)
1060 return 1;
1062 if (strstr(file, "../") != NULL) goto err;
1064 /* relative paths without "../" are ok */
1065 if (*file != '/') return 1;
1067 /* file must be of the format base + "/" + <1+ char filename> */
1068 if (strlen(file) < _config_base_dir_len + 2) goto err;
1069 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1070 if (*(file + _config_base_dir_len) != '/') goto err;
1072 return 1;
1074 err:
1075 if (sock != NULL && sock->fd >= 0)
1076 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1078 return 0;
1079 } /* }}} static int check_file_access */
1081 /* when using a base dir, convert relative paths to absolute paths.
1082 * if necessary, modifies the "filename" pointer to point
1083 * to the new path created in "tmp". "tmp" is provided
1084 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1085 *
1086 * this allows us to optimize for the expected case (absolute path)
1087 * with a no-op.
1088 */
1089 static void get_abs_path(char **filename, char *tmp)
1090 {
1091 assert(tmp != NULL);
1092 assert(filename != NULL && *filename != NULL);
1094 if (config_base_dir == NULL || **filename == '/')
1095 return;
1097 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1098 *filename = tmp;
1099 } /* }}} static int get_abs_path */
1101 static int flush_file (const char *filename) /* {{{ */
1102 {
1103 cache_item_t *ci;
1105 pthread_mutex_lock (&cache_lock);
1107 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1108 if (ci == NULL)
1109 {
1110 pthread_mutex_unlock (&cache_lock);
1111 return (ENOENT);
1112 }
1114 if (ci->values_num > 0)
1115 {
1116 /* Enqueue at head */
1117 enqueue_cache_item (ci, HEAD);
1118 pthread_cond_wait(&ci->flushed, &cache_lock);
1119 }
1121 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1122 * may have been purged during our cond_wait() */
1124 pthread_mutex_unlock(&cache_lock);
1126 return (0);
1127 } /* }}} int flush_file */
1129 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1130 {
1131 char *err = "Syntax error.\n";
1133 if (cmd && cmd->syntax)
1134 err = cmd->syntax;
1136 return send_response(sock, RESP_ERR, "Usage: %s", err);
1137 } /* }}} static int syntax_error() */
1139 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1140 {
1141 uint64_t copy_queue_length;
1142 uint64_t copy_updates_received;
1143 uint64_t copy_flush_received;
1144 uint64_t copy_updates_written;
1145 uint64_t copy_data_sets_written;
1146 uint64_t copy_journal_bytes;
1147 uint64_t copy_journal_rotate;
1149 uint64_t tree_nodes_number;
1150 uint64_t tree_depth;
1152 pthread_mutex_lock (&stats_lock);
1153 copy_queue_length = stats_queue_length;
1154 copy_updates_received = stats_updates_received;
1155 copy_flush_received = stats_flush_received;
1156 copy_updates_written = stats_updates_written;
1157 copy_data_sets_written = stats_data_sets_written;
1158 copy_journal_bytes = stats_journal_bytes;
1159 copy_journal_rotate = stats_journal_rotate;
1160 pthread_mutex_unlock (&stats_lock);
1162 pthread_mutex_lock (&cache_lock);
1163 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1164 tree_depth = (uint64_t) g_tree_height (cache_tree);
1165 pthread_mutex_unlock (&cache_lock);
1167 add_response_info(sock,
1168 "QueueLength: %"PRIu64"\n", copy_queue_length);
1169 add_response_info(sock,
1170 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1171 add_response_info(sock,
1172 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1173 add_response_info(sock,
1174 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1175 add_response_info(sock,
1176 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1177 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1178 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1179 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1180 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1182 send_response(sock, RESP_OK, "Statistics follow\n");
1184 return (0);
1185 } /* }}} int handle_request_stats */
1187 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1188 {
1189 char *file, file_tmp[PATH_MAX];
1190 int status;
1192 status = buffer_get_field (&buffer, &buffer_size, &file);
1193 if (status != 0)
1194 {
1195 return syntax_error(sock,cmd);
1196 }
1197 else
1198 {
1199 pthread_mutex_lock(&stats_lock);
1200 stats_flush_received++;
1201 pthread_mutex_unlock(&stats_lock);
1203 get_abs_path(&file, file_tmp);
1204 if (!check_file_access(file, sock)) return 0;
1206 status = flush_file (file);
1207 if (status == 0)
1208 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1209 else if (status == ENOENT)
1210 {
1211 /* no file in our tree; see whether it exists at all */
1212 struct stat statbuf;
1214 memset(&statbuf, 0, sizeof(statbuf));
1215 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1216 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1217 else
1218 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1219 }
1220 else if (status < 0)
1221 return send_response(sock, RESP_ERR, "Internal error.\n");
1222 else
1223 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1224 }
1226 /* NOTREACHED */
1227 assert(1==0);
1228 } /* }}} int handle_request_flush */
1230 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1231 {
1232 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1234 pthread_mutex_lock(&cache_lock);
1235 flush_old_values(-1);
1236 pthread_mutex_unlock(&cache_lock);
1238 return send_response(sock, RESP_OK, "Started flush.\n");
1239 } /* }}} static int handle_request_flushall */
1241 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1242 {
1243 int status;
1244 char *file, file_tmp[PATH_MAX];
1245 cache_item_t *ci;
1247 status = buffer_get_field(&buffer, &buffer_size, &file);
1248 if (status != 0)
1249 return syntax_error(sock,cmd);
1251 get_abs_path(&file, file_tmp);
1253 pthread_mutex_lock(&cache_lock);
1254 ci = g_tree_lookup(cache_tree, file);
1255 if (ci == NULL)
1256 {
1257 pthread_mutex_unlock(&cache_lock);
1258 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1259 }
1261 for (size_t i=0; i < ci->values_num; i++)
1262 add_response_info(sock, "%s\n", ci->values[i]);
1264 pthread_mutex_unlock(&cache_lock);
1265 return send_response(sock, RESP_OK, "updates pending\n");
1266 } /* }}} static int handle_request_pending */
1268 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1269 {
1270 int status;
1271 gboolean found;
1272 char *file, file_tmp[PATH_MAX];
1274 status = buffer_get_field(&buffer, &buffer_size, &file);
1275 if (status != 0)
1276 return syntax_error(sock,cmd);
1278 get_abs_path(&file, file_tmp);
1279 if (!check_file_access(file, sock)) return 0;
1281 pthread_mutex_lock(&cache_lock);
1282 found = g_tree_remove(cache_tree, file);
1283 pthread_mutex_unlock(&cache_lock);
1285 if (found == TRUE)
1286 {
1287 if (!JOURNAL_REPLAY(sock))
1288 journal_write("forget", file);
1290 return send_response(sock, RESP_OK, "Gone!\n");
1291 }
1292 else
1293 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1295 /* NOTREACHED */
1296 assert(1==0);
1297 } /* }}} static int handle_request_forget */
1299 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1300 {
1301 cache_item_t *ci;
1303 pthread_mutex_lock(&cache_lock);
1305 ci = cache_queue_head;
1306 while (ci != NULL)
1307 {
1308 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1309 ci = ci->next;
1310 }
1312 pthread_mutex_unlock(&cache_lock);
1314 return send_response(sock, RESP_OK, "in queue.\n");
1315 } /* }}} int handle_request_queue */
1317 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1318 {
1319 char *file, file_tmp[PATH_MAX];
1320 int values_num = 0;
1321 int status;
1322 char orig_buf[CMD_MAX];
1324 cache_item_t *ci;
1326 /* save it for the journal later */
1327 if (!JOURNAL_REPLAY(sock))
1328 strncpy(orig_buf, buffer, buffer_size);
1330 status = buffer_get_field (&buffer, &buffer_size, &file);
1331 if (status != 0)
1332 return syntax_error(sock,cmd);
1334 pthread_mutex_lock(&stats_lock);
1335 stats_updates_received++;
1336 pthread_mutex_unlock(&stats_lock);
1338 get_abs_path(&file, file_tmp);
1339 if (!check_file_access(file, sock)) return 0;
1341 pthread_mutex_lock (&cache_lock);
1342 ci = g_tree_lookup (cache_tree, file);
1344 if (ci == NULL) /* {{{ */
1345 {
1346 struct stat statbuf;
1347 cache_item_t *tmp;
1349 /* don't hold the lock while we setup; stat(2) might block */
1350 pthread_mutex_unlock(&cache_lock);
1352 memset (&statbuf, 0, sizeof (statbuf));
1353 status = stat (file, &statbuf);
1354 if (status != 0)
1355 {
1356 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1358 status = errno;
1359 if (status == ENOENT)
1360 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1361 else
1362 return send_response(sock, RESP_ERR,
1363 "stat failed with error %i.\n", status);
1364 }
1365 if (!S_ISREG (statbuf.st_mode))
1366 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1368 if (access(file, R_OK|W_OK) != 0)
1369 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1370 file, rrd_strerror(errno));
1372 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1373 if (ci == NULL)
1374 {
1375 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1377 return send_response(sock, RESP_ERR, "malloc failed.\n");
1378 }
1379 memset (ci, 0, sizeof (cache_item_t));
1381 ci->file = strdup (file);
1382 if (ci->file == NULL)
1383 {
1384 free (ci);
1385 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1387 return send_response(sock, RESP_ERR, "strdup failed.\n");
1388 }
1390 wipe_ci_values(ci, now);
1391 ci->flags = CI_FLAGS_IN_TREE;
1392 pthread_cond_init(&ci->flushed, NULL);
1394 pthread_mutex_lock(&cache_lock);
1396 /* another UPDATE might have added this entry in the meantime */
1397 tmp = g_tree_lookup (cache_tree, file);
1398 if (tmp == NULL)
1399 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1400 else
1401 {
1402 free_cache_item (ci);
1403 ci = tmp;
1404 }
1406 /* state may have changed while we were unlocked */
1407 if (state == SHUTDOWN)
1408 return -1;
1409 } /* }}} */
1410 assert (ci != NULL);
1412 /* don't re-write updates in replay mode */
1413 if (!JOURNAL_REPLAY(sock))
1414 journal_write("update", orig_buf);
1416 while (buffer_size > 0)
1417 {
1418 char *value;
1419 time_t stamp;
1420 char *eostamp;
1422 status = buffer_get_field (&buffer, &buffer_size, &value);
1423 if (status != 0)
1424 {
1425 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1426 break;
1427 }
1429 /* make sure update time is always moving forward */
1430 stamp = strtol(value, &eostamp, 10);
1431 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1432 {
1433 pthread_mutex_unlock(&cache_lock);
1434 return send_response(sock, RESP_ERR,
1435 "Cannot find timestamp in '%s'!\n", value);
1436 }
1437 else if (stamp <= ci->last_update_stamp)
1438 {
1439 pthread_mutex_unlock(&cache_lock);
1440 return send_response(sock, RESP_ERR,
1441 "illegal attempt to update using time %ld when last"
1442 " update time is %ld (minimum one second step)\n",
1443 stamp, ci->last_update_stamp);
1444 }
1445 else
1446 ci->last_update_stamp = stamp;
1448 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1449 &ci->values_alloc, config_alloc_chunk))
1450 {
1451 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1452 continue;
1453 }
1455 values_num++;
1456 }
1458 if (((now - ci->last_flush_time) >= config_write_interval)
1459 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1460 && (ci->values_num > 0))
1461 {
1462 enqueue_cache_item (ci, TAIL);
1463 }
1465 pthread_mutex_unlock (&cache_lock);
1467 if (values_num < 1)
1468 return send_response(sock, RESP_ERR, "No values updated.\n");
1469 else
1470 return send_response(sock, RESP_OK,
1471 "errors, enqueued %i value(s).\n", values_num);
1473 /* NOTREACHED */
1474 assert(1==0);
1476 } /* }}} int handle_request_update */
1478 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1479 {
1480 char *file, file_tmp[PATH_MAX];
1481 char *cf;
1483 char *start_str;
1484 char *end_str;
1485 time_t start_tm;
1486 time_t end_tm;
1488 unsigned long step;
1489 unsigned long ds_cnt;
1490 char **ds_namv;
1491 rrd_value_t *data;
1493 int status;
1494 unsigned long i;
1495 time_t t;
1496 rrd_value_t *data_ptr;
1498 file = NULL;
1499 cf = NULL;
1500 start_str = NULL;
1501 end_str = NULL;
1503 /* Read the arguments */
1504 do /* while (0) */
1505 {
1506 status = buffer_get_field (&buffer, &buffer_size, &file);
1507 if (status != 0)
1508 break;
1510 status = buffer_get_field (&buffer, &buffer_size, &cf);
1511 if (status != 0)
1512 break;
1514 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1515 if (status != 0)
1516 {
1517 start_str = NULL;
1518 status = 0;
1519 break;
1520 }
1522 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1523 if (status != 0)
1524 {
1525 end_str = NULL;
1526 status = 0;
1527 break;
1528 }
1529 } while (0);
1531 if (status != 0)
1532 return (syntax_error(sock,cmd));
1534 get_abs_path(&file, file_tmp);
1535 if (!check_file_access(file, sock)) return 0;
1537 status = flush_file (file);
1538 if ((status != 0) && (status != ENOENT))
1539 return (send_response (sock, RESP_ERR,
1540 "flush_file (%s) failed with status %i.\n", file, status));
1542 t = time (NULL); /* "now" */
1544 /* Parse start time */
1545 if (start_str != NULL)
1546 {
1547 char *endptr;
1548 long value;
1550 endptr = NULL;
1551 errno = 0;
1552 value = strtol (start_str, &endptr, /* base = */ 0);
1553 if ((endptr == start_str) || (errno != 0))
1554 return (send_response(sock, RESP_ERR,
1555 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1556 start_str));
1558 if (value > 0)
1559 start_tm = (time_t) value;
1560 else
1561 start_tm = (time_t) (t + value);
1562 }
1563 else
1564 {
1565 start_tm = t - 86400;
1566 }
1568 /* Parse end time */
1569 if (end_str != NULL)
1570 {
1571 char *endptr;
1572 long value;
1574 endptr = NULL;
1575 errno = 0;
1576 value = strtol (end_str, &endptr, /* base = */ 0);
1577 if ((endptr == end_str) || (errno != 0))
1578 return (send_response(sock, RESP_ERR,
1579 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1580 end_str));
1582 if (value > 0)
1583 end_tm = (time_t) value;
1584 else
1585 end_tm = (time_t) (t + value);
1586 }
1587 else
1588 {
1589 end_tm = t;
1590 }
1592 step = -1;
1593 ds_cnt = 0;
1594 ds_namv = NULL;
1595 data = NULL;
1597 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1598 &ds_cnt, &ds_namv, &data);
1599 if (status != 0)
1600 return (send_response(sock, RESP_ERR,
1601 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1603 add_response_info (sock, "FlushVersion: %lu\n", 1);
1604 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1605 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1606 add_response_info (sock, "Step: %lu\n", step);
1607 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1609 #define SSTRCAT(buffer,str,buffer_fill) do { \
1610 size_t str_len = strlen (str); \
1611 if ((buffer_fill + str_len) > sizeof (buffer)) \
1612 str_len = sizeof (buffer) - buffer_fill; \
1613 if (str_len > 0) { \
1614 strncpy (buffer + buffer_fill, str, str_len); \
1615 buffer_fill += str_len; \
1616 assert (buffer_fill <= sizeof (buffer)); \
1617 if (buffer_fill == sizeof (buffer)) \
1618 buffer[buffer_fill - 1] = 0; \
1619 else \
1620 buffer[buffer_fill] = 0; \
1621 } \
1622 } while (0)
1624 { /* Add list of DS names */
1625 char linebuf[1024];
1626 size_t linebuf_fill;
1628 memset (linebuf, 0, sizeof (linebuf));
1629 linebuf_fill = 0;
1630 for (i = 0; i < ds_cnt; i++)
1631 {
1632 if (i > 0)
1633 SSTRCAT (linebuf, " ", linebuf_fill);
1634 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1635 rrd_freemem(ds_namv[i]);
1636 }
1637 rrd_freemem(ds_namv);
1638 add_response_info (sock, "DSName: %s\n", linebuf);
1639 }
1641 /* Add the actual data */
1642 assert (step > 0);
1643 data_ptr = data;
1644 for (t = start_tm + step; t <= end_tm; t += step)
1645 {
1646 char linebuf[1024];
1647 size_t linebuf_fill;
1648 char tmp[128];
1650 memset (linebuf, 0, sizeof (linebuf));
1651 linebuf_fill = 0;
1652 for (i = 0; i < ds_cnt; i++)
1653 {
1654 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1655 tmp[sizeof (tmp) - 1] = 0;
1656 SSTRCAT (linebuf, tmp, linebuf_fill);
1658 data_ptr++;
1659 }
1661 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1662 } /* for (t) */
1663 rrd_freemem(data);
1665 return (send_response (sock, RESP_OK, "Success\n"));
1666 #undef SSTRCAT
1667 } /* }}} int handle_request_fetch */
1669 /* we came across a "WROTE" entry during journal replay.
1670 * throw away any values that we have accumulated for this file
1671 */
1672 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1673 {
1674 cache_item_t *ci;
1675 const char *file = buffer;
1677 pthread_mutex_lock(&cache_lock);
1679 ci = g_tree_lookup(cache_tree, file);
1680 if (ci == NULL)
1681 {
1682 pthread_mutex_unlock(&cache_lock);
1683 return (0);
1684 }
1686 if (ci->values)
1687 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1689 wipe_ci_values(ci, now);
1690 remove_from_queue(ci);
1692 pthread_mutex_unlock(&cache_lock);
1693 return (0);
1694 } /* }}} int handle_request_wrote */
1696 /* start "BATCH" processing */
1697 static int batch_start (HANDLER_PROTO) /* {{{ */
1698 {
1699 int status;
1700 if (sock->batch_start)
1701 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1703 status = send_response(sock, RESP_OK,
1704 "Go ahead. End with dot '.' on its own line.\n");
1705 sock->batch_start = time(NULL);
1706 sock->batch_cmd = 0;
1708 return status;
1709 } /* }}} static int batch_start */
1711 /* finish "BATCH" processing and return results to the client */
1712 static int batch_done (HANDLER_PROTO) /* {{{ */
1713 {
1714 assert(sock->batch_start);
1715 sock->batch_start = 0;
1716 sock->batch_cmd = 0;
1717 return send_response(sock, RESP_OK, "errors\n");
1718 } /* }}} static int batch_done */
1720 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1721 {
1722 return -1;
1723 } /* }}} static int handle_request_quit */
1725 static command_t list_of_commands[] = { /* {{{ */
1726 {
1727 "UPDATE",
1728 handle_request_update,
1729 CMD_CONTEXT_ANY,
1730 "UPDATE <filename> <values> [<values> ...]\n"
1731 ,
1732 "Adds the given file to the internal cache if it is not yet known and\n"
1733 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1734 "for details.\n"
1735 "\n"
1736 "Each <values> has the following form:\n"
1737 " <values> = <time>:<value>[:<value>[...]]\n"
1738 "See the rrdupdate(1) manpage for details.\n"
1739 },
1740 {
1741 "WROTE",
1742 handle_request_wrote,
1743 CMD_CONTEXT_JOURNAL,
1744 NULL,
1745 NULL
1746 },
1747 {
1748 "FLUSH",
1749 handle_request_flush,
1750 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1751 "FLUSH <filename>\n"
1752 ,
1753 "Adds the given filename to the head of the update queue and returns\n"
1754 "after it has been dequeued.\n"
1755 },
1756 {
1757 "FLUSHALL",
1758 handle_request_flushall,
1759 CMD_CONTEXT_CLIENT,
1760 "FLUSHALL\n"
1761 ,
1762 "Triggers writing of all pending updates. Returns immediately.\n"
1763 },
1764 {
1765 "PENDING",
1766 handle_request_pending,
1767 CMD_CONTEXT_CLIENT,
1768 "PENDING <filename>\n"
1769 ,
1770 "Shows any 'pending' updates for a file, in order.\n"
1771 "The updates shown have not yet been written to the underlying RRD file.\n"
1772 },
1773 {
1774 "FORGET",
1775 handle_request_forget,
1776 CMD_CONTEXT_ANY,
1777 "FORGET <filename>\n"
1778 ,
1779 "Removes the file completely from the cache.\n"
1780 "Any pending updates for the file will be lost.\n"
1781 },
1782 {
1783 "QUEUE",
1784 handle_request_queue,
1785 CMD_CONTEXT_CLIENT,
1786 "QUEUE\n"
1787 ,
1788 "Shows all files in the output queue.\n"
1789 "The output is zero or more lines in the following format:\n"
1790 "(where <num_vals> is the number of values to be written)\n"
1791 "\n"
1792 "<num_vals> <filename>\n"
1793 },
1794 {
1795 "STATS",
1796 handle_request_stats,
1797 CMD_CONTEXT_CLIENT,
1798 "STATS\n"
1799 ,
1800 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1801 "a description of the values.\n"
1802 },
1803 {
1804 "HELP",
1805 handle_request_help,
1806 CMD_CONTEXT_CLIENT,
1807 "HELP [<command>]\n",
1808 NULL, /* special! */
1809 },
1810 {
1811 "BATCH",
1812 batch_start,
1813 CMD_CONTEXT_CLIENT,
1814 "BATCH\n"
1815 ,
1816 "The 'BATCH' command permits the client to initiate a bulk load\n"
1817 " of commands to rrdcached.\n"
1818 "\n"
1819 "Usage:\n"
1820 "\n"
1821 " client: BATCH\n"
1822 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1823 " client: command #1\n"
1824 " client: command #2\n"
1825 " client: ... and so on\n"
1826 " client: .\n"
1827 " server: 2 errors\n"
1828 " server: 7 message for command #7\n"
1829 " server: 9 message for command #9\n"
1830 "\n"
1831 "For more information, consult the rrdcached(1) documentation.\n"
1832 },
1833 {
1834 ".", /* BATCH terminator */
1835 batch_done,
1836 CMD_CONTEXT_BATCH,
1837 NULL,
1838 NULL
1839 },
1840 {
1841 "FETCH",
1842 handle_request_fetch,
1843 CMD_CONTEXT_CLIENT,
1844 "FETCH <file> <CF> [<start> [<end>]]\n"
1845 ,
1846 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1847 },
1848 {
1849 "QUIT",
1850 handle_request_quit,
1851 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1852 "QUIT\n"
1853 ,
1854 "Disconnect from rrdcached.\n"
1855 }
1856 }; /* }}} command_t list_of_commands[] */
1857 static size_t list_of_commands_len = sizeof (list_of_commands)
1858 / sizeof (list_of_commands[0]);
1860 static command_t *find_command(char *cmd)
1861 {
1862 size_t i;
1864 for (i = 0; i < list_of_commands_len; i++)
1865 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1866 return (&list_of_commands[i]);
1867 return NULL;
1868 }
1870 /* We currently use the index in the `list_of_commands' array as a bit position
1871 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1872 * outside these functions so that switching to a more elegant storage method
1873 * is easily possible. */
1874 static ssize_t find_command_index (const char *cmd) /* {{{ */
1875 {
1876 size_t i;
1878 for (i = 0; i < list_of_commands_len; i++)
1879 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1880 return ((ssize_t) i);
1881 return (-1);
1882 } /* }}} ssize_t find_command_index */
1884 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1885 const char *cmd)
1886 {
1887 ssize_t i;
1889 if (JOURNAL_REPLAY(sock))
1890 return (1);
1892 if (cmd == NULL)
1893 return (-1);
1895 if ((strcasecmp ("QUIT", cmd) == 0)
1896 || (strcasecmp ("HELP", cmd) == 0))
1897 return (1);
1898 else if (strcmp (".", cmd) == 0)
1899 cmd = "BATCH";
1901 i = find_command_index (cmd);
1902 if (i < 0)
1903 return (-1);
1904 assert (i < 32);
1906 if ((sock->permissions & (1 << i)) != 0)
1907 return (1);
1908 return (0);
1909 } /* }}} int socket_permission_check */
1911 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1912 const char *cmd)
1913 {
1914 ssize_t i;
1916 i = find_command_index (cmd);
1917 if (i < 0)
1918 return (-1);
1919 assert (i < 32);
1921 sock->permissions |= (1 << i);
1922 return (0);
1923 } /* }}} int socket_permission_add */
1925 /* check whether commands are received in the expected context */
1926 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1927 {
1928 if (JOURNAL_REPLAY(sock))
1929 return (cmd->context & CMD_CONTEXT_JOURNAL);
1930 else if (sock->batch_start)
1931 return (cmd->context & CMD_CONTEXT_BATCH);
1932 else
1933 return (cmd->context & CMD_CONTEXT_CLIENT);
1935 /* NOTREACHED */
1936 assert(1==0);
1937 }
1939 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1940 {
1941 int status;
1942 char *cmd_str;
1943 char *resp_txt;
1944 command_t *help = NULL;
1946 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1947 if (status == 0)
1948 help = find_command(cmd_str);
1950 if (help && (help->syntax || help->help))
1951 {
1952 char tmp[CMD_MAX];
1954 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1955 resp_txt = tmp;
1957 if (help->syntax)
1958 add_response_info(sock, "Usage: %s\n", help->syntax);
1960 if (help->help)
1961 add_response_info(sock, "%s\n", help->help);
1962 }
1963 else
1964 {
1965 size_t i;
1967 resp_txt = "Command overview\n";
1969 for (i = 0; i < list_of_commands_len; i++)
1970 {
1971 if (list_of_commands[i].syntax == NULL)
1972 continue;
1973 add_response_info (sock, "%s", list_of_commands[i].syntax);
1974 }
1975 }
1977 return send_response(sock, RESP_OK, resp_txt);
1978 } /* }}} int handle_request_help */
1980 static int handle_request (DISPATCH_PROTO) /* {{{ */
1981 {
1982 char *buffer_ptr = buffer;
1983 char *cmd_str = NULL;
1984 command_t *cmd = NULL;
1985 int status;
1987 assert (buffer[buffer_size - 1] == '\0');
1989 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1990 if (status != 0)
1991 {
1992 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1993 return (-1);
1994 }
1996 if (sock != NULL && sock->batch_start)
1997 sock->batch_cmd++;
1999 cmd = find_command(cmd_str);
2000 if (!cmd)
2001 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2003 if (!socket_permission_check (sock, cmd->cmd))
2004 return send_response(sock, RESP_ERR, "Permission denied.\n");
2006 if (!command_check_context(sock, cmd))
2007 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2009 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2010 } /* }}} int handle_request */
2012 static void journal_set_free (journal_set *js) /* {{{ */
2013 {
2014 if (js == NULL)
2015 return;
2017 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2019 free(js);
2020 } /* }}} journal_set_free */
2022 static void journal_set_remove (journal_set *js) /* {{{ */
2023 {
2024 if (js == NULL)
2025 return;
2027 for (uint i=0; i < js->files_num; i++)
2028 {
2029 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2030 unlink(js->files[i]);
2031 }
2032 } /* }}} journal_set_remove */
2034 /* close current journal file handle.
2035 * MUST hold journal_lock before calling */
2036 static void journal_close(void) /* {{{ */
2037 {
2038 if (journal_fh != NULL)
2039 {
2040 if (fclose(journal_fh) != 0)
2041 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2042 }
2044 journal_fh = NULL;
2045 journal_size = 0;
2046 } /* }}} journal_close */
2048 /* MUST hold journal_lock before calling */
2049 static void journal_new_file(void) /* {{{ */
2050 {
2051 struct timeval now;
2052 int new_fd;
2053 char new_file[PATH_MAX + 1];
2055 assert(journal_dir != NULL);
2056 assert(journal_cur != NULL);
2058 journal_close();
2060 gettimeofday(&now, NULL);
2061 /* this format assures that the files sort in strcmp() order */
2062 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2063 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2065 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2066 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2067 if (new_fd < 0)
2068 goto error;
2070 journal_fh = fdopen(new_fd, "a");
2071 if (journal_fh == NULL)
2072 goto error;
2074 journal_size = ftell(journal_fh);
2075 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2077 /* record the file in the journal set */
2078 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2080 return;
2082 error:
2083 RRDD_LOG(LOG_CRIT,
2084 "JOURNALING DISABLED: Error while trying to create %s : %s",
2085 new_file, rrd_strerror(errno));
2086 RRDD_LOG(LOG_CRIT,
2087 "JOURNALING DISABLED: All values will be flushed at shutdown");
2089 close(new_fd);
2090 config_flush_at_shutdown = 1;
2092 } /* }}} journal_new_file */
2094 /* MUST NOT hold journal_lock before calling this */
2095 static void journal_rotate(void) /* {{{ */
2096 {
2097 journal_set *old_js = NULL;
2099 if (journal_dir == NULL)
2100 return;
2102 RRDD_LOG(LOG_DEBUG, "rotating journals");
2104 pthread_mutex_lock(&stats_lock);
2105 ++stats_journal_rotate;
2106 pthread_mutex_unlock(&stats_lock);
2108 pthread_mutex_lock(&journal_lock);
2110 journal_close();
2112 /* rotate the journal sets */
2113 old_js = journal_old;
2114 journal_old = journal_cur;
2115 journal_cur = calloc(1, sizeof(journal_set));
2117 if (journal_cur != NULL)
2118 journal_new_file();
2119 else
2120 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2122 pthread_mutex_unlock(&journal_lock);
2124 journal_set_remove(old_js);
2125 journal_set_free (old_js);
2127 } /* }}} static void journal_rotate */
2129 /* MUST hold journal_lock when calling */
2130 static void journal_done(void) /* {{{ */
2131 {
2132 if (journal_cur == NULL)
2133 return;
2135 journal_close();
2137 if (config_flush_at_shutdown)
2138 {
2139 RRDD_LOG(LOG_INFO, "removing journals");
2140 journal_set_remove(journal_old);
2141 journal_set_remove(journal_cur);
2142 }
2143 else
2144 {
2145 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2146 "journals will be used at next startup");
2147 }
2149 journal_set_free(journal_cur);
2150 journal_set_free(journal_old);
2151 free(journal_dir);
2153 } /* }}} static void journal_done */
2155 static int journal_write(char *cmd, char *args) /* {{{ */
2156 {
2157 int chars;
2159 if (journal_fh == NULL)
2160 return 0;
2162 pthread_mutex_lock(&journal_lock);
2163 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2164 journal_size += chars;
2166 if (journal_size > JOURNAL_MAX)
2167 journal_new_file();
2169 pthread_mutex_unlock(&journal_lock);
2171 if (chars > 0)
2172 {
2173 pthread_mutex_lock(&stats_lock);
2174 stats_journal_bytes += chars;
2175 pthread_mutex_unlock(&stats_lock);
2176 }
2178 return chars;
2179 } /* }}} static int journal_write */
2181 static int journal_replay (const char *file) /* {{{ */
2182 {
2183 FILE *fh;
2184 int entry_cnt = 0;
2185 int fail_cnt = 0;
2186 uint64_t line = 0;
2187 char entry[CMD_MAX];
2188 time_t now;
2190 if (file == NULL) return 0;
2192 {
2193 char *reason = "unknown error";
2194 int status = 0;
2195 struct stat statbuf;
2197 memset(&statbuf, 0, sizeof(statbuf));
2198 if (stat(file, &statbuf) != 0)
2199 {
2200 reason = "stat error";
2201 status = errno;
2202 }
2203 else if (!S_ISREG(statbuf.st_mode))
2204 {
2205 reason = "not a regular file";
2206 status = EPERM;
2207 }
2208 if (statbuf.st_uid != daemon_uid)
2209 {
2210 reason = "not owned by daemon user";
2211 status = EACCES;
2212 }
2213 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2214 {
2215 reason = "must not be user/group writable";
2216 status = EACCES;
2217 }
2219 if (status != 0)
2220 {
2221 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2222 file, rrd_strerror(status), reason);
2223 return 0;
2224 }
2225 }
2227 fh = fopen(file, "r");
2228 if (fh == NULL)
2229 {
2230 if (errno != ENOENT)
2231 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2232 file, rrd_strerror(errno));
2233 return 0;
2234 }
2235 else
2236 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2238 now = time(NULL);
2240 while(!feof(fh))
2241 {
2242 size_t entry_len;
2244 ++line;
2245 if (fgets(entry, sizeof(entry), fh) == NULL)
2246 break;
2247 entry_len = strlen(entry);
2249 /* check \n termination in case journal writing crashed mid-line */
2250 if (entry_len == 0)
2251 continue;
2252 else if (entry[entry_len - 1] != '\n')
2253 {
2254 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2255 ++fail_cnt;
2256 continue;
2257 }
2259 entry[entry_len - 1] = '\0';
2261 if (handle_request(NULL, now, entry, entry_len) == 0)
2262 ++entry_cnt;
2263 else
2264 ++fail_cnt;
2265 }
2267 fclose(fh);
2269 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2270 entry_cnt, fail_cnt);
2272 return entry_cnt > 0 ? 1 : 0;
2273 } /* }}} static int journal_replay */
2275 static int journal_sort(const void *v1, const void *v2)
2276 {
2277 char **jn1 = (char **) v1;
2278 char **jn2 = (char **) v2;
2280 return strcmp(*jn1,*jn2);
2281 }
2283 static void journal_init(void) /* {{{ */
2284 {
2285 int had_journal = 0;
2286 DIR *dir;
2287 struct dirent *dent;
2288 char path[PATH_MAX+1];
2290 if (journal_dir == NULL) return;
2292 pthread_mutex_lock(&journal_lock);
2294 journal_cur = calloc(1, sizeof(journal_set));
2295 if (journal_cur == NULL)
2296 {
2297 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2298 return;
2299 }
2301 RRDD_LOG(LOG_INFO, "checking for journal files");
2303 /* Handle old journal files during transition. This gives them the
2304 * correct sort order. TODO: remove after first release
2305 */
2306 {
2307 char old_path[PATH_MAX+1];
2308 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2309 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2310 rename(old_path, path);
2312 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2313 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2314 rename(old_path, path);
2315 }
2317 dir = opendir(journal_dir);
2318 if (!dir) {
2319 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2320 return;
2321 }
2322 while ((dent = readdir(dir)) != NULL)
2323 {
2324 /* looks like a journal file? */
2325 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2326 continue;
2328 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2330 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2331 {
2332 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2333 dent->d_name);
2334 break;
2335 }
2336 }
2337 closedir(dir);
2339 qsort(journal_cur->files, journal_cur->files_num,
2340 sizeof(journal_cur->files[0]), journal_sort);
2342 for (uint i=0; i < journal_cur->files_num; i++)
2343 had_journal += journal_replay(journal_cur->files[i]);
2345 journal_new_file();
2347 /* it must have been a crash. start a flush */
2348 if (had_journal && config_flush_at_shutdown)
2349 flush_old_values(-1);
2351 pthread_mutex_unlock(&journal_lock);
2353 RRDD_LOG(LOG_INFO, "journal processing complete");
2355 } /* }}} static void journal_init */
2357 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2358 {
2359 assert(sock != NULL);
2361 free(sock->rbuf); sock->rbuf = NULL;
2362 free(sock->wbuf); sock->wbuf = NULL;
2363 free(sock);
2364 } /* }}} void free_listen_socket */
2366 static void close_connection(listen_socket_t *sock) /* {{{ */
2367 {
2368 if (sock->fd >= 0)
2369 {
2370 close(sock->fd);
2371 sock->fd = -1;
2372 }
2374 free_listen_socket(sock);
2376 } /* }}} void close_connection */
2378 static void *connection_thread_main (void *args) /* {{{ */
2379 {
2380 listen_socket_t *sock;
2381 int fd;
2383 sock = (listen_socket_t *) args;
2384 fd = sock->fd;
2386 /* init read buffers */
2387 sock->next_read = sock->next_cmd = 0;
2388 sock->rbuf = malloc(RBUF_SIZE);
2389 if (sock->rbuf == NULL)
2390 {
2391 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2392 close_connection(sock);
2393 return NULL;
2394 }
2396 pthread_mutex_lock (&connection_threads_lock);
2397 connection_threads_num++;
2398 pthread_mutex_unlock (&connection_threads_lock);
2400 while (state == RUNNING)
2401 {
2402 char *cmd;
2403 ssize_t cmd_len;
2404 ssize_t rbytes;
2405 time_t now;
2407 struct pollfd pollfd;
2408 int status;
2410 pollfd.fd = fd;
2411 pollfd.events = POLLIN | POLLPRI;
2412 pollfd.revents = 0;
2414 status = poll (&pollfd, 1, /* timeout = */ 500);
2415 if (state != RUNNING)
2416 break;
2417 else if (status == 0) /* timeout */
2418 continue;
2419 else if (status < 0) /* error */
2420 {
2421 status = errno;
2422 if (status != EINTR)
2423 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2424 continue;
2425 }
2427 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2428 break;
2429 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2430 {
2431 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2432 "poll(2) returned something unexpected: %#04hx",
2433 pollfd.revents);
2434 break;
2435 }
2437 rbytes = read(fd, sock->rbuf + sock->next_read,
2438 RBUF_SIZE - sock->next_read);
2439 if (rbytes < 0)
2440 {
2441 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2442 break;
2443 }
2444 else if (rbytes == 0)
2445 break; /* eof */
2447 sock->next_read += rbytes;
2449 if (sock->batch_start)
2450 now = sock->batch_start;
2451 else
2452 now = time(NULL);
2454 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2455 {
2456 status = handle_request (sock, now, cmd, cmd_len+1);
2457 if (status != 0)
2458 goto out_close;
2459 }
2460 }
2462 out_close:
2463 close_connection(sock);
2465 /* Remove this thread from the connection threads list */
2466 pthread_mutex_lock (&connection_threads_lock);
2467 connection_threads_num--;
2468 if (connection_threads_num <= 0)
2469 pthread_cond_broadcast(&connection_threads_done);
2470 pthread_mutex_unlock (&connection_threads_lock);
2472 return (NULL);
2473 } /* }}} void *connection_thread_main */
2475 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2476 {
2477 int fd;
2478 struct sockaddr_un sa;
2479 listen_socket_t *temp;
2480 int status;
2481 const char *path;
2482 char *path_copy, *dir;
2484 path = sock->addr;
2485 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2486 path += strlen("unix:");
2488 /* dirname may modify its argument */
2489 path_copy = strdup(path);
2490 if (path_copy == NULL)
2491 {
2492 fprintf(stderr, "rrdcached: strdup(): %s\n",
2493 rrd_strerror(errno));
2494 return (-1);
2495 }
2497 dir = dirname(path_copy);
2498 if (rrd_mkdir_p(dir, 0777) != 0)
2499 {
2500 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2501 dir, rrd_strerror(errno));
2502 return (-1);
2503 }
2505 free(path_copy);
2507 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2508 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2509 if (temp == NULL)
2510 {
2511 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2512 return (-1);
2513 }
2514 listen_fds = temp;
2515 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2517 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2518 if (fd < 0)
2519 {
2520 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2521 rrd_strerror(errno));
2522 return (-1);
2523 }
2525 memset (&sa, 0, sizeof (sa));
2526 sa.sun_family = AF_UNIX;
2527 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2529 /* if we've gotten this far, we own the pid file. any daemon started
2530 * with the same args must not be alive. therefore, ensure that we can
2531 * create the socket...
2532 */
2533 unlink(path);
2535 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2536 if (status != 0)
2537 {
2538 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2539 path, rrd_strerror(errno));
2540 close (fd);
2541 return (-1);
2542 }
2544 /* tweak the sockets group ownership */
2545 if (sock->socket_group != (gid_t)-1)
2546 {
2547 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2548 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2549 {
2550 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2551 }
2552 }
2554 if (sock->socket_permissions != (mode_t)-1)
2555 {
2556 if (chmod(path, sock->socket_permissions) != 0)
2557 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2558 (unsigned int)sock->socket_permissions, strerror(errno));
2559 }
2561 status = listen (fd, /* backlog = */ 10);
2562 if (status != 0)
2563 {
2564 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2565 path, rrd_strerror(errno));
2566 close (fd);
2567 unlink (path);
2568 return (-1);
2569 }
2571 listen_fds[listen_fds_num].fd = fd;
2572 listen_fds[listen_fds_num].family = PF_UNIX;
2573 strncpy(listen_fds[listen_fds_num].addr, path,
2574 sizeof (listen_fds[listen_fds_num].addr) - 1);
2575 listen_fds_num++;
2577 return (0);
2578 } /* }}} int open_listen_socket_unix */
2580 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2581 {
2582 struct addrinfo ai_hints;
2583 struct addrinfo *ai_res;
2584 struct addrinfo *ai_ptr;
2585 char addr_copy[NI_MAXHOST];
2586 char *addr;
2587 char *port;
2588 int status;
2590 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2591 addr_copy[sizeof (addr_copy) - 1] = 0;
2592 addr = addr_copy;
2594 memset (&ai_hints, 0, sizeof (ai_hints));
2595 ai_hints.ai_flags = 0;
2596 #ifdef AI_ADDRCONFIG
2597 ai_hints.ai_flags |= AI_ADDRCONFIG;
2598 #endif
2599 ai_hints.ai_family = AF_UNSPEC;
2600 ai_hints.ai_socktype = SOCK_STREAM;
2602 port = NULL;
2603 if (*addr == '[') /* IPv6+port format */
2604 {
2605 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2606 addr++;
2608 port = strchr (addr, ']');
2609 if (port == NULL)
2610 {
2611 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2612 return (-1);
2613 }
2614 *port = 0;
2615 port++;
2617 if (*port == ':')
2618 port++;
2619 else if (*port == 0)
2620 port = NULL;
2621 else
2622 {
2623 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2624 return (-1);
2625 }
2626 } /* if (*addr == '[') */
2627 else
2628 {
2629 port = rindex(addr, ':');
2630 if (port != NULL)
2631 {
2632 *port = 0;
2633 port++;
2634 }
2635 }
2636 ai_res = NULL;
2637 status = getaddrinfo (addr,
2638 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2639 &ai_hints, &ai_res);
2640 if (status != 0)
2641 {
2642 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2643 addr, gai_strerror (status));
2644 return (-1);
2645 }
2647 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2648 {
2649 int fd;
2650 listen_socket_t *temp;
2651 int one = 1;
2653 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2654 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2655 if (temp == NULL)
2656 {
2657 fprintf (stderr,
2658 "rrdcached: open_listen_socket_network: realloc failed.\n");
2659 continue;
2660 }
2661 listen_fds = temp;
2662 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2664 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2665 if (fd < 0)
2666 {
2667 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2668 rrd_strerror(errno));
2669 continue;
2670 }
2672 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2674 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2675 if (status != 0)
2676 {
2677 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2678 sock->addr, rrd_strerror(errno));
2679 close (fd);
2680 continue;
2681 }
2683 status = listen (fd, /* backlog = */ 10);
2684 if (status != 0)
2685 {
2686 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2687 sock->addr, rrd_strerror(errno));
2688 close (fd);
2689 freeaddrinfo(ai_res);
2690 return (-1);
2691 }
2693 listen_fds[listen_fds_num].fd = fd;
2694 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2695 listen_fds_num++;
2696 } /* for (ai_ptr) */
2698 freeaddrinfo(ai_res);
2699 return (0);
2700 } /* }}} static int open_listen_socket_network */
2702 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2703 {
2704 assert(sock != NULL);
2705 assert(sock->addr != NULL);
2707 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2708 || sock->addr[0] == '/')
2709 return (open_listen_socket_unix(sock));
2710 else
2711 return (open_listen_socket_network(sock));
2712 } /* }}} int open_listen_socket */
2714 static int close_listen_sockets (void) /* {{{ */
2715 {
2716 size_t i;
2718 for (i = 0; i < listen_fds_num; i++)
2719 {
2720 close (listen_fds[i].fd);
2722 if (listen_fds[i].family == PF_UNIX)
2723 unlink(listen_fds[i].addr);
2724 }
2726 free (listen_fds);
2727 listen_fds = NULL;
2728 listen_fds_num = 0;
2730 return (0);
2731 } /* }}} int close_listen_sockets */
2733 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2734 {
2735 struct pollfd *pollfds;
2736 int pollfds_num;
2737 int status;
2738 int i;
2740 if (listen_fds_num < 1)
2741 {
2742 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2743 return (NULL);
2744 }
2746 pollfds_num = listen_fds_num;
2747 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2748 if (pollfds == NULL)
2749 {
2750 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2751 return (NULL);
2752 }
2753 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2755 RRDD_LOG(LOG_INFO, "listening for connections");
2757 while (state == RUNNING)
2758 {
2759 for (i = 0; i < pollfds_num; i++)
2760 {
2761 pollfds[i].fd = listen_fds[i].fd;
2762 pollfds[i].events = POLLIN | POLLPRI;
2763 pollfds[i].revents = 0;
2764 }
2766 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2767 if (state != RUNNING)
2768 break;
2769 else if (status == 0) /* timeout */
2770 continue;
2771 else if (status < 0) /* error */
2772 {
2773 status = errno;
2774 if (status != EINTR)
2775 {
2776 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2777 }
2778 continue;
2779 }
2781 for (i = 0; i < pollfds_num; i++)
2782 {
2783 listen_socket_t *client_sock;
2784 struct sockaddr_storage client_sa;
2785 socklen_t client_sa_size;
2786 pthread_t tid;
2787 pthread_attr_t attr;
2789 if (pollfds[i].revents == 0)
2790 continue;
2792 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2793 {
2794 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2795 "poll(2) returned something unexpected for listen FD #%i.",
2796 pollfds[i].fd);
2797 continue;
2798 }
2800 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2801 if (client_sock == NULL)
2802 {
2803 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2804 continue;
2805 }
2806 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2808 client_sa_size = sizeof (client_sa);
2809 client_sock->fd = accept (pollfds[i].fd,
2810 (struct sockaddr *) &client_sa, &client_sa_size);
2811 if (client_sock->fd < 0)
2812 {
2813 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2814 free(client_sock);
2815 continue;
2816 }
2818 pthread_attr_init (&attr);
2819 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2821 status = pthread_create (&tid, &attr, connection_thread_main,
2822 client_sock);
2823 if (status != 0)
2824 {
2825 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2826 close_connection(client_sock);
2827 continue;
2828 }
2829 } /* for (pollfds_num) */
2830 } /* while (state == RUNNING) */
2832 RRDD_LOG(LOG_INFO, "starting shutdown");
2834 close_listen_sockets ();
2836 pthread_mutex_lock (&connection_threads_lock);
2837 while (connection_threads_num > 0)
2838 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2839 pthread_mutex_unlock (&connection_threads_lock);
2841 free(pollfds);
2843 return (NULL);
2844 } /* }}} void *listen_thread_main */
2846 static int daemonize (void) /* {{{ */
2847 {
2848 int pid_fd;
2849 char *base_dir;
2851 daemon_uid = geteuid();
2853 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2854 if (pid_fd < 0)
2855 pid_fd = check_pidfile();
2856 if (pid_fd < 0)
2857 return pid_fd;
2859 /* open all the listen sockets */
2860 if (config_listen_address_list_len > 0)
2861 {
2862 for (size_t i = 0; i < config_listen_address_list_len; i++)
2863 open_listen_socket (config_listen_address_list[i]);
2865 rrd_free_ptrs((void ***) &config_listen_address_list,
2866 &config_listen_address_list_len);
2867 }
2868 else
2869 {
2870 listen_socket_t sock;
2871 memset(&sock, 0, sizeof(sock));
2872 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2873 open_listen_socket (&sock);
2874 }
2876 if (listen_fds_num < 1)
2877 {
2878 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2879 goto error;
2880 }
2882 if (!stay_foreground)
2883 {
2884 pid_t child;
2886 child = fork ();
2887 if (child < 0)
2888 {
2889 fprintf (stderr, "daemonize: fork(2) failed.\n");
2890 goto error;
2891 }
2892 else if (child > 0)
2893 exit(0);
2895 /* Become session leader */
2896 setsid ();
2898 /* Open the first three file descriptors to /dev/null */
2899 close (2);
2900 close (1);
2901 close (0);
2903 open ("/dev/null", O_RDWR);
2904 if (dup(0) == -1 || dup(0) == -1){
2905 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2906 }
2907 } /* if (!stay_foreground) */
2909 /* Change into the /tmp directory. */
2910 base_dir = (config_base_dir != NULL)
2911 ? config_base_dir
2912 : "/tmp";
2914 if (chdir (base_dir) != 0)
2915 {
2916 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2917 goto error;
2918 }
2920 install_signal_handlers();
2922 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2923 RRDD_LOG(LOG_INFO, "starting up");
2925 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2926 (GDestroyNotify) free_cache_item);
2927 if (cache_tree == NULL)
2928 {
2929 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2930 goto error;
2931 }
2933 return write_pidfile (pid_fd);
2935 error:
2936 remove_pidfile();
2937 return -1;
2938 } /* }}} int daemonize */
2940 static int cleanup (void) /* {{{ */
2941 {
2942 pthread_cond_broadcast (&flush_cond);
2943 pthread_join (flush_thread, NULL);
2945 pthread_cond_broadcast (&queue_cond);
2946 for (int i = 0; i < config_queue_threads; i++)
2947 pthread_join (queue_threads[i], NULL);
2949 if (config_flush_at_shutdown)
2950 {
2951 assert(cache_queue_head == NULL);
2952 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2953 }
2955 free(queue_threads);
2956 free(config_base_dir);
2958 pthread_mutex_lock(&cache_lock);
2959 g_tree_destroy(cache_tree);
2961 pthread_mutex_lock(&journal_lock);
2962 journal_done();
2964 RRDD_LOG(LOG_INFO, "goodbye");
2965 closelog ();
2967 remove_pidfile ();
2968 free(config_pid_file);
2970 return (0);
2971 } /* }}} int cleanup */
2973 static int read_options (int argc, char **argv) /* {{{ */
2974 {
2975 int option;
2976 int status = 0;
2978 char **permissions = NULL;
2979 size_t permissions_len = 0;
2981 gid_t socket_group = (gid_t)-1;
2982 mode_t socket_permissions = (mode_t)-1;
2984 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2985 {
2986 switch (option)
2987 {
2988 case 'g':
2989 stay_foreground=1;
2990 break;
2992 case 'l':
2993 {
2994 listen_socket_t *new;
2996 new = malloc(sizeof(listen_socket_t));
2997 if (new == NULL)
2998 {
2999 fprintf(stderr, "read_options: malloc failed.\n");
3000 return(2);
3001 }
3002 memset(new, 0, sizeof(listen_socket_t));
3004 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3006 /* Add permissions to the socket {{{ */
3007 if (permissions_len != 0)
3008 {
3009 size_t i;
3010 for (i = 0; i < permissions_len; i++)
3011 {
3012 status = socket_permission_add (new, permissions[i]);
3013 if (status != 0)
3014 {
3015 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3016 "socket failed. Most likely, this permission doesn't "
3017 "exist. Check your command line.\n", permissions[i]);
3018 status = 4;
3019 }
3020 }
3021 }
3022 else /* if (permissions_len == 0) */
3023 {
3024 /* Add permission for ALL commands to the socket. */
3025 size_t i;
3026 for (i = 0; i < list_of_commands_len; i++)
3027 {
3028 status = socket_permission_add (new, list_of_commands[i].cmd);
3029 if (status != 0)
3030 {
3031 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3032 "socket failed. This should never happen, ever! Sorry.\n",
3033 permissions[i]);
3034 status = 4;
3035 }
3036 }
3037 }
3038 /* }}} Done adding permissions. */
3040 new->socket_group = socket_group;
3041 new->socket_permissions = socket_permissions;
3043 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3044 &config_listen_address_list_len, new))
3045 {
3046 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3047 return (2);
3048 }
3049 }
3050 break;
3052 /* set socket group permissions */
3053 case 's':
3054 {
3055 gid_t group_gid;
3056 struct group *grp;
3058 group_gid = strtoul(optarg, NULL, 10);
3059 if (errno != EINVAL && group_gid>0)
3060 {
3061 /* we were passed a number */
3062 grp = getgrgid(group_gid);
3063 }
3064 else
3065 {
3066 grp = getgrnam(optarg);
3067 }
3069 if (grp)
3070 {
3071 socket_group = grp->gr_gid;
3072 }
3073 else
3074 {
3075 /* no idea what the user wanted... */
3076 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3077 return (5);
3078 }
3079 }
3080 break;
3082 /* set socket file permissions */
3083 case 'm':
3084 {
3085 long tmp;
3086 char *endptr = NULL;
3088 tmp = strtol (optarg, &endptr, 8);
3089 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3090 || (tmp > 07777) || (tmp < 0)) {
3091 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3092 optarg);
3093 return (5);
3094 }
3096 socket_permissions = (mode_t)tmp;
3097 }
3098 break;
3100 case 'P':
3101 {
3102 char *optcopy;
3103 char *saveptr;
3104 char *dummy;
3105 char *ptr;
3107 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3109 optcopy = strdup (optarg);
3110 dummy = optcopy;
3111 saveptr = NULL;
3112 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3113 {
3114 dummy = NULL;
3115 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3116 }
3118 free (optcopy);
3119 }
3120 break;
3122 case 'f':
3123 {
3124 int temp;
3126 temp = atoi (optarg);
3127 if (temp > 0)
3128 config_flush_interval = temp;
3129 else
3130 {
3131 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3132 status = 3;
3133 }
3134 }
3135 break;
3137 case 'w':
3138 {
3139 int temp;
3141 temp = atoi (optarg);
3142 if (temp > 0)
3143 config_write_interval = temp;
3144 else
3145 {
3146 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3147 status = 2;
3148 }
3149 }
3150 break;
3152 case 'z':
3153 {
3154 int temp;
3156 temp = atoi(optarg);
3157 if (temp > 0)
3158 config_write_jitter = temp;
3159 else
3160 {
3161 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3162 status = 2;
3163 }
3165 break;
3166 }
3168 case 't':
3169 {
3170 int threads;
3171 threads = atoi(optarg);
3172 if (threads >= 1)
3173 config_queue_threads = threads;
3174 else
3175 {
3176 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3177 return 1;
3178 }
3179 }
3180 break;
3182 case 'B':
3183 config_write_base_only = 1;
3184 break;
3186 case 'b':
3187 {
3188 size_t len;
3189 char base_realpath[PATH_MAX];
3191 if (config_base_dir != NULL)
3192 free (config_base_dir);
3193 config_base_dir = strdup (optarg);
3194 if (config_base_dir == NULL)
3195 {
3196 fprintf (stderr, "read_options: strdup failed.\n");
3197 return (3);
3198 }
3200 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3201 {
3202 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3203 config_base_dir, rrd_strerror (errno));
3204 return (3);
3205 }
3207 /* make sure that the base directory is not resolved via
3208 * symbolic links. this makes some performance-enhancing
3209 * assumptions possible (we don't have to resolve paths
3210 * that start with a "/")
3211 */
3212 if (realpath(config_base_dir, base_realpath) == NULL)
3213 {
3214 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3215 "%s\n", config_base_dir, rrd_strerror(errno));
3216 return 5;
3217 }
3219 len = strlen (config_base_dir);
3220 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3221 {
3222 config_base_dir[len - 1] = 0;
3223 len--;
3224 }
3226 if (len < 1)
3227 {
3228 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3229 return (4);
3230 }
3232 _config_base_dir_len = len;
3234 len = strlen (base_realpath);
3235 while ((len > 0) && (base_realpath[len - 1] == '/'))
3236 {
3237 base_realpath[len - 1] = '\0';
3238 len--;
3239 }
3241 if (strncmp(config_base_dir,
3242 base_realpath, sizeof(base_realpath)) != 0)
3243 {
3244 fprintf(stderr,
3245 "Base directory (-b) resolved via file system links!\n"
3246 "Please consult rrdcached '-b' documentation!\n"
3247 "Consider specifying the real directory (%s)\n",
3248 base_realpath);
3249 return 5;
3250 }
3251 }
3252 break;
3254 case 'p':
3255 {
3256 if (config_pid_file != NULL)
3257 free (config_pid_file);
3258 config_pid_file = strdup (optarg);
3259 if (config_pid_file == NULL)
3260 {
3261 fprintf (stderr, "read_options: strdup failed.\n");
3262 return (3);
3263 }
3264 }
3265 break;
3267 case 'F':
3268 config_flush_at_shutdown = 1;
3269 break;
3271 case 'j':
3272 {
3273 char journal_dir_actual[PATH_MAX];
3274 const char *dir;
3275 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3277 status = rrd_mkdir_p(dir, 0777);
3278 if (status != 0)
3279 {
3280 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3281 dir, rrd_strerror(errno));
3282 return 6;
3283 }
3285 if (access(dir, R_OK|W_OK|X_OK) != 0)
3286 {
3287 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3288 errno ? rrd_strerror(errno) : "");
3289 return 6;
3290 }
3291 }
3292 break;
3294 case 'a':
3295 {
3296 int temp = atoi(optarg);
3297 if (temp > 0)
3298 config_alloc_chunk = temp;
3299 else
3300 {
3301 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3302 return 10;
3303 }
3304 }
3305 break;
3307 case 'h':
3308 case '?':
3309 printf ("RRDCacheD %s\n"
3310 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3311 "\n"
3312 "Usage: rrdcached [options]\n"
3313 "\n"
3314 "Valid options are:\n"
3315 " -l <address> Socket address to listen to.\n"
3316 " -P <perms> Sets the permissions to assign to all following "
3317 "sockets\n"
3318 " -w <seconds> Interval in which to write data.\n"
3319 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3320 " -t <threads> Number of write threads.\n"
3321 " -f <seconds> Interval in which to flush dead data.\n"
3322 " -p <file> Location of the PID-file.\n"
3323 " -b <dir> Base directory to change to.\n"
3324 " -B Restrict file access to paths within -b <dir>\n"
3325 " -g Do not fork and run in the foreground.\n"
3326 " -j <dir> Directory in which to create the journal files.\n"
3327 " -F Always flush all updates at shutdown\n"
3328 " -s <id|name> Group owner of all following UNIX sockets\n"
3329 " (the socket will also have read/write permissions "
3330 "for that group)\n"
3331 " -m <mode> File permissions (octal) of all following UNIX "
3332 "sockets\n"
3333 " -a <size> Memory allocation chunk size. Default is 1."
3334 "\n"
3335 "For more information and a detailed description of all options "
3336 "please refer\n"
3337 "to the rrdcached(1) manual page.\n",
3338 VERSION);
3339 if (option == 'h')
3340 status = -1;
3341 else
3342 status = 1;
3343 break;
3344 } /* switch (option) */
3345 } /* while (getopt) */
3347 /* advise the user when values are not sane */
3348 if (config_flush_interval < 2 * config_write_interval)
3349 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3350 " 2x write interval (-w) !\n");
3351 if (config_write_jitter > config_write_interval)
3352 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3353 " write interval (-w) !\n");
3355 if (config_write_base_only && config_base_dir == NULL)
3356 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3357 " Consult the rrdcached documentation\n");
3359 if (journal_dir == NULL)
3360 config_flush_at_shutdown = 1;
3362 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3364 return (status);
3365 } /* }}} int read_options */
3367 int main (int argc, char **argv)
3368 {
3369 int status;
3371 status = read_options (argc, argv);
3372 if (status != 0)
3373 {
3374 if (status < 0)
3375 status = 0;
3376 return (status);
3377 }
3379 status = daemonize ();
3380 if (status != 0)
3381 {
3382 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3383 return (1);
3384 }
3386 journal_init();
3388 /* start the queue threads */
3389 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3390 if (queue_threads == NULL)
3391 {
3392 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3393 cleanup();
3394 return (1);
3395 }
3396 for (int i = 0; i < config_queue_threads; i++)
3397 {
3398 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3399 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3400 if (status != 0)
3401 {
3402 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3403 cleanup();
3404 return (1);
3405 }
3406 }
3408 /* start the flush thread */
3409 memset(&flush_thread, 0, sizeof(flush_thread));
3410 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3411 if (status != 0)
3412 {
3413 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3414 cleanup();
3415 return (1);
3416 }
3418 listen_thread_main (NULL);
3419 cleanup ();
3421 return (0);
3422 } /* int main */
3424 /*
3425 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3426 */