0610d388b997f42b11b948850f2cf95e4b4c1c57
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
67 #include "rrd_tool.h"
68 #include "rrd_client.h"
69 #include "unused.h"
71 #include <stdlib.h>
73 #ifndef WIN32
74 #ifdef HAVE_STDINT_H
75 # include <stdint.h>
76 #endif
77 #include <unistd.h>
78 #include <strings.h>
79 #include <inttypes.h>
80 #include <sys/socket.h>
82 #else
84 #endif
85 #include <stdio.h>
86 #include <string.h>
88 #include <sys/types.h>
89 #include <sys/stat.h>
90 #include <dirent.h>
91 #include <fcntl.h>
92 #include <signal.h>
93 #include <sys/un.h>
94 #include <netdb.h>
95 #include <poll.h>
96 #include <syslog.h>
97 #include <pthread.h>
98 #include <errno.h>
99 #include <assert.h>
100 #include <sys/time.h>
101 #include <time.h>
102 #include <libgen.h>
103 #include <grp.h>
105 #ifdef HAVE_LIBWRAP
106 #include <tcpd.h>
107 #endif /* HAVE_LIBWRAP */
109 #include <glib-2.0/glib.h>
110 /* }}} */
112 #define RRDD_LOG(severity, ...) \
113 do { \
114 if (stay_foreground) { \
115 fprintf(stderr, __VA_ARGS__); \
116 fprintf(stderr, "\n"); } \
117 syslog ((severity), __VA_ARGS__); \
118 } while (0)
120 /*
121 * Types
122 */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125 struct listen_socket_s
126 {
127 int fd;
128 char addr[PATH_MAX + 1];
129 int family;
131 /* state for BATCH processing */
132 time_t batch_start;
133 int batch_cmd;
135 /* buffered IO */
136 char *rbuf;
137 off_t next_cmd;
138 off_t next_read;
140 char *wbuf;
141 ssize_t wbuf_len;
143 uint32_t permissions;
145 gid_t socket_group;
146 mode_t socket_permissions;
147 };
148 typedef struct listen_socket_s listen_socket_t;
150 struct command_s;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
154 time_t UNUSED(now),\
155 char UNUSED(*buffer),\
156 size_t UNUSED(buffer_size)
158 #define HANDLER_PROTO command_t UNUSED(*cmd),\
159 DISPATCH_PROTO
161 struct command_s {
162 char *cmd;
163 int (*handler)(HANDLER_PROTO);
165 char context; /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT (1<<0)
167 #define CMD_CONTEXT_BATCH (1<<1)
168 #define CMD_CONTEXT_JOURNAL (1<<2)
169 #define CMD_CONTEXT_ANY (0x7f)
171 char *syntax;
172 char *help;
173 };
175 struct cache_item_s;
176 typedef struct cache_item_s cache_item_t;
177 struct cache_item_s
178 {
179 char *file;
180 char **values;
181 size_t values_num; /* number of valid pointers */
182 size_t values_alloc; /* number of allocated pointers */
183 time_t last_flush_time;
184 double last_update_stamp;
185 #define CI_FLAGS_IN_TREE (1<<0)
186 #define CI_FLAGS_IN_QUEUE (1<<1)
187 int flags;
188 pthread_cond_t flushed;
189 cache_item_t *prev;
190 cache_item_t *next;
191 };
193 struct callback_flush_data_s
194 {
195 time_t now;
196 time_t abs_timeout;
197 char **keys;
198 size_t keys_num;
199 };
200 typedef struct callback_flush_data_s callback_flush_data_t;
202 enum queue_side_e
203 {
204 HEAD,
205 TAIL
206 };
207 typedef enum queue_side_e queue_side_t;
209 /* describe a set of journal files */
210 typedef struct {
211 char **files;
212 size_t files_num;
213 } journal_set;
215 #define RBUF_SIZE (RRD_CMD_MAX*2)
217 /*
218 * Variables
219 */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
226 static listen_socket_t default_socket;
228 enum {
229 RUNNING, /* normal operation */
230 FLUSHING, /* flushing remaining values */
231 SHUTDOWN /* shutting down */
232 } state = RUNNING;
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
245 /* Cache stuff */
246 static GTree *cache_tree = NULL;
247 static cache_item_t *cache_queue_head = NULL;
248 static cache_item_t *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
251 static int config_write_interval = 300;
252 static int config_write_jitter = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
259 static size_t config_alloc_chunk = 1;
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
273 static int opt_no_overwrite = 0; /* default for the daemon */
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL; /* current journal file handle */
282 static long journal_size = 0; /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
292 /*
293 * Functions
294 */
295 static void sig_common (const char *sig) /* {{{ */
296 {
297 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298 if (state == RUNNING) {
299 state = FLUSHING;
300 }
301 pthread_cond_broadcast(&flush_cond);
302 pthread_cond_broadcast(&queue_cond);
303 } /* }}} void sig_common */
305 static void sig_int_handler (int UNUSED(s)) /* {{{ */
306 {
307 sig_common("INT");
308 } /* }}} void sig_int_handler */
310 static void sig_term_handler (int UNUSED(s)) /* {{{ */
311 {
312 sig_common("TERM");
313 } /* }}} void sig_term_handler */
315 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
316 {
317 config_flush_at_shutdown = 1;
318 sig_common("USR1");
319 } /* }}} void sig_usr1_handler */
321 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
322 {
323 config_flush_at_shutdown = 0;
324 sig_common("USR2");
325 } /* }}} void sig_usr2_handler */
327 static void install_signal_handlers(void) /* {{{ */
328 {
329 /* These structures are static, because `sigaction' behaves weird if the are
330 * overwritten.. */
331 static struct sigaction sa_int;
332 static struct sigaction sa_term;
333 static struct sigaction sa_pipe;
334 static struct sigaction sa_usr1;
335 static struct sigaction sa_usr2;
337 /* Install signal handlers */
338 memset (&sa_int, 0, sizeof (sa_int));
339 sa_int.sa_handler = sig_int_handler;
340 sigaction (SIGINT, &sa_int, NULL);
342 memset (&sa_term, 0, sizeof (sa_term));
343 sa_term.sa_handler = sig_term_handler;
344 sigaction (SIGTERM, &sa_term, NULL);
346 memset (&sa_pipe, 0, sizeof (sa_pipe));
347 sa_pipe.sa_handler = SIG_IGN;
348 sigaction (SIGPIPE, &sa_pipe, NULL);
350 memset (&sa_pipe, 0, sizeof (sa_usr1));
351 sa_usr1.sa_handler = sig_usr1_handler;
352 sigaction (SIGUSR1, &sa_usr1, NULL);
354 memset (&sa_usr2, 0, sizeof (sa_usr2));
355 sa_usr2.sa_handler = sig_usr2_handler;
356 sigaction (SIGUSR2, &sa_usr2, NULL);
358 } /* }}} void install_signal_handlers */
360 static int open_pidfile(char *action, int oflag) /* {{{ */
361 {
362 int fd;
363 const char *file;
364 char *file_copy, *dir;
366 file = (config_pid_file != NULL)
367 ? config_pid_file
368 : LOCALSTATEDIR "/run/rrdcached.pid";
370 /* dirname may modify its argument */
371 file_copy = strdup(file);
372 if (file_copy == NULL)
373 {
374 fprintf(stderr, "rrdcached: strdup(): %s\n",
375 rrd_strerror(errno));
376 return -1;
377 }
379 dir = dirname(file_copy);
380 if (rrd_mkdir_p(dir, 0777) != 0)
381 {
382 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
383 dir, rrd_strerror(errno));
384 return -1;
385 }
387 free(file_copy);
389 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
390 if (fd < 0)
391 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
392 action, file, rrd_strerror(errno));
394 return(fd);
395 } /* }}} static int open_pidfile */
397 /* check existing pid file to see whether a daemon is running */
398 static int check_pidfile(void)
399 {
400 int pid_fd;
401 pid_t pid;
402 char pid_str[16];
404 pid_fd = open_pidfile("open", O_RDWR);
405 if (pid_fd < 0)
406 return pid_fd;
408 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
409 return -1;
411 pid = atoi(pid_str);
412 if (pid <= 0)
413 return -1;
415 /* another running process that we can signal COULD be
416 * a competing rrdcached */
417 if (pid != getpid() && kill(pid, 0) == 0)
418 {
419 fprintf(stderr,
420 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
421 close(pid_fd);
422 return -1;
423 }
425 lseek(pid_fd, 0, SEEK_SET);
426 if (ftruncate(pid_fd, 0) == -1)
427 {
428 fprintf(stderr,
429 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
430 close(pid_fd);
431 return -1;
432 }
434 fprintf(stderr,
435 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
436 "rrdcached: starting normally.\n", pid);
438 return pid_fd;
439 } /* }}} static int check_pidfile */
441 static int write_pidfile (int fd) /* {{{ */
442 {
443 pid_t pid;
444 FILE *fh;
446 pid = getpid ();
448 fh = fdopen (fd, "w");
449 if (fh == NULL)
450 {
451 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
452 close(fd);
453 return (-1);
454 }
456 fprintf (fh, "%i\n", (int) pid);
457 fclose (fh);
459 return (0);
460 } /* }}} int write_pidfile */
462 static int remove_pidfile (void) /* {{{ */
463 {
464 char *file;
465 int status;
467 file = (config_pid_file != NULL)
468 ? config_pid_file
469 : LOCALSTATEDIR "/run/rrdcached.pid";
471 status = unlink (file);
472 if (status == 0)
473 return (0);
474 return (errno);
475 } /* }}} int remove_pidfile */
477 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
478 {
479 char *eol;
481 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
482 sock->next_read - sock->next_cmd);
484 if (eol == NULL)
485 {
486 /* no commands left, move remainder back to front of rbuf */
487 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
488 sock->next_read - sock->next_cmd);
489 sock->next_read -= sock->next_cmd;
490 sock->next_cmd = 0;
491 *len = 0;
492 return NULL;
493 }
494 else
495 {
496 char *cmd = sock->rbuf + sock->next_cmd;
497 *eol = '\0';
499 sock->next_cmd = eol - sock->rbuf + 1;
501 if (eol > sock->rbuf && *(eol-1) == '\r')
502 *(--eol) = '\0'; /* handle "\r\n" EOL */
504 *len = eol - cmd;
506 return cmd;
507 }
509 /* NOTREACHED */
510 assert(1==0);
511 } /* }}} char *next_cmd */
513 /* add the characters directly to the write buffer */
514 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
515 {
516 char *new_buf;
518 assert(sock != NULL);
520 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
521 if (new_buf == NULL)
522 {
523 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
524 return -1;
525 }
527 strncpy(new_buf + sock->wbuf_len, str, len + 1);
529 sock->wbuf = new_buf;
530 sock->wbuf_len += len;
532 return 0;
533 } /* }}} static int add_to_wbuf */
535 /* add the text to the "extra" info that's sent after the status line */
536 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
537 {
538 va_list argp;
539 char buffer[RRD_CMD_MAX];
540 int len;
542 if (JOURNAL_REPLAY(sock)) return 0;
543 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
545 va_start(argp, fmt);
546 #ifdef HAVE_VSNPRINTF
547 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
548 #else
549 len = vsprintf(buffer, fmt, argp);
550 #endif
551 va_end(argp);
552 if (len < 0)
553 {
554 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
555 return -1;
556 }
558 return add_to_wbuf(sock, buffer, len);
559 } /* }}} static int add_response_info */
561 static int count_lines(char *str) /* {{{ */
562 {
563 int lines = 0;
565 if (str != NULL)
566 {
567 while ((str = strchr(str, '\n')) != NULL)
568 {
569 ++lines;
570 ++str;
571 }
572 }
574 return lines;
575 } /* }}} static int count_lines */
577 /* send the response back to the user.
578 * returns 0 on success, -1 on error
579 * write buffer is always zeroed after this call */
580 static int send_response (listen_socket_t *sock, response_code rc,
581 char *fmt, ...) /* {{{ */
582 {
583 va_list argp;
584 char buffer[RRD_CMD_MAX];
585 int lines;
586 ssize_t wrote;
587 int rclen, len;
589 if (JOURNAL_REPLAY(sock)) return rc;
591 if (sock->batch_start)
592 {
593 if (rc == RESP_OK)
594 return rc; /* no response on success during BATCH */
595 lines = sock->batch_cmd;
596 }
597 else if (rc == RESP_OK)
598 lines = count_lines(sock->wbuf);
599 else
600 lines = -1;
602 rclen = sprintf(buffer, "%d ", lines);
603 va_start(argp, fmt);
604 #ifdef HAVE_VSNPRINTF
605 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
606 #else
607 len = vsprintf(buffer+rclen, fmt, argp);
608 #endif
609 va_end(argp);
610 if (len < 0)
611 return -1;
613 len += rclen;
615 /* append the result to the wbuf, don't write to the user */
616 if (sock->batch_start)
617 return add_to_wbuf(sock, buffer, len);
619 /* first write must be complete */
620 if (len != write(sock->fd, buffer, len))
621 {
622 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
623 return -1;
624 }
626 if (sock->wbuf != NULL && rc == RESP_OK)
627 {
628 wrote = 0;
629 while (wrote < sock->wbuf_len)
630 {
631 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
632 if (wb <= 0)
633 {
634 RRDD_LOG(LOG_INFO, "send_response: could not write results");
635 return -1;
636 }
637 wrote += wb;
638 }
639 }
641 free(sock->wbuf); sock->wbuf = NULL;
642 sock->wbuf_len = 0;
644 return 0;
645 } /* }}} */
647 static void wipe_ci_values(cache_item_t *ci, time_t when)
648 {
649 ci->values = NULL;
650 ci->values_num = 0;
651 ci->values_alloc = 0;
653 ci->last_flush_time = when;
654 if (config_write_jitter > 0)
655 ci->last_flush_time += (rrd_random() % config_write_jitter);
656 }
658 /* remove_from_queue
659 * remove a "cache_item_t" item from the queue.
660 * must hold 'cache_lock' when calling this
661 */
662 static void remove_from_queue(cache_item_t *ci) /* {{{ */
663 {
664 if (ci == NULL) return;
665 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
667 if (ci->prev == NULL)
668 cache_queue_head = ci->next; /* reset head */
669 else
670 ci->prev->next = ci->next;
672 if (ci->next == NULL)
673 cache_queue_tail = ci->prev; /* reset the tail */
674 else
675 ci->next->prev = ci->prev;
677 ci->next = ci->prev = NULL;
678 ci->flags &= ~CI_FLAGS_IN_QUEUE;
680 pthread_mutex_lock (&stats_lock);
681 assert (stats_queue_length > 0);
682 stats_queue_length--;
683 pthread_mutex_unlock (&stats_lock);
685 } /* }}} static void remove_from_queue */
687 /* free the resources associated with the cache_item_t
688 * must hold cache_lock when calling this function
689 */
690 static void *free_cache_item(cache_item_t *ci) /* {{{ */
691 {
692 if (ci == NULL) return NULL;
694 remove_from_queue(ci);
696 for (size_t i=0; i < ci->values_num; i++)
697 free(ci->values[i]);
699 free (ci->values);
700 free (ci->file);
702 /* in case anyone is waiting */
703 pthread_cond_broadcast(&ci->flushed);
704 pthread_cond_destroy(&ci->flushed);
706 free (ci);
708 return NULL;
709 } /* }}} static void *free_cache_item */
711 /*
712 * enqueue_cache_item:
713 * `cache_lock' must be acquired before calling this function!
714 */
715 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
716 queue_side_t side)
717 {
718 if (ci == NULL)
719 return (-1);
721 if (ci->values_num == 0)
722 return (0);
724 if (side == HEAD)
725 {
726 if (cache_queue_head == ci)
727 return 0;
729 /* remove if further down in queue */
730 remove_from_queue(ci);
732 ci->prev = NULL;
733 ci->next = cache_queue_head;
734 if (ci->next != NULL)
735 ci->next->prev = ci;
736 cache_queue_head = ci;
738 if (cache_queue_tail == NULL)
739 cache_queue_tail = cache_queue_head;
740 }
741 else /* (side == TAIL) */
742 {
743 /* We don't move values back in the list.. */
744 if (ci->flags & CI_FLAGS_IN_QUEUE)
745 return (0);
747 assert (ci->next == NULL);
748 assert (ci->prev == NULL);
750 ci->prev = cache_queue_tail;
752 if (cache_queue_tail == NULL)
753 cache_queue_head = ci;
754 else
755 cache_queue_tail->next = ci;
757 cache_queue_tail = ci;
758 }
760 ci->flags |= CI_FLAGS_IN_QUEUE;
762 pthread_cond_signal(&queue_cond);
763 pthread_mutex_lock (&stats_lock);
764 stats_queue_length++;
765 pthread_mutex_unlock (&stats_lock);
767 return (0);
768 } /* }}} int enqueue_cache_item */
770 /*
771 * tree_callback_flush:
772 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
773 * while this is in progress.
774 */
775 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
776 gpointer data)
777 {
778 cache_item_t *ci;
779 callback_flush_data_t *cfd;
781 ci = (cache_item_t *) value;
782 cfd = (callback_flush_data_t *) data;
784 if (ci->flags & CI_FLAGS_IN_QUEUE)
785 return FALSE;
787 if (ci->values_num > 0
788 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
789 {
790 enqueue_cache_item (ci, TAIL);
791 }
792 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
793 && (ci->values_num <= 0))
794 {
795 assert ((char *) key == ci->file);
796 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
797 {
798 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
799 return (FALSE);
800 }
801 }
803 return (FALSE);
804 } /* }}} gboolean tree_callback_flush */
806 static int flush_old_values (int max_age)
807 {
808 callback_flush_data_t cfd;
809 size_t k;
811 memset (&cfd, 0, sizeof (cfd));
812 /* Pass the current time as user data so that we don't need to call
813 * `time' for each node. */
814 cfd.now = time (NULL);
815 cfd.keys = NULL;
816 cfd.keys_num = 0;
818 if (max_age > 0)
819 cfd.abs_timeout = cfd.now - max_age;
820 else
821 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
823 /* `tree_callback_flush' will return the keys of all values that haven't
824 * been touched in the last `config_flush_interval' seconds in `cfd'.
825 * The char*'s in this array point to the same memory as ci->file, so we
826 * don't need to free them separately. */
827 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
829 for (k = 0; k < cfd.keys_num; k++)
830 {
831 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
832 /* should never fail, since we have held the cache_lock
833 * the entire time */
834 assert(status == TRUE);
835 }
837 if (cfd.keys != NULL)
838 {
839 free (cfd.keys);
840 cfd.keys = NULL;
841 }
843 return (0);
844 } /* int flush_old_values */
846 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
847 {
848 struct timeval now;
849 struct timespec next_flush;
850 int status;
852 gettimeofday (&now, NULL);
853 next_flush.tv_sec = now.tv_sec + config_flush_interval;
854 next_flush.tv_nsec = 1000 * now.tv_usec;
856 pthread_mutex_lock(&cache_lock);
858 while (state == RUNNING)
859 {
860 gettimeofday (&now, NULL);
861 if ((now.tv_sec > next_flush.tv_sec)
862 || ((now.tv_sec == next_flush.tv_sec)
863 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
864 {
865 RRDD_LOG(LOG_DEBUG, "flushing old values");
867 /* Determine the time of the next cache flush. */
868 next_flush.tv_sec = now.tv_sec + config_flush_interval;
870 /* Flush all values that haven't been written in the last
871 * `config_write_interval' seconds. */
872 flush_old_values (config_write_interval);
874 /* unlock the cache while we rotate so we don't block incoming
875 * updates if the fsync() blocks on disk I/O */
876 pthread_mutex_unlock(&cache_lock);
877 journal_rotate();
878 pthread_mutex_lock(&cache_lock);
879 }
881 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
882 if (status != 0 && status != ETIMEDOUT)
883 {
884 RRDD_LOG (LOG_ERR, "flush_thread_main: "
885 "pthread_cond_timedwait returned %i.", status);
886 }
887 }
889 if (config_flush_at_shutdown)
890 flush_old_values (-1); /* flush everything */
892 state = SHUTDOWN;
894 pthread_mutex_unlock(&cache_lock);
896 return NULL;
897 } /* void *flush_thread_main */
899 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
900 {
901 pthread_mutex_lock (&cache_lock);
903 while (state != SHUTDOWN
904 || (cache_queue_head != NULL && config_flush_at_shutdown))
905 {
906 cache_item_t *ci;
907 char *file;
908 char **values;
909 size_t values_num;
910 int status;
912 /* Now, check if there's something to store away. If not, wait until
913 * something comes in. */
914 if (cache_queue_head == NULL)
915 {
916 status = pthread_cond_wait (&queue_cond, &cache_lock);
917 if ((status != 0) && (status != ETIMEDOUT))
918 {
919 RRDD_LOG (LOG_ERR, "queue_thread_main: "
920 "pthread_cond_wait returned %i.", status);
921 }
922 }
924 /* Check if a value has arrived. This may be NULL if we timed out or there
925 * was an interrupt such as a signal. */
926 if (cache_queue_head == NULL)
927 continue;
929 ci = cache_queue_head;
931 /* copy the relevant parts */
932 file = strdup (ci->file);
933 if (file == NULL)
934 {
935 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
936 continue;
937 }
939 assert(ci->values != NULL);
940 assert(ci->values_num > 0);
942 values = ci->values;
943 values_num = ci->values_num;
945 wipe_ci_values(ci, time(NULL));
946 remove_from_queue(ci);
948 pthread_mutex_unlock (&cache_lock);
950 rrd_clear_error ();
951 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
952 if (status != 0)
953 {
954 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
955 "rrd_update_r (%s) failed with status %i. (%s)",
956 file, status, rrd_get_error());
957 }
959 journal_write("wrote", file);
961 /* Search again in the tree. It's possible someone issued a "FORGET"
962 * while we were writing the update values. */
963 pthread_mutex_lock(&cache_lock);
964 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
965 if (ci)
966 pthread_cond_broadcast(&ci->flushed);
967 pthread_mutex_unlock(&cache_lock);
969 if (status == 0)
970 {
971 pthread_mutex_lock (&stats_lock);
972 stats_updates_written++;
973 stats_data_sets_written += values_num;
974 pthread_mutex_unlock (&stats_lock);
975 }
977 rrd_free_ptrs((void ***) &values, &values_num);
978 free(file);
980 pthread_mutex_lock (&cache_lock);
981 }
982 pthread_mutex_unlock (&cache_lock);
984 return (NULL);
985 } /* }}} void *queue_thread_main */
987 static int buffer_get_field (char **buffer_ret, /* {{{ */
988 size_t *buffer_size_ret, char **field_ret)
989 {
990 char *buffer;
991 size_t buffer_pos;
992 size_t buffer_size;
993 char *field;
994 size_t field_size;
995 int status;
997 buffer = *buffer_ret;
998 buffer_pos = 0;
999 buffer_size = *buffer_size_ret;
1000 field = *buffer_ret;
1001 field_size = 0;
1003 if (buffer_size <= 0)
1004 return (-1);
1006 /* This is ensured by `handle_request'. */
1007 assert (buffer[buffer_size - 1] == '\0');
1009 status = -1;
1010 while (buffer_pos < buffer_size)
1011 {
1012 /* Check for end-of-field or end-of-buffer */
1013 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1014 {
1015 field[field_size] = 0;
1016 field_size++;
1017 buffer_pos++;
1018 status = 0;
1019 break;
1020 }
1021 /* Handle escaped characters. */
1022 else if (buffer[buffer_pos] == '\\')
1023 {
1024 if (buffer_pos >= (buffer_size - 1))
1025 break;
1026 buffer_pos++;
1027 field[field_size] = buffer[buffer_pos];
1028 field_size++;
1029 buffer_pos++;
1030 }
1031 /* Normal operation */
1032 else
1033 {
1034 field[field_size] = buffer[buffer_pos];
1035 field_size++;
1036 buffer_pos++;
1037 }
1038 } /* while (buffer_pos < buffer_size) */
1040 if (status != 0)
1041 return (status);
1043 *buffer_ret = buffer + buffer_pos;
1044 *buffer_size_ret = buffer_size - buffer_pos;
1045 *field_ret = field;
1047 return (0);
1048 } /* }}} int buffer_get_field */
1050 /* if we're restricting writes to the base directory,
1051 * check whether the file falls within the dir
1052 * returns 1 if OK, otherwise 0
1053 */
1054 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1055 {
1056 assert(file != NULL);
1058 if (!config_write_base_only
1059 || JOURNAL_REPLAY(sock)
1060 || config_base_dir == NULL)
1061 return 1;
1063 if (strstr(file, "../") != NULL) goto err;
1065 /* relative paths without "../" are ok */
1066 if (*file != '/') return 1;
1068 /* file must be of the format base + "/" + <1+ char filename> */
1069 if (strlen(file) < _config_base_dir_len + 2) goto err;
1070 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1071 if (*(file + _config_base_dir_len) != '/') goto err;
1073 return 1;
1075 err:
1076 if (sock != NULL && sock->fd >= 0)
1077 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1079 return 0;
1080 } /* }}} static int check_file_access */
1082 /* when using a base dir, convert relative paths to absolute paths.
1083 * if necessary, modifies the "filename" pointer to point
1084 * to the new path created in "tmp". "tmp" is provided
1085 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1086 *
1087 * this allows us to optimize for the expected case (absolute path)
1088 * with a no-op.
1089 */
1090 static void get_abs_path(char **filename, char *tmp)
1091 {
1092 assert(tmp != NULL);
1093 assert(filename != NULL && *filename != NULL);
1095 if (config_base_dir == NULL || **filename == '/')
1096 return;
1098 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1099 *filename = tmp;
1100 } /* }}} static int get_abs_path */
1102 static int flush_file (const char *filename) /* {{{ */
1103 {
1104 cache_item_t *ci;
1106 pthread_mutex_lock (&cache_lock);
1108 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1109 if (ci == NULL)
1110 {
1111 pthread_mutex_unlock (&cache_lock);
1112 return (ENOENT);
1113 }
1115 if (ci->values_num > 0)
1116 {
1117 /* Enqueue at head */
1118 enqueue_cache_item (ci, HEAD);
1119 pthread_cond_wait(&ci->flushed, &cache_lock);
1120 }
1122 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1123 * may have been purged during our cond_wait() */
1125 pthread_mutex_unlock(&cache_lock);
1127 return (0);
1128 } /* }}} int flush_file */
1130 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1131 {
1132 char *err = "Syntax error.\n";
1134 if (cmd && cmd->syntax)
1135 err = cmd->syntax;
1137 return send_response(sock, RESP_ERR, "Usage: %s", err);
1138 } /* }}} static int syntax_error() */
1140 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1141 {
1142 uint64_t copy_queue_length;
1143 uint64_t copy_updates_received;
1144 uint64_t copy_flush_received;
1145 uint64_t copy_updates_written;
1146 uint64_t copy_data_sets_written;
1147 uint64_t copy_journal_bytes;
1148 uint64_t copy_journal_rotate;
1150 uint64_t tree_nodes_number;
1151 uint64_t tree_depth;
1153 pthread_mutex_lock (&stats_lock);
1154 copy_queue_length = stats_queue_length;
1155 copy_updates_received = stats_updates_received;
1156 copy_flush_received = stats_flush_received;
1157 copy_updates_written = stats_updates_written;
1158 copy_data_sets_written = stats_data_sets_written;
1159 copy_journal_bytes = stats_journal_bytes;
1160 copy_journal_rotate = stats_journal_rotate;
1161 pthread_mutex_unlock (&stats_lock);
1163 pthread_mutex_lock (&cache_lock);
1164 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1165 tree_depth = (uint64_t) g_tree_height (cache_tree);
1166 pthread_mutex_unlock (&cache_lock);
1168 add_response_info(sock,
1169 "QueueLength: %"PRIu64"\n", copy_queue_length);
1170 add_response_info(sock,
1171 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1172 add_response_info(sock,
1173 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1174 add_response_info(sock,
1175 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1176 add_response_info(sock,
1177 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1178 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1179 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1180 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1181 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1183 send_response(sock, RESP_OK, "Statistics follow\n");
1185 return (0);
1186 } /* }}} int handle_request_stats */
1188 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1189 {
1190 char *file, file_tmp[PATH_MAX];
1191 int status;
1193 status = buffer_get_field (&buffer, &buffer_size, &file);
1194 if (status != 0)
1195 {
1196 return syntax_error(sock,cmd);
1197 }
1198 else
1199 {
1200 pthread_mutex_lock(&stats_lock);
1201 stats_flush_received++;
1202 pthread_mutex_unlock(&stats_lock);
1204 get_abs_path(&file, file_tmp);
1205 if (!check_file_access(file, sock)) return 0;
1207 status = flush_file (file);
1208 if (status == 0)
1209 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1210 else if (status == ENOENT)
1211 {
1212 /* no file in our tree; see whether it exists at all */
1213 struct stat statbuf;
1215 memset(&statbuf, 0, sizeof(statbuf));
1216 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1217 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1218 else
1219 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1220 }
1221 else if (status < 0)
1222 return send_response(sock, RESP_ERR, "Internal error.\n");
1223 else
1224 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1225 }
1227 /* NOTREACHED */
1228 assert(1==0);
1229 } /* }}} int handle_request_flush */
1231 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1232 {
1233 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1235 pthread_mutex_lock(&cache_lock);
1236 flush_old_values(-1);
1237 pthread_mutex_unlock(&cache_lock);
1239 return send_response(sock, RESP_OK, "Started flush.\n");
1240 } /* }}} static int handle_request_flushall */
1242 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1243 {
1244 int status;
1245 char *file, file_tmp[PATH_MAX];
1246 cache_item_t *ci;
1248 status = buffer_get_field(&buffer, &buffer_size, &file);
1249 if (status != 0)
1250 return syntax_error(sock,cmd);
1252 get_abs_path(&file, file_tmp);
1254 pthread_mutex_lock(&cache_lock);
1255 ci = g_tree_lookup(cache_tree, file);
1256 if (ci == NULL)
1257 {
1258 pthread_mutex_unlock(&cache_lock);
1259 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1260 }
1262 for (size_t i=0; i < ci->values_num; i++)
1263 add_response_info(sock, "%s\n", ci->values[i]);
1265 pthread_mutex_unlock(&cache_lock);
1266 return send_response(sock, RESP_OK, "updates pending\n");
1267 } /* }}} static int handle_request_pending */
1269 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1270 {
1271 int status;
1272 gboolean found;
1273 char *file, file_tmp[PATH_MAX];
1275 status = buffer_get_field(&buffer, &buffer_size, &file);
1276 if (status != 0)
1277 return syntax_error(sock,cmd);
1279 get_abs_path(&file, file_tmp);
1280 if (!check_file_access(file, sock)) return 0;
1282 pthread_mutex_lock(&cache_lock);
1283 found = g_tree_remove(cache_tree, file);
1284 pthread_mutex_unlock(&cache_lock);
1286 if (found == TRUE)
1287 {
1288 if (!JOURNAL_REPLAY(sock))
1289 journal_write("forget", file);
1291 return send_response(sock, RESP_OK, "Gone!\n");
1292 }
1293 else
1294 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1296 /* NOTREACHED */
1297 assert(1==0);
1298 } /* }}} static int handle_request_forget */
1300 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1301 {
1302 cache_item_t *ci;
1304 pthread_mutex_lock(&cache_lock);
1306 ci = cache_queue_head;
1307 while (ci != NULL)
1308 {
1309 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1310 ci = ci->next;
1311 }
1313 pthread_mutex_unlock(&cache_lock);
1315 return send_response(sock, RESP_OK, "in queue.\n");
1316 } /* }}} int handle_request_queue */
1318 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1319 {
1320 char *file, file_tmp[PATH_MAX];
1321 int values_num = 0;
1322 int status;
1323 char orig_buf[RRD_CMD_MAX];
1325 cache_item_t *ci;
1327 /* save it for the journal later */
1328 if (!JOURNAL_REPLAY(sock))
1329 strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
1331 status = buffer_get_field (&buffer, &buffer_size, &file);
1332 if (status != 0)
1333 return syntax_error(sock,cmd);
1335 pthread_mutex_lock(&stats_lock);
1336 stats_updates_received++;
1337 pthread_mutex_unlock(&stats_lock);
1339 get_abs_path(&file, file_tmp);
1340 if (!check_file_access(file, sock)) return 0;
1342 pthread_mutex_lock (&cache_lock);
1343 ci = g_tree_lookup (cache_tree, file);
1345 if (ci == NULL) /* {{{ */
1346 {
1347 struct stat statbuf;
1348 cache_item_t *tmp;
1350 /* don't hold the lock while we setup; stat(2) might block */
1351 pthread_mutex_unlock(&cache_lock);
1353 memset (&statbuf, 0, sizeof (statbuf));
1354 status = stat (file, &statbuf);
1355 if (status != 0)
1356 {
1357 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1359 status = errno;
1360 if (status == ENOENT)
1361 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1362 else
1363 return send_response(sock, RESP_ERR,
1364 "stat failed with error %i.\n", status);
1365 }
1366 if (!S_ISREG (statbuf.st_mode))
1367 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1369 if (access(file, R_OK|W_OK) != 0)
1370 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1371 file, rrd_strerror(errno));
1373 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1374 if (ci == NULL)
1375 {
1376 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1378 return send_response(sock, RESP_ERR, "malloc failed.\n");
1379 }
1380 memset (ci, 0, sizeof (cache_item_t));
1382 ci->file = strdup (file);
1383 if (ci->file == NULL)
1384 {
1385 free (ci);
1386 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1388 return send_response(sock, RESP_ERR, "strdup failed.\n");
1389 }
1391 wipe_ci_values(ci, now);
1392 ci->flags = CI_FLAGS_IN_TREE;
1393 pthread_cond_init(&ci->flushed, NULL);
1395 pthread_mutex_lock(&cache_lock);
1397 /* another UPDATE might have added this entry in the meantime */
1398 tmp = g_tree_lookup (cache_tree, file);
1399 if (tmp == NULL)
1400 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1401 else
1402 {
1403 free_cache_item (ci);
1404 ci = tmp;
1405 }
1407 /* state may have changed while we were unlocked */
1408 if (state == SHUTDOWN)
1409 return -1;
1410 } /* }}} */
1411 assert (ci != NULL);
1413 /* don't re-write updates in replay mode */
1414 if (!JOURNAL_REPLAY(sock))
1415 journal_write("update", orig_buf);
1417 while (buffer_size > 0)
1418 {
1419 char *value;
1420 double stamp;
1421 char *eostamp;
1423 status = buffer_get_field (&buffer, &buffer_size, &value);
1424 if (status != 0)
1425 {
1426 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1427 break;
1428 }
1430 /* make sure update time is always moving forward. We use double here since
1431 update does support subsecond precision for timestamps ... */
1432 stamp = strtod(value, &eostamp);
1433 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434 {
1435 pthread_mutex_unlock(&cache_lock);
1436 return send_response(sock, RESP_ERR,
1437 "Cannot find timestamp in '%s'!\n", value);
1438 }
1439 else if (stamp <= ci->last_update_stamp)
1440 {
1441 pthread_mutex_unlock(&cache_lock);
1442 return send_response(sock, RESP_ERR,
1443 "illegal attempt to update using time %lf when last"
1444 " update time is %lf (minimum one second step)\n",
1445 stamp, ci->last_update_stamp);
1446 }
1447 else
1448 ci->last_update_stamp = stamp;
1450 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451 &ci->values_alloc, config_alloc_chunk))
1452 {
1453 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454 continue;
1455 }
1457 values_num++;
1458 }
1460 if (((now - ci->last_flush_time) >= config_write_interval)
1461 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462 && (ci->values_num > 0))
1463 {
1464 enqueue_cache_item (ci, TAIL);
1465 }
1467 pthread_mutex_unlock (&cache_lock);
1469 if (values_num < 1)
1470 return send_response(sock, RESP_ERR, "No values updated.\n");
1471 else
1472 return send_response(sock, RESP_OK,
1473 "errors, enqueued %i value(s).\n", values_num);
1475 /* NOTREACHED */
1476 assert(1==0);
1478 } /* }}} int handle_request_update */
1480 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1481 {
1482 char *file, file_tmp[PATH_MAX];
1483 char *cf;
1485 char *start_str;
1486 char *end_str;
1487 time_t start_tm;
1488 time_t end_tm;
1490 unsigned long step;
1491 unsigned long ds_cnt;
1492 char **ds_namv;
1493 rrd_value_t *data;
1495 int status;
1496 unsigned long i;
1497 time_t t;
1498 rrd_value_t *data_ptr;
1500 file = NULL;
1501 cf = NULL;
1502 start_str = NULL;
1503 end_str = NULL;
1505 /* Read the arguments */
1506 do /* while (0) */
1507 {
1508 status = buffer_get_field (&buffer, &buffer_size, &file);
1509 if (status != 0)
1510 break;
1512 status = buffer_get_field (&buffer, &buffer_size, &cf);
1513 if (status != 0)
1514 break;
1516 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1517 if (status != 0)
1518 {
1519 start_str = NULL;
1520 status = 0;
1521 break;
1522 }
1524 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1525 if (status != 0)
1526 {
1527 end_str = NULL;
1528 status = 0;
1529 break;
1530 }
1531 } while (0);
1533 if (status != 0)
1534 return (syntax_error(sock,cmd));
1536 get_abs_path(&file, file_tmp);
1537 if (!check_file_access(file, sock)) return 0;
1539 status = flush_file (file);
1540 if ((status != 0) && (status != ENOENT))
1541 return (send_response (sock, RESP_ERR,
1542 "flush_file (%s) failed with status %i.\n", file, status));
1544 t = time (NULL); /* "now" */
1546 /* Parse start time */
1547 if (start_str != NULL)
1548 {
1549 char *endptr;
1550 long value;
1552 endptr = NULL;
1553 errno = 0;
1554 value = strtol (start_str, &endptr, /* base = */ 0);
1555 if ((endptr == start_str) || (errno != 0))
1556 return (send_response(sock, RESP_ERR,
1557 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1558 start_str));
1560 if (value > 0)
1561 start_tm = (time_t) value;
1562 else
1563 start_tm = (time_t) (t + value);
1564 }
1565 else
1566 {
1567 start_tm = t - 86400;
1568 }
1570 /* Parse end time */
1571 if (end_str != NULL)
1572 {
1573 char *endptr;
1574 long value;
1576 endptr = NULL;
1577 errno = 0;
1578 value = strtol (end_str, &endptr, /* base = */ 0);
1579 if ((endptr == end_str) || (errno != 0))
1580 return (send_response(sock, RESP_ERR,
1581 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1582 end_str));
1584 if (value > 0)
1585 end_tm = (time_t) value;
1586 else
1587 end_tm = (time_t) (t + value);
1588 }
1589 else
1590 {
1591 end_tm = t;
1592 }
1594 step = -1;
1595 ds_cnt = 0;
1596 ds_namv = NULL;
1597 data = NULL;
1599 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1600 &ds_cnt, &ds_namv, &data);
1601 if (status != 0)
1602 return (send_response(sock, RESP_ERR,
1603 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1605 add_response_info (sock, "FlushVersion: %lu\n", 1);
1606 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1607 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1608 add_response_info (sock, "Step: %lu\n", step);
1609 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1611 #define SSTRCAT(buffer,str,buffer_fill) do { \
1612 size_t str_len = strlen (str); \
1613 if ((buffer_fill + str_len) > sizeof (buffer)) \
1614 str_len = sizeof (buffer) - buffer_fill; \
1615 if (str_len > 0) { \
1616 strncpy (buffer + buffer_fill, str, str_len); \
1617 buffer_fill += str_len; \
1618 assert (buffer_fill <= sizeof (buffer)); \
1619 if (buffer_fill == sizeof (buffer)) \
1620 buffer[buffer_fill - 1] = 0; \
1621 else \
1622 buffer[buffer_fill] = 0; \
1623 } \
1624 } while (0)
1626 { /* Add list of DS names */
1627 char linebuf[1024];
1628 size_t linebuf_fill;
1630 memset (linebuf, 0, sizeof (linebuf));
1631 linebuf_fill = 0;
1632 for (i = 0; i < ds_cnt; i++)
1633 {
1634 if (i > 0)
1635 SSTRCAT (linebuf, " ", linebuf_fill);
1636 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1637 rrd_freemem(ds_namv[i]);
1638 }
1639 rrd_freemem(ds_namv);
1640 add_response_info (sock, "DSName: %s\n", linebuf);
1641 }
1643 /* Add the actual data */
1644 assert (step > 0);
1645 data_ptr = data;
1646 for (t = start_tm + step; t <= end_tm; t += step)
1647 {
1648 char linebuf[1024];
1649 size_t linebuf_fill;
1650 char tmp[128];
1652 memset (linebuf, 0, sizeof (linebuf));
1653 linebuf_fill = 0;
1654 for (i = 0; i < ds_cnt; i++)
1655 {
1656 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1657 tmp[sizeof (tmp) - 1] = 0;
1658 SSTRCAT (linebuf, tmp, linebuf_fill);
1660 data_ptr++;
1661 }
1663 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1664 } /* for (t) */
1665 rrd_freemem(data);
1667 return (send_response (sock, RESP_OK, "Success\n"));
1668 #undef SSTRCAT
1669 } /* }}} int handle_request_fetch */
1671 /* we came across a "WROTE" entry during journal replay.
1672 * throw away any values that we have accumulated for this file
1673 */
1674 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1675 {
1676 cache_item_t *ci;
1677 const char *file = buffer;
1679 pthread_mutex_lock(&cache_lock);
1681 ci = g_tree_lookup(cache_tree, file);
1682 if (ci == NULL)
1683 {
1684 pthread_mutex_unlock(&cache_lock);
1685 return (0);
1686 }
1688 if (ci->values)
1689 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1691 wipe_ci_values(ci, now);
1692 remove_from_queue(ci);
1694 pthread_mutex_unlock(&cache_lock);
1695 return (0);
1696 } /* }}} int handle_request_wrote */
1698 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1699 {
1700 char *file, file_tmp[PATH_MAX];
1701 int status;
1702 rrd_info_t *info;
1704 /* obtain filename */
1705 status = buffer_get_field(&buffer, &buffer_size, &file);
1706 if (status != 0)
1707 return syntax_error(sock,cmd);
1708 /* get full pathname */
1709 get_abs_path(&file, file_tmp);
1710 if (!check_file_access(file, sock)) {
1711 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1712 }
1713 /* get data */
1714 rrd_clear_error ();
1715 info = rrd_info_r(file);
1716 if(!info) {
1717 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1718 }
1719 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1720 switch (data->type) {
1721 case RD_I_VAL:
1722 if (isnan(data->value.u_val))
1723 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1724 else
1725 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1726 break;
1727 case RD_I_CNT:
1728 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1729 break;
1730 case RD_I_INT:
1731 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1732 break;
1733 case RD_I_STR:
1734 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1735 break;
1736 case RD_I_BLO:
1737 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1738 break;
1739 }
1740 }
1742 rrd_info_free(info);
1744 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1745 } /* }}} static int handle_request_info */
1747 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1748 {
1749 char *i, *file, file_tmp[PATH_MAX];
1750 int status;
1751 int idx;
1752 time_t t;
1754 /* obtain filename */
1755 status = buffer_get_field(&buffer, &buffer_size, &file);
1756 if (status != 0)
1757 return syntax_error(sock,cmd);
1758 /* get full pathname */
1759 get_abs_path(&file, file_tmp);
1760 if (!check_file_access(file, sock)) {
1761 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1762 }
1764 status = buffer_get_field(&buffer, &buffer_size, &i);
1765 if (status != 0)
1766 return syntax_error(sock,cmd);
1767 idx = atoi(i);
1768 if(idx<0) {
1769 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1770 }
1772 /* get data */
1773 rrd_clear_error ();
1774 t = rrd_first_r(file,idx);
1775 if(t<1) {
1776 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1777 }
1778 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1779 } /* }}} static int handle_request_first */
1782 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1783 {
1784 char *file, file_tmp[PATH_MAX];
1785 int status;
1786 time_t t, from_file, step;
1787 rrd_file_t * rrd_file;
1788 cache_item_t * ci;
1789 rrd_t rrd;
1791 /* obtain filename */
1792 status = buffer_get_field(&buffer, &buffer_size, &file);
1793 if (status != 0)
1794 return syntax_error(sock,cmd);
1795 /* get full pathname */
1796 get_abs_path(&file, file_tmp);
1797 if (!check_file_access(file, sock)) {
1798 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1799 }
1800 rrd_clear_error();
1801 rrd_init(&rrd);
1802 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1803 if(!rrd_file) {
1804 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1805 }
1806 from_file = rrd.live_head->last_up;
1807 step = rrd.stat_head->pdp_step;
1808 rrd_close(rrd_file);
1809 pthread_mutex_lock(&cache_lock);
1810 ci = g_tree_lookup(cache_tree, file);
1811 if (ci)
1812 t = ci->last_update_stamp;
1813 else
1814 t = from_file;
1815 pthread_mutex_unlock(&cache_lock);
1816 t -= t % step;
1817 rrd_free(&rrd);
1818 if(t<1) {
1819 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1820 }
1821 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1822 } /* }}} static int handle_request_last */
1824 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1825 {
1826 char *file, file_tmp[PATH_MAX];
1827 char *tok;
1828 int ac = 0;
1829 char *av[128];
1830 int status;
1831 unsigned long step = 300;
1832 time_t last_up = time(NULL)-10;
1833 int no_overwrite = opt_no_overwrite;
1836 /* obtain filename */
1837 status = buffer_get_field(&buffer, &buffer_size, &file);
1838 if (status != 0)
1839 return syntax_error(sock,cmd);
1840 /* get full pathname */
1841 get_abs_path(&file, file_tmp);
1842 if (!check_file_access(file, sock)) {
1843 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1844 }
1845 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1847 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1848 if( ! strncmp(tok,"-b",2) ) {
1849 status = buffer_get_field(&buffer, &buffer_size, &tok );
1850 if (status != 0) return syntax_error(sock,cmd);
1851 last_up = (time_t) atol(tok);
1852 continue;
1853 }
1854 if( ! strncmp(tok,"-s",2) ) {
1855 status = buffer_get_field(&buffer, &buffer_size, &tok );
1856 if (status != 0) return syntax_error(sock,cmd);
1857 step = atol(tok);
1858 continue;
1859 }
1860 if( ! strncmp(tok,"-O",2) ) {
1861 no_overwrite = 1;
1862 continue;
1863 }
1864 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1865 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1866 return syntax_error(sock,cmd);
1867 }
1868 if(step<1) {
1869 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1870 }
1871 if (last_up < 3600 * 24 * 365 * 10) {
1872 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1873 }
1875 rrd_clear_error ();
1876 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1878 if(!status) {
1879 return send_response(sock, RESP_OK, "RRD created OK\n");
1880 }
1881 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1882 } /* }}} static int handle_request_create */
1884 /* start "BATCH" processing */
1885 static int batch_start (HANDLER_PROTO) /* {{{ */
1886 {
1887 int status;
1888 if (sock->batch_start)
1889 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1891 status = send_response(sock, RESP_OK,
1892 "Go ahead. End with dot '.' on its own line.\n");
1893 sock->batch_start = time(NULL);
1894 sock->batch_cmd = 0;
1896 return status;
1897 } /* }}} static int batch_start */
1899 /* finish "BATCH" processing and return results to the client */
1900 static int batch_done (HANDLER_PROTO) /* {{{ */
1901 {
1902 assert(sock->batch_start);
1903 sock->batch_start = 0;
1904 sock->batch_cmd = 0;
1905 return send_response(sock, RESP_OK, "errors\n");
1906 } /* }}} static int batch_done */
1908 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1909 {
1910 return -1;
1911 } /* }}} static int handle_request_quit */
1913 static command_t list_of_commands[] = { /* {{{ */
1914 {
1915 "UPDATE",
1916 handle_request_update,
1917 CMD_CONTEXT_ANY,
1918 "UPDATE <filename> <values> [<values> ...]\n"
1919 ,
1920 "Adds the given file to the internal cache if it is not yet known and\n"
1921 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1922 "for details.\n"
1923 "\n"
1924 "Each <values> has the following form:\n"
1925 " <values> = <time>:<value>[:<value>[...]]\n"
1926 "See the rrdupdate(1) manpage for details.\n"
1927 },
1928 {
1929 "WROTE",
1930 handle_request_wrote,
1931 CMD_CONTEXT_JOURNAL,
1932 NULL,
1933 NULL
1934 },
1935 {
1936 "FLUSH",
1937 handle_request_flush,
1938 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1939 "FLUSH <filename>\n"
1940 ,
1941 "Adds the given filename to the head of the update queue and returns\n"
1942 "after it has been dequeued.\n"
1943 },
1944 {
1945 "FLUSHALL",
1946 handle_request_flushall,
1947 CMD_CONTEXT_CLIENT,
1948 "FLUSHALL\n"
1949 ,
1950 "Triggers writing of all pending updates. Returns immediately.\n"
1951 },
1952 {
1953 "PENDING",
1954 handle_request_pending,
1955 CMD_CONTEXT_CLIENT,
1956 "PENDING <filename>\n"
1957 ,
1958 "Shows any 'pending' updates for a file, in order.\n"
1959 "The updates shown have not yet been written to the underlying RRD file.\n"
1960 },
1961 {
1962 "FORGET",
1963 handle_request_forget,
1964 CMD_CONTEXT_ANY,
1965 "FORGET <filename>\n"
1966 ,
1967 "Removes the file completely from the cache.\n"
1968 "Any pending updates for the file will be lost.\n"
1969 },
1970 {
1971 "QUEUE",
1972 handle_request_queue,
1973 CMD_CONTEXT_CLIENT,
1974 "QUEUE\n"
1975 ,
1976 "Shows all files in the output queue.\n"
1977 "The output is zero or more lines in the following format:\n"
1978 "(where <num_vals> is the number of values to be written)\n"
1979 "\n"
1980 "<num_vals> <filename>\n"
1981 },
1982 {
1983 "STATS",
1984 handle_request_stats,
1985 CMD_CONTEXT_CLIENT,
1986 "STATS\n"
1987 ,
1988 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1989 "a description of the values.\n"
1990 },
1991 {
1992 "HELP",
1993 handle_request_help,
1994 CMD_CONTEXT_CLIENT,
1995 "HELP [<command>]\n",
1996 NULL, /* special! */
1997 },
1998 {
1999 "BATCH",
2000 batch_start,
2001 CMD_CONTEXT_CLIENT,
2002 "BATCH\n"
2003 ,
2004 "The 'BATCH' command permits the client to initiate a bulk load\n"
2005 " of commands to rrdcached.\n"
2006 "\n"
2007 "Usage:\n"
2008 "\n"
2009 " client: BATCH\n"
2010 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2011 " client: command #1\n"
2012 " client: command #2\n"
2013 " client: ... and so on\n"
2014 " client: .\n"
2015 " server: 2 errors\n"
2016 " server: 7 message for command #7\n"
2017 " server: 9 message for command #9\n"
2018 "\n"
2019 "For more information, consult the rrdcached(1) documentation.\n"
2020 },
2021 {
2022 ".", /* BATCH terminator */
2023 batch_done,
2024 CMD_CONTEXT_BATCH,
2025 NULL,
2026 NULL
2027 },
2028 {
2029 "FETCH",
2030 handle_request_fetch,
2031 CMD_CONTEXT_CLIENT,
2032 "FETCH <file> <CF> [<start> [<end>]]\n"
2033 ,
2034 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2035 },
2036 {
2037 "INFO",
2038 handle_request_info,
2039 CMD_CONTEXT_CLIENT,
2040 "INFO <filename>\n",
2041 "The INFO command retrieves information about a specified RRD file.\n"
2042 "This is returned in standard rrdinfo format, a sequence of lines\n"
2043 "with the format <keyname> = <value>\n"
2044 "Note that this is the data as of the last update of the RRD file itself,\n"
2045 "not the last time data was received via rrdcached, so there may be pending\n"
2046 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2047 },
2048 {
2049 "FIRST",
2050 handle_request_first,
2051 CMD_CONTEXT_CLIENT,
2052 "FIRST <filename> <rra index>\n",
2053 "The FIRST command retrieves the first data time for a specified RRA in\n"
2054 "an RRD file.\n"
2055 },
2056 {
2057 "LAST",
2058 handle_request_last,
2059 CMD_CONTEXT_CLIENT,
2060 "LAST <filename>\n",
2061 "The LAST command retrieves the last update time for a specified RRD file.\n"
2062 "Note that this is the time of the last update of the RRD file itself, not\n"
2063 "the last time data was received via rrdcached, so there may be pending\n"
2064 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2065 },
2066 {
2067 "CREATE",
2068 handle_request_create,
2069 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2070 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2071 "The CREATE command will create an RRD file, overwriting any existing file\n"
2072 "unless the -O option is given or rrdcached was started with the -O option.\n"
2073 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2074 "not acceptable) and the step is in seconds (default is 300).\n"
2075 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2076 },
2077 {
2078 "QUIT",
2079 handle_request_quit,
2080 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2081 "QUIT\n"
2082 ,
2083 "Disconnect from rrdcached.\n"
2084 }
2085 }; /* }}} command_t list_of_commands[] */
2086 static size_t list_of_commands_len = sizeof (list_of_commands)
2087 / sizeof (list_of_commands[0]);
2089 static command_t *find_command(char *cmd)
2090 {
2091 size_t i;
2093 for (i = 0; i < list_of_commands_len; i++)
2094 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2095 return (&list_of_commands[i]);
2096 return NULL;
2097 }
2099 /* We currently use the index in the `list_of_commands' array as a bit position
2100 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2101 * outside these functions so that switching to a more elegant storage method
2102 * is easily possible. */
2103 static ssize_t find_command_index (const char *cmd) /* {{{ */
2104 {
2105 size_t i;
2107 for (i = 0; i < list_of_commands_len; i++)
2108 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2109 return ((ssize_t) i);
2110 return (-1);
2111 } /* }}} ssize_t find_command_index */
2113 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2114 const char *cmd)
2115 {
2116 ssize_t i;
2118 if (JOURNAL_REPLAY(sock))
2119 return (1);
2121 if (cmd == NULL)
2122 return (-1);
2124 if ((strcasecmp ("QUIT", cmd) == 0)
2125 || (strcasecmp ("HELP", cmd) == 0))
2126 return (1);
2127 else if (strcmp (".", cmd) == 0)
2128 cmd = "BATCH";
2130 i = find_command_index (cmd);
2131 if (i < 0)
2132 return (-1);
2133 assert (i < 32);
2135 if ((sock->permissions & (1 << i)) != 0)
2136 return (1);
2137 return (0);
2138 } /* }}} int socket_permission_check */
2140 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2141 const char *cmd)
2142 {
2143 ssize_t i;
2145 i = find_command_index (cmd);
2146 if (i < 0)
2147 return (-1);
2148 assert (i < 32);
2150 sock->permissions |= (1 << i);
2151 return (0);
2152 } /* }}} int socket_permission_add */
2154 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2155 {
2156 sock->permissions = 0;
2157 } /* }}} socket_permission_clear */
2159 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2160 listen_socket_t *src)
2161 {
2162 dest->permissions = src->permissions;
2163 } /* }}} socket_permission_copy */
2165 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2166 {
2167 size_t i;
2169 sock->permissions = 0;
2170 for (i = 0; i < list_of_commands_len; i++)
2171 sock->permissions |= (1 << i);
2172 } /* }}} void socket_permission_set_all */
2174 /* check whether commands are received in the expected context */
2175 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2176 {
2177 if (JOURNAL_REPLAY(sock))
2178 return (cmd->context & CMD_CONTEXT_JOURNAL);
2179 else if (sock->batch_start)
2180 return (cmd->context & CMD_CONTEXT_BATCH);
2181 else
2182 return (cmd->context & CMD_CONTEXT_CLIENT);
2184 /* NOTREACHED */
2185 assert(1==0);
2186 }
2188 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2189 {
2190 int status;
2191 char *cmd_str;
2192 char *resp_txt;
2193 command_t *help = NULL;
2195 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2196 if (status == 0)
2197 help = find_command(cmd_str);
2199 if (help && (help->syntax || help->help))
2200 {
2201 char tmp[RRD_CMD_MAX];
2203 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2204 resp_txt = tmp;
2206 if (help->syntax)
2207 add_response_info(sock, "Usage: %s\n", help->syntax);
2209 if (help->help)
2210 add_response_info(sock, "%s\n", help->help);
2211 }
2212 else
2213 {
2214 size_t i;
2216 resp_txt = "Command overview\n";
2218 for (i = 0; i < list_of_commands_len; i++)
2219 {
2220 if (list_of_commands[i].syntax == NULL)
2221 continue;
2222 add_response_info (sock, "%s", list_of_commands[i].syntax);
2223 }
2224 }
2226 return send_response(sock, RESP_OK, resp_txt);
2227 } /* }}} int handle_request_help */
2229 static int handle_request (DISPATCH_PROTO) /* {{{ */
2230 {
2231 char *buffer_ptr = buffer;
2232 char *cmd_str = NULL;
2233 command_t *cmd = NULL;
2234 int status;
2236 assert (buffer[buffer_size - 1] == '\0');
2238 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2239 if (status != 0)
2240 {
2241 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2242 return (-1);
2243 }
2245 if (sock != NULL && sock->batch_start)
2246 sock->batch_cmd++;
2248 cmd = find_command(cmd_str);
2249 if (!cmd)
2250 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2252 if (!socket_permission_check (sock, cmd->cmd))
2253 return send_response(sock, RESP_ERR, "Permission denied.\n");
2255 if (!command_check_context(sock, cmd))
2256 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2258 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2259 } /* }}} int handle_request */
2261 static void journal_set_free (journal_set *js) /* {{{ */
2262 {
2263 if (js == NULL)
2264 return;
2266 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2268 free(js);
2269 } /* }}} journal_set_free */
2271 static void journal_set_remove (journal_set *js) /* {{{ */
2272 {
2273 if (js == NULL)
2274 return;
2276 for (uint i=0; i < js->files_num; i++)
2277 {
2278 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2279 unlink(js->files[i]);
2280 }
2281 } /* }}} journal_set_remove */
2283 /* close current journal file handle.
2284 * MUST hold journal_lock before calling */
2285 static void journal_close(void) /* {{{ */
2286 {
2287 if (journal_fh != NULL)
2288 {
2289 if (fclose(journal_fh) != 0)
2290 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2291 }
2293 journal_fh = NULL;
2294 journal_size = 0;
2295 } /* }}} journal_close */
2297 /* MUST hold journal_lock before calling */
2298 static void journal_new_file(void) /* {{{ */
2299 {
2300 struct timeval now;
2301 int new_fd;
2302 char new_file[PATH_MAX + 1];
2304 assert(journal_dir != NULL);
2305 assert(journal_cur != NULL);
2307 journal_close();
2309 gettimeofday(&now, NULL);
2310 /* this format assures that the files sort in strcmp() order */
2311 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2312 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2314 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2315 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2316 if (new_fd < 0)
2317 goto error;
2319 journal_fh = fdopen(new_fd, "a");
2320 if (journal_fh == NULL)
2321 goto error;
2323 journal_size = ftell(journal_fh);
2324 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2326 /* record the file in the journal set */
2327 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2329 return;
2331 error:
2332 RRDD_LOG(LOG_CRIT,
2333 "JOURNALING DISABLED: Error while trying to create %s : %s",
2334 new_file, rrd_strerror(errno));
2335 RRDD_LOG(LOG_CRIT,
2336 "JOURNALING DISABLED: All values will be flushed at shutdown");
2338 close(new_fd);
2339 config_flush_at_shutdown = 1;
2341 } /* }}} journal_new_file */
2343 /* MUST NOT hold journal_lock before calling this */
2344 static void journal_rotate(void) /* {{{ */
2345 {
2346 journal_set *old_js = NULL;
2348 if (journal_dir == NULL)
2349 return;
2351 RRDD_LOG(LOG_DEBUG, "rotating journals");
2353 pthread_mutex_lock(&stats_lock);
2354 ++stats_journal_rotate;
2355 pthread_mutex_unlock(&stats_lock);
2357 pthread_mutex_lock(&journal_lock);
2359 journal_close();
2361 /* rotate the journal sets */
2362 old_js = journal_old;
2363 journal_old = journal_cur;
2364 journal_cur = calloc(1, sizeof(journal_set));
2366 if (journal_cur != NULL)
2367 journal_new_file();
2368 else
2369 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2371 pthread_mutex_unlock(&journal_lock);
2373 journal_set_remove(old_js);
2374 journal_set_free (old_js);
2376 } /* }}} static void journal_rotate */
2378 /* MUST hold journal_lock when calling */
2379 static void journal_done(void) /* {{{ */
2380 {
2381 if (journal_cur == NULL)
2382 return;
2384 journal_close();
2386 if (config_flush_at_shutdown)
2387 {
2388 RRDD_LOG(LOG_INFO, "removing journals");
2389 journal_set_remove(journal_old);
2390 journal_set_remove(journal_cur);
2391 }
2392 else
2393 {
2394 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2395 "journals will be used at next startup");
2396 }
2398 journal_set_free(journal_cur);
2399 journal_set_free(journal_old);
2400 free(journal_dir);
2402 } /* }}} static void journal_done */
2404 static int journal_write(char *cmd, char *args) /* {{{ */
2405 {
2406 int chars;
2408 if (journal_fh == NULL)
2409 return 0;
2411 pthread_mutex_lock(&journal_lock);
2412 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2413 journal_size += chars;
2415 if (journal_size > JOURNAL_MAX)
2416 journal_new_file();
2418 pthread_mutex_unlock(&journal_lock);
2420 if (chars > 0)
2421 {
2422 pthread_mutex_lock(&stats_lock);
2423 stats_journal_bytes += chars;
2424 pthread_mutex_unlock(&stats_lock);
2425 }
2427 return chars;
2428 } /* }}} static int journal_write */
2430 static int journal_replay (const char *file) /* {{{ */
2431 {
2432 FILE *fh;
2433 int entry_cnt = 0;
2434 int fail_cnt = 0;
2435 uint64_t line = 0;
2436 char entry[RRD_CMD_MAX];
2437 time_t now;
2439 if (file == NULL) return 0;
2441 {
2442 char *reason = "unknown error";
2443 int status = 0;
2444 struct stat statbuf;
2446 memset(&statbuf, 0, sizeof(statbuf));
2447 if (stat(file, &statbuf) != 0)
2448 {
2449 reason = "stat error";
2450 status = errno;
2451 }
2452 else if (!S_ISREG(statbuf.st_mode))
2453 {
2454 reason = "not a regular file";
2455 status = EPERM;
2456 }
2457 if (statbuf.st_uid != daemon_uid)
2458 {
2459 reason = "not owned by daemon user";
2460 status = EACCES;
2461 }
2462 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2463 {
2464 reason = "must not be user/group writable";
2465 status = EACCES;
2466 }
2468 if (status != 0)
2469 {
2470 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2471 file, rrd_strerror(status), reason);
2472 return 0;
2473 }
2474 }
2476 fh = fopen(file, "r");
2477 if (fh == NULL)
2478 {
2479 if (errno != ENOENT)
2480 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2481 file, rrd_strerror(errno));
2482 return 0;
2483 }
2484 else
2485 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2487 now = time(NULL);
2489 while(!feof(fh))
2490 {
2491 size_t entry_len;
2493 ++line;
2494 if (fgets(entry, sizeof(entry), fh) == NULL)
2495 break;
2496 entry_len = strlen(entry);
2498 /* check \n termination in case journal writing crashed mid-line */
2499 if (entry_len == 0)
2500 continue;
2501 else if (entry[entry_len - 1] != '\n')
2502 {
2503 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2504 ++fail_cnt;
2505 continue;
2506 }
2508 entry[entry_len - 1] = '\0';
2510 if (handle_request(NULL, now, entry, entry_len) == 0)
2511 ++entry_cnt;
2512 else
2513 ++fail_cnt;
2514 }
2516 fclose(fh);
2518 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2519 entry_cnt, fail_cnt);
2521 return entry_cnt > 0 ? 1 : 0;
2522 } /* }}} static int journal_replay */
2524 static int journal_sort(const void *v1, const void *v2)
2525 {
2526 char **jn1 = (char **) v1;
2527 char **jn2 = (char **) v2;
2529 return strcmp(*jn1,*jn2);
2530 }
2532 static void journal_init(void) /* {{{ */
2533 {
2534 int had_journal = 0;
2535 DIR *dir;
2536 struct dirent *dent;
2537 char path[PATH_MAX+1];
2539 if (journal_dir == NULL) return;
2541 pthread_mutex_lock(&journal_lock);
2543 journal_cur = calloc(1, sizeof(journal_set));
2544 if (journal_cur == NULL)
2545 {
2546 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2547 return;
2548 }
2550 RRDD_LOG(LOG_INFO, "checking for journal files");
2552 /* Handle old journal files during transition. This gives them the
2553 * correct sort order. TODO: remove after first release
2554 */
2555 {
2556 char old_path[PATH_MAX+1];
2557 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2558 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2559 rename(old_path, path);
2561 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2562 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2563 rename(old_path, path);
2564 }
2566 dir = opendir(journal_dir);
2567 if (!dir) {
2568 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2569 return;
2570 }
2571 while ((dent = readdir(dir)) != NULL)
2572 {
2573 /* looks like a journal file? */
2574 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2575 continue;
2577 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2579 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2580 {
2581 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2582 dent->d_name);
2583 break;
2584 }
2585 }
2586 closedir(dir);
2588 qsort(journal_cur->files, journal_cur->files_num,
2589 sizeof(journal_cur->files[0]), journal_sort);
2591 for (uint i=0; i < journal_cur->files_num; i++)
2592 had_journal += journal_replay(journal_cur->files[i]);
2594 journal_new_file();
2596 /* it must have been a crash. start a flush */
2597 if (had_journal && config_flush_at_shutdown)
2598 flush_old_values(-1);
2600 pthread_mutex_unlock(&journal_lock);
2602 RRDD_LOG(LOG_INFO, "journal processing complete");
2604 } /* }}} static void journal_init */
2606 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2607 {
2608 assert(sock != NULL);
2610 free(sock->rbuf); sock->rbuf = NULL;
2611 free(sock->wbuf); sock->wbuf = NULL;
2612 free(sock);
2613 } /* }}} void free_listen_socket */
2615 static void close_connection(listen_socket_t *sock) /* {{{ */
2616 {
2617 if (sock->fd >= 0)
2618 {
2619 close(sock->fd);
2620 sock->fd = -1;
2621 }
2623 free_listen_socket(sock);
2625 } /* }}} void close_connection */
2627 static void *connection_thread_main (void *args) /* {{{ */
2628 {
2629 listen_socket_t *sock;
2630 int fd;
2632 sock = (listen_socket_t *) args;
2633 fd = sock->fd;
2635 /* init read buffers */
2636 sock->next_read = sock->next_cmd = 0;
2637 sock->rbuf = malloc(RBUF_SIZE);
2638 if (sock->rbuf == NULL)
2639 {
2640 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2641 close_connection(sock);
2642 return NULL;
2643 }
2645 pthread_mutex_lock (&connection_threads_lock);
2646 #ifdef HAVE_LIBWRAP
2647 /* LIBWRAP does not support multiple threads! By putting this code
2648 inside pthread_mutex_lock we do not have to worry about request_info
2649 getting overwritten by another thread.
2650 */
2651 struct request_info req;
2652 request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2653 fromhost(&req);
2654 if(!hosts_access(&req)) {
2655 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2656 pthread_mutex_unlock (&connection_threads_lock);
2657 close_connection(sock);
2658 return NULL;
2659 }
2660 #endif /* HAVE_LIBWRAP */
2661 connection_threads_num++;
2662 pthread_mutex_unlock (&connection_threads_lock);
2664 while (state == RUNNING)
2665 {
2666 char *cmd;
2667 ssize_t cmd_len;
2668 ssize_t rbytes;
2669 time_t now;
2671 struct pollfd pollfd;
2672 int status;
2674 pollfd.fd = fd;
2675 pollfd.events = POLLIN | POLLPRI;
2676 pollfd.revents = 0;
2678 status = poll (&pollfd, 1, /* timeout = */ 500);
2679 if (state != RUNNING)
2680 break;
2681 else if (status == 0) /* timeout */
2682 continue;
2683 else if (status < 0) /* error */
2684 {
2685 status = errno;
2686 if (status != EINTR)
2687 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2688 continue;
2689 }
2691 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2692 break;
2693 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2694 {
2695 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2696 "poll(2) returned something unexpected: %#04hx",
2697 pollfd.revents);
2698 break;
2699 }
2701 rbytes = read(fd, sock->rbuf + sock->next_read,
2702 RBUF_SIZE - sock->next_read);
2703 if (rbytes < 0)
2704 {
2705 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2706 break;
2707 }
2708 else if (rbytes == 0)
2709 break; /* eof */
2711 sock->next_read += rbytes;
2713 if (sock->batch_start)
2714 now = sock->batch_start;
2715 else
2716 now = time(NULL);
2718 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2719 {
2720 status = handle_request (sock, now, cmd, cmd_len+1);
2721 if (status != 0)
2722 goto out_close;
2723 }
2724 }
2726 out_close:
2727 close_connection(sock);
2729 /* Remove this thread from the connection threads list */
2730 pthread_mutex_lock (&connection_threads_lock);
2731 connection_threads_num--;
2732 if (connection_threads_num <= 0)
2733 pthread_cond_broadcast(&connection_threads_done);
2734 pthread_mutex_unlock (&connection_threads_lock);
2736 return (NULL);
2737 } /* }}} void *connection_thread_main */
2739 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2740 {
2741 int fd;
2742 struct sockaddr_un sa;
2743 listen_socket_t *temp;
2744 int status;
2745 const char *path;
2746 char *path_copy, *dir;
2748 path = sock->addr;
2749 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2750 path += strlen("unix:");
2752 /* dirname may modify its argument */
2753 path_copy = strdup(path);
2754 if (path_copy == NULL)
2755 {
2756 fprintf(stderr, "rrdcached: strdup(): %s\n",
2757 rrd_strerror(errno));
2758 return (-1);
2759 }
2761 dir = dirname(path_copy);
2762 if (rrd_mkdir_p(dir, 0777) != 0)
2763 {
2764 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2765 dir, rrd_strerror(errno));
2766 return (-1);
2767 }
2769 free(path_copy);
2771 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2772 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2773 if (temp == NULL)
2774 {
2775 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2776 return (-1);
2777 }
2778 listen_fds = temp;
2779 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2781 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2782 if (fd < 0)
2783 {
2784 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2785 rrd_strerror(errno));
2786 return (-1);
2787 }
2789 memset (&sa, 0, sizeof (sa));
2790 sa.sun_family = AF_UNIX;
2791 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2793 /* if we've gotten this far, we own the pid file. any daemon started
2794 * with the same args must not be alive. therefore, ensure that we can
2795 * create the socket...
2796 */
2797 unlink(path);
2799 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2800 if (status != 0)
2801 {
2802 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2803 path, rrd_strerror(errno));
2804 close (fd);
2805 return (-1);
2806 }
2808 /* tweak the sockets group ownership */
2809 if (sock->socket_group != (gid_t)-1)
2810 {
2811 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2812 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2813 {
2814 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2815 }
2816 }
2818 if (sock->socket_permissions != (mode_t)-1)
2819 {
2820 if (chmod(path, sock->socket_permissions) != 0)
2821 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2822 (unsigned int)sock->socket_permissions, strerror(errno));
2823 }
2825 status = listen (fd, /* backlog = */ 10);
2826 if (status != 0)
2827 {
2828 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2829 path, rrd_strerror(errno));
2830 close (fd);
2831 unlink (path);
2832 return (-1);
2833 }
2835 listen_fds[listen_fds_num].fd = fd;
2836 listen_fds[listen_fds_num].family = PF_UNIX;
2837 strncpy(listen_fds[listen_fds_num].addr, path,
2838 sizeof (listen_fds[listen_fds_num].addr) - 1);
2839 listen_fds_num++;
2841 return (0);
2842 } /* }}} int open_listen_socket_unix */
2844 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2845 {
2846 struct addrinfo ai_hints;
2847 struct addrinfo *ai_res;
2848 struct addrinfo *ai_ptr;
2849 char addr_copy[NI_MAXHOST];
2850 char *addr;
2851 char *port;
2852 int status;
2854 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2855 addr_copy[sizeof (addr_copy) - 1] = 0;
2856 addr = addr_copy;
2858 memset (&ai_hints, 0, sizeof (ai_hints));
2859 ai_hints.ai_flags = 0;
2860 #ifdef AI_ADDRCONFIG
2861 ai_hints.ai_flags |= AI_ADDRCONFIG;
2862 #endif
2863 ai_hints.ai_family = AF_UNSPEC;
2864 ai_hints.ai_socktype = SOCK_STREAM;
2866 port = NULL;
2867 if (*addr == '[') /* IPv6+port format */
2868 {
2869 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2870 addr++;
2872 port = strchr (addr, ']');
2873 if (port == NULL)
2874 {
2875 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2876 return (-1);
2877 }
2878 *port = 0;
2879 port++;
2881 if (*port == ':')
2882 port++;
2883 else if (*port == 0)
2884 port = NULL;
2885 else
2886 {
2887 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2888 return (-1);
2889 }
2890 } /* if (*addr == '[') */
2891 else
2892 {
2893 port = rindex(addr, ':');
2894 if (port != NULL)
2895 {
2896 *port = 0;
2897 port++;
2898 }
2899 }
2900 ai_res = NULL;
2901 status = getaddrinfo (addr,
2902 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2903 &ai_hints, &ai_res);
2904 if (status != 0)
2905 {
2906 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2907 addr, gai_strerror (status));
2908 return (-1);
2909 }
2911 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2912 {
2913 int fd;
2914 listen_socket_t *temp;
2915 int one = 1;
2917 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2918 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2919 if (temp == NULL)
2920 {
2921 fprintf (stderr,
2922 "rrdcached: open_listen_socket_network: realloc failed.\n");
2923 continue;
2924 }
2925 listen_fds = temp;
2926 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2928 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2929 if (fd < 0)
2930 {
2931 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2932 rrd_strerror(errno));
2933 continue;
2934 }
2936 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2938 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2939 if (status != 0)
2940 {
2941 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2942 sock->addr, rrd_strerror(errno));
2943 close (fd);
2944 continue;
2945 }
2947 status = listen (fd, /* backlog = */ 10);
2948 if (status != 0)
2949 {
2950 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2951 sock->addr, rrd_strerror(errno));
2952 close (fd);
2953 freeaddrinfo(ai_res);
2954 return (-1);
2955 }
2957 listen_fds[listen_fds_num].fd = fd;
2958 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2959 listen_fds_num++;
2960 } /* for (ai_ptr) */
2962 freeaddrinfo(ai_res);
2963 return (0);
2964 } /* }}} static int open_listen_socket_network */
2966 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2967 {
2968 assert(sock != NULL);
2969 assert(sock->addr != NULL);
2971 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2972 || sock->addr[0] == '/')
2973 return (open_listen_socket_unix(sock));
2974 else
2975 return (open_listen_socket_network(sock));
2976 } /* }}} int open_listen_socket */
2978 #ifndef SD_LISTEN_FDS_START
2979 # define SD_LISTEN_FDS_START 3
2980 #endif
2981 /*
2982 * returns number of descriptors passed from systemd
2983 */
2984 static int open_listen_sockets_systemd(void) /* {{{ */
2985 {
2986 listen_socket_t *temp;
2987 struct sockaddr_un sa;
2988 socklen_t l;
2989 int sd_fd;
2990 const char *env;
2991 unsigned long n;
2993 /* check if it for us */
2994 env = getenv("LISTEN_PID");
2995 if (!env)
2996 return 0;
2998 n = strtoul(env, NULL, 10);
2999 if (!n || n == ULONG_MAX || (pid_t)n != getpid())
3000 return 0;
3002 /* get the number of passed descriptors */
3003 env = getenv("LISTEN_FDS");
3004 if (!env)
3005 return 0;
3007 n = strtoul(env, NULL, 10);
3008 if (!n || n == ULONG_MAX)
3009 return 0;
3011 temp = (listen_socket_t *) rrd_realloc (listen_fds,
3012 sizeof (listen_fds[0]) * (listen_fds_num + n));
3013 if (temp == NULL)
3014 {
3015 fprintf (stderr, "rrdcached: open_listen_socket_systemd: realloc failed.\n");
3016 return 0;
3017 }
3018 listen_fds = temp;
3020 for (unsigned int i = 0; i < n; i++)
3021 {
3022 sd_fd = SD_LISTEN_FDS_START + i;
3024 l = sizeof(sa);
3025 memset(&sa, 0, l);
3026 if (getsockname(sd_fd, &sa, &l) < 0)
3027 {
3028 fprintf(stderr, "open_listen_sockets_systemd: problem getting fd %d: %s\n", sd_fd, rrd_strerror (errno));
3029 return i;
3030 }
3032 listen_fds[listen_fds_num].fd = sd_fd;
3033 listen_fds[listen_fds_num].family = sa.sun_family;
3034 listen_fds_num++;
3035 }
3037 return n;
3038 } /* }}} open_listen_sockets_systemd */
3040 static void open_listen_sockets_traditional(void) /* {{{ */
3041 {
3042 if (config_listen_address_list_len > 0)
3043 {
3044 for (size_t i = 0; i < config_listen_address_list_len; i++)
3045 open_listen_socket (config_listen_address_list[i]);
3047 rrd_free_ptrs((void ***) &config_listen_address_list,
3048 &config_listen_address_list_len);
3049 }
3050 else
3051 {
3052 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3053 sizeof(default_socket.addr) - 1);
3054 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3056 if (default_socket.permissions == 0)
3057 socket_permission_set_all (&default_socket);
3059 open_listen_socket (&default_socket);
3060 }
3061 } /* }}} open_list_sockets_traditional */
3063 static int close_listen_sockets (void) /* {{{ */
3064 {
3065 size_t i;
3067 for (i = 0; i < listen_fds_num; i++)
3068 {
3069 close (listen_fds[i].fd);
3071 if (listen_fds[i].family == PF_UNIX)
3072 unlink(listen_fds[i].addr);
3073 }
3075 free (listen_fds);
3076 listen_fds = NULL;
3077 listen_fds_num = 0;
3079 return (0);
3080 } /* }}} int close_listen_sockets */
3082 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3083 {
3084 struct pollfd *pollfds;
3085 int pollfds_num;
3086 int status;
3087 int i;
3089 if (listen_fds_num < 1)
3090 {
3091 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3092 return (NULL);
3093 }
3095 pollfds_num = listen_fds_num;
3096 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3097 if (pollfds == NULL)
3098 {
3099 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3100 return (NULL);
3101 }
3102 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3104 RRDD_LOG(LOG_INFO, "listening for connections");
3106 while (state == RUNNING)
3107 {
3108 for (i = 0; i < pollfds_num; i++)
3109 {
3110 pollfds[i].fd = listen_fds[i].fd;
3111 pollfds[i].events = POLLIN | POLLPRI;
3112 pollfds[i].revents = 0;
3113 }
3115 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3116 if (state != RUNNING)
3117 break;
3118 else if (status == 0) /* timeout */
3119 continue;
3120 else if (status < 0) /* error */
3121 {
3122 status = errno;
3123 if (status != EINTR)
3124 {
3125 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3126 }
3127 continue;
3128 }
3130 for (i = 0; i < pollfds_num; i++)
3131 {
3132 listen_socket_t *client_sock;
3133 struct sockaddr_storage client_sa;
3134 socklen_t client_sa_size;
3135 pthread_t tid;
3136 pthread_attr_t attr;
3138 if (pollfds[i].revents == 0)
3139 continue;
3141 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3142 {
3143 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3144 "poll(2) returned something unexpected for listen FD #%i.",
3145 pollfds[i].fd);
3146 continue;
3147 }
3149 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3150 if (client_sock == NULL)
3151 {
3152 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3153 continue;
3154 }
3155 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3157 client_sa_size = sizeof (client_sa);
3158 client_sock->fd = accept (pollfds[i].fd,
3159 (struct sockaddr *) &client_sa, &client_sa_size);
3160 if (client_sock->fd < 0)
3161 {
3162 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3163 free(client_sock);
3164 continue;
3165 }
3167 pthread_attr_init (&attr);
3168 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3170 status = pthread_create (&tid, &attr, connection_thread_main,
3171 client_sock);
3172 if (status != 0)
3173 {
3174 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3175 close_connection(client_sock);
3176 continue;
3177 }
3178 } /* for (pollfds_num) */
3179 } /* while (state == RUNNING) */
3181 RRDD_LOG(LOG_INFO, "starting shutdown");
3183 close_listen_sockets ();
3185 pthread_mutex_lock (&connection_threads_lock);
3186 while (connection_threads_num > 0)
3187 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3188 pthread_mutex_unlock (&connection_threads_lock);
3190 free(pollfds);
3192 return (NULL);
3193 } /* }}} void *listen_thread_main */
3195 static int daemonize (void) /* {{{ */
3196 {
3197 int pid_fd;
3198 char *base_dir;
3200 daemon_uid = geteuid();
3202 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3203 if (pid_fd < 0)
3204 pid_fd = check_pidfile();
3205 if (pid_fd < 0)
3206 return pid_fd;
3208 /* gather sockets passed from systemd;
3209 * if none, open all the listen sockets from config or default */
3211 if (!(open_listen_sockets_systemd() > 0))
3212 open_listen_sockets_traditional();
3214 if (listen_fds_num < 1)
3215 {
3216 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3217 goto error;
3218 }
3220 if (!stay_foreground)
3221 {
3222 pid_t child;
3224 child = fork ();
3225 if (child < 0)
3226 {
3227 fprintf (stderr, "daemonize: fork(2) failed.\n");
3228 goto error;
3229 }
3230 else if (child > 0)
3231 exit(0);
3233 /* Become session leader */
3234 setsid ();
3236 /* Open the first three file descriptors to /dev/null */
3237 close (2);
3238 close (1);
3239 close (0);
3241 open ("/dev/null", O_RDWR);
3242 if (dup(0) == -1 || dup(0) == -1){
3243 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3244 }
3245 } /* if (!stay_foreground) */
3247 /* Change into the /tmp directory. */
3248 base_dir = (config_base_dir != NULL)
3249 ? config_base_dir
3250 : "/tmp";
3252 if (chdir (base_dir) != 0)
3253 {
3254 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3255 goto error;
3256 }
3258 install_signal_handlers();
3260 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3261 RRDD_LOG(LOG_INFO, "starting up");
3263 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3264 (GDestroyNotify) free_cache_item);
3265 if (cache_tree == NULL)
3266 {
3267 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3268 goto error;
3269 }
3271 return write_pidfile (pid_fd);
3273 error:
3274 remove_pidfile();
3275 return -1;
3276 } /* }}} int daemonize */
3278 static int cleanup (void) /* {{{ */
3279 {
3280 pthread_cond_broadcast (&flush_cond);
3281 pthread_join (flush_thread, NULL);
3283 pthread_cond_broadcast (&queue_cond);
3284 for (int i = 0; i < config_queue_threads; i++)
3285 pthread_join (queue_threads[i], NULL);
3287 if (config_flush_at_shutdown)
3288 {
3289 assert(cache_queue_head == NULL);
3290 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3291 }
3293 free(queue_threads);
3294 free(config_base_dir);
3296 pthread_mutex_lock(&cache_lock);
3297 g_tree_destroy(cache_tree);
3299 pthread_mutex_lock(&journal_lock);
3300 journal_done();
3302 RRDD_LOG(LOG_INFO, "goodbye");
3303 closelog ();
3305 remove_pidfile ();
3306 free(config_pid_file);
3308 return (0);
3309 } /* }}} int cleanup */
3311 static int read_options (int argc, char **argv) /* {{{ */
3312 {
3313 int option;
3314 int status = 0;
3316 socket_permission_clear (&default_socket);
3318 default_socket.socket_group = (gid_t)-1;
3319 default_socket.socket_permissions = (mode_t)-1;
3321 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3322 {
3323 switch (option)
3324 {
3325 case 'O':
3326 opt_no_overwrite = 1;
3327 break;
3329 case 'g':
3330 stay_foreground=1;
3331 break;
3333 case 'l':
3334 {
3335 listen_socket_t *new;
3337 new = malloc(sizeof(listen_socket_t));
3338 if (new == NULL)
3339 {
3340 fprintf(stderr, "read_options: malloc failed.\n");
3341 return(2);
3342 }
3343 memset(new, 0, sizeof(listen_socket_t));
3345 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3347 /* Add permissions to the socket {{{ */
3348 if (default_socket.permissions != 0)
3349 {
3350 socket_permission_copy (new, &default_socket);
3351 }
3352 else /* if (default_socket.permissions == 0) */
3353 {
3354 /* Add permission for ALL commands to the socket. */
3355 socket_permission_set_all (new);
3356 }
3357 /* }}} Done adding permissions. */
3359 new->socket_group = default_socket.socket_group;
3360 new->socket_permissions = default_socket.socket_permissions;
3362 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3363 &config_listen_address_list_len, new))
3364 {
3365 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3366 return (2);
3367 }
3368 }
3369 break;
3371 /* set socket group permissions */
3372 case 's':
3373 {
3374 gid_t group_gid;
3375 struct group *grp;
3377 group_gid = strtoul(optarg, NULL, 10);
3378 if (errno != EINVAL && group_gid>0)
3379 {
3380 /* we were passed a number */
3381 grp = getgrgid(group_gid);
3382 }
3383 else
3384 {
3385 grp = getgrnam(optarg);
3386 }
3388 if (grp)
3389 {
3390 default_socket.socket_group = grp->gr_gid;
3391 }
3392 else
3393 {
3394 /* no idea what the user wanted... */
3395 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3396 return (5);
3397 }
3398 }
3399 break;
3401 /* set socket file permissions */
3402 case 'm':
3403 {
3404 long tmp;
3405 char *endptr = NULL;
3407 tmp = strtol (optarg, &endptr, 8);
3408 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3409 || (tmp > 07777) || (tmp < 0)) {
3410 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3411 optarg);
3412 return (5);
3413 }
3415 default_socket.socket_permissions = (mode_t)tmp;
3416 }
3417 break;
3419 case 'P':
3420 {
3421 char *optcopy;
3422 char *saveptr;
3423 char *dummy;
3424 char *ptr;
3426 socket_permission_clear (&default_socket);
3428 optcopy = strdup (optarg);
3429 dummy = optcopy;
3430 saveptr = NULL;
3431 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3432 {
3433 dummy = NULL;
3434 status = socket_permission_add (&default_socket, ptr);
3435 if (status != 0)
3436 {
3437 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3438 "socket failed. Most likely, this permission doesn't "
3439 "exist. Check your command line.\n", ptr);
3440 status = 4;
3441 }
3442 }
3444 free (optcopy);
3445 }
3446 break;
3448 case 'f':
3449 {
3450 int temp;
3452 temp = atoi (optarg);
3453 if (temp > 0)
3454 config_flush_interval = temp;
3455 else
3456 {
3457 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3458 status = 3;
3459 }
3460 }
3461 break;
3463 case 'w':
3464 {
3465 int temp;
3467 temp = atoi (optarg);
3468 if (temp > 0)
3469 config_write_interval = temp;
3470 else
3471 {
3472 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3473 status = 2;
3474 }
3475 }
3476 break;
3478 case 'z':
3479 {
3480 int temp;
3482 temp = atoi(optarg);
3483 if (temp > 0)
3484 config_write_jitter = temp;
3485 else
3486 {
3487 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3488 status = 2;
3489 }
3491 break;
3492 }
3494 case 't':
3495 {
3496 int threads;
3497 threads = atoi(optarg);
3498 if (threads >= 1)
3499 config_queue_threads = threads;
3500 else
3501 {
3502 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3503 return 1;
3504 }
3505 }
3506 break;
3508 case 'B':
3509 config_write_base_only = 1;
3510 break;
3512 case 'b':
3513 {
3514 size_t len;
3515 char base_realpath[PATH_MAX];
3517 if (config_base_dir != NULL)
3518 free (config_base_dir);
3519 config_base_dir = strdup (optarg);
3520 if (config_base_dir == NULL)
3521 {
3522 fprintf (stderr, "read_options: strdup failed.\n");
3523 return (3);
3524 }
3526 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3527 {
3528 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3529 config_base_dir, rrd_strerror (errno));
3530 return (3);
3531 }
3533 /* make sure that the base directory is not resolved via
3534 * symbolic links. this makes some performance-enhancing
3535 * assumptions possible (we don't have to resolve paths
3536 * that start with a "/")
3537 */
3538 if (realpath(config_base_dir, base_realpath) == NULL)
3539 {
3540 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3541 "%s\n", config_base_dir, rrd_strerror(errno));
3542 return 5;
3543 }
3545 len = strlen (config_base_dir);
3546 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3547 {
3548 config_base_dir[len - 1] = 0;
3549 len--;
3550 }
3552 if (len < 1)
3553 {
3554 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3555 return (4);
3556 }
3558 _config_base_dir_len = len;
3560 len = strlen (base_realpath);
3561 while ((len > 0) && (base_realpath[len - 1] == '/'))
3562 {
3563 base_realpath[len - 1] = '\0';
3564 len--;
3565 }
3567 if (strncmp(config_base_dir,
3568 base_realpath, sizeof(base_realpath)) != 0)
3569 {
3570 fprintf(stderr,
3571 "Base directory (-b) resolved via file system links!\n"
3572 "Please consult rrdcached '-b' documentation!\n"
3573 "Consider specifying the real directory (%s)\n",
3574 base_realpath);
3575 return 5;
3576 }
3577 }
3578 break;
3580 case 'p':
3581 {
3582 if (config_pid_file != NULL)
3583 free (config_pid_file);
3584 config_pid_file = strdup (optarg);
3585 if (config_pid_file == NULL)
3586 {
3587 fprintf (stderr, "read_options: strdup failed.\n");
3588 return (3);
3589 }
3590 }
3591 break;
3593 case 'F':
3594 config_flush_at_shutdown = 1;
3595 break;
3597 case 'j':
3598 {
3599 char journal_dir_actual[PATH_MAX];
3600 journal_dir = realpath((const char *)optarg, journal_dir_actual);
3601 if (journal_dir)
3602 {
3603 // if we were able to properly resolve the path, lets have a copy
3604 // for use outside this block.
3605 journal_dir = strdup(journal_dir);
3606 status = rrd_mkdir_p(journal_dir, 0777);
3607 if (status != 0)
3608 {
3609 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3610 journal_dir, rrd_strerror(errno));
3611 return 6;
3612 }
3613 if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3614 {
3615 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3616 errno ? rrd_strerror(errno) : "");
3617 return 6;
3618 }
3619 } else {
3620 fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3621 errno ? rrd_strerror(errno) : "");
3622 return 6;
3623 }
3624 }
3625 break;
3627 case 'a':
3628 {
3629 int temp = atoi(optarg);
3630 if (temp > 0)
3631 config_alloc_chunk = temp;
3632 else
3633 {
3634 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3635 return 10;
3636 }
3637 }
3638 break;
3640 case 'h':
3641 case '?':
3642 printf ("RRDCacheD %s\n"
3643 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3644 "\n"
3645 "Usage: rrdcached [options]\n"
3646 "\n"
3647 "Valid options are:\n"
3648 " -l <address> Socket address to listen to.\n"
3649 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3650 " -P <perms> Sets the permissions to assign to all following "
3651 "sockets\n"
3652 " -w <seconds> Interval in which to write data.\n"
3653 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3654 " -t <threads> Number of write threads.\n"
3655 " -f <seconds> Interval in which to flush dead data.\n"
3656 " -p <file> Location of the PID-file.\n"
3657 " -b <dir> Base directory to change to.\n"
3658 " -B Restrict file access to paths within -b <dir>\n"
3659 " -g Do not fork and run in the foreground.\n"
3660 " -j <dir> Directory in which to create the journal files.\n"
3661 " -F Always flush all updates at shutdown\n"
3662 " -s <id|name> Group owner of all following UNIX sockets\n"
3663 " (the socket will also have read/write permissions "
3664 "for that group)\n"
3665 " -m <mode> File permissions (octal) of all following UNIX "
3666 "sockets\n"
3667 " -a <size> Memory allocation chunk size. Default is 1.\n"
3668 " -O Do not allow CREATE commands to overwrite existing\n"
3669 " files, even if asked to.\n"
3670 "\n"
3671 "For more information and a detailed description of all options "
3672 "please refer\n"
3673 "to the rrdcached(1) manual page.\n",
3674 VERSION);
3675 if (option == 'h')
3676 status = -1;
3677 else
3678 status = 1;
3679 break;
3680 } /* switch (option) */
3681 } /* while (getopt) */
3683 /* advise the user when values are not sane */
3684 if (config_flush_interval < 2 * config_write_interval)
3685 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3686 " 2x write interval (-w) !\n");
3687 if (config_write_jitter > config_write_interval)
3688 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3689 " write interval (-w) !\n");
3691 if (config_write_base_only && config_base_dir == NULL)
3692 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3693 " Consult the rrdcached documentation\n");
3695 if (journal_dir == NULL)
3696 config_flush_at_shutdown = 1;
3698 return (status);
3699 } /* }}} int read_options */
3701 int main (int argc, char **argv)
3702 {
3703 int status;
3705 status = read_options (argc, argv);
3706 if (status != 0)
3707 {
3708 if (status < 0)
3709 status = 0;
3710 return (status);
3711 }
3713 status = daemonize ();
3714 if (status != 0)
3715 {
3716 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3717 return (1);
3718 }
3720 journal_init();
3722 /* start the queue threads */
3723 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3724 if (queue_threads == NULL)
3725 {
3726 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3727 cleanup();
3728 return (1);
3729 }
3730 for (int i = 0; i < config_queue_threads; i++)
3731 {
3732 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3733 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3734 if (status != 0)
3735 {
3736 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3737 cleanup();
3738 return (1);
3739 }
3740 }
3742 /* start the flush thread */
3743 memset(&flush_thread, 0, sizeof(flush_thread));
3744 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3745 if (status != 0)
3746 {
3747 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3748 cleanup();
3749 return (1);
3750 }
3752 listen_thread_main (NULL);
3753 cleanup ();
3755 return (0);
3756 } /* int main */
3758 /*
3759 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3760 */