1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 # include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
111 #include <glib-2.0/glib.h>
112 /* }}} */
114 #define RRDD_LOG(severity, ...) \
115 do { \
116 if (stay_foreground) \
117 fprintf(stderr, __VA_ARGS__); \
118 syslog ((severity), __VA_ARGS__); \
119 } while (0)
121 #ifndef __GNUC__
122 # define __attribute__(x) /**/
123 #endif
125 /*
126 * Types
127 */
128 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
130 struct listen_socket_s
131 {
132 int fd;
133 char addr[PATH_MAX + 1];
134 int family;
136 /* state for BATCH processing */
137 time_t batch_start;
138 int batch_cmd;
140 /* buffered IO */
141 char *rbuf;
142 off_t next_cmd;
143 off_t next_read;
145 char *wbuf;
146 ssize_t wbuf_len;
148 uint32_t permissions;
150 gid_t socket_group;
151 mode_t socket_permissions;
152 };
153 typedef struct listen_socket_s listen_socket_t;
155 struct command_s;
156 typedef struct command_s command_t;
157 /* note: guard against "unused" warnings in the handlers */
158 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
159 time_t now __attribute__((unused)),\
160 char *buffer __attribute__((unused)),\
161 size_t buffer_size __attribute__((unused))
163 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
164 DISPATCH_PROTO
166 struct command_s {
167 char *cmd;
168 int (*handler)(HANDLER_PROTO);
170 char context; /* where we expect to see it */
171 #define CMD_CONTEXT_CLIENT (1<<0)
172 #define CMD_CONTEXT_BATCH (1<<1)
173 #define CMD_CONTEXT_JOURNAL (1<<2)
174 #define CMD_CONTEXT_ANY (0x7f)
176 char *syntax;
177 char *help;
178 };
180 struct cache_item_s;
181 typedef struct cache_item_s cache_item_t;
182 struct cache_item_s
183 {
184 char *file;
185 char **values;
186 size_t values_num; /* number of valid pointers */
187 size_t values_alloc; /* number of allocated pointers */
188 time_t last_flush_time;
189 time_t last_update_stamp;
190 #define CI_FLAGS_IN_TREE (1<<0)
191 #define CI_FLAGS_IN_QUEUE (1<<1)
192 int flags;
193 pthread_cond_t flushed;
194 cache_item_t *prev;
195 cache_item_t *next;
196 };
198 struct callback_flush_data_s
199 {
200 time_t now;
201 time_t abs_timeout;
202 char **keys;
203 size_t keys_num;
204 };
205 typedef struct callback_flush_data_s callback_flush_data_t;
207 enum queue_side_e
208 {
209 HEAD,
210 TAIL
211 };
212 typedef enum queue_side_e queue_side_t;
214 /* describe a set of journal files */
215 typedef struct {
216 char **files;
217 size_t files_num;
218 } journal_set;
220 /* max length of socket command or response */
221 #define CMD_MAX 4096
222 #define RBUF_SIZE (CMD_MAX*2)
224 /*
225 * Variables
226 */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
233 enum {
234 RUNNING, /* normal operation */
235 FLUSHING, /* flushing remaining values */
236 SHUTDOWN /* shutting down */
237 } state = RUNNING;
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
250 /* Cache stuff */
251 static GTree *cache_tree = NULL;
252 static cache_item_t *cache_queue_head = NULL;
253 static cache_item_t *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
256 static int config_write_interval = 300;
257 static int config_write_jitter = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
278 /* Journaled updates */
279 #define JOURNAL_REPLAY(s) ((s) == NULL)
280 #define JOURNAL_BASE "rrd.journal"
281 static journal_set *journal_cur = NULL;
282 static journal_set *journal_old = NULL;
283 static char *journal_dir = NULL;
284 static FILE *journal_fh = NULL; /* current journal file handle */
285 static long journal_size = 0; /* current journal size */
286 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
287 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
288 static int journal_write(char *cmd, char *args);
289 static void journal_done(void);
290 static void journal_rotate(void);
292 /* prototypes for forward refernces */
293 static int handle_request_help (HANDLER_PROTO);
295 /*
296 * Functions
297 */
298 static void sig_common (const char *sig) /* {{{ */
299 {
300 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
301 state = FLUSHING;
302 pthread_cond_broadcast(&flush_cond);
303 pthread_cond_broadcast(&queue_cond);
304 } /* }}} void sig_common */
306 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
307 {
308 sig_common("INT");
309 } /* }}} void sig_int_handler */
311 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
312 {
313 sig_common("TERM");
314 } /* }}} void sig_term_handler */
316 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
317 {
318 config_flush_at_shutdown = 1;
319 sig_common("USR1");
320 } /* }}} void sig_usr1_handler */
322 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
323 {
324 config_flush_at_shutdown = 0;
325 sig_common("USR2");
326 } /* }}} void sig_usr2_handler */
328 static void install_signal_handlers(void) /* {{{ */
329 {
330 /* These structures are static, because `sigaction' behaves weird if the are
331 * overwritten.. */
332 static struct sigaction sa_int;
333 static struct sigaction sa_term;
334 static struct sigaction sa_pipe;
335 static struct sigaction sa_usr1;
336 static struct sigaction sa_usr2;
338 /* Install signal handlers */
339 memset (&sa_int, 0, sizeof (sa_int));
340 sa_int.sa_handler = sig_int_handler;
341 sigaction (SIGINT, &sa_int, NULL);
343 memset (&sa_term, 0, sizeof (sa_term));
344 sa_term.sa_handler = sig_term_handler;
345 sigaction (SIGTERM, &sa_term, NULL);
347 memset (&sa_pipe, 0, sizeof (sa_pipe));
348 sa_pipe.sa_handler = SIG_IGN;
349 sigaction (SIGPIPE, &sa_pipe, NULL);
351 memset (&sa_pipe, 0, sizeof (sa_usr1));
352 sa_usr1.sa_handler = sig_usr1_handler;
353 sigaction (SIGUSR1, &sa_usr1, NULL);
355 memset (&sa_usr2, 0, sizeof (sa_usr2));
356 sa_usr2.sa_handler = sig_usr2_handler;
357 sigaction (SIGUSR2, &sa_usr2, NULL);
359 } /* }}} void install_signal_handlers */
361 static int open_pidfile(char *action, int oflag) /* {{{ */
362 {
363 int fd;
364 const char *file;
365 char *file_copy, *dir;
367 file = (config_pid_file != NULL)
368 ? config_pid_file
369 : LOCALSTATEDIR "/run/rrdcached.pid";
371 /* dirname may modify its argument */
372 file_copy = strdup(file);
373 if (file_copy == NULL)
374 {
375 fprintf(stderr, "rrdcached: strdup(): %s\n",
376 rrd_strerror(errno));
377 return -1;
378 }
380 dir = dirname(file_copy);
381 if (rrd_mkdir_p(dir, 0777) != 0)
382 {
383 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
384 dir, rrd_strerror(errno));
385 return -1;
386 }
388 free(file_copy);
390 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
391 if (fd < 0)
392 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
393 action, file, rrd_strerror(errno));
395 return(fd);
396 } /* }}} static int open_pidfile */
398 /* check existing pid file to see whether a daemon is running */
399 static int check_pidfile(void)
400 {
401 int pid_fd;
402 pid_t pid;
403 char pid_str[16];
405 pid_fd = open_pidfile("open", O_RDWR);
406 if (pid_fd < 0)
407 return pid_fd;
409 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
410 return -1;
412 pid = atoi(pid_str);
413 if (pid <= 0)
414 return -1;
416 /* another running process that we can signal COULD be
417 * a competing rrdcached */
418 if (pid != getpid() && kill(pid, 0) == 0)
419 {
420 fprintf(stderr,
421 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
422 close(pid_fd);
423 return -1;
424 }
426 lseek(pid_fd, 0, SEEK_SET);
427 if (ftruncate(pid_fd, 0) == -1)
428 {
429 fprintf(stderr,
430 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
431 close(pid_fd);
432 return -1;
433 }
435 fprintf(stderr,
436 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
437 "rrdcached: starting normally.\n", pid);
439 return pid_fd;
440 } /* }}} static int check_pidfile */
442 static int write_pidfile (int fd) /* {{{ */
443 {
444 pid_t pid;
445 FILE *fh;
447 pid = getpid ();
449 fh = fdopen (fd, "w");
450 if (fh == NULL)
451 {
452 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
453 close(fd);
454 return (-1);
455 }
457 fprintf (fh, "%i\n", (int) pid);
458 fclose (fh);
460 return (0);
461 } /* }}} int write_pidfile */
463 static int remove_pidfile (void) /* {{{ */
464 {
465 char *file;
466 int status;
468 file = (config_pid_file != NULL)
469 ? config_pid_file
470 : LOCALSTATEDIR "/run/rrdcached.pid";
472 status = unlink (file);
473 if (status == 0)
474 return (0);
475 return (errno);
476 } /* }}} int remove_pidfile */
478 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
479 {
480 char *eol;
482 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
483 sock->next_read - sock->next_cmd);
485 if (eol == NULL)
486 {
487 /* no commands left, move remainder back to front of rbuf */
488 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
489 sock->next_read - sock->next_cmd);
490 sock->next_read -= sock->next_cmd;
491 sock->next_cmd = 0;
492 *len = 0;
493 return NULL;
494 }
495 else
496 {
497 char *cmd = sock->rbuf + sock->next_cmd;
498 *eol = '\0';
500 sock->next_cmd = eol - sock->rbuf + 1;
502 if (eol > sock->rbuf && *(eol-1) == '\r')
503 *(--eol) = '\0'; /* handle "\r\n" EOL */
505 *len = eol - cmd;
507 return cmd;
508 }
510 /* NOTREACHED */
511 assert(1==0);
512 } /* }}} char *next_cmd */
514 /* add the characters directly to the write buffer */
515 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
516 {
517 char *new_buf;
519 assert(sock != NULL);
521 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
522 if (new_buf == NULL)
523 {
524 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
525 return -1;
526 }
528 strncpy(new_buf + sock->wbuf_len, str, len + 1);
530 sock->wbuf = new_buf;
531 sock->wbuf_len += len;
533 return 0;
534 } /* }}} static int add_to_wbuf */
536 /* add the text to the "extra" info that's sent after the status line */
537 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
538 {
539 va_list argp;
540 char buffer[CMD_MAX];
541 int len;
543 if (JOURNAL_REPLAY(sock)) return 0;
544 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
546 va_start(argp, fmt);
547 #ifdef HAVE_VSNPRINTF
548 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
549 #else
550 len = vsprintf(buffer, fmt, argp);
551 #endif
552 va_end(argp);
553 if (len < 0)
554 {
555 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
556 return -1;
557 }
559 return add_to_wbuf(sock, buffer, len);
560 } /* }}} static int add_response_info */
562 static int count_lines(char *str) /* {{{ */
563 {
564 int lines = 0;
566 if (str != NULL)
567 {
568 while ((str = strchr(str, '\n')) != NULL)
569 {
570 ++lines;
571 ++str;
572 }
573 }
575 return lines;
576 } /* }}} static int count_lines */
578 /* send the response back to the user.
579 * returns 0 on success, -1 on error
580 * write buffer is always zeroed after this call */
581 static int send_response (listen_socket_t *sock, response_code rc,
582 char *fmt, ...) /* {{{ */
583 {
584 va_list argp;
585 char buffer[CMD_MAX];
586 int lines;
587 ssize_t wrote;
588 int rclen, len;
590 if (JOURNAL_REPLAY(sock)) return rc;
592 if (sock->batch_start)
593 {
594 if (rc == RESP_OK)
595 return rc; /* no response on success during BATCH */
596 lines = sock->batch_cmd;
597 }
598 else if (rc == RESP_OK)
599 lines = count_lines(sock->wbuf);
600 else
601 lines = -1;
603 rclen = sprintf(buffer, "%d ", lines);
604 va_start(argp, fmt);
605 #ifdef HAVE_VSNPRINTF
606 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
607 #else
608 len = vsprintf(buffer+rclen, fmt, argp);
609 #endif
610 va_end(argp);
611 if (len < 0)
612 return -1;
614 len += rclen;
616 /* append the result to the wbuf, don't write to the user */
617 if (sock->batch_start)
618 return add_to_wbuf(sock, buffer, len);
620 /* first write must be complete */
621 if (len != write(sock->fd, buffer, len))
622 {
623 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
624 return -1;
625 }
627 if (sock->wbuf != NULL && rc == RESP_OK)
628 {
629 wrote = 0;
630 while (wrote < sock->wbuf_len)
631 {
632 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
633 if (wb <= 0)
634 {
635 RRDD_LOG(LOG_INFO, "send_response: could not write results");
636 return -1;
637 }
638 wrote += wb;
639 }
640 }
642 free(sock->wbuf); sock->wbuf = NULL;
643 sock->wbuf_len = 0;
645 return 0;
646 } /* }}} */
648 static void wipe_ci_values(cache_item_t *ci, time_t when)
649 {
650 ci->values = NULL;
651 ci->values_num = 0;
652 ci->values_alloc = 0;
654 ci->last_flush_time = when;
655 if (config_write_jitter > 0)
656 ci->last_flush_time += (rrd_random() % config_write_jitter);
657 }
659 /* remove_from_queue
660 * remove a "cache_item_t" item from the queue.
661 * must hold 'cache_lock' when calling this
662 */
663 static void remove_from_queue(cache_item_t *ci) /* {{{ */
664 {
665 if (ci == NULL) return;
666 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
668 if (ci->prev == NULL)
669 cache_queue_head = ci->next; /* reset head */
670 else
671 ci->prev->next = ci->next;
673 if (ci->next == NULL)
674 cache_queue_tail = ci->prev; /* reset the tail */
675 else
676 ci->next->prev = ci->prev;
678 ci->next = ci->prev = NULL;
679 ci->flags &= ~CI_FLAGS_IN_QUEUE;
681 pthread_mutex_lock (&stats_lock);
682 assert (stats_queue_length > 0);
683 stats_queue_length--;
684 pthread_mutex_unlock (&stats_lock);
686 } /* }}} static void remove_from_queue */
688 /* free the resources associated with the cache_item_t
689 * must hold cache_lock when calling this function
690 */
691 static void *free_cache_item(cache_item_t *ci) /* {{{ */
692 {
693 if (ci == NULL) return NULL;
695 remove_from_queue(ci);
697 for (size_t i=0; i < ci->values_num; i++)
698 free(ci->values[i]);
700 free (ci->values);
701 free (ci->file);
703 /* in case anyone is waiting */
704 pthread_cond_broadcast(&ci->flushed);
705 pthread_cond_destroy(&ci->flushed);
707 free (ci);
709 return NULL;
710 } /* }}} static void *free_cache_item */
712 /*
713 * enqueue_cache_item:
714 * `cache_lock' must be acquired before calling this function!
715 */
716 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
717 queue_side_t side)
718 {
719 if (ci == NULL)
720 return (-1);
722 if (ci->values_num == 0)
723 return (0);
725 if (side == HEAD)
726 {
727 if (cache_queue_head == ci)
728 return 0;
730 /* remove if further down in queue */
731 remove_from_queue(ci);
733 ci->prev = NULL;
734 ci->next = cache_queue_head;
735 if (ci->next != NULL)
736 ci->next->prev = ci;
737 cache_queue_head = ci;
739 if (cache_queue_tail == NULL)
740 cache_queue_tail = cache_queue_head;
741 }
742 else /* (side == TAIL) */
743 {
744 /* We don't move values back in the list.. */
745 if (ci->flags & CI_FLAGS_IN_QUEUE)
746 return (0);
748 assert (ci->next == NULL);
749 assert (ci->prev == NULL);
751 ci->prev = cache_queue_tail;
753 if (cache_queue_tail == NULL)
754 cache_queue_head = ci;
755 else
756 cache_queue_tail->next = ci;
758 cache_queue_tail = ci;
759 }
761 ci->flags |= CI_FLAGS_IN_QUEUE;
763 pthread_cond_signal(&queue_cond);
764 pthread_mutex_lock (&stats_lock);
765 stats_queue_length++;
766 pthread_mutex_unlock (&stats_lock);
768 return (0);
769 } /* }}} int enqueue_cache_item */
771 /*
772 * tree_callback_flush:
773 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
774 * while this is in progress.
775 */
776 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
777 gpointer data)
778 {
779 cache_item_t *ci;
780 callback_flush_data_t *cfd;
782 ci = (cache_item_t *) value;
783 cfd = (callback_flush_data_t *) data;
785 if (ci->flags & CI_FLAGS_IN_QUEUE)
786 return FALSE;
788 if (ci->values_num > 0
789 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
790 {
791 enqueue_cache_item (ci, TAIL);
792 }
793 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
794 && (ci->values_num <= 0))
795 {
796 assert ((char *) key == ci->file);
797 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
798 {
799 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
800 return (FALSE);
801 }
802 }
804 return (FALSE);
805 } /* }}} gboolean tree_callback_flush */
807 static int flush_old_values (int max_age)
808 {
809 callback_flush_data_t cfd;
810 size_t k;
812 memset (&cfd, 0, sizeof (cfd));
813 /* Pass the current time as user data so that we don't need to call
814 * `time' for each node. */
815 cfd.now = time (NULL);
816 cfd.keys = NULL;
817 cfd.keys_num = 0;
819 if (max_age > 0)
820 cfd.abs_timeout = cfd.now - max_age;
821 else
822 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
824 /* `tree_callback_flush' will return the keys of all values that haven't
825 * been touched in the last `config_flush_interval' seconds in `cfd'.
826 * The char*'s in this array point to the same memory as ci->file, so we
827 * don't need to free them separately. */
828 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
830 for (k = 0; k < cfd.keys_num; k++)
831 {
832 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
833 /* should never fail, since we have held the cache_lock
834 * the entire time */
835 assert(status == TRUE);
836 }
838 if (cfd.keys != NULL)
839 {
840 free (cfd.keys);
841 cfd.keys = NULL;
842 }
844 return (0);
845 } /* int flush_old_values */
847 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
848 {
849 struct timeval now;
850 struct timespec next_flush;
851 int status;
853 gettimeofday (&now, NULL);
854 next_flush.tv_sec = now.tv_sec + config_flush_interval;
855 next_flush.tv_nsec = 1000 * now.tv_usec;
857 pthread_mutex_lock(&cache_lock);
859 while (state == RUNNING)
860 {
861 gettimeofday (&now, NULL);
862 if ((now.tv_sec > next_flush.tv_sec)
863 || ((now.tv_sec == next_flush.tv_sec)
864 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
865 {
866 RRDD_LOG(LOG_DEBUG, "flushing old values");
868 /* Determine the time of the next cache flush. */
869 next_flush.tv_sec = now.tv_sec + config_flush_interval;
871 /* Flush all values that haven't been written in the last
872 * `config_write_interval' seconds. */
873 flush_old_values (config_write_interval);
875 /* unlock the cache while we rotate so we don't block incoming
876 * updates if the fsync() blocks on disk I/O */
877 pthread_mutex_unlock(&cache_lock);
878 journal_rotate();
879 pthread_mutex_lock(&cache_lock);
880 }
882 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
883 if (status != 0 && status != ETIMEDOUT)
884 {
885 RRDD_LOG (LOG_ERR, "flush_thread_main: "
886 "pthread_cond_timedwait returned %i.", status);
887 }
888 }
890 if (config_flush_at_shutdown)
891 flush_old_values (-1); /* flush everything */
893 state = SHUTDOWN;
895 pthread_mutex_unlock(&cache_lock);
897 return NULL;
898 } /* void *flush_thread_main */
900 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
901 {
902 pthread_mutex_lock (&cache_lock);
904 while (state != SHUTDOWN
905 || (cache_queue_head != NULL && config_flush_at_shutdown))
906 {
907 cache_item_t *ci;
908 char *file;
909 char **values;
910 size_t values_num;
911 int status;
913 /* Now, check if there's something to store away. If not, wait until
914 * something comes in. */
915 if (cache_queue_head == NULL)
916 {
917 status = pthread_cond_wait (&queue_cond, &cache_lock);
918 if ((status != 0) && (status != ETIMEDOUT))
919 {
920 RRDD_LOG (LOG_ERR, "queue_thread_main: "
921 "pthread_cond_wait returned %i.", status);
922 }
923 }
925 /* Check if a value has arrived. This may be NULL if we timed out or there
926 * was an interrupt such as a signal. */
927 if (cache_queue_head == NULL)
928 continue;
930 ci = cache_queue_head;
932 /* copy the relevant parts */
933 file = strdup (ci->file);
934 if (file == NULL)
935 {
936 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
937 continue;
938 }
940 assert(ci->values != NULL);
941 assert(ci->values_num > 0);
943 values = ci->values;
944 values_num = ci->values_num;
946 wipe_ci_values(ci, time(NULL));
947 remove_from_queue(ci);
949 pthread_mutex_unlock (&cache_lock);
951 rrd_clear_error ();
952 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
953 if (status != 0)
954 {
955 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
956 "rrd_update_r (%s) failed with status %i. (%s)",
957 file, status, rrd_get_error());
958 }
960 journal_write("wrote", file);
962 /* Search again in the tree. It's possible someone issued a "FORGET"
963 * while we were writing the update values. */
964 pthread_mutex_lock(&cache_lock);
965 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
966 if (ci)
967 pthread_cond_broadcast(&ci->flushed);
968 pthread_mutex_unlock(&cache_lock);
970 if (status == 0)
971 {
972 pthread_mutex_lock (&stats_lock);
973 stats_updates_written++;
974 stats_data_sets_written += values_num;
975 pthread_mutex_unlock (&stats_lock);
976 }
978 rrd_free_ptrs((void ***) &values, &values_num);
979 free(file);
981 pthread_mutex_lock (&cache_lock);
982 }
983 pthread_mutex_unlock (&cache_lock);
985 return (NULL);
986 } /* }}} void *queue_thread_main */
988 static int buffer_get_field (char **buffer_ret, /* {{{ */
989 size_t *buffer_size_ret, char **field_ret)
990 {
991 char *buffer;
992 size_t buffer_pos;
993 size_t buffer_size;
994 char *field;
995 size_t field_size;
996 int status;
998 buffer = *buffer_ret;
999 buffer_pos = 0;
1000 buffer_size = *buffer_size_ret;
1001 field = *buffer_ret;
1002 field_size = 0;
1004 if (buffer_size <= 0)
1005 return (-1);
1007 /* This is ensured by `handle_request'. */
1008 assert (buffer[buffer_size - 1] == '\0');
1010 status = -1;
1011 while (buffer_pos < buffer_size)
1012 {
1013 /* Check for end-of-field or end-of-buffer */
1014 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1015 {
1016 field[field_size] = 0;
1017 field_size++;
1018 buffer_pos++;
1019 status = 0;
1020 break;
1021 }
1022 /* Handle escaped characters. */
1023 else if (buffer[buffer_pos] == '\\')
1024 {
1025 if (buffer_pos >= (buffer_size - 1))
1026 break;
1027 buffer_pos++;
1028 field[field_size] = buffer[buffer_pos];
1029 field_size++;
1030 buffer_pos++;
1031 }
1032 /* Normal operation */
1033 else
1034 {
1035 field[field_size] = buffer[buffer_pos];
1036 field_size++;
1037 buffer_pos++;
1038 }
1039 } /* while (buffer_pos < buffer_size) */
1041 if (status != 0)
1042 return (status);
1044 *buffer_ret = buffer + buffer_pos;
1045 *buffer_size_ret = buffer_size - buffer_pos;
1046 *field_ret = field;
1048 return (0);
1049 } /* }}} int buffer_get_field */
1051 /* if we're restricting writes to the base directory,
1052 * check whether the file falls within the dir
1053 * returns 1 if OK, otherwise 0
1054 */
1055 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1056 {
1057 assert(file != NULL);
1059 if (!config_write_base_only
1060 || JOURNAL_REPLAY(sock)
1061 || config_base_dir == NULL)
1062 return 1;
1064 if (strstr(file, "../") != NULL) goto err;
1066 /* relative paths without "../" are ok */
1067 if (*file != '/') return 1;
1069 /* file must be of the format base + "/" + <1+ char filename> */
1070 if (strlen(file) < _config_base_dir_len + 2) goto err;
1071 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1072 if (*(file + _config_base_dir_len) != '/') goto err;
1074 return 1;
1076 err:
1077 if (sock != NULL && sock->fd >= 0)
1078 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1080 return 0;
1081 } /* }}} static int check_file_access */
1083 /* when using a base dir, convert relative paths to absolute paths.
1084 * if necessary, modifies the "filename" pointer to point
1085 * to the new path created in "tmp". "tmp" is provided
1086 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1087 *
1088 * this allows us to optimize for the expected case (absolute path)
1089 * with a no-op.
1090 */
1091 static void get_abs_path(char **filename, char *tmp)
1092 {
1093 assert(tmp != NULL);
1094 assert(filename != NULL && *filename != NULL);
1096 if (config_base_dir == NULL || **filename == '/')
1097 return;
1099 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1100 *filename = tmp;
1101 } /* }}} static int get_abs_path */
1103 static int flush_file (const char *filename) /* {{{ */
1104 {
1105 cache_item_t *ci;
1107 pthread_mutex_lock (&cache_lock);
1109 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1110 if (ci == NULL)
1111 {
1112 pthread_mutex_unlock (&cache_lock);
1113 return (ENOENT);
1114 }
1116 if (ci->values_num > 0)
1117 {
1118 /* Enqueue at head */
1119 enqueue_cache_item (ci, HEAD);
1120 pthread_cond_wait(&ci->flushed, &cache_lock);
1121 }
1123 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1124 * may have been purged during our cond_wait() */
1126 pthread_mutex_unlock(&cache_lock);
1128 return (0);
1129 } /* }}} int flush_file */
1131 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1132 {
1133 char *err = "Syntax error.\n";
1135 if (cmd && cmd->syntax)
1136 err = cmd->syntax;
1138 return send_response(sock, RESP_ERR, "Usage: %s", err);
1139 } /* }}} static int syntax_error() */
1141 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1142 {
1143 uint64_t copy_queue_length;
1144 uint64_t copy_updates_received;
1145 uint64_t copy_flush_received;
1146 uint64_t copy_updates_written;
1147 uint64_t copy_data_sets_written;
1148 uint64_t copy_journal_bytes;
1149 uint64_t copy_journal_rotate;
1151 uint64_t tree_nodes_number;
1152 uint64_t tree_depth;
1154 pthread_mutex_lock (&stats_lock);
1155 copy_queue_length = stats_queue_length;
1156 copy_updates_received = stats_updates_received;
1157 copy_flush_received = stats_flush_received;
1158 copy_updates_written = stats_updates_written;
1159 copy_data_sets_written = stats_data_sets_written;
1160 copy_journal_bytes = stats_journal_bytes;
1161 copy_journal_rotate = stats_journal_rotate;
1162 pthread_mutex_unlock (&stats_lock);
1164 pthread_mutex_lock (&cache_lock);
1165 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1166 tree_depth = (uint64_t) g_tree_height (cache_tree);
1167 pthread_mutex_unlock (&cache_lock);
1169 add_response_info(sock,
1170 "QueueLength: %"PRIu64"\n", copy_queue_length);
1171 add_response_info(sock,
1172 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1173 add_response_info(sock,
1174 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1175 add_response_info(sock,
1176 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1177 add_response_info(sock,
1178 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1179 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1180 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1181 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1182 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1184 send_response(sock, RESP_OK, "Statistics follow\n");
1186 return (0);
1187 } /* }}} int handle_request_stats */
1189 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1190 {
1191 char *file, file_tmp[PATH_MAX];
1192 int status;
1194 status = buffer_get_field (&buffer, &buffer_size, &file);
1195 if (status != 0)
1196 {
1197 return syntax_error(sock,cmd);
1198 }
1199 else
1200 {
1201 pthread_mutex_lock(&stats_lock);
1202 stats_flush_received++;
1203 pthread_mutex_unlock(&stats_lock);
1205 get_abs_path(&file, file_tmp);
1206 if (!check_file_access(file, sock)) return 0;
1208 status = flush_file (file);
1209 if (status == 0)
1210 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1211 else if (status == ENOENT)
1212 {
1213 /* no file in our tree; see whether it exists at all */
1214 struct stat statbuf;
1216 memset(&statbuf, 0, sizeof(statbuf));
1217 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1218 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1219 else
1220 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1221 }
1222 else if (status < 0)
1223 return send_response(sock, RESP_ERR, "Internal error.\n");
1224 else
1225 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1226 }
1228 /* NOTREACHED */
1229 assert(1==0);
1230 } /* }}} int handle_request_flush */
1232 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1233 {
1234 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1236 pthread_mutex_lock(&cache_lock);
1237 flush_old_values(-1);
1238 pthread_mutex_unlock(&cache_lock);
1240 return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1243 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1244 {
1245 int status;
1246 char *file, file_tmp[PATH_MAX];
1247 cache_item_t *ci;
1249 status = buffer_get_field(&buffer, &buffer_size, &file);
1250 if (status != 0)
1251 return syntax_error(sock,cmd);
1253 get_abs_path(&file, file_tmp);
1255 pthread_mutex_lock(&cache_lock);
1256 ci = g_tree_lookup(cache_tree, file);
1257 if (ci == NULL)
1258 {
1259 pthread_mutex_unlock(&cache_lock);
1260 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1261 }
1263 for (size_t i=0; i < ci->values_num; i++)
1264 add_response_info(sock, "%s\n", ci->values[i]);
1266 pthread_mutex_unlock(&cache_lock);
1267 return send_response(sock, RESP_OK, "updates pending\n");
1268 } /* }}} static int handle_request_pending */
1270 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1271 {
1272 int status;
1273 gboolean found;
1274 char *file, file_tmp[PATH_MAX];
1276 status = buffer_get_field(&buffer, &buffer_size, &file);
1277 if (status != 0)
1278 return syntax_error(sock,cmd);
1280 get_abs_path(&file, file_tmp);
1281 if (!check_file_access(file, sock)) return 0;
1283 pthread_mutex_lock(&cache_lock);
1284 found = g_tree_remove(cache_tree, file);
1285 pthread_mutex_unlock(&cache_lock);
1287 if (found == TRUE)
1288 {
1289 if (!JOURNAL_REPLAY(sock))
1290 journal_write("forget", file);
1292 return send_response(sock, RESP_OK, "Gone!\n");
1293 }
1294 else
1295 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1297 /* NOTREACHED */
1298 assert(1==0);
1299 } /* }}} static int handle_request_forget */
1301 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1302 {
1303 cache_item_t *ci;
1305 pthread_mutex_lock(&cache_lock);
1307 ci = cache_queue_head;
1308 while (ci != NULL)
1309 {
1310 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1311 ci = ci->next;
1312 }
1314 pthread_mutex_unlock(&cache_lock);
1316 return send_response(sock, RESP_OK, "in queue.\n");
1317 } /* }}} int handle_request_queue */
1319 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1320 {
1321 char *file, file_tmp[PATH_MAX];
1322 int values_num = 0;
1323 int status;
1324 char orig_buf[CMD_MAX];
1326 cache_item_t *ci;
1328 /* save it for the journal later */
1329 if (!JOURNAL_REPLAY(sock))
1330 strncpy(orig_buf, buffer, buffer_size);
1332 status = buffer_get_field (&buffer, &buffer_size, &file);
1333 if (status != 0)
1334 return syntax_error(sock,cmd);
1336 pthread_mutex_lock(&stats_lock);
1337 stats_updates_received++;
1338 pthread_mutex_unlock(&stats_lock);
1340 get_abs_path(&file, file_tmp);
1341 if (!check_file_access(file, sock)) return 0;
1343 pthread_mutex_lock (&cache_lock);
1344 ci = g_tree_lookup (cache_tree, file);
1346 if (ci == NULL) /* {{{ */
1347 {
1348 struct stat statbuf;
1349 cache_item_t *tmp;
1351 /* don't hold the lock while we setup; stat(2) might block */
1352 pthread_mutex_unlock(&cache_lock);
1354 memset (&statbuf, 0, sizeof (statbuf));
1355 status = stat (file, &statbuf);
1356 if (status != 0)
1357 {
1358 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1360 status = errno;
1361 if (status == ENOENT)
1362 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1363 else
1364 return send_response(sock, RESP_ERR,
1365 "stat failed with error %i.\n", status);
1366 }
1367 if (!S_ISREG (statbuf.st_mode))
1368 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1370 if (access(file, R_OK|W_OK) != 0)
1371 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1372 file, rrd_strerror(errno));
1374 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1375 if (ci == NULL)
1376 {
1377 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1379 return send_response(sock, RESP_ERR, "malloc failed.\n");
1380 }
1381 memset (ci, 0, sizeof (cache_item_t));
1383 ci->file = strdup (file);
1384 if (ci->file == NULL)
1385 {
1386 free (ci);
1387 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1389 return send_response(sock, RESP_ERR, "strdup failed.\n");
1390 }
1392 wipe_ci_values(ci, now);
1393 ci->flags = CI_FLAGS_IN_TREE;
1394 pthread_cond_init(&ci->flushed, NULL);
1396 pthread_mutex_lock(&cache_lock);
1398 /* another UPDATE might have added this entry in the meantime */
1399 tmp = g_tree_lookup (cache_tree, file);
1400 if (tmp == NULL)
1401 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1402 else
1403 {
1404 free_cache_item (ci);
1405 ci = tmp;
1406 }
1408 /* state may have changed while we were unlocked */
1409 if (state == SHUTDOWN)
1410 return -1;
1411 } /* }}} */
1412 assert (ci != NULL);
1414 /* don't re-write updates in replay mode */
1415 if (!JOURNAL_REPLAY(sock))
1416 journal_write("update", orig_buf);
1418 while (buffer_size > 0)
1419 {
1420 char *value;
1421 time_t stamp;
1422 char *eostamp;
1424 status = buffer_get_field (&buffer, &buffer_size, &value);
1425 if (status != 0)
1426 {
1427 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1428 break;
1429 }
1431 /* make sure update time is always moving forward */
1432 stamp = strtol(value, &eostamp, 10);
1433 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434 {
1435 pthread_mutex_unlock(&cache_lock);
1436 return send_response(sock, RESP_ERR,
1437 "Cannot find timestamp in '%s'!\n", value);
1438 }
1439 else if (stamp <= ci->last_update_stamp)
1440 {
1441 pthread_mutex_unlock(&cache_lock);
1442 return send_response(sock, RESP_ERR,
1443 "illegal attempt to update using time %ld when last"
1444 " update time is %ld (minimum one second step)\n",
1445 stamp, ci->last_update_stamp);
1446 }
1447 else
1448 ci->last_update_stamp = stamp;
1450 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451 &ci->values_alloc, config_alloc_chunk))
1452 {
1453 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454 continue;
1455 }
1457 values_num++;
1458 }
1460 if (((now - ci->last_flush_time) >= config_write_interval)
1461 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462 && (ci->values_num > 0))
1463 {
1464 enqueue_cache_item (ci, TAIL);
1465 }
1467 pthread_mutex_unlock (&cache_lock);
1469 if (values_num < 1)
1470 return send_response(sock, RESP_ERR, "No values updated.\n");
1471 else
1472 return send_response(sock, RESP_OK,
1473 "errors, enqueued %i value(s).\n", values_num);
1475 /* NOTREACHED */
1476 assert(1==0);
1478 } /* }}} int handle_request_update */
1480 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1481 {
1482 char *file;
1483 char *cf;
1485 char *start_str;
1486 char *end_str;
1487 rrd_time_value_t start_tv;
1488 rrd_time_value_t end_tv;
1489 time_t start_tm;
1490 time_t end_tm;
1492 unsigned long step;
1493 unsigned long ds_cnt;
1494 char **ds_namv;
1495 rrd_value_t *data;
1497 int status;
1498 unsigned long i;
1499 time_t t;
1500 rrd_value_t *data_ptr;
1502 file = NULL;
1503 cf = NULL;
1504 start_str = NULL;
1505 end_str = NULL;
1507 /* Read the arguments */
1508 do /* while (0) */
1509 {
1510 status = buffer_get_field (&buffer, &buffer_size, &file);
1511 if (status != 0)
1512 break;
1514 status = buffer_get_field (&buffer, &buffer_size, &cf);
1515 if (status != 0)
1516 break;
1518 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1519 if (status != 0)
1520 {
1521 start_str = NULL;
1522 status = 0;
1523 break;
1524 }
1526 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1527 if (status != 0)
1528 {
1529 end_str = NULL;
1530 status = 0;
1531 break;
1532 }
1533 } while (0);
1535 if (status != 0)
1536 return (syntax_error(sock,cmd));
1538 status = flush_file (file);
1539 if ((status != 0) && (status != ENOENT))
1540 return (send_response (sock, RESP_ERR,
1541 "flush_file (%s) failed with status %i.\n", file, status));
1543 /* Parse start time */
1544 if (start_str != NULL)
1545 {
1546 const char *errmsg;
1548 errmsg = rrd_parsetime (start_str, &start_tv);
1549 if (errmsg != NULL)
1550 return (send_response(sock, RESP_ERR,
1551 "Cannot parse start time `%s': %s\n", start_str, errmsg));
1552 }
1553 else
1554 rrd_parsetime ("-86400", &start_tv);
1556 /* Parse end time */
1557 if (end_str != NULL)
1558 {
1559 const char *errmsg;
1561 errmsg = rrd_parsetime (end_str, &end_tv);
1562 if (errmsg != NULL)
1563 return (send_response(sock, RESP_ERR,
1564 "Cannot parse end time `%s': %s\n", end_str, errmsg));
1565 }
1566 else
1567 rrd_parsetime ("now", &end_tv);
1569 start_tm = 0;
1570 end_tm = 0;
1571 status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm);
1572 if (status != 0)
1573 return (send_response(sock, RESP_ERR,
1574 "rrd_proc_start_end failed: %s\n", rrd_get_error ()));
1576 step = -1;
1577 ds_cnt = 0;
1578 ds_namv = NULL;
1579 data = NULL;
1581 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1582 &ds_cnt, &ds_namv, &data);
1583 if (status != 0)
1584 return (send_response(sock, RESP_ERR,
1585 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1587 add_response_info (sock, "FlushVersion: %lu\n", 1);
1588 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1589 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1590 add_response_info (sock, "Step: %lu\n", step);
1591 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1593 #define SSTRCAT(buffer,str,buffer_fill) do { \
1594 size_t str_len = strlen (str); \
1595 if ((buffer_fill + str_len) > sizeof (buffer)) \
1596 str_len = sizeof (buffer) - buffer_fill; \
1597 if (str_len > 0) { \
1598 strncpy (buffer + buffer_fill, str, str_len); \
1599 buffer_fill += str_len; \
1600 assert (buffer_fill <= sizeof (buffer)); \
1601 if (buffer_fill == sizeof (buffer)) \
1602 buffer[buffer_fill - 1] = 0; \
1603 else \
1604 buffer[buffer_fill] = 0; \
1605 } \
1606 } while (0)
1608 { /* Add list of DS names */
1609 char linebuf[1024];
1610 size_t linebuf_fill;
1612 memset (linebuf, 0, sizeof (linebuf));
1613 linebuf_fill = 0;
1614 for (i = 0; i < ds_cnt; i++)
1615 {
1616 if (i > 0)
1617 SSTRCAT (linebuf, " ", linebuf_fill);
1618 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1619 }
1620 add_response_info (sock, "DSName: %s\n", linebuf);
1621 }
1623 /* Add the actual data */
1624 assert (step > 0);
1625 data_ptr = data;
1626 for (t = start_tm + step; t <= end_tm; t += step)
1627 {
1628 char linebuf[1024];
1629 size_t linebuf_fill;
1630 char tmp[128];
1632 memset (linebuf, 0, sizeof (linebuf));
1633 linebuf_fill = 0;
1634 for (i = 0; i < ds_cnt; i++)
1635 {
1636 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1637 tmp[sizeof (tmp) - 1] = 0;
1638 SSTRCAT (linebuf, tmp, linebuf_fill);
1640 data_ptr++;
1641 }
1643 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1644 } /* for (t) */
1646 return (send_response (sock, RESP_OK, "Success\n"));
1647 #undef SSTRCAT
1648 } /* }}} int handle_request_fetch */
1650 /* we came across a "WROTE" entry during journal replay.
1651 * throw away any values that we have accumulated for this file
1652 */
1653 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1654 {
1655 cache_item_t *ci;
1656 const char *file = buffer;
1658 pthread_mutex_lock(&cache_lock);
1660 ci = g_tree_lookup(cache_tree, file);
1661 if (ci == NULL)
1662 {
1663 pthread_mutex_unlock(&cache_lock);
1664 return (0);
1665 }
1667 if (ci->values)
1668 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1670 wipe_ci_values(ci, now);
1671 remove_from_queue(ci);
1673 pthread_mutex_unlock(&cache_lock);
1674 return (0);
1675 } /* }}} int handle_request_wrote */
1677 /* start "BATCH" processing */
1678 static int batch_start (HANDLER_PROTO) /* {{{ */
1679 {
1680 int status;
1681 if (sock->batch_start)
1682 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1684 status = send_response(sock, RESP_OK,
1685 "Go ahead. End with dot '.' on its own line.\n");
1686 sock->batch_start = time(NULL);
1687 sock->batch_cmd = 0;
1689 return status;
1690 } /* }}} static int batch_start */
1692 /* finish "BATCH" processing and return results to the client */
1693 static int batch_done (HANDLER_PROTO) /* {{{ */
1694 {
1695 assert(sock->batch_start);
1696 sock->batch_start = 0;
1697 sock->batch_cmd = 0;
1698 return send_response(sock, RESP_OK, "errors\n");
1699 } /* }}} static int batch_done */
1701 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1702 {
1703 return -1;
1704 } /* }}} static int handle_request_quit */
1706 static command_t list_of_commands[] = { /* {{{ */
1707 {
1708 "UPDATE",
1709 handle_request_update,
1710 CMD_CONTEXT_ANY,
1711 "UPDATE <filename> <values> [<values> ...]\n"
1712 ,
1713 "Adds the given file to the internal cache if it is not yet known and\n"
1714 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1715 "for details.\n"
1716 "\n"
1717 "Each <values> has the following form:\n"
1718 " <values> = <time>:<value>[:<value>[...]]\n"
1719 "See the rrdupdate(1) manpage for details.\n"
1720 },
1721 {
1722 "WROTE",
1723 handle_request_wrote,
1724 CMD_CONTEXT_JOURNAL,
1725 NULL,
1726 NULL
1727 },
1728 {
1729 "FLUSH",
1730 handle_request_flush,
1731 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1732 "FLUSH <filename>\n"
1733 ,
1734 "Adds the given filename to the head of the update queue and returns\n"
1735 "after it has been dequeued.\n"
1736 },
1737 {
1738 "FLUSHALL",
1739 handle_request_flushall,
1740 CMD_CONTEXT_CLIENT,
1741 "FLUSHALL\n"
1742 ,
1743 "Triggers writing of all pending updates. Returns immediately.\n"
1744 },
1745 {
1746 "PENDING",
1747 handle_request_pending,
1748 CMD_CONTEXT_CLIENT,
1749 "PENDING <filename>\n"
1750 ,
1751 "Shows any 'pending' updates for a file, in order.\n"
1752 "The updates shown have not yet been written to the underlying RRD file.\n"
1753 },
1754 {
1755 "FORGET",
1756 handle_request_forget,
1757 CMD_CONTEXT_ANY,
1758 "FORGET <filename>\n"
1759 ,
1760 "Removes the file completely from the cache.\n"
1761 "Any pending updates for the file will be lost.\n"
1762 },
1763 {
1764 "QUEUE",
1765 handle_request_queue,
1766 CMD_CONTEXT_CLIENT,
1767 "QUEUE\n"
1768 ,
1769 "Shows all files in the output queue.\n"
1770 "The output is zero or more lines in the following format:\n"
1771 "(where <num_vals> is the number of values to be written)\n"
1772 "\n"
1773 "<num_vals> <filename>\n"
1774 },
1775 {
1776 "STATS",
1777 handle_request_stats,
1778 CMD_CONTEXT_CLIENT,
1779 "STATS\n"
1780 ,
1781 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1782 "a description of the values.\n"
1783 },
1784 {
1785 "HELP",
1786 handle_request_help,
1787 CMD_CONTEXT_CLIENT,
1788 "HELP [<command>]\n",
1789 NULL, /* special! */
1790 },
1791 {
1792 "BATCH",
1793 batch_start,
1794 CMD_CONTEXT_CLIENT,
1795 "BATCH\n"
1796 ,
1797 "The 'BATCH' command permits the client to initiate a bulk load\n"
1798 " of commands to rrdcached.\n"
1799 "\n"
1800 "Usage:\n"
1801 "\n"
1802 " client: BATCH\n"
1803 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1804 " client: command #1\n"
1805 " client: command #2\n"
1806 " client: ... and so on\n"
1807 " client: .\n"
1808 " server: 2 errors\n"
1809 " server: 7 message for command #7\n"
1810 " server: 9 message for command #9\n"
1811 "\n"
1812 "For more information, consult the rrdcached(1) documentation.\n"
1813 },
1814 {
1815 ".", /* BATCH terminator */
1816 batch_done,
1817 CMD_CONTEXT_BATCH,
1818 NULL,
1819 NULL
1820 },
1821 {
1822 "FETCH",
1823 handle_request_fetch,
1824 CMD_CONTEXT_CLIENT,
1825 "FETCH <file> <CF> [<start> [<end>]]\n"
1826 ,
1827 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1828 },
1829 {
1830 "QUIT",
1831 handle_request_quit,
1832 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1833 "QUIT\n"
1834 ,
1835 "Disconnect from rrdcached.\n"
1836 }
1837 }; /* }}} command_t list_of_commands[] */
1838 static size_t list_of_commands_len = sizeof (list_of_commands)
1839 / sizeof (list_of_commands[0]);
1841 static command_t *find_command(char *cmd)
1842 {
1843 size_t i;
1845 for (i = 0; i < list_of_commands_len; i++)
1846 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1847 return (&list_of_commands[i]);
1848 return NULL;
1849 }
1851 /* We currently use the index in the `list_of_commands' array as a bit position
1852 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1853 * outside these functions so that switching to a more elegant storage method
1854 * is easily possible. */
1855 static ssize_t find_command_index (const char *cmd) /* {{{ */
1856 {
1857 size_t i;
1859 for (i = 0; i < list_of_commands_len; i++)
1860 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1861 return ((ssize_t) i);
1862 return (-1);
1863 } /* }}} ssize_t find_command_index */
1865 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1866 const char *cmd)
1867 {
1868 ssize_t i;
1870 if (JOURNAL_REPLAY(sock))
1871 return (1);
1873 if (cmd == NULL)
1874 return (-1);
1876 if ((strcasecmp ("QUIT", cmd) == 0)
1877 || (strcasecmp ("HELP", cmd) == 0))
1878 return (1);
1879 else if (strcmp (".", cmd) == 0)
1880 cmd = "BATCH";
1882 i = find_command_index (cmd);
1883 if (i < 0)
1884 return (-1);
1885 assert (i < 32);
1887 if ((sock->permissions & (1 << i)) != 0)
1888 return (1);
1889 return (0);
1890 } /* }}} int socket_permission_check */
1892 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1893 const char *cmd)
1894 {
1895 ssize_t i;
1897 i = find_command_index (cmd);
1898 if (i < 0)
1899 return (-1);
1900 assert (i < 32);
1902 sock->permissions |= (1 << i);
1903 return (0);
1904 } /* }}} int socket_permission_add */
1906 /* check whether commands are received in the expected context */
1907 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1908 {
1909 if (JOURNAL_REPLAY(sock))
1910 return (cmd->context & CMD_CONTEXT_JOURNAL);
1911 else if (sock->batch_start)
1912 return (cmd->context & CMD_CONTEXT_BATCH);
1913 else
1914 return (cmd->context & CMD_CONTEXT_CLIENT);
1916 /* NOTREACHED */
1917 assert(1==0);
1918 }
1920 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1921 {
1922 int status;
1923 char *cmd_str;
1924 char *resp_txt;
1925 command_t *help = NULL;
1927 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1928 if (status == 0)
1929 help = find_command(cmd_str);
1931 if (help && (help->syntax || help->help))
1932 {
1933 char tmp[CMD_MAX];
1935 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1936 resp_txt = tmp;
1938 if (help->syntax)
1939 add_response_info(sock, "Usage: %s\n", help->syntax);
1941 if (help->help)
1942 add_response_info(sock, "%s\n", help->help);
1943 }
1944 else
1945 {
1946 size_t i;
1948 resp_txt = "Command overview\n";
1950 for (i = 0; i < list_of_commands_len; i++)
1951 {
1952 if (list_of_commands[i].syntax == NULL)
1953 continue;
1954 add_response_info (sock, "%s", list_of_commands[i].syntax);
1955 }
1956 }
1958 return send_response(sock, RESP_OK, resp_txt);
1959 } /* }}} int handle_request_help */
1961 static int handle_request (DISPATCH_PROTO) /* {{{ */
1962 {
1963 char *buffer_ptr = buffer;
1964 char *cmd_str = NULL;
1965 command_t *cmd = NULL;
1966 int status;
1968 assert (buffer[buffer_size - 1] == '\0');
1970 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1971 if (status != 0)
1972 {
1973 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1974 return (-1);
1975 }
1977 if (sock != NULL && sock->batch_start)
1978 sock->batch_cmd++;
1980 cmd = find_command(cmd_str);
1981 if (!cmd)
1982 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1984 if (!socket_permission_check (sock, cmd->cmd))
1985 return send_response(sock, RESP_ERR, "Permission denied.\n");
1987 if (!command_check_context(sock, cmd))
1988 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1990 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1991 } /* }}} int handle_request */
1993 static void journal_set_free (journal_set *js) /* {{{ */
1994 {
1995 if (js == NULL)
1996 return;
1998 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2000 free(js);
2001 } /* }}} journal_set_free */
2003 static void journal_set_remove (journal_set *js) /* {{{ */
2004 {
2005 if (js == NULL)
2006 return;
2008 for (uint i=0; i < js->files_num; i++)
2009 {
2010 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2011 unlink(js->files[i]);
2012 }
2013 } /* }}} journal_set_remove */
2015 /* close current journal file handle.
2016 * MUST hold journal_lock before calling */
2017 static void journal_close(void) /* {{{ */
2018 {
2019 if (journal_fh != NULL)
2020 {
2021 if (fclose(journal_fh) != 0)
2022 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2023 }
2025 journal_fh = NULL;
2026 journal_size = 0;
2027 } /* }}} journal_close */
2029 /* MUST hold journal_lock before calling */
2030 static void journal_new_file(void) /* {{{ */
2031 {
2032 struct timeval now;
2033 int new_fd;
2034 char new_file[PATH_MAX + 1];
2036 assert(journal_dir != NULL);
2037 assert(journal_cur != NULL);
2039 journal_close();
2041 gettimeofday(&now, NULL);
2042 /* this format assures that the files sort in strcmp() order */
2043 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2044 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2046 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2047 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2048 if (new_fd < 0)
2049 goto error;
2051 journal_fh = fdopen(new_fd, "a");
2052 if (journal_fh == NULL)
2053 goto error;
2055 journal_size = ftell(journal_fh);
2056 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2058 /* record the file in the journal set */
2059 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2061 return;
2063 error:
2064 RRDD_LOG(LOG_CRIT,
2065 "JOURNALING DISABLED: Error while trying to create %s : %s",
2066 new_file, rrd_strerror(errno));
2067 RRDD_LOG(LOG_CRIT,
2068 "JOURNALING DISABLED: All values will be flushed at shutdown");
2070 close(new_fd);
2071 config_flush_at_shutdown = 1;
2073 } /* }}} journal_new_file */
2075 /* MUST NOT hold journal_lock before calling this */
2076 static void journal_rotate(void) /* {{{ */
2077 {
2078 journal_set *old_js = NULL;
2080 if (journal_dir == NULL)
2081 return;
2083 RRDD_LOG(LOG_DEBUG, "rotating journals");
2085 pthread_mutex_lock(&stats_lock);
2086 ++stats_journal_rotate;
2087 pthread_mutex_unlock(&stats_lock);
2089 pthread_mutex_lock(&journal_lock);
2091 journal_close();
2093 /* rotate the journal sets */
2094 old_js = journal_old;
2095 journal_old = journal_cur;
2096 journal_cur = calloc(1, sizeof(journal_set));
2098 if (journal_cur != NULL)
2099 journal_new_file();
2100 else
2101 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2103 pthread_mutex_unlock(&journal_lock);
2105 journal_set_remove(old_js);
2106 journal_set_free (old_js);
2108 } /* }}} static void journal_rotate */
2110 /* MUST hold journal_lock when calling */
2111 static void journal_done(void) /* {{{ */
2112 {
2113 if (journal_cur == NULL)
2114 return;
2116 journal_close();
2118 if (config_flush_at_shutdown)
2119 {
2120 RRDD_LOG(LOG_INFO, "removing journals");
2121 journal_set_remove(journal_old);
2122 journal_set_remove(journal_cur);
2123 }
2124 else
2125 {
2126 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2127 "journals will be used at next startup");
2128 }
2130 journal_set_free(journal_cur);
2131 journal_set_free(journal_old);
2132 free(journal_dir);
2134 } /* }}} static void journal_done */
2136 static int journal_write(char *cmd, char *args) /* {{{ */
2137 {
2138 int chars;
2140 if (journal_fh == NULL)
2141 return 0;
2143 pthread_mutex_lock(&journal_lock);
2144 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2145 journal_size += chars;
2147 if (journal_size > JOURNAL_MAX)
2148 journal_new_file();
2150 pthread_mutex_unlock(&journal_lock);
2152 if (chars > 0)
2153 {
2154 pthread_mutex_lock(&stats_lock);
2155 stats_journal_bytes += chars;
2156 pthread_mutex_unlock(&stats_lock);
2157 }
2159 return chars;
2160 } /* }}} static int journal_write */
2162 static int journal_replay (const char *file) /* {{{ */
2163 {
2164 FILE *fh;
2165 int entry_cnt = 0;
2166 int fail_cnt = 0;
2167 uint64_t line = 0;
2168 char entry[CMD_MAX];
2169 time_t now;
2171 if (file == NULL) return 0;
2173 {
2174 char *reason = "unknown error";
2175 int status = 0;
2176 struct stat statbuf;
2178 memset(&statbuf, 0, sizeof(statbuf));
2179 if (stat(file, &statbuf) != 0)
2180 {
2181 reason = "stat error";
2182 status = errno;
2183 }
2184 else if (!S_ISREG(statbuf.st_mode))
2185 {
2186 reason = "not a regular file";
2187 status = EPERM;
2188 }
2189 if (statbuf.st_uid != daemon_uid)
2190 {
2191 reason = "not owned by daemon user";
2192 status = EACCES;
2193 }
2194 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2195 {
2196 reason = "must not be user/group writable";
2197 status = EACCES;
2198 }
2200 if (status != 0)
2201 {
2202 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2203 file, rrd_strerror(status), reason);
2204 return 0;
2205 }
2206 }
2208 fh = fopen(file, "r");
2209 if (fh == NULL)
2210 {
2211 if (errno != ENOENT)
2212 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2213 file, rrd_strerror(errno));
2214 return 0;
2215 }
2216 else
2217 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2219 now = time(NULL);
2221 while(!feof(fh))
2222 {
2223 size_t entry_len;
2225 ++line;
2226 if (fgets(entry, sizeof(entry), fh) == NULL)
2227 break;
2228 entry_len = strlen(entry);
2230 /* check \n termination in case journal writing crashed mid-line */
2231 if (entry_len == 0)
2232 continue;
2233 else if (entry[entry_len - 1] != '\n')
2234 {
2235 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2236 ++fail_cnt;
2237 continue;
2238 }
2240 entry[entry_len - 1] = '\0';
2242 if (handle_request(NULL, now, entry, entry_len) == 0)
2243 ++entry_cnt;
2244 else
2245 ++fail_cnt;
2246 }
2248 fclose(fh);
2250 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2251 entry_cnt, fail_cnt);
2253 return entry_cnt > 0 ? 1 : 0;
2254 } /* }}} static int journal_replay */
2256 static int journal_sort(const void *v1, const void *v2)
2257 {
2258 char **jn1 = (char **) v1;
2259 char **jn2 = (char **) v2;
2261 return strcmp(*jn1,*jn2);
2262 }
2264 static void journal_init(void) /* {{{ */
2265 {
2266 int had_journal = 0;
2267 DIR *dir;
2268 struct dirent *dent;
2269 char path[PATH_MAX+1];
2271 if (journal_dir == NULL) return;
2273 pthread_mutex_lock(&journal_lock);
2275 journal_cur = calloc(1, sizeof(journal_set));
2276 if (journal_cur == NULL)
2277 {
2278 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2279 return;
2280 }
2282 RRDD_LOG(LOG_INFO, "checking for journal files");
2284 /* Handle old journal files during transition. This gives them the
2285 * correct sort order. TODO: remove after first release
2286 */
2287 {
2288 char old_path[PATH_MAX+1];
2289 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2290 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2291 rename(old_path, path);
2293 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2294 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2295 rename(old_path, path);
2296 }
2298 dir = opendir(journal_dir);
2299 while ((dent = readdir(dir)) != NULL)
2300 {
2301 /* looks like a journal file? */
2302 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2303 continue;
2305 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2307 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2308 {
2309 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2310 dent->d_name);
2311 break;
2312 }
2313 }
2314 closedir(dir);
2316 qsort(journal_cur->files, journal_cur->files_num,
2317 sizeof(journal_cur->files[0]), journal_sort);
2319 for (uint i=0; i < journal_cur->files_num; i++)
2320 had_journal += journal_replay(journal_cur->files[i]);
2322 journal_new_file();
2324 /* it must have been a crash. start a flush */
2325 if (had_journal && config_flush_at_shutdown)
2326 flush_old_values(-1);
2328 pthread_mutex_unlock(&journal_lock);
2330 RRDD_LOG(LOG_INFO, "journal processing complete");
2332 } /* }}} static void journal_init */
2334 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2335 {
2336 assert(sock != NULL);
2338 free(sock->rbuf); sock->rbuf = NULL;
2339 free(sock->wbuf); sock->wbuf = NULL;
2340 free(sock);
2341 } /* }}} void free_listen_socket */
2343 static void close_connection(listen_socket_t *sock) /* {{{ */
2344 {
2345 if (sock->fd >= 0)
2346 {
2347 close(sock->fd);
2348 sock->fd = -1;
2349 }
2351 free_listen_socket(sock);
2353 } /* }}} void close_connection */
2355 static void *connection_thread_main (void *args) /* {{{ */
2356 {
2357 listen_socket_t *sock;
2358 int fd;
2360 sock = (listen_socket_t *) args;
2361 fd = sock->fd;
2363 /* init read buffers */
2364 sock->next_read = sock->next_cmd = 0;
2365 sock->rbuf = malloc(RBUF_SIZE);
2366 if (sock->rbuf == NULL)
2367 {
2368 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2369 close_connection(sock);
2370 return NULL;
2371 }
2373 pthread_mutex_lock (&connection_threads_lock);
2374 connection_threads_num++;
2375 pthread_mutex_unlock (&connection_threads_lock);
2377 while (state == RUNNING)
2378 {
2379 char *cmd;
2380 ssize_t cmd_len;
2381 ssize_t rbytes;
2382 time_t now;
2384 struct pollfd pollfd;
2385 int status;
2387 pollfd.fd = fd;
2388 pollfd.events = POLLIN | POLLPRI;
2389 pollfd.revents = 0;
2391 status = poll (&pollfd, 1, /* timeout = */ 500);
2392 if (state != RUNNING)
2393 break;
2394 else if (status == 0) /* timeout */
2395 continue;
2396 else if (status < 0) /* error */
2397 {
2398 status = errno;
2399 if (status != EINTR)
2400 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2401 continue;
2402 }
2404 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2405 break;
2406 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2407 {
2408 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2409 "poll(2) returned something unexpected: %#04hx",
2410 pollfd.revents);
2411 break;
2412 }
2414 rbytes = read(fd, sock->rbuf + sock->next_read,
2415 RBUF_SIZE - sock->next_read);
2416 if (rbytes < 0)
2417 {
2418 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2419 break;
2420 }
2421 else if (rbytes == 0)
2422 break; /* eof */
2424 sock->next_read += rbytes;
2426 if (sock->batch_start)
2427 now = sock->batch_start;
2428 else
2429 now = time(NULL);
2431 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2432 {
2433 status = handle_request (sock, now, cmd, cmd_len+1);
2434 if (status != 0)
2435 goto out_close;
2436 }
2437 }
2439 out_close:
2440 close_connection(sock);
2442 /* Remove this thread from the connection threads list */
2443 pthread_mutex_lock (&connection_threads_lock);
2444 connection_threads_num--;
2445 if (connection_threads_num <= 0)
2446 pthread_cond_broadcast(&connection_threads_done);
2447 pthread_mutex_unlock (&connection_threads_lock);
2449 return (NULL);
2450 } /* }}} void *connection_thread_main */
2452 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2453 {
2454 int fd;
2455 struct sockaddr_un sa;
2456 listen_socket_t *temp;
2457 int status;
2458 const char *path;
2459 char *path_copy, *dir;
2461 path = sock->addr;
2462 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2463 path += strlen("unix:");
2465 /* dirname may modify its argument */
2466 path_copy = strdup(path);
2467 if (path_copy == NULL)
2468 {
2469 fprintf(stderr, "rrdcached: strdup(): %s\n",
2470 rrd_strerror(errno));
2471 return (-1);
2472 }
2474 dir = dirname(path_copy);
2475 if (rrd_mkdir_p(dir, 0777) != 0)
2476 {
2477 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2478 dir, rrd_strerror(errno));
2479 return (-1);
2480 }
2482 free(path_copy);
2484 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2485 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2486 if (temp == NULL)
2487 {
2488 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2489 return (-1);
2490 }
2491 listen_fds = temp;
2492 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2494 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2495 if (fd < 0)
2496 {
2497 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2498 rrd_strerror(errno));
2499 return (-1);
2500 }
2502 memset (&sa, 0, sizeof (sa));
2503 sa.sun_family = AF_UNIX;
2504 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2506 /* if we've gotten this far, we own the pid file. any daemon started
2507 * with the same args must not be alive. therefore, ensure that we can
2508 * create the socket...
2509 */
2510 unlink(path);
2512 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2513 if (status != 0)
2514 {
2515 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2516 path, rrd_strerror(errno));
2517 close (fd);
2518 return (-1);
2519 }
2521 /* tweak the sockets group ownership */
2522 if (sock->socket_group != (gid_t)-1)
2523 {
2524 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2525 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2526 {
2527 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2528 }
2529 }
2531 if (sock->socket_permissions != (mode_t)-1)
2532 {
2533 if (chmod(path, sock->socket_permissions) != 0)
2534 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2535 (unsigned int)sock->socket_permissions, strerror(errno));
2536 }
2538 status = listen (fd, /* backlog = */ 10);
2539 if (status != 0)
2540 {
2541 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2542 path, rrd_strerror(errno));
2543 close (fd);
2544 unlink (path);
2545 return (-1);
2546 }
2548 listen_fds[listen_fds_num].fd = fd;
2549 listen_fds[listen_fds_num].family = PF_UNIX;
2550 strncpy(listen_fds[listen_fds_num].addr, path,
2551 sizeof (listen_fds[listen_fds_num].addr) - 1);
2552 listen_fds_num++;
2554 return (0);
2555 } /* }}} int open_listen_socket_unix */
2557 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2558 {
2559 struct addrinfo ai_hints;
2560 struct addrinfo *ai_res;
2561 struct addrinfo *ai_ptr;
2562 char addr_copy[NI_MAXHOST];
2563 char *addr;
2564 char *port;
2565 int status;
2567 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2568 addr_copy[sizeof (addr_copy) - 1] = 0;
2569 addr = addr_copy;
2571 memset (&ai_hints, 0, sizeof (ai_hints));
2572 ai_hints.ai_flags = 0;
2573 #ifdef AI_ADDRCONFIG
2574 ai_hints.ai_flags |= AI_ADDRCONFIG;
2575 #endif
2576 ai_hints.ai_family = AF_UNSPEC;
2577 ai_hints.ai_socktype = SOCK_STREAM;
2579 port = NULL;
2580 if (*addr == '[') /* IPv6+port format */
2581 {
2582 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2583 addr++;
2585 port = strchr (addr, ']');
2586 if (port == NULL)
2587 {
2588 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2589 return (-1);
2590 }
2591 *port = 0;
2592 port++;
2594 if (*port == ':')
2595 port++;
2596 else if (*port == 0)
2597 port = NULL;
2598 else
2599 {
2600 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2601 return (-1);
2602 }
2603 } /* if (*addr == '[') */
2604 else
2605 {
2606 port = rindex(addr, ':');
2607 if (port != NULL)
2608 {
2609 *port = 0;
2610 port++;
2611 }
2612 }
2613 ai_res = NULL;
2614 status = getaddrinfo (addr,
2615 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2616 &ai_hints, &ai_res);
2617 if (status != 0)
2618 {
2619 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2620 addr, gai_strerror (status));
2621 return (-1);
2622 }
2624 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2625 {
2626 int fd;
2627 listen_socket_t *temp;
2628 int one = 1;
2630 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2631 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2632 if (temp == NULL)
2633 {
2634 fprintf (stderr,
2635 "rrdcached: open_listen_socket_network: realloc failed.\n");
2636 continue;
2637 }
2638 listen_fds = temp;
2639 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2641 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2642 if (fd < 0)
2643 {
2644 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2645 rrd_strerror(errno));
2646 continue;
2647 }
2649 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2651 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2652 if (status != 0)
2653 {
2654 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2655 sock->addr, rrd_strerror(errno));
2656 close (fd);
2657 continue;
2658 }
2660 status = listen (fd, /* backlog = */ 10);
2661 if (status != 0)
2662 {
2663 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2664 sock->addr, rrd_strerror(errno));
2665 close (fd);
2666 freeaddrinfo(ai_res);
2667 return (-1);
2668 }
2670 listen_fds[listen_fds_num].fd = fd;
2671 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2672 listen_fds_num++;
2673 } /* for (ai_ptr) */
2675 freeaddrinfo(ai_res);
2676 return (0);
2677 } /* }}} static int open_listen_socket_network */
2679 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2680 {
2681 assert(sock != NULL);
2682 assert(sock->addr != NULL);
2684 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2685 || sock->addr[0] == '/')
2686 return (open_listen_socket_unix(sock));
2687 else
2688 return (open_listen_socket_network(sock));
2689 } /* }}} int open_listen_socket */
2691 static int close_listen_sockets (void) /* {{{ */
2692 {
2693 size_t i;
2695 for (i = 0; i < listen_fds_num; i++)
2696 {
2697 close (listen_fds[i].fd);
2699 if (listen_fds[i].family == PF_UNIX)
2700 unlink(listen_fds[i].addr);
2701 }
2703 free (listen_fds);
2704 listen_fds = NULL;
2705 listen_fds_num = 0;
2707 return (0);
2708 } /* }}} int close_listen_sockets */
2710 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2711 {
2712 struct pollfd *pollfds;
2713 int pollfds_num;
2714 int status;
2715 int i;
2717 if (listen_fds_num < 1)
2718 {
2719 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2720 return (NULL);
2721 }
2723 pollfds_num = listen_fds_num;
2724 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2725 if (pollfds == NULL)
2726 {
2727 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2728 return (NULL);
2729 }
2730 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2732 RRDD_LOG(LOG_INFO, "listening for connections");
2734 while (state == RUNNING)
2735 {
2736 for (i = 0; i < pollfds_num; i++)
2737 {
2738 pollfds[i].fd = listen_fds[i].fd;
2739 pollfds[i].events = POLLIN | POLLPRI;
2740 pollfds[i].revents = 0;
2741 }
2743 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2744 if (state != RUNNING)
2745 break;
2746 else if (status == 0) /* timeout */
2747 continue;
2748 else if (status < 0) /* error */
2749 {
2750 status = errno;
2751 if (status != EINTR)
2752 {
2753 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2754 }
2755 continue;
2756 }
2758 for (i = 0; i < pollfds_num; i++)
2759 {
2760 listen_socket_t *client_sock;
2761 struct sockaddr_storage client_sa;
2762 socklen_t client_sa_size;
2763 pthread_t tid;
2764 pthread_attr_t attr;
2766 if (pollfds[i].revents == 0)
2767 continue;
2769 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2770 {
2771 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2772 "poll(2) returned something unexpected for listen FD #%i.",
2773 pollfds[i].fd);
2774 continue;
2775 }
2777 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2778 if (client_sock == NULL)
2779 {
2780 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2781 continue;
2782 }
2783 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2785 client_sa_size = sizeof (client_sa);
2786 client_sock->fd = accept (pollfds[i].fd,
2787 (struct sockaddr *) &client_sa, &client_sa_size);
2788 if (client_sock->fd < 0)
2789 {
2790 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2791 free(client_sock);
2792 continue;
2793 }
2795 pthread_attr_init (&attr);
2796 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2798 status = pthread_create (&tid, &attr, connection_thread_main,
2799 client_sock);
2800 if (status != 0)
2801 {
2802 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2803 close_connection(client_sock);
2804 continue;
2805 }
2806 } /* for (pollfds_num) */
2807 } /* while (state == RUNNING) */
2809 RRDD_LOG(LOG_INFO, "starting shutdown");
2811 close_listen_sockets ();
2813 pthread_mutex_lock (&connection_threads_lock);
2814 while (connection_threads_num > 0)
2815 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2816 pthread_mutex_unlock (&connection_threads_lock);
2818 free(pollfds);
2820 return (NULL);
2821 } /* }}} void *listen_thread_main */
2823 static int daemonize (void) /* {{{ */
2824 {
2825 int pid_fd;
2826 char *base_dir;
2828 daemon_uid = geteuid();
2830 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2831 if (pid_fd < 0)
2832 pid_fd = check_pidfile();
2833 if (pid_fd < 0)
2834 return pid_fd;
2836 /* open all the listen sockets */
2837 if (config_listen_address_list_len > 0)
2838 {
2839 for (size_t i = 0; i < config_listen_address_list_len; i++)
2840 open_listen_socket (config_listen_address_list[i]);
2842 rrd_free_ptrs((void ***) &config_listen_address_list,
2843 &config_listen_address_list_len);
2844 }
2845 else
2846 {
2847 listen_socket_t sock;
2848 memset(&sock, 0, sizeof(sock));
2849 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2850 open_listen_socket (&sock);
2851 }
2853 if (listen_fds_num < 1)
2854 {
2855 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2856 goto error;
2857 }
2859 if (!stay_foreground)
2860 {
2861 pid_t child;
2863 child = fork ();
2864 if (child < 0)
2865 {
2866 fprintf (stderr, "daemonize: fork(2) failed.\n");
2867 goto error;
2868 }
2869 else if (child > 0)
2870 exit(0);
2872 /* Become session leader */
2873 setsid ();
2875 /* Open the first three file descriptors to /dev/null */
2876 close (2);
2877 close (1);
2878 close (0);
2880 open ("/dev/null", O_RDWR);
2881 if (dup(0) == -1 || dup(0) == -1){
2882 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2883 }
2884 } /* if (!stay_foreground) */
2886 /* Change into the /tmp directory. */
2887 base_dir = (config_base_dir != NULL)
2888 ? config_base_dir
2889 : "/tmp";
2891 if (chdir (base_dir) != 0)
2892 {
2893 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2894 goto error;
2895 }
2897 install_signal_handlers();
2899 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2900 RRDD_LOG(LOG_INFO, "starting up");
2902 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2903 (GDestroyNotify) free_cache_item);
2904 if (cache_tree == NULL)
2905 {
2906 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2907 goto error;
2908 }
2910 return write_pidfile (pid_fd);
2912 error:
2913 remove_pidfile();
2914 return -1;
2915 } /* }}} int daemonize */
2917 static int cleanup (void) /* {{{ */
2918 {
2919 pthread_cond_broadcast (&flush_cond);
2920 pthread_join (flush_thread, NULL);
2922 pthread_cond_broadcast (&queue_cond);
2923 for (int i = 0; i < config_queue_threads; i++)
2924 pthread_join (queue_threads[i], NULL);
2926 if (config_flush_at_shutdown)
2927 {
2928 assert(cache_queue_head == NULL);
2929 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2930 }
2932 free(queue_threads);
2933 free(config_base_dir);
2935 pthread_mutex_lock(&cache_lock);
2936 g_tree_destroy(cache_tree);
2938 pthread_mutex_lock(&journal_lock);
2939 journal_done();
2941 RRDD_LOG(LOG_INFO, "goodbye");
2942 closelog ();
2944 remove_pidfile ();
2945 free(config_pid_file);
2947 return (0);
2948 } /* }}} int cleanup */
2950 static int read_options (int argc, char **argv) /* {{{ */
2951 {
2952 int option;
2953 int status = 0;
2955 char **permissions = NULL;
2956 size_t permissions_len = 0;
2958 gid_t socket_group = (gid_t)-1;
2959 mode_t socket_permissions = (mode_t)-1;
2961 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2962 {
2963 switch (option)
2964 {
2965 case 'g':
2966 stay_foreground=1;
2967 break;
2969 case 'l':
2970 {
2971 listen_socket_t *new;
2973 new = malloc(sizeof(listen_socket_t));
2974 if (new == NULL)
2975 {
2976 fprintf(stderr, "read_options: malloc failed.\n");
2977 return(2);
2978 }
2979 memset(new, 0, sizeof(listen_socket_t));
2981 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2983 /* Add permissions to the socket {{{ */
2984 if (permissions_len != 0)
2985 {
2986 size_t i;
2987 for (i = 0; i < permissions_len; i++)
2988 {
2989 status = socket_permission_add (new, permissions[i]);
2990 if (status != 0)
2991 {
2992 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2993 "socket failed. Most likely, this permission doesn't "
2994 "exist. Check your command line.\n", permissions[i]);
2995 status = 4;
2996 }
2997 }
2998 }
2999 else /* if (permissions_len == 0) */
3000 {
3001 /* Add permission for ALL commands to the socket. */
3002 size_t i;
3003 for (i = 0; i < list_of_commands_len; i++)
3004 {
3005 status = socket_permission_add (new, list_of_commands[i].cmd);
3006 if (status != 0)
3007 {
3008 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3009 "socket failed. This should never happen, ever! Sorry.\n",
3010 permissions[i]);
3011 status = 4;
3012 }
3013 }
3014 }
3015 /* }}} Done adding permissions. */
3017 new->socket_group = socket_group;
3018 new->socket_permissions = socket_permissions;
3020 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3021 &config_listen_address_list_len, new))
3022 {
3023 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3024 return (2);
3025 }
3026 }
3027 break;
3029 /* set socket group permissions */
3030 case 's':
3031 {
3032 gid_t group_gid;
3033 struct group *grp;
3035 group_gid = strtoul(optarg, NULL, 10);
3036 if (errno != EINVAL && group_gid>0)
3037 {
3038 /* we were passed a number */
3039 grp = getgrgid(group_gid);
3040 }
3041 else
3042 {
3043 grp = getgrnam(optarg);
3044 }
3046 if (grp)
3047 {
3048 socket_group = grp->gr_gid;
3049 }
3050 else
3051 {
3052 /* no idea what the user wanted... */
3053 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3054 return (5);
3055 }
3056 }
3057 break;
3059 /* set socket file permissions */
3060 case 'm':
3061 {
3062 long tmp;
3063 char *endptr = NULL;
3065 tmp = strtol (optarg, &endptr, 8);
3066 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3067 || (tmp > 07777) || (tmp < 0)) {
3068 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3069 optarg);
3070 return (5);
3071 }
3073 socket_permissions = (mode_t)tmp;
3074 }
3075 break;
3077 case 'P':
3078 {
3079 char *optcopy;
3080 char *saveptr;
3081 char *dummy;
3082 char *ptr;
3084 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3086 optcopy = strdup (optarg);
3087 dummy = optcopy;
3088 saveptr = NULL;
3089 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3090 {
3091 dummy = NULL;
3092 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3093 }
3095 free (optcopy);
3096 }
3097 break;
3099 case 'f':
3100 {
3101 int temp;
3103 temp = atoi (optarg);
3104 if (temp > 0)
3105 config_flush_interval = temp;
3106 else
3107 {
3108 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3109 status = 3;
3110 }
3111 }
3112 break;
3114 case 'w':
3115 {
3116 int temp;
3118 temp = atoi (optarg);
3119 if (temp > 0)
3120 config_write_interval = temp;
3121 else
3122 {
3123 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3124 status = 2;
3125 }
3126 }
3127 break;
3129 case 'z':
3130 {
3131 int temp;
3133 temp = atoi(optarg);
3134 if (temp > 0)
3135 config_write_jitter = temp;
3136 else
3137 {
3138 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3139 status = 2;
3140 }
3142 break;
3143 }
3145 case 't':
3146 {
3147 int threads;
3148 threads = atoi(optarg);
3149 if (threads >= 1)
3150 config_queue_threads = threads;
3151 else
3152 {
3153 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3154 return 1;
3155 }
3156 }
3157 break;
3159 case 'B':
3160 config_write_base_only = 1;
3161 break;
3163 case 'b':
3164 {
3165 size_t len;
3166 char base_realpath[PATH_MAX];
3168 if (config_base_dir != NULL)
3169 free (config_base_dir);
3170 config_base_dir = strdup (optarg);
3171 if (config_base_dir == NULL)
3172 {
3173 fprintf (stderr, "read_options: strdup failed.\n");
3174 return (3);
3175 }
3177 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3178 {
3179 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3180 config_base_dir, rrd_strerror (errno));
3181 return (3);
3182 }
3184 /* make sure that the base directory is not resolved via
3185 * symbolic links. this makes some performance-enhancing
3186 * assumptions possible (we don't have to resolve paths
3187 * that start with a "/")
3188 */
3189 if (realpath(config_base_dir, base_realpath) == NULL)
3190 {
3191 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3192 "%s\n", config_base_dir, rrd_strerror(errno));
3193 return 5;
3194 }
3196 len = strlen (config_base_dir);
3197 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3198 {
3199 config_base_dir[len - 1] = 0;
3200 len--;
3201 }
3203 if (len < 1)
3204 {
3205 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3206 return (4);
3207 }
3209 _config_base_dir_len = len;
3211 len = strlen (base_realpath);
3212 while ((len > 0) && (base_realpath[len - 1] == '/'))
3213 {
3214 base_realpath[len - 1] = '\0';
3215 len--;
3216 }
3218 if (strncmp(config_base_dir,
3219 base_realpath, sizeof(base_realpath)) != 0)
3220 {
3221 fprintf(stderr,
3222 "Base directory (-b) resolved via file system links!\n"
3223 "Please consult rrdcached '-b' documentation!\n"
3224 "Consider specifying the real directory (%s)\n",
3225 base_realpath);
3226 return 5;
3227 }
3228 }
3229 break;
3231 case 'p':
3232 {
3233 if (config_pid_file != NULL)
3234 free (config_pid_file);
3235 config_pid_file = strdup (optarg);
3236 if (config_pid_file == NULL)
3237 {
3238 fprintf (stderr, "read_options: strdup failed.\n");
3239 return (3);
3240 }
3241 }
3242 break;
3244 case 'F':
3245 config_flush_at_shutdown = 1;
3246 break;
3248 case 'j':
3249 {
3250 const char *dir = journal_dir = strdup(optarg);
3252 status = rrd_mkdir_p(dir, 0777);
3253 if (status != 0)
3254 {
3255 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3256 dir, rrd_strerror(errno));
3257 return 6;
3258 }
3260 if (access(dir, R_OK|W_OK|X_OK) != 0)
3261 {
3262 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3263 errno ? rrd_strerror(errno) : "");
3264 return 6;
3265 }
3266 }
3267 break;
3269 case 'a':
3270 {
3271 int temp = atoi(optarg);
3272 if (temp > 0)
3273 config_alloc_chunk = temp;
3274 else
3275 {
3276 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3277 return 10;
3278 }
3279 }
3280 break;
3282 case 'h':
3283 case '?':
3284 printf ("RRDCacheD %s\n"
3285 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3286 "\n"
3287 "Usage: rrdcached [options]\n"
3288 "\n"
3289 "Valid options are:\n"
3290 " -l <address> Socket address to listen to.\n"
3291 " -P <perms> Sets the permissions to assign to all following "
3292 "sockets\n"
3293 " -w <seconds> Interval in which to write data.\n"
3294 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3295 " -t <threads> Number of write threads.\n"
3296 " -f <seconds> Interval in which to flush dead data.\n"
3297 " -p <file> Location of the PID-file.\n"
3298 " -b <dir> Base directory to change to.\n"
3299 " -B Restrict file access to paths within -b <dir>\n"
3300 " -g Do not fork and run in the foreground.\n"
3301 " -j <dir> Directory in which to create the journal files.\n"
3302 " -F Always flush all updates at shutdown\n"
3303 " -s <id|name> Group owner of all following UNIX sockets\n"
3304 " (the socket will also have read/write permissions "
3305 "for that group)\n"
3306 " -m <mode> File permissions (octal) of all following UNIX "
3307 "sockets\n"
3308 " -a <size> Memory allocation chunk size. Default is 1."
3309 "\n"
3310 "For more information and a detailed description of all options "
3311 "please refer\n"
3312 "to the rrdcached(1) manual page.\n",
3313 VERSION);
3314 if (option == 'h')
3315 status = -1;
3316 else
3317 status = 1;
3318 break;
3319 } /* switch (option) */
3320 } /* while (getopt) */
3322 /* advise the user when values are not sane */
3323 if (config_flush_interval < 2 * config_write_interval)
3324 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3325 " 2x write interval (-w) !\n");
3326 if (config_write_jitter > config_write_interval)
3327 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3328 " write interval (-w) !\n");
3330 if (config_write_base_only && config_base_dir == NULL)
3331 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3332 " Consult the rrdcached documentation\n");
3334 if (journal_dir == NULL)
3335 config_flush_at_shutdown = 1;
3337 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3339 return (status);
3340 } /* }}} int read_options */
3342 int main (int argc, char **argv)
3343 {
3344 int status;
3346 status = read_options (argc, argv);
3347 if (status != 0)
3348 {
3349 if (status < 0)
3350 status = 0;
3351 return (status);
3352 }
3354 status = daemonize ();
3355 if (status != 0)
3356 {
3357 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3358 return (1);
3359 }
3361 journal_init();
3363 /* start the queue threads */
3364 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3365 if (queue_threads == NULL)
3366 {
3367 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3368 cleanup();
3369 return (1);
3370 }
3371 for (int i = 0; i < config_queue_threads; i++)
3372 {
3373 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3374 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3375 if (status != 0)
3376 {
3377 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3378 cleanup();
3379 return (1);
3380 }
3381 }
3383 /* start the flush thread */
3384 memset(&flush_thread, 0, sizeof(flush_thread));
3385 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3386 if (status != 0)
3387 {
3388 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3389 cleanup();
3390 return (1);
3391 }
3393 listen_thread_main (NULL);
3394 cleanup ();
3396 return (0);
3397 } /* int main */
3399 /*
3400 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3401 */