1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116 do { \
117 if (stay_foreground) \
118 fprintf(stderr, __VA_ARGS__); \
119 syslog ((severity), __VA_ARGS__); \
120 } while (0)
122 /*
123 * Types
124 */
125 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127 struct listen_socket_s
128 {
129 int fd;
130 char addr[PATH_MAX + 1];
131 int family;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
145 uint32_t permissions;
147 gid_t socket_group;
148 mode_t socket_permissions;
149 };
150 typedef struct listen_socket_s listen_socket_t;
152 struct command_s;
153 typedef struct command_s command_t;
154 /* note: guard against "unused" warnings in the handlers */
155 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
156 time_t UNUSED(now),\
157 char UNUSED(*buffer),\
158 size_t UNUSED(buffer_size)
160 #define HANDLER_PROTO command_t UNUSED(*cmd),\
161 DISPATCH_PROTO
163 struct command_s {
164 char *cmd;
165 int (*handler)(HANDLER_PROTO);
167 char context; /* where we expect to see it */
168 #define CMD_CONTEXT_CLIENT (1<<0)
169 #define CMD_CONTEXT_BATCH (1<<1)
170 #define CMD_CONTEXT_JOURNAL (1<<2)
171 #define CMD_CONTEXT_ANY (0x7f)
173 char *syntax;
174 char *help;
175 };
177 struct cache_item_s;
178 typedef struct cache_item_s cache_item_t;
179 struct cache_item_s
180 {
181 char *file;
182 char **values;
183 size_t values_num; /* number of valid pointers */
184 size_t values_alloc; /* number of allocated pointers */
185 time_t last_flush_time;
186 time_t last_update_stamp;
187 #define CI_FLAGS_IN_TREE (1<<0)
188 #define CI_FLAGS_IN_QUEUE (1<<1)
189 int flags;
190 pthread_cond_t flushed;
191 cache_item_t *prev;
192 cache_item_t *next;
193 };
195 struct callback_flush_data_s
196 {
197 time_t now;
198 time_t abs_timeout;
199 char **keys;
200 size_t keys_num;
201 };
202 typedef struct callback_flush_data_s callback_flush_data_t;
204 enum queue_side_e
205 {
206 HEAD,
207 TAIL
208 };
209 typedef enum queue_side_e queue_side_t;
211 /* describe a set of journal files */
212 typedef struct {
213 char **files;
214 size_t files_num;
215 } journal_set;
217 /* max length of socket command or response */
218 #define CMD_MAX 4096
219 #define RBUF_SIZE (CMD_MAX*2)
221 /*
222 * Variables
223 */
224 static int stay_foreground = 0;
225 static uid_t daemon_uid;
227 static listen_socket_t *listen_fds = NULL;
228 static size_t listen_fds_num = 0;
230 enum {
231 RUNNING, /* normal operation */
232 FLUSHING, /* flushing remaining values */
233 SHUTDOWN /* shutting down */
234 } state = RUNNING;
236 static pthread_t *queue_threads;
237 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
238 static int config_queue_threads = 4;
240 static pthread_t flush_thread;
241 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
243 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
244 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
245 static int connection_threads_num = 0;
247 /* Cache stuff */
248 static GTree *cache_tree = NULL;
249 static cache_item_t *cache_queue_head = NULL;
250 static cache_item_t *cache_queue_tail = NULL;
251 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
253 static int config_write_interval = 300;
254 static int config_write_jitter = 0;
255 static int config_flush_interval = 3600;
256 static int config_flush_at_shutdown = 0;
257 static char *config_pid_file = NULL;
258 static char *config_base_dir = NULL;
259 static size_t _config_base_dir_len = 0;
260 static int config_write_base_only = 0;
261 static size_t config_alloc_chunk = 1;
263 static listen_socket_t **config_listen_address_list = NULL;
264 static size_t config_listen_address_list_len = 0;
266 static uint64_t stats_queue_length = 0;
267 static uint64_t stats_updates_received = 0;
268 static uint64_t stats_flush_received = 0;
269 static uint64_t stats_updates_written = 0;
270 static uint64_t stats_data_sets_written = 0;
271 static uint64_t stats_journal_bytes = 0;
272 static uint64_t stats_journal_rotate = 0;
273 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL; /* current journal file handle */
282 static long journal_size = 0; /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
292 /*
293 * Functions
294 */
295 static void sig_common (const char *sig) /* {{{ */
296 {
297 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298 state = FLUSHING;
299 pthread_cond_broadcast(&flush_cond);
300 pthread_cond_broadcast(&queue_cond);
301 } /* }}} void sig_common */
303 static void sig_int_handler (int UNUSED(s)) /* {{{ */
304 {
305 sig_common("INT");
306 } /* }}} void sig_int_handler */
308 static void sig_term_handler (int UNUSED(s)) /* {{{ */
309 {
310 sig_common("TERM");
311 } /* }}} void sig_term_handler */
313 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
314 {
315 config_flush_at_shutdown = 1;
316 sig_common("USR1");
317 } /* }}} void sig_usr1_handler */
319 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
320 {
321 config_flush_at_shutdown = 0;
322 sig_common("USR2");
323 } /* }}} void sig_usr2_handler */
325 static void install_signal_handlers(void) /* {{{ */
326 {
327 /* These structures are static, because `sigaction' behaves weird if the are
328 * overwritten.. */
329 static struct sigaction sa_int;
330 static struct sigaction sa_term;
331 static struct sigaction sa_pipe;
332 static struct sigaction sa_usr1;
333 static struct sigaction sa_usr2;
335 /* Install signal handlers */
336 memset (&sa_int, 0, sizeof (sa_int));
337 sa_int.sa_handler = sig_int_handler;
338 sigaction (SIGINT, &sa_int, NULL);
340 memset (&sa_term, 0, sizeof (sa_term));
341 sa_term.sa_handler = sig_term_handler;
342 sigaction (SIGTERM, &sa_term, NULL);
344 memset (&sa_pipe, 0, sizeof (sa_pipe));
345 sa_pipe.sa_handler = SIG_IGN;
346 sigaction (SIGPIPE, &sa_pipe, NULL);
348 memset (&sa_pipe, 0, sizeof (sa_usr1));
349 sa_usr1.sa_handler = sig_usr1_handler;
350 sigaction (SIGUSR1, &sa_usr1, NULL);
352 memset (&sa_usr2, 0, sizeof (sa_usr2));
353 sa_usr2.sa_handler = sig_usr2_handler;
354 sigaction (SIGUSR2, &sa_usr2, NULL);
356 } /* }}} void install_signal_handlers */
358 static int open_pidfile(char *action, int oflag) /* {{{ */
359 {
360 int fd;
361 const char *file;
362 char *file_copy, *dir;
364 file = (config_pid_file != NULL)
365 ? config_pid_file
366 : LOCALSTATEDIR "/run/rrdcached.pid";
368 /* dirname may modify its argument */
369 file_copy = strdup(file);
370 if (file_copy == NULL)
371 {
372 fprintf(stderr, "rrdcached: strdup(): %s\n",
373 rrd_strerror(errno));
374 return -1;
375 }
377 dir = dirname(file_copy);
378 if (rrd_mkdir_p(dir, 0777) != 0)
379 {
380 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
381 dir, rrd_strerror(errno));
382 return -1;
383 }
385 free(file_copy);
387 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
388 if (fd < 0)
389 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
390 action, file, rrd_strerror(errno));
392 return(fd);
393 } /* }}} static int open_pidfile */
395 /* check existing pid file to see whether a daemon is running */
396 static int check_pidfile(void)
397 {
398 int pid_fd;
399 pid_t pid;
400 char pid_str[16];
402 pid_fd = open_pidfile("open", O_RDWR);
403 if (pid_fd < 0)
404 return pid_fd;
406 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
407 return -1;
409 pid = atoi(pid_str);
410 if (pid <= 0)
411 return -1;
413 /* another running process that we can signal COULD be
414 * a competing rrdcached */
415 if (pid != getpid() && kill(pid, 0) == 0)
416 {
417 fprintf(stderr,
418 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
419 close(pid_fd);
420 return -1;
421 }
423 lseek(pid_fd, 0, SEEK_SET);
424 if (ftruncate(pid_fd, 0) == -1)
425 {
426 fprintf(stderr,
427 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
428 close(pid_fd);
429 return -1;
430 }
432 fprintf(stderr,
433 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
434 "rrdcached: starting normally.\n", pid);
436 return pid_fd;
437 } /* }}} static int check_pidfile */
439 static int write_pidfile (int fd) /* {{{ */
440 {
441 pid_t pid;
442 FILE *fh;
444 pid = getpid ();
446 fh = fdopen (fd, "w");
447 if (fh == NULL)
448 {
449 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
450 close(fd);
451 return (-1);
452 }
454 fprintf (fh, "%i\n", (int) pid);
455 fclose (fh);
457 return (0);
458 } /* }}} int write_pidfile */
460 static int remove_pidfile (void) /* {{{ */
461 {
462 char *file;
463 int status;
465 file = (config_pid_file != NULL)
466 ? config_pid_file
467 : LOCALSTATEDIR "/run/rrdcached.pid";
469 status = unlink (file);
470 if (status == 0)
471 return (0);
472 return (errno);
473 } /* }}} int remove_pidfile */
475 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
476 {
477 char *eol;
479 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
480 sock->next_read - sock->next_cmd);
482 if (eol == NULL)
483 {
484 /* no commands left, move remainder back to front of rbuf */
485 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
486 sock->next_read - sock->next_cmd);
487 sock->next_read -= sock->next_cmd;
488 sock->next_cmd = 0;
489 *len = 0;
490 return NULL;
491 }
492 else
493 {
494 char *cmd = sock->rbuf + sock->next_cmd;
495 *eol = '\0';
497 sock->next_cmd = eol - sock->rbuf + 1;
499 if (eol > sock->rbuf && *(eol-1) == '\r')
500 *(--eol) = '\0'; /* handle "\r\n" EOL */
502 *len = eol - cmd;
504 return cmd;
505 }
507 /* NOTREACHED */
508 assert(1==0);
509 } /* }}} char *next_cmd */
511 /* add the characters directly to the write buffer */
512 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
513 {
514 char *new_buf;
516 assert(sock != NULL);
518 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
519 if (new_buf == NULL)
520 {
521 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
522 return -1;
523 }
525 strncpy(new_buf + sock->wbuf_len, str, len + 1);
527 sock->wbuf = new_buf;
528 sock->wbuf_len += len;
530 return 0;
531 } /* }}} static int add_to_wbuf */
533 /* add the text to the "extra" info that's sent after the status line */
534 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
535 {
536 va_list argp;
537 char buffer[CMD_MAX];
538 int len;
540 if (JOURNAL_REPLAY(sock)) return 0;
541 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
543 va_start(argp, fmt);
544 #ifdef HAVE_VSNPRINTF
545 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
546 #else
547 len = vsprintf(buffer, fmt, argp);
548 #endif
549 va_end(argp);
550 if (len < 0)
551 {
552 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
553 return -1;
554 }
556 return add_to_wbuf(sock, buffer, len);
557 } /* }}} static int add_response_info */
559 static int count_lines(char *str) /* {{{ */
560 {
561 int lines = 0;
563 if (str != NULL)
564 {
565 while ((str = strchr(str, '\n')) != NULL)
566 {
567 ++lines;
568 ++str;
569 }
570 }
572 return lines;
573 } /* }}} static int count_lines */
575 /* send the response back to the user.
576 * returns 0 on success, -1 on error
577 * write buffer is always zeroed after this call */
578 static int send_response (listen_socket_t *sock, response_code rc,
579 char *fmt, ...) /* {{{ */
580 {
581 va_list argp;
582 char buffer[CMD_MAX];
583 int lines;
584 ssize_t wrote;
585 int rclen, len;
587 if (JOURNAL_REPLAY(sock)) return rc;
589 if (sock->batch_start)
590 {
591 if (rc == RESP_OK)
592 return rc; /* no response on success during BATCH */
593 lines = sock->batch_cmd;
594 }
595 else if (rc == RESP_OK)
596 lines = count_lines(sock->wbuf);
597 else
598 lines = -1;
600 rclen = sprintf(buffer, "%d ", lines);
601 va_start(argp, fmt);
602 #ifdef HAVE_VSNPRINTF
603 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
604 #else
605 len = vsprintf(buffer+rclen, fmt, argp);
606 #endif
607 va_end(argp);
608 if (len < 0)
609 return -1;
611 len += rclen;
613 /* append the result to the wbuf, don't write to the user */
614 if (sock->batch_start)
615 return add_to_wbuf(sock, buffer, len);
617 /* first write must be complete */
618 if (len != write(sock->fd, buffer, len))
619 {
620 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
621 return -1;
622 }
624 if (sock->wbuf != NULL && rc == RESP_OK)
625 {
626 wrote = 0;
627 while (wrote < sock->wbuf_len)
628 {
629 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
630 if (wb <= 0)
631 {
632 RRDD_LOG(LOG_INFO, "send_response: could not write results");
633 return -1;
634 }
635 wrote += wb;
636 }
637 }
639 free(sock->wbuf); sock->wbuf = NULL;
640 sock->wbuf_len = 0;
642 return 0;
643 } /* }}} */
645 static void wipe_ci_values(cache_item_t *ci, time_t when)
646 {
647 ci->values = NULL;
648 ci->values_num = 0;
649 ci->values_alloc = 0;
651 ci->last_flush_time = when;
652 if (config_write_jitter > 0)
653 ci->last_flush_time += (rrd_random() % config_write_jitter);
654 }
656 /* remove_from_queue
657 * remove a "cache_item_t" item from the queue.
658 * must hold 'cache_lock' when calling this
659 */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
661 {
662 if (ci == NULL) return;
663 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
665 if (ci->prev == NULL)
666 cache_queue_head = ci->next; /* reset head */
667 else
668 ci->prev->next = ci->next;
670 if (ci->next == NULL)
671 cache_queue_tail = ci->prev; /* reset the tail */
672 else
673 ci->next->prev = ci->prev;
675 ci->next = ci->prev = NULL;
676 ci->flags &= ~CI_FLAGS_IN_QUEUE;
678 pthread_mutex_lock (&stats_lock);
679 assert (stats_queue_length > 0);
680 stats_queue_length--;
681 pthread_mutex_unlock (&stats_lock);
683 } /* }}} static void remove_from_queue */
685 /* free the resources associated with the cache_item_t
686 * must hold cache_lock when calling this function
687 */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
689 {
690 if (ci == NULL) return NULL;
692 remove_from_queue(ci);
694 for (size_t i=0; i < ci->values_num; i++)
695 free(ci->values[i]);
697 free (ci->values);
698 free (ci->file);
700 /* in case anyone is waiting */
701 pthread_cond_broadcast(&ci->flushed);
702 pthread_cond_destroy(&ci->flushed);
704 free (ci);
706 return NULL;
707 } /* }}} static void *free_cache_item */
709 /*
710 * enqueue_cache_item:
711 * `cache_lock' must be acquired before calling this function!
712 */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714 queue_side_t side)
715 {
716 if (ci == NULL)
717 return (-1);
719 if (ci->values_num == 0)
720 return (0);
722 if (side == HEAD)
723 {
724 if (cache_queue_head == ci)
725 return 0;
727 /* remove if further down in queue */
728 remove_from_queue(ci);
730 ci->prev = NULL;
731 ci->next = cache_queue_head;
732 if (ci->next != NULL)
733 ci->next->prev = ci;
734 cache_queue_head = ci;
736 if (cache_queue_tail == NULL)
737 cache_queue_tail = cache_queue_head;
738 }
739 else /* (side == TAIL) */
740 {
741 /* We don't move values back in the list.. */
742 if (ci->flags & CI_FLAGS_IN_QUEUE)
743 return (0);
745 assert (ci->next == NULL);
746 assert (ci->prev == NULL);
748 ci->prev = cache_queue_tail;
750 if (cache_queue_tail == NULL)
751 cache_queue_head = ci;
752 else
753 cache_queue_tail->next = ci;
755 cache_queue_tail = ci;
756 }
758 ci->flags |= CI_FLAGS_IN_QUEUE;
760 pthread_cond_signal(&queue_cond);
761 pthread_mutex_lock (&stats_lock);
762 stats_queue_length++;
763 pthread_mutex_unlock (&stats_lock);
765 return (0);
766 } /* }}} int enqueue_cache_item */
768 /*
769 * tree_callback_flush:
770 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771 * while this is in progress.
772 */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774 gpointer data)
775 {
776 cache_item_t *ci;
777 callback_flush_data_t *cfd;
779 ci = (cache_item_t *) value;
780 cfd = (callback_flush_data_t *) data;
782 if (ci->flags & CI_FLAGS_IN_QUEUE)
783 return FALSE;
785 if (ci->values_num > 0
786 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787 {
788 enqueue_cache_item (ci, TAIL);
789 }
790 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791 && (ci->values_num <= 0))
792 {
793 assert ((char *) key == ci->file);
794 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795 {
796 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797 return (FALSE);
798 }
799 }
801 return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
804 static int flush_old_values (int max_age)
805 {
806 callback_flush_data_t cfd;
807 size_t k;
809 memset (&cfd, 0, sizeof (cfd));
810 /* Pass the current time as user data so that we don't need to call
811 * `time' for each node. */
812 cfd.now = time (NULL);
813 cfd.keys = NULL;
814 cfd.keys_num = 0;
816 if (max_age > 0)
817 cfd.abs_timeout = cfd.now - max_age;
818 else
819 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
821 /* `tree_callback_flush' will return the keys of all values that haven't
822 * been touched in the last `config_flush_interval' seconds in `cfd'.
823 * The char*'s in this array point to the same memory as ci->file, so we
824 * don't need to free them separately. */
825 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
827 for (k = 0; k < cfd.keys_num; k++)
828 {
829 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830 /* should never fail, since we have held the cache_lock
831 * the entire time */
832 assert(status == TRUE);
833 }
835 if (cfd.keys != NULL)
836 {
837 free (cfd.keys);
838 cfd.keys = NULL;
839 }
841 return (0);
842 } /* int flush_old_values */
844 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
845 {
846 struct timeval now;
847 struct timespec next_flush;
848 int status;
850 gettimeofday (&now, NULL);
851 next_flush.tv_sec = now.tv_sec + config_flush_interval;
852 next_flush.tv_nsec = 1000 * now.tv_usec;
854 pthread_mutex_lock(&cache_lock);
856 while (state == RUNNING)
857 {
858 gettimeofday (&now, NULL);
859 if ((now.tv_sec > next_flush.tv_sec)
860 || ((now.tv_sec == next_flush.tv_sec)
861 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862 {
863 RRDD_LOG(LOG_DEBUG, "flushing old values");
865 /* Determine the time of the next cache flush. */
866 next_flush.tv_sec = now.tv_sec + config_flush_interval;
868 /* Flush all values that haven't been written in the last
869 * `config_write_interval' seconds. */
870 flush_old_values (config_write_interval);
872 /* unlock the cache while we rotate so we don't block incoming
873 * updates if the fsync() blocks on disk I/O */
874 pthread_mutex_unlock(&cache_lock);
875 journal_rotate();
876 pthread_mutex_lock(&cache_lock);
877 }
879 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880 if (status != 0 && status != ETIMEDOUT)
881 {
882 RRDD_LOG (LOG_ERR, "flush_thread_main: "
883 "pthread_cond_timedwait returned %i.", status);
884 }
885 }
887 if (config_flush_at_shutdown)
888 flush_old_values (-1); /* flush everything */
890 state = SHUTDOWN;
892 pthread_mutex_unlock(&cache_lock);
894 return NULL;
895 } /* void *flush_thread_main */
897 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
898 {
899 pthread_mutex_lock (&cache_lock);
901 while (state != SHUTDOWN
902 || (cache_queue_head != NULL && config_flush_at_shutdown))
903 {
904 cache_item_t *ci;
905 char *file;
906 char **values;
907 size_t values_num;
908 int status;
910 /* Now, check if there's something to store away. If not, wait until
911 * something comes in. */
912 if (cache_queue_head == NULL)
913 {
914 status = pthread_cond_wait (&queue_cond, &cache_lock);
915 if ((status != 0) && (status != ETIMEDOUT))
916 {
917 RRDD_LOG (LOG_ERR, "queue_thread_main: "
918 "pthread_cond_wait returned %i.", status);
919 }
920 }
922 /* Check if a value has arrived. This may be NULL if we timed out or there
923 * was an interrupt such as a signal. */
924 if (cache_queue_head == NULL)
925 continue;
927 ci = cache_queue_head;
929 /* copy the relevant parts */
930 file = strdup (ci->file);
931 if (file == NULL)
932 {
933 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934 continue;
935 }
937 assert(ci->values != NULL);
938 assert(ci->values_num > 0);
940 values = ci->values;
941 values_num = ci->values_num;
943 wipe_ci_values(ci, time(NULL));
944 remove_from_queue(ci);
946 pthread_mutex_unlock (&cache_lock);
948 rrd_clear_error ();
949 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950 if (status != 0)
951 {
952 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953 "rrd_update_r (%s) failed with status %i. (%s)",
954 file, status, rrd_get_error());
955 }
957 journal_write("wrote", file);
959 /* Search again in the tree. It's possible someone issued a "FORGET"
960 * while we were writing the update values. */
961 pthread_mutex_lock(&cache_lock);
962 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963 if (ci)
964 pthread_cond_broadcast(&ci->flushed);
965 pthread_mutex_unlock(&cache_lock);
967 if (status == 0)
968 {
969 pthread_mutex_lock (&stats_lock);
970 stats_updates_written++;
971 stats_data_sets_written += values_num;
972 pthread_mutex_unlock (&stats_lock);
973 }
975 rrd_free_ptrs((void ***) &values, &values_num);
976 free(file);
978 pthread_mutex_lock (&cache_lock);
979 }
980 pthread_mutex_unlock (&cache_lock);
982 return (NULL);
983 } /* }}} void *queue_thread_main */
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986 size_t *buffer_size_ret, char **field_ret)
987 {
988 char *buffer;
989 size_t buffer_pos;
990 size_t buffer_size;
991 char *field;
992 size_t field_size;
993 int status;
995 buffer = *buffer_ret;
996 buffer_pos = 0;
997 buffer_size = *buffer_size_ret;
998 field = *buffer_ret;
999 field_size = 0;
1001 if (buffer_size <= 0)
1002 return (-1);
1004 /* This is ensured by `handle_request'. */
1005 assert (buffer[buffer_size - 1] == '\0');
1007 status = -1;
1008 while (buffer_pos < buffer_size)
1009 {
1010 /* Check for end-of-field or end-of-buffer */
1011 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012 {
1013 field[field_size] = 0;
1014 field_size++;
1015 buffer_pos++;
1016 status = 0;
1017 break;
1018 }
1019 /* Handle escaped characters. */
1020 else if (buffer[buffer_pos] == '\\')
1021 {
1022 if (buffer_pos >= (buffer_size - 1))
1023 break;
1024 buffer_pos++;
1025 field[field_size] = buffer[buffer_pos];
1026 field_size++;
1027 buffer_pos++;
1028 }
1029 /* Normal operation */
1030 else
1031 {
1032 field[field_size] = buffer[buffer_pos];
1033 field_size++;
1034 buffer_pos++;
1035 }
1036 } /* while (buffer_pos < buffer_size) */
1038 if (status != 0)
1039 return (status);
1041 *buffer_ret = buffer + buffer_pos;
1042 *buffer_size_ret = buffer_size - buffer_pos;
1043 *field_ret = field;
1045 return (0);
1046 } /* }}} int buffer_get_field */
1048 /* if we're restricting writes to the base directory,
1049 * check whether the file falls within the dir
1050 * returns 1 if OK, otherwise 0
1051 */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1053 {
1054 assert(file != NULL);
1056 if (!config_write_base_only
1057 || JOURNAL_REPLAY(sock)
1058 || config_base_dir == NULL)
1059 return 1;
1061 if (strstr(file, "../") != NULL) goto err;
1063 /* relative paths without "../" are ok */
1064 if (*file != '/') return 1;
1066 /* file must be of the format base + "/" + <1+ char filename> */
1067 if (strlen(file) < _config_base_dir_len + 2) goto err;
1068 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069 if (*(file + _config_base_dir_len) != '/') goto err;
1071 return 1;
1073 err:
1074 if (sock != NULL && sock->fd >= 0)
1075 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1077 return 0;
1078 } /* }}} static int check_file_access */
1080 /* when using a base dir, convert relative paths to absolute paths.
1081 * if necessary, modifies the "filename" pointer to point
1082 * to the new path created in "tmp". "tmp" is provided
1083 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084 *
1085 * this allows us to optimize for the expected case (absolute path)
1086 * with a no-op.
1087 */
1088 static void get_abs_path(char **filename, char *tmp)
1089 {
1090 assert(tmp != NULL);
1091 assert(filename != NULL && *filename != NULL);
1093 if (config_base_dir == NULL || **filename == '/')
1094 return;
1096 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097 *filename = tmp;
1098 } /* }}} static int get_abs_path */
1100 static int flush_file (const char *filename) /* {{{ */
1101 {
1102 cache_item_t *ci;
1104 pthread_mutex_lock (&cache_lock);
1106 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107 if (ci == NULL)
1108 {
1109 pthread_mutex_unlock (&cache_lock);
1110 return (ENOENT);
1111 }
1113 if (ci->values_num > 0)
1114 {
1115 /* Enqueue at head */
1116 enqueue_cache_item (ci, HEAD);
1117 pthread_cond_wait(&ci->flushed, &cache_lock);
1118 }
1120 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1121 * may have been purged during our cond_wait() */
1123 pthread_mutex_unlock(&cache_lock);
1125 return (0);
1126 } /* }}} int flush_file */
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1129 {
1130 char *err = "Syntax error.\n";
1132 if (cmd && cmd->syntax)
1133 err = cmd->syntax;
1135 return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1139 {
1140 uint64_t copy_queue_length;
1141 uint64_t copy_updates_received;
1142 uint64_t copy_flush_received;
1143 uint64_t copy_updates_written;
1144 uint64_t copy_data_sets_written;
1145 uint64_t copy_journal_bytes;
1146 uint64_t copy_journal_rotate;
1148 uint64_t tree_nodes_number;
1149 uint64_t tree_depth;
1151 pthread_mutex_lock (&stats_lock);
1152 copy_queue_length = stats_queue_length;
1153 copy_updates_received = stats_updates_received;
1154 copy_flush_received = stats_flush_received;
1155 copy_updates_written = stats_updates_written;
1156 copy_data_sets_written = stats_data_sets_written;
1157 copy_journal_bytes = stats_journal_bytes;
1158 copy_journal_rotate = stats_journal_rotate;
1159 pthread_mutex_unlock (&stats_lock);
1161 pthread_mutex_lock (&cache_lock);
1162 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163 tree_depth = (uint64_t) g_tree_height (cache_tree);
1164 pthread_mutex_unlock (&cache_lock);
1166 add_response_info(sock,
1167 "QueueLength: %"PRIu64"\n", copy_queue_length);
1168 add_response_info(sock,
1169 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170 add_response_info(sock,
1171 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172 add_response_info(sock,
1173 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174 add_response_info(sock,
1175 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1181 send_response(sock, RESP_OK, "Statistics follow\n");
1183 return (0);
1184 } /* }}} int handle_request_stats */
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1187 {
1188 char *file, file_tmp[PATH_MAX];
1189 int status;
1191 status = buffer_get_field (&buffer, &buffer_size, &file);
1192 if (status != 0)
1193 {
1194 return syntax_error(sock,cmd);
1195 }
1196 else
1197 {
1198 pthread_mutex_lock(&stats_lock);
1199 stats_flush_received++;
1200 pthread_mutex_unlock(&stats_lock);
1202 get_abs_path(&file, file_tmp);
1203 if (!check_file_access(file, sock)) return 0;
1205 status = flush_file (file);
1206 if (status == 0)
1207 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208 else if (status == ENOENT)
1209 {
1210 /* no file in our tree; see whether it exists at all */
1211 struct stat statbuf;
1213 memset(&statbuf, 0, sizeof(statbuf));
1214 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216 else
1217 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218 }
1219 else if (status < 0)
1220 return send_response(sock, RESP_ERR, "Internal error.\n");
1221 else
1222 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223 }
1225 /* NOTREACHED */
1226 assert(1==0);
1227 } /* }}} int handle_request_flush */
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1230 {
1231 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1233 pthread_mutex_lock(&cache_lock);
1234 flush_old_values(-1);
1235 pthread_mutex_unlock(&cache_lock);
1237 return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1241 {
1242 int status;
1243 char *file, file_tmp[PATH_MAX];
1244 cache_item_t *ci;
1246 status = buffer_get_field(&buffer, &buffer_size, &file);
1247 if (status != 0)
1248 return syntax_error(sock,cmd);
1250 get_abs_path(&file, file_tmp);
1252 pthread_mutex_lock(&cache_lock);
1253 ci = g_tree_lookup(cache_tree, file);
1254 if (ci == NULL)
1255 {
1256 pthread_mutex_unlock(&cache_lock);
1257 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258 }
1260 for (size_t i=0; i < ci->values_num; i++)
1261 add_response_info(sock, "%s\n", ci->values[i]);
1263 pthread_mutex_unlock(&cache_lock);
1264 return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1268 {
1269 int status;
1270 gboolean found;
1271 char *file, file_tmp[PATH_MAX];
1273 status = buffer_get_field(&buffer, &buffer_size, &file);
1274 if (status != 0)
1275 return syntax_error(sock,cmd);
1277 get_abs_path(&file, file_tmp);
1278 if (!check_file_access(file, sock)) return 0;
1280 pthread_mutex_lock(&cache_lock);
1281 found = g_tree_remove(cache_tree, file);
1282 pthread_mutex_unlock(&cache_lock);
1284 if (found == TRUE)
1285 {
1286 if (!JOURNAL_REPLAY(sock))
1287 journal_write("forget", file);
1289 return send_response(sock, RESP_OK, "Gone!\n");
1290 }
1291 else
1292 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1294 /* NOTREACHED */
1295 assert(1==0);
1296 } /* }}} static int handle_request_forget */
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1299 {
1300 cache_item_t *ci;
1302 pthread_mutex_lock(&cache_lock);
1304 ci = cache_queue_head;
1305 while (ci != NULL)
1306 {
1307 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308 ci = ci->next;
1309 }
1311 pthread_mutex_unlock(&cache_lock);
1313 return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1317 {
1318 char *file, file_tmp[PATH_MAX];
1319 int values_num = 0;
1320 int status;
1321 char orig_buf[CMD_MAX];
1323 cache_item_t *ci;
1325 /* save it for the journal later */
1326 if (!JOURNAL_REPLAY(sock))
1327 strncpy(orig_buf, buffer, buffer_size);
1329 status = buffer_get_field (&buffer, &buffer_size, &file);
1330 if (status != 0)
1331 return syntax_error(sock,cmd);
1333 pthread_mutex_lock(&stats_lock);
1334 stats_updates_received++;
1335 pthread_mutex_unlock(&stats_lock);
1337 get_abs_path(&file, file_tmp);
1338 if (!check_file_access(file, sock)) return 0;
1340 pthread_mutex_lock (&cache_lock);
1341 ci = g_tree_lookup (cache_tree, file);
1343 if (ci == NULL) /* {{{ */
1344 {
1345 struct stat statbuf;
1346 cache_item_t *tmp;
1348 /* don't hold the lock while we setup; stat(2) might block */
1349 pthread_mutex_unlock(&cache_lock);
1351 memset (&statbuf, 0, sizeof (statbuf));
1352 status = stat (file, &statbuf);
1353 if (status != 0)
1354 {
1355 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1357 status = errno;
1358 if (status == ENOENT)
1359 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360 else
1361 return send_response(sock, RESP_ERR,
1362 "stat failed with error %i.\n", status);
1363 }
1364 if (!S_ISREG (statbuf.st_mode))
1365 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1367 if (access(file, R_OK|W_OK) != 0)
1368 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369 file, rrd_strerror(errno));
1371 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372 if (ci == NULL)
1373 {
1374 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1376 return send_response(sock, RESP_ERR, "malloc failed.\n");
1377 }
1378 memset (ci, 0, sizeof (cache_item_t));
1380 ci->file = strdup (file);
1381 if (ci->file == NULL)
1382 {
1383 free (ci);
1384 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1386 return send_response(sock, RESP_ERR, "strdup failed.\n");
1387 }
1389 wipe_ci_values(ci, now);
1390 ci->flags = CI_FLAGS_IN_TREE;
1391 pthread_cond_init(&ci->flushed, NULL);
1393 pthread_mutex_lock(&cache_lock);
1395 /* another UPDATE might have added this entry in the meantime */
1396 tmp = g_tree_lookup (cache_tree, file);
1397 if (tmp == NULL)
1398 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399 else
1400 {
1401 free_cache_item (ci);
1402 ci = tmp;
1403 }
1405 /* state may have changed while we were unlocked */
1406 if (state == SHUTDOWN)
1407 return -1;
1408 } /* }}} */
1409 assert (ci != NULL);
1411 /* don't re-write updates in replay mode */
1412 if (!JOURNAL_REPLAY(sock))
1413 journal_write("update", orig_buf);
1415 while (buffer_size > 0)
1416 {
1417 char *value;
1418 time_t stamp;
1419 char *eostamp;
1421 status = buffer_get_field (&buffer, &buffer_size, &value);
1422 if (status != 0)
1423 {
1424 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425 break;
1426 }
1428 /* make sure update time is always moving forward */
1429 stamp = strtol(value, &eostamp, 10);
1430 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1431 {
1432 pthread_mutex_unlock(&cache_lock);
1433 return send_response(sock, RESP_ERR,
1434 "Cannot find timestamp in '%s'!\n", value);
1435 }
1436 else if (stamp <= ci->last_update_stamp)
1437 {
1438 pthread_mutex_unlock(&cache_lock);
1439 return send_response(sock, RESP_ERR,
1440 "illegal attempt to update using time %ld when last"
1441 " update time is %ld (minimum one second step)\n",
1442 stamp, ci->last_update_stamp);
1443 }
1444 else
1445 ci->last_update_stamp = stamp;
1447 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1448 &ci->values_alloc, config_alloc_chunk))
1449 {
1450 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1451 continue;
1452 }
1454 values_num++;
1455 }
1457 if (((now - ci->last_flush_time) >= config_write_interval)
1458 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1459 && (ci->values_num > 0))
1460 {
1461 enqueue_cache_item (ci, TAIL);
1462 }
1464 pthread_mutex_unlock (&cache_lock);
1466 if (values_num < 1)
1467 return send_response(sock, RESP_ERR, "No values updated.\n");
1468 else
1469 return send_response(sock, RESP_OK,
1470 "errors, enqueued %i value(s).\n", values_num);
1472 /* NOTREACHED */
1473 assert(1==0);
1475 } /* }}} int handle_request_update */
1477 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1478 {
1479 char *file, file_tmp[PATH_MAX];
1480 char *cf;
1482 char *start_str;
1483 char *end_str;
1484 time_t start_tm;
1485 time_t end_tm;
1487 unsigned long step;
1488 unsigned long ds_cnt;
1489 char **ds_namv;
1490 rrd_value_t *data;
1492 int status;
1493 unsigned long i;
1494 time_t t;
1495 rrd_value_t *data_ptr;
1497 file = NULL;
1498 cf = NULL;
1499 start_str = NULL;
1500 end_str = NULL;
1502 /* Read the arguments */
1503 do /* while (0) */
1504 {
1505 status = buffer_get_field (&buffer, &buffer_size, &file);
1506 if (status != 0)
1507 break;
1509 status = buffer_get_field (&buffer, &buffer_size, &cf);
1510 if (status != 0)
1511 break;
1513 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1514 if (status != 0)
1515 {
1516 start_str = NULL;
1517 status = 0;
1518 break;
1519 }
1521 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1522 if (status != 0)
1523 {
1524 end_str = NULL;
1525 status = 0;
1526 break;
1527 }
1528 } while (0);
1530 if (status != 0)
1531 return (syntax_error(sock,cmd));
1533 get_abs_path(&file, file_tmp);
1534 if (!check_file_access(file, sock)) return 0;
1536 status = flush_file (file);
1537 if ((status != 0) && (status != ENOENT))
1538 return (send_response (sock, RESP_ERR,
1539 "flush_file (%s) failed with status %i.\n", file, status));
1541 t = time (NULL); /* "now" */
1543 /* Parse start time */
1544 if (start_str != NULL)
1545 {
1546 char *endptr;
1547 long value;
1549 endptr = NULL;
1550 errno = 0;
1551 value = strtol (start_str, &endptr, /* base = */ 0);
1552 if ((endptr == start_str) || (errno != 0))
1553 return (send_response(sock, RESP_ERR,
1554 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1555 start_str));
1557 if (value > 0)
1558 start_tm = (time_t) value;
1559 else
1560 start_tm = (time_t) (t + value);
1561 }
1562 else
1563 {
1564 start_tm = t - 86400;
1565 }
1567 /* Parse end time */
1568 if (end_str != NULL)
1569 {
1570 char *endptr;
1571 long value;
1573 endptr = NULL;
1574 errno = 0;
1575 value = strtol (end_str, &endptr, /* base = */ 0);
1576 if ((endptr == end_str) || (errno != 0))
1577 return (send_response(sock, RESP_ERR,
1578 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1579 end_str));
1581 if (value > 0)
1582 end_tm = (time_t) value;
1583 else
1584 end_tm = (time_t) (t + value);
1585 }
1586 else
1587 {
1588 end_tm = t;
1589 }
1591 step = -1;
1592 ds_cnt = 0;
1593 ds_namv = NULL;
1594 data = NULL;
1596 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1597 &ds_cnt, &ds_namv, &data);
1598 if (status != 0)
1599 return (send_response(sock, RESP_ERR,
1600 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1602 add_response_info (sock, "FlushVersion: %lu\n", 1);
1603 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1604 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1605 add_response_info (sock, "Step: %lu\n", step);
1606 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1608 #define SSTRCAT(buffer,str,buffer_fill) do { \
1609 size_t str_len = strlen (str); \
1610 if ((buffer_fill + str_len) > sizeof (buffer)) \
1611 str_len = sizeof (buffer) - buffer_fill; \
1612 if (str_len > 0) { \
1613 strncpy (buffer + buffer_fill, str, str_len); \
1614 buffer_fill += str_len; \
1615 assert (buffer_fill <= sizeof (buffer)); \
1616 if (buffer_fill == sizeof (buffer)) \
1617 buffer[buffer_fill - 1] = 0; \
1618 else \
1619 buffer[buffer_fill] = 0; \
1620 } \
1621 } while (0)
1623 { /* Add list of DS names */
1624 char linebuf[1024];
1625 size_t linebuf_fill;
1627 memset (linebuf, 0, sizeof (linebuf));
1628 linebuf_fill = 0;
1629 for (i = 0; i < ds_cnt; i++)
1630 {
1631 if (i > 0)
1632 SSTRCAT (linebuf, " ", linebuf_fill);
1633 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1634 rrd_freemem(ds_namv[i]);
1635 }
1636 rrd_freemem(ds_namv);
1637 add_response_info (sock, "DSName: %s\n", linebuf);
1638 }
1640 /* Add the actual data */
1641 assert (step > 0);
1642 data_ptr = data;
1643 for (t = start_tm + step; t <= end_tm; t += step)
1644 {
1645 char linebuf[1024];
1646 size_t linebuf_fill;
1647 char tmp[128];
1649 memset (linebuf, 0, sizeof (linebuf));
1650 linebuf_fill = 0;
1651 for (i = 0; i < ds_cnt; i++)
1652 {
1653 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1654 tmp[sizeof (tmp) - 1] = 0;
1655 SSTRCAT (linebuf, tmp, linebuf_fill);
1657 data_ptr++;
1658 }
1660 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1661 } /* for (t) */
1662 rrd_freemem(data);
1664 return (send_response (sock, RESP_OK, "Success\n"));
1665 #undef SSTRCAT
1666 } /* }}} int handle_request_fetch */
1668 /* we came across a "WROTE" entry during journal replay.
1669 * throw away any values that we have accumulated for this file
1670 */
1671 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1672 {
1673 cache_item_t *ci;
1674 const char *file = buffer;
1676 pthread_mutex_lock(&cache_lock);
1678 ci = g_tree_lookup(cache_tree, file);
1679 if (ci == NULL)
1680 {
1681 pthread_mutex_unlock(&cache_lock);
1682 return (0);
1683 }
1685 if (ci->values)
1686 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1688 wipe_ci_values(ci, now);
1689 remove_from_queue(ci);
1691 pthread_mutex_unlock(&cache_lock);
1692 return (0);
1693 } /* }}} int handle_request_wrote */
1695 /* start "BATCH" processing */
1696 static int batch_start (HANDLER_PROTO) /* {{{ */
1697 {
1698 int status;
1699 if (sock->batch_start)
1700 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1702 status = send_response(sock, RESP_OK,
1703 "Go ahead. End with dot '.' on its own line.\n");
1704 sock->batch_start = time(NULL);
1705 sock->batch_cmd = 0;
1707 return status;
1708 } /* }}} static int batch_start */
1710 /* finish "BATCH" processing and return results to the client */
1711 static int batch_done (HANDLER_PROTO) /* {{{ */
1712 {
1713 assert(sock->batch_start);
1714 sock->batch_start = 0;
1715 sock->batch_cmd = 0;
1716 return send_response(sock, RESP_OK, "errors\n");
1717 } /* }}} static int batch_done */
1719 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1720 {
1721 return -1;
1722 } /* }}} static int handle_request_quit */
1724 static command_t list_of_commands[] = { /* {{{ */
1725 {
1726 "UPDATE",
1727 handle_request_update,
1728 CMD_CONTEXT_ANY,
1729 "UPDATE <filename> <values> [<values> ...]\n"
1730 ,
1731 "Adds the given file to the internal cache if it is not yet known and\n"
1732 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1733 "for details.\n"
1734 "\n"
1735 "Each <values> has the following form:\n"
1736 " <values> = <time>:<value>[:<value>[...]]\n"
1737 "See the rrdupdate(1) manpage for details.\n"
1738 },
1739 {
1740 "WROTE",
1741 handle_request_wrote,
1742 CMD_CONTEXT_JOURNAL,
1743 NULL,
1744 NULL
1745 },
1746 {
1747 "FLUSH",
1748 handle_request_flush,
1749 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1750 "FLUSH <filename>\n"
1751 ,
1752 "Adds the given filename to the head of the update queue and returns\n"
1753 "after it has been dequeued.\n"
1754 },
1755 {
1756 "FLUSHALL",
1757 handle_request_flushall,
1758 CMD_CONTEXT_CLIENT,
1759 "FLUSHALL\n"
1760 ,
1761 "Triggers writing of all pending updates. Returns immediately.\n"
1762 },
1763 {
1764 "PENDING",
1765 handle_request_pending,
1766 CMD_CONTEXT_CLIENT,
1767 "PENDING <filename>\n"
1768 ,
1769 "Shows any 'pending' updates for a file, in order.\n"
1770 "The updates shown have not yet been written to the underlying RRD file.\n"
1771 },
1772 {
1773 "FORGET",
1774 handle_request_forget,
1775 CMD_CONTEXT_ANY,
1776 "FORGET <filename>\n"
1777 ,
1778 "Removes the file completely from the cache.\n"
1779 "Any pending updates for the file will be lost.\n"
1780 },
1781 {
1782 "QUEUE",
1783 handle_request_queue,
1784 CMD_CONTEXT_CLIENT,
1785 "QUEUE\n"
1786 ,
1787 "Shows all files in the output queue.\n"
1788 "The output is zero or more lines in the following format:\n"
1789 "(where <num_vals> is the number of values to be written)\n"
1790 "\n"
1791 "<num_vals> <filename>\n"
1792 },
1793 {
1794 "STATS",
1795 handle_request_stats,
1796 CMD_CONTEXT_CLIENT,
1797 "STATS\n"
1798 ,
1799 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1800 "a description of the values.\n"
1801 },
1802 {
1803 "HELP",
1804 handle_request_help,
1805 CMD_CONTEXT_CLIENT,
1806 "HELP [<command>]\n",
1807 NULL, /* special! */
1808 },
1809 {
1810 "BATCH",
1811 batch_start,
1812 CMD_CONTEXT_CLIENT,
1813 "BATCH\n"
1814 ,
1815 "The 'BATCH' command permits the client to initiate a bulk load\n"
1816 " of commands to rrdcached.\n"
1817 "\n"
1818 "Usage:\n"
1819 "\n"
1820 " client: BATCH\n"
1821 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1822 " client: command #1\n"
1823 " client: command #2\n"
1824 " client: ... and so on\n"
1825 " client: .\n"
1826 " server: 2 errors\n"
1827 " server: 7 message for command #7\n"
1828 " server: 9 message for command #9\n"
1829 "\n"
1830 "For more information, consult the rrdcached(1) documentation.\n"
1831 },
1832 {
1833 ".", /* BATCH terminator */
1834 batch_done,
1835 CMD_CONTEXT_BATCH,
1836 NULL,
1837 NULL
1838 },
1839 {
1840 "FETCH",
1841 handle_request_fetch,
1842 CMD_CONTEXT_CLIENT,
1843 "FETCH <file> <CF> [<start> [<end>]]\n"
1844 ,
1845 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1846 },
1847 {
1848 "QUIT",
1849 handle_request_quit,
1850 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1851 "QUIT\n"
1852 ,
1853 "Disconnect from rrdcached.\n"
1854 }
1855 }; /* }}} command_t list_of_commands[] */
1856 static size_t list_of_commands_len = sizeof (list_of_commands)
1857 / sizeof (list_of_commands[0]);
1859 static command_t *find_command(char *cmd)
1860 {
1861 size_t i;
1863 for (i = 0; i < list_of_commands_len; i++)
1864 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1865 return (&list_of_commands[i]);
1866 return NULL;
1867 }
1869 /* We currently use the index in the `list_of_commands' array as a bit position
1870 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1871 * outside these functions so that switching to a more elegant storage method
1872 * is easily possible. */
1873 static ssize_t find_command_index (const char *cmd) /* {{{ */
1874 {
1875 size_t i;
1877 for (i = 0; i < list_of_commands_len; i++)
1878 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1879 return ((ssize_t) i);
1880 return (-1);
1881 } /* }}} ssize_t find_command_index */
1883 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1884 const char *cmd)
1885 {
1886 ssize_t i;
1888 if (JOURNAL_REPLAY(sock))
1889 return (1);
1891 if (cmd == NULL)
1892 return (-1);
1894 if ((strcasecmp ("QUIT", cmd) == 0)
1895 || (strcasecmp ("HELP", cmd) == 0))
1896 return (1);
1897 else if (strcmp (".", cmd) == 0)
1898 cmd = "BATCH";
1900 i = find_command_index (cmd);
1901 if (i < 0)
1902 return (-1);
1903 assert (i < 32);
1905 if ((sock->permissions & (1 << i)) != 0)
1906 return (1);
1907 return (0);
1908 } /* }}} int socket_permission_check */
1910 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1911 const char *cmd)
1912 {
1913 ssize_t i;
1915 i = find_command_index (cmd);
1916 if (i < 0)
1917 return (-1);
1918 assert (i < 32);
1920 sock->permissions |= (1 << i);
1921 return (0);
1922 } /* }}} int socket_permission_add */
1924 /* check whether commands are received in the expected context */
1925 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1926 {
1927 if (JOURNAL_REPLAY(sock))
1928 return (cmd->context & CMD_CONTEXT_JOURNAL);
1929 else if (sock->batch_start)
1930 return (cmd->context & CMD_CONTEXT_BATCH);
1931 else
1932 return (cmd->context & CMD_CONTEXT_CLIENT);
1934 /* NOTREACHED */
1935 assert(1==0);
1936 }
1938 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1939 {
1940 int status;
1941 char *cmd_str;
1942 char *resp_txt;
1943 command_t *help = NULL;
1945 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1946 if (status == 0)
1947 help = find_command(cmd_str);
1949 if (help && (help->syntax || help->help))
1950 {
1951 char tmp[CMD_MAX];
1953 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1954 resp_txt = tmp;
1956 if (help->syntax)
1957 add_response_info(sock, "Usage: %s\n", help->syntax);
1959 if (help->help)
1960 add_response_info(sock, "%s\n", help->help);
1961 }
1962 else
1963 {
1964 size_t i;
1966 resp_txt = "Command overview\n";
1968 for (i = 0; i < list_of_commands_len; i++)
1969 {
1970 if (list_of_commands[i].syntax == NULL)
1971 continue;
1972 add_response_info (sock, "%s", list_of_commands[i].syntax);
1973 }
1974 }
1976 return send_response(sock, RESP_OK, resp_txt);
1977 } /* }}} int handle_request_help */
1979 static int handle_request (DISPATCH_PROTO) /* {{{ */
1980 {
1981 char *buffer_ptr = buffer;
1982 char *cmd_str = NULL;
1983 command_t *cmd = NULL;
1984 int status;
1986 assert (buffer[buffer_size - 1] == '\0');
1988 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1989 if (status != 0)
1990 {
1991 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1992 return (-1);
1993 }
1995 if (sock != NULL && sock->batch_start)
1996 sock->batch_cmd++;
1998 cmd = find_command(cmd_str);
1999 if (!cmd)
2000 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2002 if (!socket_permission_check (sock, cmd->cmd))
2003 return send_response(sock, RESP_ERR, "Permission denied.\n");
2005 if (!command_check_context(sock, cmd))
2006 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2008 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2009 } /* }}} int handle_request */
2011 static void journal_set_free (journal_set *js) /* {{{ */
2012 {
2013 if (js == NULL)
2014 return;
2016 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2018 free(js);
2019 } /* }}} journal_set_free */
2021 static void journal_set_remove (journal_set *js) /* {{{ */
2022 {
2023 if (js == NULL)
2024 return;
2026 for (uint i=0; i < js->files_num; i++)
2027 {
2028 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2029 unlink(js->files[i]);
2030 }
2031 } /* }}} journal_set_remove */
2033 /* close current journal file handle.
2034 * MUST hold journal_lock before calling */
2035 static void journal_close(void) /* {{{ */
2036 {
2037 if (journal_fh != NULL)
2038 {
2039 if (fclose(journal_fh) != 0)
2040 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2041 }
2043 journal_fh = NULL;
2044 journal_size = 0;
2045 } /* }}} journal_close */
2047 /* MUST hold journal_lock before calling */
2048 static void journal_new_file(void) /* {{{ */
2049 {
2050 struct timeval now;
2051 int new_fd;
2052 char new_file[PATH_MAX + 1];
2054 assert(journal_dir != NULL);
2055 assert(journal_cur != NULL);
2057 journal_close();
2059 gettimeofday(&now, NULL);
2060 /* this format assures that the files sort in strcmp() order */
2061 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2062 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2064 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2065 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2066 if (new_fd < 0)
2067 goto error;
2069 journal_fh = fdopen(new_fd, "a");
2070 if (journal_fh == NULL)
2071 goto error;
2073 journal_size = ftell(journal_fh);
2074 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2076 /* record the file in the journal set */
2077 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2079 return;
2081 error:
2082 RRDD_LOG(LOG_CRIT,
2083 "JOURNALING DISABLED: Error while trying to create %s : %s",
2084 new_file, rrd_strerror(errno));
2085 RRDD_LOG(LOG_CRIT,
2086 "JOURNALING DISABLED: All values will be flushed at shutdown");
2088 close(new_fd);
2089 config_flush_at_shutdown = 1;
2091 } /* }}} journal_new_file */
2093 /* MUST NOT hold journal_lock before calling this */
2094 static void journal_rotate(void) /* {{{ */
2095 {
2096 journal_set *old_js = NULL;
2098 if (journal_dir == NULL)
2099 return;
2101 RRDD_LOG(LOG_DEBUG, "rotating journals");
2103 pthread_mutex_lock(&stats_lock);
2104 ++stats_journal_rotate;
2105 pthread_mutex_unlock(&stats_lock);
2107 pthread_mutex_lock(&journal_lock);
2109 journal_close();
2111 /* rotate the journal sets */
2112 old_js = journal_old;
2113 journal_old = journal_cur;
2114 journal_cur = calloc(1, sizeof(journal_set));
2116 if (journal_cur != NULL)
2117 journal_new_file();
2118 else
2119 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2121 pthread_mutex_unlock(&journal_lock);
2123 journal_set_remove(old_js);
2124 journal_set_free (old_js);
2126 } /* }}} static void journal_rotate */
2128 /* MUST hold journal_lock when calling */
2129 static void journal_done(void) /* {{{ */
2130 {
2131 if (journal_cur == NULL)
2132 return;
2134 journal_close();
2136 if (config_flush_at_shutdown)
2137 {
2138 RRDD_LOG(LOG_INFO, "removing journals");
2139 journal_set_remove(journal_old);
2140 journal_set_remove(journal_cur);
2141 }
2142 else
2143 {
2144 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2145 "journals will be used at next startup");
2146 }
2148 journal_set_free(journal_cur);
2149 journal_set_free(journal_old);
2150 free(journal_dir);
2152 } /* }}} static void journal_done */
2154 static int journal_write(char *cmd, char *args) /* {{{ */
2155 {
2156 int chars;
2158 if (journal_fh == NULL)
2159 return 0;
2161 pthread_mutex_lock(&journal_lock);
2162 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2163 journal_size += chars;
2165 if (journal_size > JOURNAL_MAX)
2166 journal_new_file();
2168 pthread_mutex_unlock(&journal_lock);
2170 if (chars > 0)
2171 {
2172 pthread_mutex_lock(&stats_lock);
2173 stats_journal_bytes += chars;
2174 pthread_mutex_unlock(&stats_lock);
2175 }
2177 return chars;
2178 } /* }}} static int journal_write */
2180 static int journal_replay (const char *file) /* {{{ */
2181 {
2182 FILE *fh;
2183 int entry_cnt = 0;
2184 int fail_cnt = 0;
2185 uint64_t line = 0;
2186 char entry[CMD_MAX];
2187 time_t now;
2189 if (file == NULL) return 0;
2191 {
2192 char *reason = "unknown error";
2193 int status = 0;
2194 struct stat statbuf;
2196 memset(&statbuf, 0, sizeof(statbuf));
2197 if (stat(file, &statbuf) != 0)
2198 {
2199 reason = "stat error";
2200 status = errno;
2201 }
2202 else if (!S_ISREG(statbuf.st_mode))
2203 {
2204 reason = "not a regular file";
2205 status = EPERM;
2206 }
2207 if (statbuf.st_uid != daemon_uid)
2208 {
2209 reason = "not owned by daemon user";
2210 status = EACCES;
2211 }
2212 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2213 {
2214 reason = "must not be user/group writable";
2215 status = EACCES;
2216 }
2218 if (status != 0)
2219 {
2220 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2221 file, rrd_strerror(status), reason);
2222 return 0;
2223 }
2224 }
2226 fh = fopen(file, "r");
2227 if (fh == NULL)
2228 {
2229 if (errno != ENOENT)
2230 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2231 file, rrd_strerror(errno));
2232 return 0;
2233 }
2234 else
2235 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2237 now = time(NULL);
2239 while(!feof(fh))
2240 {
2241 size_t entry_len;
2243 ++line;
2244 if (fgets(entry, sizeof(entry), fh) == NULL)
2245 break;
2246 entry_len = strlen(entry);
2248 /* check \n termination in case journal writing crashed mid-line */
2249 if (entry_len == 0)
2250 continue;
2251 else if (entry[entry_len - 1] != '\n')
2252 {
2253 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2254 ++fail_cnt;
2255 continue;
2256 }
2258 entry[entry_len - 1] = '\0';
2260 if (handle_request(NULL, now, entry, entry_len) == 0)
2261 ++entry_cnt;
2262 else
2263 ++fail_cnt;
2264 }
2266 fclose(fh);
2268 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2269 entry_cnt, fail_cnt);
2271 return entry_cnt > 0 ? 1 : 0;
2272 } /* }}} static int journal_replay */
2274 static int journal_sort(const void *v1, const void *v2)
2275 {
2276 char **jn1 = (char **) v1;
2277 char **jn2 = (char **) v2;
2279 return strcmp(*jn1,*jn2);
2280 }
2282 static void journal_init(void) /* {{{ */
2283 {
2284 int had_journal = 0;
2285 DIR *dir;
2286 struct dirent *dent;
2287 char path[PATH_MAX+1];
2289 if (journal_dir == NULL) return;
2291 pthread_mutex_lock(&journal_lock);
2293 journal_cur = calloc(1, sizeof(journal_set));
2294 if (journal_cur == NULL)
2295 {
2296 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2297 return;
2298 }
2300 RRDD_LOG(LOG_INFO, "checking for journal files");
2302 /* Handle old journal files during transition. This gives them the
2303 * correct sort order. TODO: remove after first release
2304 */
2305 {
2306 char old_path[PATH_MAX+1];
2307 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2308 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2309 rename(old_path, path);
2311 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2312 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2313 rename(old_path, path);
2314 }
2316 dir = opendir(journal_dir);
2317 if (!dir) {
2318 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2319 return;
2320 }
2321 while ((dent = readdir(dir)) != NULL)
2322 {
2323 /* looks like a journal file? */
2324 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2325 continue;
2327 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2329 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2330 {
2331 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2332 dent->d_name);
2333 break;
2334 }
2335 }
2336 closedir(dir);
2338 qsort(journal_cur->files, journal_cur->files_num,
2339 sizeof(journal_cur->files[0]), journal_sort);
2341 for (uint i=0; i < journal_cur->files_num; i++)
2342 had_journal += journal_replay(journal_cur->files[i]);
2344 journal_new_file();
2346 /* it must have been a crash. start a flush */
2347 if (had_journal && config_flush_at_shutdown)
2348 flush_old_values(-1);
2350 pthread_mutex_unlock(&journal_lock);
2352 RRDD_LOG(LOG_INFO, "journal processing complete");
2354 } /* }}} static void journal_init */
2356 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2357 {
2358 assert(sock != NULL);
2360 free(sock->rbuf); sock->rbuf = NULL;
2361 free(sock->wbuf); sock->wbuf = NULL;
2362 free(sock);
2363 } /* }}} void free_listen_socket */
2365 static void close_connection(listen_socket_t *sock) /* {{{ */
2366 {
2367 if (sock->fd >= 0)
2368 {
2369 close(sock->fd);
2370 sock->fd = -1;
2371 }
2373 free_listen_socket(sock);
2375 } /* }}} void close_connection */
2377 static void *connection_thread_main (void *args) /* {{{ */
2378 {
2379 listen_socket_t *sock;
2380 int fd;
2382 sock = (listen_socket_t *) args;
2383 fd = sock->fd;
2385 /* init read buffers */
2386 sock->next_read = sock->next_cmd = 0;
2387 sock->rbuf = malloc(RBUF_SIZE);
2388 if (sock->rbuf == NULL)
2389 {
2390 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2391 close_connection(sock);
2392 return NULL;
2393 }
2395 pthread_mutex_lock (&connection_threads_lock);
2396 connection_threads_num++;
2397 pthread_mutex_unlock (&connection_threads_lock);
2399 while (state == RUNNING)
2400 {
2401 char *cmd;
2402 ssize_t cmd_len;
2403 ssize_t rbytes;
2404 time_t now;
2406 struct pollfd pollfd;
2407 int status;
2409 pollfd.fd = fd;
2410 pollfd.events = POLLIN | POLLPRI;
2411 pollfd.revents = 0;
2413 status = poll (&pollfd, 1, /* timeout = */ 500);
2414 if (state != RUNNING)
2415 break;
2416 else if (status == 0) /* timeout */
2417 continue;
2418 else if (status < 0) /* error */
2419 {
2420 status = errno;
2421 if (status != EINTR)
2422 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2423 continue;
2424 }
2426 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2427 break;
2428 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2429 {
2430 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2431 "poll(2) returned something unexpected: %#04hx",
2432 pollfd.revents);
2433 break;
2434 }
2436 rbytes = read(fd, sock->rbuf + sock->next_read,
2437 RBUF_SIZE - sock->next_read);
2438 if (rbytes < 0)
2439 {
2440 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2441 break;
2442 }
2443 else if (rbytes == 0)
2444 break; /* eof */
2446 sock->next_read += rbytes;
2448 if (sock->batch_start)
2449 now = sock->batch_start;
2450 else
2451 now = time(NULL);
2453 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2454 {
2455 status = handle_request (sock, now, cmd, cmd_len+1);
2456 if (status != 0)
2457 goto out_close;
2458 }
2459 }
2461 out_close:
2462 close_connection(sock);
2464 /* Remove this thread from the connection threads list */
2465 pthread_mutex_lock (&connection_threads_lock);
2466 connection_threads_num--;
2467 if (connection_threads_num <= 0)
2468 pthread_cond_broadcast(&connection_threads_done);
2469 pthread_mutex_unlock (&connection_threads_lock);
2471 return (NULL);
2472 } /* }}} void *connection_thread_main */
2474 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2475 {
2476 int fd;
2477 struct sockaddr_un sa;
2478 listen_socket_t *temp;
2479 int status;
2480 const char *path;
2481 char *path_copy, *dir;
2483 path = sock->addr;
2484 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2485 path += strlen("unix:");
2487 /* dirname may modify its argument */
2488 path_copy = strdup(path);
2489 if (path_copy == NULL)
2490 {
2491 fprintf(stderr, "rrdcached: strdup(): %s\n",
2492 rrd_strerror(errno));
2493 return (-1);
2494 }
2496 dir = dirname(path_copy);
2497 if (rrd_mkdir_p(dir, 0777) != 0)
2498 {
2499 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2500 dir, rrd_strerror(errno));
2501 return (-1);
2502 }
2504 free(path_copy);
2506 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2507 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2508 if (temp == NULL)
2509 {
2510 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2511 return (-1);
2512 }
2513 listen_fds = temp;
2514 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2516 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2517 if (fd < 0)
2518 {
2519 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2520 rrd_strerror(errno));
2521 return (-1);
2522 }
2524 memset (&sa, 0, sizeof (sa));
2525 sa.sun_family = AF_UNIX;
2526 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2528 /* if we've gotten this far, we own the pid file. any daemon started
2529 * with the same args must not be alive. therefore, ensure that we can
2530 * create the socket...
2531 */
2532 unlink(path);
2534 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2535 if (status != 0)
2536 {
2537 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2538 path, rrd_strerror(errno));
2539 close (fd);
2540 return (-1);
2541 }
2543 /* tweak the sockets group ownership */
2544 if (sock->socket_group != (gid_t)-1)
2545 {
2546 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2547 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2548 {
2549 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2550 }
2551 }
2553 if (sock->socket_permissions != (mode_t)-1)
2554 {
2555 if (chmod(path, sock->socket_permissions) != 0)
2556 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2557 (unsigned int)sock->socket_permissions, strerror(errno));
2558 }
2560 status = listen (fd, /* backlog = */ 10);
2561 if (status != 0)
2562 {
2563 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2564 path, rrd_strerror(errno));
2565 close (fd);
2566 unlink (path);
2567 return (-1);
2568 }
2570 listen_fds[listen_fds_num].fd = fd;
2571 listen_fds[listen_fds_num].family = PF_UNIX;
2572 strncpy(listen_fds[listen_fds_num].addr, path,
2573 sizeof (listen_fds[listen_fds_num].addr) - 1);
2574 listen_fds_num++;
2576 return (0);
2577 } /* }}} int open_listen_socket_unix */
2579 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2580 {
2581 struct addrinfo ai_hints;
2582 struct addrinfo *ai_res;
2583 struct addrinfo *ai_ptr;
2584 char addr_copy[NI_MAXHOST];
2585 char *addr;
2586 char *port;
2587 int status;
2589 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2590 addr_copy[sizeof (addr_copy) - 1] = 0;
2591 addr = addr_copy;
2593 memset (&ai_hints, 0, sizeof (ai_hints));
2594 ai_hints.ai_flags = 0;
2595 #ifdef AI_ADDRCONFIG
2596 ai_hints.ai_flags |= AI_ADDRCONFIG;
2597 #endif
2598 ai_hints.ai_family = AF_UNSPEC;
2599 ai_hints.ai_socktype = SOCK_STREAM;
2601 port = NULL;
2602 if (*addr == '[') /* IPv6+port format */
2603 {
2604 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2605 addr++;
2607 port = strchr (addr, ']');
2608 if (port == NULL)
2609 {
2610 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2611 return (-1);
2612 }
2613 *port = 0;
2614 port++;
2616 if (*port == ':')
2617 port++;
2618 else if (*port == 0)
2619 port = NULL;
2620 else
2621 {
2622 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2623 return (-1);
2624 }
2625 } /* if (*addr == '[') */
2626 else
2627 {
2628 port = rindex(addr, ':');
2629 if (port != NULL)
2630 {
2631 *port = 0;
2632 port++;
2633 }
2634 }
2635 ai_res = NULL;
2636 status = getaddrinfo (addr,
2637 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2638 &ai_hints, &ai_res);
2639 if (status != 0)
2640 {
2641 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2642 addr, gai_strerror (status));
2643 return (-1);
2644 }
2646 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2647 {
2648 int fd;
2649 listen_socket_t *temp;
2650 int one = 1;
2652 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2653 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2654 if (temp == NULL)
2655 {
2656 fprintf (stderr,
2657 "rrdcached: open_listen_socket_network: realloc failed.\n");
2658 continue;
2659 }
2660 listen_fds = temp;
2661 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2663 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2664 if (fd < 0)
2665 {
2666 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2667 rrd_strerror(errno));
2668 continue;
2669 }
2671 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2673 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2674 if (status != 0)
2675 {
2676 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2677 sock->addr, rrd_strerror(errno));
2678 close (fd);
2679 continue;
2680 }
2682 status = listen (fd, /* backlog = */ 10);
2683 if (status != 0)
2684 {
2685 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2686 sock->addr, rrd_strerror(errno));
2687 close (fd);
2688 freeaddrinfo(ai_res);
2689 return (-1);
2690 }
2692 listen_fds[listen_fds_num].fd = fd;
2693 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2694 listen_fds_num++;
2695 } /* for (ai_ptr) */
2697 freeaddrinfo(ai_res);
2698 return (0);
2699 } /* }}} static int open_listen_socket_network */
2701 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2702 {
2703 assert(sock != NULL);
2704 assert(sock->addr != NULL);
2706 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2707 || sock->addr[0] == '/')
2708 return (open_listen_socket_unix(sock));
2709 else
2710 return (open_listen_socket_network(sock));
2711 } /* }}} int open_listen_socket */
2713 static int close_listen_sockets (void) /* {{{ */
2714 {
2715 size_t i;
2717 for (i = 0; i < listen_fds_num; i++)
2718 {
2719 close (listen_fds[i].fd);
2721 if (listen_fds[i].family == PF_UNIX)
2722 unlink(listen_fds[i].addr);
2723 }
2725 free (listen_fds);
2726 listen_fds = NULL;
2727 listen_fds_num = 0;
2729 return (0);
2730 } /* }}} int close_listen_sockets */
2732 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2733 {
2734 struct pollfd *pollfds;
2735 int pollfds_num;
2736 int status;
2737 int i;
2739 if (listen_fds_num < 1)
2740 {
2741 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2742 return (NULL);
2743 }
2745 pollfds_num = listen_fds_num;
2746 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2747 if (pollfds == NULL)
2748 {
2749 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2750 return (NULL);
2751 }
2752 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2754 RRDD_LOG(LOG_INFO, "listening for connections");
2756 while (state == RUNNING)
2757 {
2758 for (i = 0; i < pollfds_num; i++)
2759 {
2760 pollfds[i].fd = listen_fds[i].fd;
2761 pollfds[i].events = POLLIN | POLLPRI;
2762 pollfds[i].revents = 0;
2763 }
2765 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2766 if (state != RUNNING)
2767 break;
2768 else if (status == 0) /* timeout */
2769 continue;
2770 else if (status < 0) /* error */
2771 {
2772 status = errno;
2773 if (status != EINTR)
2774 {
2775 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2776 }
2777 continue;
2778 }
2780 for (i = 0; i < pollfds_num; i++)
2781 {
2782 listen_socket_t *client_sock;
2783 struct sockaddr_storage client_sa;
2784 socklen_t client_sa_size;
2785 pthread_t tid;
2786 pthread_attr_t attr;
2788 if (pollfds[i].revents == 0)
2789 continue;
2791 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2792 {
2793 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2794 "poll(2) returned something unexpected for listen FD #%i.",
2795 pollfds[i].fd);
2796 continue;
2797 }
2799 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2800 if (client_sock == NULL)
2801 {
2802 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2803 continue;
2804 }
2805 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2807 client_sa_size = sizeof (client_sa);
2808 client_sock->fd = accept (pollfds[i].fd,
2809 (struct sockaddr *) &client_sa, &client_sa_size);
2810 if (client_sock->fd < 0)
2811 {
2812 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2813 free(client_sock);
2814 continue;
2815 }
2817 pthread_attr_init (&attr);
2818 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2820 status = pthread_create (&tid, &attr, connection_thread_main,
2821 client_sock);
2822 if (status != 0)
2823 {
2824 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2825 close_connection(client_sock);
2826 continue;
2827 }
2828 } /* for (pollfds_num) */
2829 } /* while (state == RUNNING) */
2831 RRDD_LOG(LOG_INFO, "starting shutdown");
2833 close_listen_sockets ();
2835 pthread_mutex_lock (&connection_threads_lock);
2836 while (connection_threads_num > 0)
2837 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2838 pthread_mutex_unlock (&connection_threads_lock);
2840 free(pollfds);
2842 return (NULL);
2843 } /* }}} void *listen_thread_main */
2845 static int daemonize (void) /* {{{ */
2846 {
2847 int pid_fd;
2848 char *base_dir;
2850 daemon_uid = geteuid();
2852 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2853 if (pid_fd < 0)
2854 pid_fd = check_pidfile();
2855 if (pid_fd < 0)
2856 return pid_fd;
2858 /* open all the listen sockets */
2859 if (config_listen_address_list_len > 0)
2860 {
2861 for (size_t i = 0; i < config_listen_address_list_len; i++)
2862 open_listen_socket (config_listen_address_list[i]);
2864 rrd_free_ptrs((void ***) &config_listen_address_list,
2865 &config_listen_address_list_len);
2866 }
2867 else
2868 {
2869 listen_socket_t sock;
2870 memset(&sock, 0, sizeof(sock));
2871 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2872 open_listen_socket (&sock);
2873 }
2875 if (listen_fds_num < 1)
2876 {
2877 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2878 goto error;
2879 }
2881 if (!stay_foreground)
2882 {
2883 pid_t child;
2885 child = fork ();
2886 if (child < 0)
2887 {
2888 fprintf (stderr, "daemonize: fork(2) failed.\n");
2889 goto error;
2890 }
2891 else if (child > 0)
2892 exit(0);
2894 /* Become session leader */
2895 setsid ();
2897 /* Open the first three file descriptors to /dev/null */
2898 close (2);
2899 close (1);
2900 close (0);
2902 open ("/dev/null", O_RDWR);
2903 if (dup(0) == -1 || dup(0) == -1){
2904 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2905 }
2906 } /* if (!stay_foreground) */
2908 /* Change into the /tmp directory. */
2909 base_dir = (config_base_dir != NULL)
2910 ? config_base_dir
2911 : "/tmp";
2913 if (chdir (base_dir) != 0)
2914 {
2915 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2916 goto error;
2917 }
2919 install_signal_handlers();
2921 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2922 RRDD_LOG(LOG_INFO, "starting up");
2924 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2925 (GDestroyNotify) free_cache_item);
2926 if (cache_tree == NULL)
2927 {
2928 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2929 goto error;
2930 }
2932 return write_pidfile (pid_fd);
2934 error:
2935 remove_pidfile();
2936 return -1;
2937 } /* }}} int daemonize */
2939 static int cleanup (void) /* {{{ */
2940 {
2941 pthread_cond_broadcast (&flush_cond);
2942 pthread_join (flush_thread, NULL);
2944 pthread_cond_broadcast (&queue_cond);
2945 for (int i = 0; i < config_queue_threads; i++)
2946 pthread_join (queue_threads[i], NULL);
2948 if (config_flush_at_shutdown)
2949 {
2950 assert(cache_queue_head == NULL);
2951 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2952 }
2954 free(queue_threads);
2955 free(config_base_dir);
2957 pthread_mutex_lock(&cache_lock);
2958 g_tree_destroy(cache_tree);
2960 pthread_mutex_lock(&journal_lock);
2961 journal_done();
2963 RRDD_LOG(LOG_INFO, "goodbye");
2964 closelog ();
2966 remove_pidfile ();
2967 free(config_pid_file);
2969 return (0);
2970 } /* }}} int cleanup */
2972 static int read_options (int argc, char **argv) /* {{{ */
2973 {
2974 int option;
2975 int status = 0;
2977 char **permissions = NULL;
2978 size_t permissions_len = 0;
2980 gid_t socket_group = (gid_t)-1;
2981 mode_t socket_permissions = (mode_t)-1;
2983 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2984 {
2985 switch (option)
2986 {
2987 case 'g':
2988 stay_foreground=1;
2989 break;
2991 case 'l':
2992 {
2993 listen_socket_t *new;
2995 new = malloc(sizeof(listen_socket_t));
2996 if (new == NULL)
2997 {
2998 fprintf(stderr, "read_options: malloc failed.\n");
2999 return(2);
3000 }
3001 memset(new, 0, sizeof(listen_socket_t));
3003 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3005 /* Add permissions to the socket {{{ */
3006 if (permissions_len != 0)
3007 {
3008 size_t i;
3009 for (i = 0; i < permissions_len; i++)
3010 {
3011 status = socket_permission_add (new, permissions[i]);
3012 if (status != 0)
3013 {
3014 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3015 "socket failed. Most likely, this permission doesn't "
3016 "exist. Check your command line.\n", permissions[i]);
3017 status = 4;
3018 }
3019 }
3020 }
3021 else /* if (permissions_len == 0) */
3022 {
3023 /* Add permission for ALL commands to the socket. */
3024 size_t i;
3025 for (i = 0; i < list_of_commands_len; i++)
3026 {
3027 status = socket_permission_add (new, list_of_commands[i].cmd);
3028 if (status != 0)
3029 {
3030 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3031 "socket failed. This should never happen, ever! Sorry.\n",
3032 permissions[i]);
3033 status = 4;
3034 }
3035 }
3036 }
3037 /* }}} Done adding permissions. */
3039 new->socket_group = socket_group;
3040 new->socket_permissions = socket_permissions;
3042 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3043 &config_listen_address_list_len, new))
3044 {
3045 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3046 return (2);
3047 }
3048 }
3049 break;
3051 /* set socket group permissions */
3052 case 's':
3053 {
3054 gid_t group_gid;
3055 struct group *grp;
3057 group_gid = strtoul(optarg, NULL, 10);
3058 if (errno != EINVAL && group_gid>0)
3059 {
3060 /* we were passed a number */
3061 grp = getgrgid(group_gid);
3062 }
3063 else
3064 {
3065 grp = getgrnam(optarg);
3066 }
3068 if (grp)
3069 {
3070 socket_group = grp->gr_gid;
3071 }
3072 else
3073 {
3074 /* no idea what the user wanted... */
3075 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3076 return (5);
3077 }
3078 }
3079 break;
3081 /* set socket file permissions */
3082 case 'm':
3083 {
3084 long tmp;
3085 char *endptr = NULL;
3087 tmp = strtol (optarg, &endptr, 8);
3088 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3089 || (tmp > 07777) || (tmp < 0)) {
3090 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3091 optarg);
3092 return (5);
3093 }
3095 socket_permissions = (mode_t)tmp;
3096 }
3097 break;
3099 case 'P':
3100 {
3101 char *optcopy;
3102 char *saveptr;
3103 char *dummy;
3104 char *ptr;
3106 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3108 optcopy = strdup (optarg);
3109 dummy = optcopy;
3110 saveptr = NULL;
3111 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3112 {
3113 dummy = NULL;
3114 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3115 }
3117 free (optcopy);
3118 }
3119 break;
3121 case 'f':
3122 {
3123 int temp;
3125 temp = atoi (optarg);
3126 if (temp > 0)
3127 config_flush_interval = temp;
3128 else
3129 {
3130 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3131 status = 3;
3132 }
3133 }
3134 break;
3136 case 'w':
3137 {
3138 int temp;
3140 temp = atoi (optarg);
3141 if (temp > 0)
3142 config_write_interval = temp;
3143 else
3144 {
3145 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3146 status = 2;
3147 }
3148 }
3149 break;
3151 case 'z':
3152 {
3153 int temp;
3155 temp = atoi(optarg);
3156 if (temp > 0)
3157 config_write_jitter = temp;
3158 else
3159 {
3160 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3161 status = 2;
3162 }
3164 break;
3165 }
3167 case 't':
3168 {
3169 int threads;
3170 threads = atoi(optarg);
3171 if (threads >= 1)
3172 config_queue_threads = threads;
3173 else
3174 {
3175 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3176 return 1;
3177 }
3178 }
3179 break;
3181 case 'B':
3182 config_write_base_only = 1;
3183 break;
3185 case 'b':
3186 {
3187 size_t len;
3188 char base_realpath[PATH_MAX];
3190 if (config_base_dir != NULL)
3191 free (config_base_dir);
3192 config_base_dir = strdup (optarg);
3193 if (config_base_dir == NULL)
3194 {
3195 fprintf (stderr, "read_options: strdup failed.\n");
3196 return (3);
3197 }
3199 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3200 {
3201 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3202 config_base_dir, rrd_strerror (errno));
3203 return (3);
3204 }
3206 /* make sure that the base directory is not resolved via
3207 * symbolic links. this makes some performance-enhancing
3208 * assumptions possible (we don't have to resolve paths
3209 * that start with a "/")
3210 */
3211 if (realpath(config_base_dir, base_realpath) == NULL)
3212 {
3213 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3214 "%s\n", config_base_dir, rrd_strerror(errno));
3215 return 5;
3216 }
3218 len = strlen (config_base_dir);
3219 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3220 {
3221 config_base_dir[len - 1] = 0;
3222 len--;
3223 }
3225 if (len < 1)
3226 {
3227 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3228 return (4);
3229 }
3231 _config_base_dir_len = len;
3233 len = strlen (base_realpath);
3234 while ((len > 0) && (base_realpath[len - 1] == '/'))
3235 {
3236 base_realpath[len - 1] = '\0';
3237 len--;
3238 }
3240 if (strncmp(config_base_dir,
3241 base_realpath, sizeof(base_realpath)) != 0)
3242 {
3243 fprintf(stderr,
3244 "Base directory (-b) resolved via file system links!\n"
3245 "Please consult rrdcached '-b' documentation!\n"
3246 "Consider specifying the real directory (%s)\n",
3247 base_realpath);
3248 return 5;
3249 }
3250 }
3251 break;
3253 case 'p':
3254 {
3255 if (config_pid_file != NULL)
3256 free (config_pid_file);
3257 config_pid_file = strdup (optarg);
3258 if (config_pid_file == NULL)
3259 {
3260 fprintf (stderr, "read_options: strdup failed.\n");
3261 return (3);
3262 }
3263 }
3264 break;
3266 case 'F':
3267 config_flush_at_shutdown = 1;
3268 break;
3270 case 'j':
3271 {
3272 char journal_dir_actual[PATH_MAX];
3273 const char *dir;
3274 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3276 status = rrd_mkdir_p(dir, 0777);
3277 if (status != 0)
3278 {
3279 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3280 dir, rrd_strerror(errno));
3281 return 6;
3282 }
3284 if (access(dir, R_OK|W_OK|X_OK) != 0)
3285 {
3286 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3287 errno ? rrd_strerror(errno) : "");
3288 return 6;
3289 }
3290 }
3291 break;
3293 case 'a':
3294 {
3295 int temp = atoi(optarg);
3296 if (temp > 0)
3297 config_alloc_chunk = temp;
3298 else
3299 {
3300 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3301 return 10;
3302 }
3303 }
3304 break;
3306 case 'h':
3307 case '?':
3308 printf ("RRDCacheD %s\n"
3309 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3310 "\n"
3311 "Usage: rrdcached [options]\n"
3312 "\n"
3313 "Valid options are:\n"
3314 " -l <address> Socket address to listen to.\n"
3315 " -P <perms> Sets the permissions to assign to all following "
3316 "sockets\n"
3317 " -w <seconds> Interval in which to write data.\n"
3318 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3319 " -t <threads> Number of write threads.\n"
3320 " -f <seconds> Interval in which to flush dead data.\n"
3321 " -p <file> Location of the PID-file.\n"
3322 " -b <dir> Base directory to change to.\n"
3323 " -B Restrict file access to paths within -b <dir>\n"
3324 " -g Do not fork and run in the foreground.\n"
3325 " -j <dir> Directory in which to create the journal files.\n"
3326 " -F Always flush all updates at shutdown\n"
3327 " -s <id|name> Group owner of all following UNIX sockets\n"
3328 " (the socket will also have read/write permissions "
3329 "for that group)\n"
3330 " -m <mode> File permissions (octal) of all following UNIX "
3331 "sockets\n"
3332 " -a <size> Memory allocation chunk size. Default is 1."
3333 "\n"
3334 "For more information and a detailed description of all options "
3335 "please refer\n"
3336 "to the rrdcached(1) manual page.\n",
3337 VERSION);
3338 if (option == 'h')
3339 status = -1;
3340 else
3341 status = 1;
3342 break;
3343 } /* switch (option) */
3344 } /* while (getopt) */
3346 /* advise the user when values are not sane */
3347 if (config_flush_interval < 2 * config_write_interval)
3348 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3349 " 2x write interval (-w) !\n");
3350 if (config_write_jitter > config_write_interval)
3351 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3352 " write interval (-w) !\n");
3354 if (config_write_base_only && config_base_dir == NULL)
3355 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3356 " Consult the rrdcached documentation\n");
3358 if (journal_dir == NULL)
3359 config_flush_at_shutdown = 1;
3361 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3363 return (status);
3364 } /* }}} int read_options */
3366 int main (int argc, char **argv)
3367 {
3368 int status;
3370 status = read_options (argc, argv);
3371 if (status != 0)
3372 {
3373 if (status < 0)
3374 status = 0;
3375 return (status);
3376 }
3378 status = daemonize ();
3379 if (status != 0)
3380 {
3381 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3382 return (1);
3383 }
3385 journal_init();
3387 /* start the queue threads */
3388 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3389 if (queue_threads == NULL)
3390 {
3391 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3392 cleanup();
3393 return (1);
3394 }
3395 for (int i = 0; i < config_queue_threads; i++)
3396 {
3397 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3398 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3399 if (status != 0)
3400 {
3401 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3402 cleanup();
3403 return (1);
3404 }
3405 }
3407 /* start the flush thread */
3408 memset(&flush_thread, 0, sizeof(flush_thread));
3409 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3410 if (status != 0)
3411 {
3412 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3413 cleanup();
3414 return (1);
3415 }
3417 listen_thread_main (NULL);
3418 cleanup ();
3420 return (0);
3421 } /* int main */
3423 /*
3424 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3425 */