1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120 do { \
121 if (stay_foreground) { \
122 fprintf(stderr, __VA_ARGS__); \
123 fprintf(stderr, "\n"); } \
124 syslog ((severity), __VA_ARGS__); \
125 } while (0)
127 /*
128 * Types
129 */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
133 {
134 int fd;
135 char addr[PATH_MAX + 1];
136 int family;
138 /* state for BATCH processing */
139 time_t batch_start;
140 int batch_cmd;
142 /* buffered IO */
143 char *rbuf;
144 off_t next_cmd;
145 off_t next_read;
147 char *wbuf;
148 ssize_t wbuf_len;
150 uint32_t permissions;
152 gid_t socket_group;
153 mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
161 time_t UNUSED(now),\
162 char UNUSED(*buffer),\
163 size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO command_t UNUSED(*cmd),\
166 DISPATCH_PROTO
168 struct command_s {
169 char *cmd;
170 int (*handler)(HANDLER_PROTO);
172 char context; /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT (1<<0)
174 #define CMD_CONTEXT_BATCH (1<<1)
175 #define CMD_CONTEXT_JOURNAL (1<<2)
176 #define CMD_CONTEXT_ANY (0x7f)
178 char *syntax;
179 char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
185 {
186 char *file;
187 char **values;
188 size_t values_num; /* number of valid pointers */
189 size_t values_alloc; /* number of allocated pointers */
190 time_t last_flush_time;
191 double last_update_stamp;
192 #define CI_FLAGS_IN_TREE (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194 int flags;
195 pthread_cond_t flushed;
196 cache_item_t *prev;
197 cache_item_t *next;
198 };
200 struct callback_flush_data_s
201 {
202 time_t now;
203 time_t abs_timeout;
204 char **keys;
205 size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
209 enum queue_side_e
210 {
211 HEAD,
212 TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
216 /* describe a set of journal files */
217 typedef struct {
218 char **files;
219 size_t files_num;
220 } journal_set;
222 #define RBUF_SIZE (RRD_CMD_MAX*2)
224 /*
225 * Variables
226 */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
233 static listen_socket_t default_socket;
235 enum {
236 RUNNING, /* normal operation */
237 FLUSHING, /* flushing remaining values */
238 SHUTDOWN /* shutting down */
239 } state = RUNNING;
241 static pthread_t *queue_threads;
242 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
243 static int config_queue_threads = 4;
245 static pthread_t flush_thread;
246 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
248 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
249 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
250 static int connection_threads_num = 0;
252 /* Cache stuff */
253 static GTree *cache_tree = NULL;
254 static cache_item_t *cache_queue_head = NULL;
255 static cache_item_t *cache_queue_tail = NULL;
256 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
258 static int config_write_interval = 300;
259 static int config_write_jitter = 0;
260 static int config_flush_interval = 3600;
261 static int config_flush_at_shutdown = 0;
262 static char *config_pid_file = NULL;
263 static char *config_base_dir = NULL;
264 static size_t _config_base_dir_len = 0;
265 static int config_write_base_only = 0;
266 static size_t config_alloc_chunk = 1;
268 static listen_socket_t **config_listen_address_list = NULL;
269 static size_t config_listen_address_list_len = 0;
271 static uint64_t stats_queue_length = 0;
272 static uint64_t stats_updates_received = 0;
273 static uint64_t stats_flush_received = 0;
274 static uint64_t stats_updates_written = 0;
275 static uint64_t stats_data_sets_written = 0;
276 static uint64_t stats_journal_bytes = 0;
277 static uint64_t stats_journal_rotate = 0;
278 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
280 static int opt_no_overwrite = 0; /* default for the daemon */
282 /* Journaled updates */
283 #define JOURNAL_REPLAY(s) ((s) == NULL)
284 #define JOURNAL_BASE "rrd.journal"
285 static journal_set *journal_cur = NULL;
286 static journal_set *journal_old = NULL;
287 static char *journal_dir = NULL;
288 static FILE *journal_fh = NULL; /* current journal file handle */
289 static long journal_size = 0; /* current journal size */
290 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
291 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
292 static int journal_write(char *cmd, char *args);
293 static void journal_done(void);
294 static void journal_rotate(void);
296 /* prototypes for forward refernces */
297 static int handle_request_help (HANDLER_PROTO);
299 /*
300 * Functions
301 */
302 static void sig_common (const char *sig) /* {{{ */
303 {
304 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
305 state = FLUSHING;
306 pthread_cond_broadcast(&flush_cond);
307 pthread_cond_broadcast(&queue_cond);
308 } /* }}} void sig_common */
310 static void sig_int_handler (int UNUSED(s)) /* {{{ */
311 {
312 sig_common("INT");
313 } /* }}} void sig_int_handler */
315 static void sig_term_handler (int UNUSED(s)) /* {{{ */
316 {
317 sig_common("TERM");
318 } /* }}} void sig_term_handler */
320 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
321 {
322 config_flush_at_shutdown = 1;
323 sig_common("USR1");
324 } /* }}} void sig_usr1_handler */
326 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
327 {
328 config_flush_at_shutdown = 0;
329 sig_common("USR2");
330 } /* }}} void sig_usr2_handler */
332 static void install_signal_handlers(void) /* {{{ */
333 {
334 /* These structures are static, because `sigaction' behaves weird if the are
335 * overwritten.. */
336 static struct sigaction sa_int;
337 static struct sigaction sa_term;
338 static struct sigaction sa_pipe;
339 static struct sigaction sa_usr1;
340 static struct sigaction sa_usr2;
342 /* Install signal handlers */
343 memset (&sa_int, 0, sizeof (sa_int));
344 sa_int.sa_handler = sig_int_handler;
345 sigaction (SIGINT, &sa_int, NULL);
347 memset (&sa_term, 0, sizeof (sa_term));
348 sa_term.sa_handler = sig_term_handler;
349 sigaction (SIGTERM, &sa_term, NULL);
351 memset (&sa_pipe, 0, sizeof (sa_pipe));
352 sa_pipe.sa_handler = SIG_IGN;
353 sigaction (SIGPIPE, &sa_pipe, NULL);
355 memset (&sa_pipe, 0, sizeof (sa_usr1));
356 sa_usr1.sa_handler = sig_usr1_handler;
357 sigaction (SIGUSR1, &sa_usr1, NULL);
359 memset (&sa_usr2, 0, sizeof (sa_usr2));
360 sa_usr2.sa_handler = sig_usr2_handler;
361 sigaction (SIGUSR2, &sa_usr2, NULL);
363 } /* }}} void install_signal_handlers */
365 static int open_pidfile(char *action, int oflag) /* {{{ */
366 {
367 int fd;
368 const char *file;
369 char *file_copy, *dir;
371 file = (config_pid_file != NULL)
372 ? config_pid_file
373 : LOCALSTATEDIR "/run/rrdcached.pid";
375 /* dirname may modify its argument */
376 file_copy = strdup(file);
377 if (file_copy == NULL)
378 {
379 fprintf(stderr, "rrdcached: strdup(): %s\n",
380 rrd_strerror(errno));
381 return -1;
382 }
384 dir = dirname(file_copy);
385 if (rrd_mkdir_p(dir, 0777) != 0)
386 {
387 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
388 dir, rrd_strerror(errno));
389 return -1;
390 }
392 free(file_copy);
394 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
395 if (fd < 0)
396 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
397 action, file, rrd_strerror(errno));
399 return(fd);
400 } /* }}} static int open_pidfile */
402 /* check existing pid file to see whether a daemon is running */
403 static int check_pidfile(void)
404 {
405 int pid_fd;
406 pid_t pid;
407 char pid_str[16];
409 pid_fd = open_pidfile("open", O_RDWR);
410 if (pid_fd < 0)
411 return pid_fd;
413 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
414 return -1;
416 pid = atoi(pid_str);
417 if (pid <= 0)
418 return -1;
420 /* another running process that we can signal COULD be
421 * a competing rrdcached */
422 if (pid != getpid() && kill(pid, 0) == 0)
423 {
424 fprintf(stderr,
425 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
426 close(pid_fd);
427 return -1;
428 }
430 lseek(pid_fd, 0, SEEK_SET);
431 if (ftruncate(pid_fd, 0) == -1)
432 {
433 fprintf(stderr,
434 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
435 close(pid_fd);
436 return -1;
437 }
439 fprintf(stderr,
440 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
441 "rrdcached: starting normally.\n", pid);
443 return pid_fd;
444 } /* }}} static int check_pidfile */
446 static int write_pidfile (int fd) /* {{{ */
447 {
448 pid_t pid;
449 FILE *fh;
451 pid = getpid ();
453 fh = fdopen (fd, "w");
454 if (fh == NULL)
455 {
456 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
457 close(fd);
458 return (-1);
459 }
461 fprintf (fh, "%i\n", (int) pid);
462 fclose (fh);
464 return (0);
465 } /* }}} int write_pidfile */
467 static int remove_pidfile (void) /* {{{ */
468 {
469 char *file;
470 int status;
472 file = (config_pid_file != NULL)
473 ? config_pid_file
474 : LOCALSTATEDIR "/run/rrdcached.pid";
476 status = unlink (file);
477 if (status == 0)
478 return (0);
479 return (errno);
480 } /* }}} int remove_pidfile */
482 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
483 {
484 char *eol;
486 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
487 sock->next_read - sock->next_cmd);
489 if (eol == NULL)
490 {
491 /* no commands left, move remainder back to front of rbuf */
492 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
493 sock->next_read - sock->next_cmd);
494 sock->next_read -= sock->next_cmd;
495 sock->next_cmd = 0;
496 *len = 0;
497 return NULL;
498 }
499 else
500 {
501 char *cmd = sock->rbuf + sock->next_cmd;
502 *eol = '\0';
504 sock->next_cmd = eol - sock->rbuf + 1;
506 if (eol > sock->rbuf && *(eol-1) == '\r')
507 *(--eol) = '\0'; /* handle "\r\n" EOL */
509 *len = eol - cmd;
511 return cmd;
512 }
514 /* NOTREACHED */
515 assert(1==0);
516 } /* }}} char *next_cmd */
518 /* add the characters directly to the write buffer */
519 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
520 {
521 char *new_buf;
523 assert(sock != NULL);
525 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
526 if (new_buf == NULL)
527 {
528 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
529 return -1;
530 }
532 strncpy(new_buf + sock->wbuf_len, str, len + 1);
534 sock->wbuf = new_buf;
535 sock->wbuf_len += len;
537 return 0;
538 } /* }}} static int add_to_wbuf */
540 /* add the text to the "extra" info that's sent after the status line */
541 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
542 {
543 va_list argp;
544 char buffer[RRD_CMD_MAX];
545 int len;
547 if (JOURNAL_REPLAY(sock)) return 0;
548 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
550 va_start(argp, fmt);
551 #ifdef HAVE_VSNPRINTF
552 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
553 #else
554 len = vsprintf(buffer, fmt, argp);
555 #endif
556 va_end(argp);
557 if (len < 0)
558 {
559 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
560 return -1;
561 }
563 return add_to_wbuf(sock, buffer, len);
564 } /* }}} static int add_response_info */
566 static int count_lines(char *str) /* {{{ */
567 {
568 int lines = 0;
570 if (str != NULL)
571 {
572 while ((str = strchr(str, '\n')) != NULL)
573 {
574 ++lines;
575 ++str;
576 }
577 }
579 return lines;
580 } /* }}} static int count_lines */
582 /* send the response back to the user.
583 * returns 0 on success, -1 on error
584 * write buffer is always zeroed after this call */
585 static int send_response (listen_socket_t *sock, response_code rc,
586 char *fmt, ...) /* {{{ */
587 {
588 va_list argp;
589 char buffer[RRD_CMD_MAX];
590 int lines;
591 ssize_t wrote;
592 int rclen, len;
594 if (JOURNAL_REPLAY(sock)) return rc;
596 if (sock->batch_start)
597 {
598 if (rc == RESP_OK)
599 return rc; /* no response on success during BATCH */
600 lines = sock->batch_cmd;
601 }
602 else if (rc == RESP_OK)
603 lines = count_lines(sock->wbuf);
604 else
605 lines = -1;
607 rclen = sprintf(buffer, "%d ", lines);
608 va_start(argp, fmt);
609 #ifdef HAVE_VSNPRINTF
610 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
611 #else
612 len = vsprintf(buffer+rclen, fmt, argp);
613 #endif
614 va_end(argp);
615 if (len < 0)
616 return -1;
618 len += rclen;
620 /* append the result to the wbuf, don't write to the user */
621 if (sock->batch_start)
622 return add_to_wbuf(sock, buffer, len);
624 /* first write must be complete */
625 if (len != write(sock->fd, buffer, len))
626 {
627 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
628 return -1;
629 }
631 if (sock->wbuf != NULL && rc == RESP_OK)
632 {
633 wrote = 0;
634 while (wrote < sock->wbuf_len)
635 {
636 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
637 if (wb <= 0)
638 {
639 RRDD_LOG(LOG_INFO, "send_response: could not write results");
640 return -1;
641 }
642 wrote += wb;
643 }
644 }
646 free(sock->wbuf); sock->wbuf = NULL;
647 sock->wbuf_len = 0;
649 return 0;
650 } /* }}} */
652 static void wipe_ci_values(cache_item_t *ci, time_t when)
653 {
654 ci->values = NULL;
655 ci->values_num = 0;
656 ci->values_alloc = 0;
658 ci->last_flush_time = when;
659 if (config_write_jitter > 0)
660 ci->last_flush_time += (rrd_random() % config_write_jitter);
661 }
663 /* remove_from_queue
664 * remove a "cache_item_t" item from the queue.
665 * must hold 'cache_lock' when calling this
666 */
667 static void remove_from_queue(cache_item_t *ci) /* {{{ */
668 {
669 if (ci == NULL) return;
670 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
672 if (ci->prev == NULL)
673 cache_queue_head = ci->next; /* reset head */
674 else
675 ci->prev->next = ci->next;
677 if (ci->next == NULL)
678 cache_queue_tail = ci->prev; /* reset the tail */
679 else
680 ci->next->prev = ci->prev;
682 ci->next = ci->prev = NULL;
683 ci->flags &= ~CI_FLAGS_IN_QUEUE;
685 pthread_mutex_lock (&stats_lock);
686 assert (stats_queue_length > 0);
687 stats_queue_length--;
688 pthread_mutex_unlock (&stats_lock);
690 } /* }}} static void remove_from_queue */
692 /* free the resources associated with the cache_item_t
693 * must hold cache_lock when calling this function
694 */
695 static void *free_cache_item(cache_item_t *ci) /* {{{ */
696 {
697 if (ci == NULL) return NULL;
699 remove_from_queue(ci);
701 for (size_t i=0; i < ci->values_num; i++)
702 free(ci->values[i]);
704 free (ci->values);
705 free (ci->file);
707 /* in case anyone is waiting */
708 pthread_cond_broadcast(&ci->flushed);
709 pthread_cond_destroy(&ci->flushed);
711 free (ci);
713 return NULL;
714 } /* }}} static void *free_cache_item */
716 /*
717 * enqueue_cache_item:
718 * `cache_lock' must be acquired before calling this function!
719 */
720 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
721 queue_side_t side)
722 {
723 if (ci == NULL)
724 return (-1);
726 if (ci->values_num == 0)
727 return (0);
729 if (side == HEAD)
730 {
731 if (cache_queue_head == ci)
732 return 0;
734 /* remove if further down in queue */
735 remove_from_queue(ci);
737 ci->prev = NULL;
738 ci->next = cache_queue_head;
739 if (ci->next != NULL)
740 ci->next->prev = ci;
741 cache_queue_head = ci;
743 if (cache_queue_tail == NULL)
744 cache_queue_tail = cache_queue_head;
745 }
746 else /* (side == TAIL) */
747 {
748 /* We don't move values back in the list.. */
749 if (ci->flags & CI_FLAGS_IN_QUEUE)
750 return (0);
752 assert (ci->next == NULL);
753 assert (ci->prev == NULL);
755 ci->prev = cache_queue_tail;
757 if (cache_queue_tail == NULL)
758 cache_queue_head = ci;
759 else
760 cache_queue_tail->next = ci;
762 cache_queue_tail = ci;
763 }
765 ci->flags |= CI_FLAGS_IN_QUEUE;
767 pthread_cond_signal(&queue_cond);
768 pthread_mutex_lock (&stats_lock);
769 stats_queue_length++;
770 pthread_mutex_unlock (&stats_lock);
772 return (0);
773 } /* }}} int enqueue_cache_item */
775 /*
776 * tree_callback_flush:
777 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
778 * while this is in progress.
779 */
780 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
781 gpointer data)
782 {
783 cache_item_t *ci;
784 callback_flush_data_t *cfd;
786 ci = (cache_item_t *) value;
787 cfd = (callback_flush_data_t *) data;
789 if (ci->flags & CI_FLAGS_IN_QUEUE)
790 return FALSE;
792 if (ci->values_num > 0
793 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
794 {
795 enqueue_cache_item (ci, TAIL);
796 }
797 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
798 && (ci->values_num <= 0))
799 {
800 assert ((char *) key == ci->file);
801 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
802 {
803 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
804 return (FALSE);
805 }
806 }
808 return (FALSE);
809 } /* }}} gboolean tree_callback_flush */
811 static int flush_old_values (int max_age)
812 {
813 callback_flush_data_t cfd;
814 size_t k;
816 memset (&cfd, 0, sizeof (cfd));
817 /* Pass the current time as user data so that we don't need to call
818 * `time' for each node. */
819 cfd.now = time (NULL);
820 cfd.keys = NULL;
821 cfd.keys_num = 0;
823 if (max_age > 0)
824 cfd.abs_timeout = cfd.now - max_age;
825 else
826 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
828 /* `tree_callback_flush' will return the keys of all values that haven't
829 * been touched in the last `config_flush_interval' seconds in `cfd'.
830 * The char*'s in this array point to the same memory as ci->file, so we
831 * don't need to free them separately. */
832 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
834 for (k = 0; k < cfd.keys_num; k++)
835 {
836 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
837 /* should never fail, since we have held the cache_lock
838 * the entire time */
839 assert(status == TRUE);
840 }
842 if (cfd.keys != NULL)
843 {
844 free (cfd.keys);
845 cfd.keys = NULL;
846 }
848 return (0);
849 } /* int flush_old_values */
851 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
852 {
853 struct timeval now;
854 struct timespec next_flush;
855 int status;
857 gettimeofday (&now, NULL);
858 next_flush.tv_sec = now.tv_sec + config_flush_interval;
859 next_flush.tv_nsec = 1000 * now.tv_usec;
861 pthread_mutex_lock(&cache_lock);
863 while (state == RUNNING)
864 {
865 gettimeofday (&now, NULL);
866 if ((now.tv_sec > next_flush.tv_sec)
867 || ((now.tv_sec == next_flush.tv_sec)
868 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
869 {
870 RRDD_LOG(LOG_DEBUG, "flushing old values");
872 /* Determine the time of the next cache flush. */
873 next_flush.tv_sec = now.tv_sec + config_flush_interval;
875 /* Flush all values that haven't been written in the last
876 * `config_write_interval' seconds. */
877 flush_old_values (config_write_interval);
879 /* unlock the cache while we rotate so we don't block incoming
880 * updates if the fsync() blocks on disk I/O */
881 pthread_mutex_unlock(&cache_lock);
882 journal_rotate();
883 pthread_mutex_lock(&cache_lock);
884 }
886 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
887 if (status != 0 && status != ETIMEDOUT)
888 {
889 RRDD_LOG (LOG_ERR, "flush_thread_main: "
890 "pthread_cond_timedwait returned %i.", status);
891 }
892 }
894 if (config_flush_at_shutdown)
895 flush_old_values (-1); /* flush everything */
897 state = SHUTDOWN;
899 pthread_mutex_unlock(&cache_lock);
901 return NULL;
902 } /* void *flush_thread_main */
904 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
905 {
906 pthread_mutex_lock (&cache_lock);
908 while (state != SHUTDOWN
909 || (cache_queue_head != NULL && config_flush_at_shutdown))
910 {
911 cache_item_t *ci;
912 char *file;
913 char **values;
914 size_t values_num;
915 int status;
917 /* Now, check if there's something to store away. If not, wait until
918 * something comes in. */
919 if (cache_queue_head == NULL)
920 {
921 status = pthread_cond_wait (&queue_cond, &cache_lock);
922 if ((status != 0) && (status != ETIMEDOUT))
923 {
924 RRDD_LOG (LOG_ERR, "queue_thread_main: "
925 "pthread_cond_wait returned %i.", status);
926 }
927 }
929 /* Check if a value has arrived. This may be NULL if we timed out or there
930 * was an interrupt such as a signal. */
931 if (cache_queue_head == NULL)
932 continue;
934 ci = cache_queue_head;
936 /* copy the relevant parts */
937 file = strdup (ci->file);
938 if (file == NULL)
939 {
940 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
941 continue;
942 }
944 assert(ci->values != NULL);
945 assert(ci->values_num > 0);
947 values = ci->values;
948 values_num = ci->values_num;
950 wipe_ci_values(ci, time(NULL));
951 remove_from_queue(ci);
953 pthread_mutex_unlock (&cache_lock);
955 rrd_clear_error ();
956 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
957 if (status != 0)
958 {
959 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
960 "rrd_update_r (%s) failed with status %i. (%s)",
961 file, status, rrd_get_error());
962 }
964 journal_write("wrote", file);
966 /* Search again in the tree. It's possible someone issued a "FORGET"
967 * while we were writing the update values. */
968 pthread_mutex_lock(&cache_lock);
969 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
970 if (ci)
971 pthread_cond_broadcast(&ci->flushed);
972 pthread_mutex_unlock(&cache_lock);
974 if (status == 0)
975 {
976 pthread_mutex_lock (&stats_lock);
977 stats_updates_written++;
978 stats_data_sets_written += values_num;
979 pthread_mutex_unlock (&stats_lock);
980 }
982 rrd_free_ptrs((void ***) &values, &values_num);
983 free(file);
985 pthread_mutex_lock (&cache_lock);
986 }
987 pthread_mutex_unlock (&cache_lock);
989 return (NULL);
990 } /* }}} void *queue_thread_main */
992 static int buffer_get_field (char **buffer_ret, /* {{{ */
993 size_t *buffer_size_ret, char **field_ret)
994 {
995 char *buffer;
996 size_t buffer_pos;
997 size_t buffer_size;
998 char *field;
999 size_t field_size;
1000 int status;
1002 buffer = *buffer_ret;
1003 buffer_pos = 0;
1004 buffer_size = *buffer_size_ret;
1005 field = *buffer_ret;
1006 field_size = 0;
1008 if (buffer_size <= 0)
1009 return (-1);
1011 /* This is ensured by `handle_request'. */
1012 assert (buffer[buffer_size - 1] == '\0');
1014 status = -1;
1015 while (buffer_pos < buffer_size)
1016 {
1017 /* Check for end-of-field or end-of-buffer */
1018 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1019 {
1020 field[field_size] = 0;
1021 field_size++;
1022 buffer_pos++;
1023 status = 0;
1024 break;
1025 }
1026 /* Handle escaped characters. */
1027 else if (buffer[buffer_pos] == '\\')
1028 {
1029 if (buffer_pos >= (buffer_size - 1))
1030 break;
1031 buffer_pos++;
1032 field[field_size] = buffer[buffer_pos];
1033 field_size++;
1034 buffer_pos++;
1035 }
1036 /* Normal operation */
1037 else
1038 {
1039 field[field_size] = buffer[buffer_pos];
1040 field_size++;
1041 buffer_pos++;
1042 }
1043 } /* while (buffer_pos < buffer_size) */
1045 if (status != 0)
1046 return (status);
1048 *buffer_ret = buffer + buffer_pos;
1049 *buffer_size_ret = buffer_size - buffer_pos;
1050 *field_ret = field;
1052 return (0);
1053 } /* }}} int buffer_get_field */
1055 /* if we're restricting writes to the base directory,
1056 * check whether the file falls within the dir
1057 * returns 1 if OK, otherwise 0
1058 */
1059 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1060 {
1061 assert(file != NULL);
1063 if (!config_write_base_only
1064 || JOURNAL_REPLAY(sock)
1065 || config_base_dir == NULL)
1066 return 1;
1068 if (strstr(file, "../") != NULL) goto err;
1070 /* relative paths without "../" are ok */
1071 if (*file != '/') return 1;
1073 /* file must be of the format base + "/" + <1+ char filename> */
1074 if (strlen(file) < _config_base_dir_len + 2) goto err;
1075 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1076 if (*(file + _config_base_dir_len) != '/') goto err;
1078 return 1;
1080 err:
1081 if (sock != NULL && sock->fd >= 0)
1082 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1084 return 0;
1085 } /* }}} static int check_file_access */
1087 /* when using a base dir, convert relative paths to absolute paths.
1088 * if necessary, modifies the "filename" pointer to point
1089 * to the new path created in "tmp". "tmp" is provided
1090 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1091 *
1092 * this allows us to optimize for the expected case (absolute path)
1093 * with a no-op.
1094 */
1095 static void get_abs_path(char **filename, char *tmp)
1096 {
1097 assert(tmp != NULL);
1098 assert(filename != NULL && *filename != NULL);
1100 if (config_base_dir == NULL || **filename == '/')
1101 return;
1103 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1104 *filename = tmp;
1105 } /* }}} static int get_abs_path */
1107 static int flush_file (const char *filename) /* {{{ */
1108 {
1109 cache_item_t *ci;
1111 pthread_mutex_lock (&cache_lock);
1113 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1114 if (ci == NULL)
1115 {
1116 pthread_mutex_unlock (&cache_lock);
1117 return (ENOENT);
1118 }
1120 if (ci->values_num > 0)
1121 {
1122 /* Enqueue at head */
1123 enqueue_cache_item (ci, HEAD);
1124 pthread_cond_wait(&ci->flushed, &cache_lock);
1125 }
1127 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1128 * may have been purged during our cond_wait() */
1130 pthread_mutex_unlock(&cache_lock);
1132 return (0);
1133 } /* }}} int flush_file */
1135 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1136 {
1137 char *err = "Syntax error.\n";
1139 if (cmd && cmd->syntax)
1140 err = cmd->syntax;
1142 return send_response(sock, RESP_ERR, "Usage: %s", err);
1143 } /* }}} static int syntax_error() */
1145 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1146 {
1147 uint64_t copy_queue_length;
1148 uint64_t copy_updates_received;
1149 uint64_t copy_flush_received;
1150 uint64_t copy_updates_written;
1151 uint64_t copy_data_sets_written;
1152 uint64_t copy_journal_bytes;
1153 uint64_t copy_journal_rotate;
1155 uint64_t tree_nodes_number;
1156 uint64_t tree_depth;
1158 pthread_mutex_lock (&stats_lock);
1159 copy_queue_length = stats_queue_length;
1160 copy_updates_received = stats_updates_received;
1161 copy_flush_received = stats_flush_received;
1162 copy_updates_written = stats_updates_written;
1163 copy_data_sets_written = stats_data_sets_written;
1164 copy_journal_bytes = stats_journal_bytes;
1165 copy_journal_rotate = stats_journal_rotate;
1166 pthread_mutex_unlock (&stats_lock);
1168 pthread_mutex_lock (&cache_lock);
1169 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1170 tree_depth = (uint64_t) g_tree_height (cache_tree);
1171 pthread_mutex_unlock (&cache_lock);
1173 add_response_info(sock,
1174 "QueueLength: %"PRIu64"\n", copy_queue_length);
1175 add_response_info(sock,
1176 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1177 add_response_info(sock,
1178 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1179 add_response_info(sock,
1180 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1181 add_response_info(sock,
1182 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1183 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1184 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1185 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1186 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1188 send_response(sock, RESP_OK, "Statistics follow\n");
1190 return (0);
1191 } /* }}} int handle_request_stats */
1193 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1194 {
1195 char *file, file_tmp[PATH_MAX];
1196 int status;
1198 status = buffer_get_field (&buffer, &buffer_size, &file);
1199 if (status != 0)
1200 {
1201 return syntax_error(sock,cmd);
1202 }
1203 else
1204 {
1205 pthread_mutex_lock(&stats_lock);
1206 stats_flush_received++;
1207 pthread_mutex_unlock(&stats_lock);
1209 get_abs_path(&file, file_tmp);
1210 if (!check_file_access(file, sock)) return 0;
1212 status = flush_file (file);
1213 if (status == 0)
1214 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1215 else if (status == ENOENT)
1216 {
1217 /* no file in our tree; see whether it exists at all */
1218 struct stat statbuf;
1220 memset(&statbuf, 0, sizeof(statbuf));
1221 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1222 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1223 else
1224 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1225 }
1226 else if (status < 0)
1227 return send_response(sock, RESP_ERR, "Internal error.\n");
1228 else
1229 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1230 }
1232 /* NOTREACHED */
1233 assert(1==0);
1234 } /* }}} int handle_request_flush */
1236 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1237 {
1238 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1240 pthread_mutex_lock(&cache_lock);
1241 flush_old_values(-1);
1242 pthread_mutex_unlock(&cache_lock);
1244 return send_response(sock, RESP_OK, "Started flush.\n");
1245 } /* }}} static int handle_request_flushall */
1247 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1248 {
1249 int status;
1250 char *file, file_tmp[PATH_MAX];
1251 cache_item_t *ci;
1253 status = buffer_get_field(&buffer, &buffer_size, &file);
1254 if (status != 0)
1255 return syntax_error(sock,cmd);
1257 get_abs_path(&file, file_tmp);
1259 pthread_mutex_lock(&cache_lock);
1260 ci = g_tree_lookup(cache_tree, file);
1261 if (ci == NULL)
1262 {
1263 pthread_mutex_unlock(&cache_lock);
1264 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265 }
1267 for (size_t i=0; i < ci->values_num; i++)
1268 add_response_info(sock, "%s\n", ci->values[i]);
1270 pthread_mutex_unlock(&cache_lock);
1271 return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1274 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1275 {
1276 int status;
1277 gboolean found;
1278 char *file, file_tmp[PATH_MAX];
1280 status = buffer_get_field(&buffer, &buffer_size, &file);
1281 if (status != 0)
1282 return syntax_error(sock,cmd);
1284 get_abs_path(&file, file_tmp);
1285 if (!check_file_access(file, sock)) return 0;
1287 pthread_mutex_lock(&cache_lock);
1288 found = g_tree_remove(cache_tree, file);
1289 pthread_mutex_unlock(&cache_lock);
1291 if (found == TRUE)
1292 {
1293 if (!JOURNAL_REPLAY(sock))
1294 journal_write("forget", file);
1296 return send_response(sock, RESP_OK, "Gone!\n");
1297 }
1298 else
1299 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1301 /* NOTREACHED */
1302 assert(1==0);
1303 } /* }}} static int handle_request_forget */
1305 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1306 {
1307 cache_item_t *ci;
1309 pthread_mutex_lock(&cache_lock);
1311 ci = cache_queue_head;
1312 while (ci != NULL)
1313 {
1314 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1315 ci = ci->next;
1316 }
1318 pthread_mutex_unlock(&cache_lock);
1320 return send_response(sock, RESP_OK, "in queue.\n");
1321 } /* }}} int handle_request_queue */
1323 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1324 {
1325 char *file, file_tmp[PATH_MAX];
1326 int values_num = 0;
1327 int status;
1328 char orig_buf[RRD_CMD_MAX];
1330 cache_item_t *ci;
1332 /* save it for the journal later */
1333 if (!JOURNAL_REPLAY(sock))
1334 strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
1336 status = buffer_get_field (&buffer, &buffer_size, &file);
1337 if (status != 0)
1338 return syntax_error(sock,cmd);
1340 pthread_mutex_lock(&stats_lock);
1341 stats_updates_received++;
1342 pthread_mutex_unlock(&stats_lock);
1344 get_abs_path(&file, file_tmp);
1345 if (!check_file_access(file, sock)) return 0;
1347 pthread_mutex_lock (&cache_lock);
1348 ci = g_tree_lookup (cache_tree, file);
1350 if (ci == NULL) /* {{{ */
1351 {
1352 struct stat statbuf;
1353 cache_item_t *tmp;
1355 /* don't hold the lock while we setup; stat(2) might block */
1356 pthread_mutex_unlock(&cache_lock);
1358 memset (&statbuf, 0, sizeof (statbuf));
1359 status = stat (file, &statbuf);
1360 if (status != 0)
1361 {
1362 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1364 status = errno;
1365 if (status == ENOENT)
1366 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1367 else
1368 return send_response(sock, RESP_ERR,
1369 "stat failed with error %i.\n", status);
1370 }
1371 if (!S_ISREG (statbuf.st_mode))
1372 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1374 if (access(file, R_OK|W_OK) != 0)
1375 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1376 file, rrd_strerror(errno));
1378 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1379 if (ci == NULL)
1380 {
1381 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1383 return send_response(sock, RESP_ERR, "malloc failed.\n");
1384 }
1385 memset (ci, 0, sizeof (cache_item_t));
1387 ci->file = strdup (file);
1388 if (ci->file == NULL)
1389 {
1390 free (ci);
1391 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1393 return send_response(sock, RESP_ERR, "strdup failed.\n");
1394 }
1396 wipe_ci_values(ci, now);
1397 ci->flags = CI_FLAGS_IN_TREE;
1398 pthread_cond_init(&ci->flushed, NULL);
1400 pthread_mutex_lock(&cache_lock);
1402 /* another UPDATE might have added this entry in the meantime */
1403 tmp = g_tree_lookup (cache_tree, file);
1404 if (tmp == NULL)
1405 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1406 else
1407 {
1408 free_cache_item (ci);
1409 ci = tmp;
1410 }
1412 /* state may have changed while we were unlocked */
1413 if (state == SHUTDOWN)
1414 return -1;
1415 } /* }}} */
1416 assert (ci != NULL);
1418 /* don't re-write updates in replay mode */
1419 if (!JOURNAL_REPLAY(sock))
1420 journal_write("update", orig_buf);
1422 while (buffer_size > 0)
1423 {
1424 char *value;
1425 double stamp;
1426 char *eostamp;
1428 status = buffer_get_field (&buffer, &buffer_size, &value);
1429 if (status != 0)
1430 {
1431 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1432 break;
1433 }
1435 /* make sure update time is always moving forward. We use double here since
1436 update does support subsecond precision for timestamps ... */
1437 stamp = strtod(value, &eostamp);
1438 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1439 {
1440 pthread_mutex_unlock(&cache_lock);
1441 return send_response(sock, RESP_ERR,
1442 "Cannot find timestamp in '%s'!\n", value);
1443 }
1444 else if (stamp <= ci->last_update_stamp)
1445 {
1446 pthread_mutex_unlock(&cache_lock);
1447 return send_response(sock, RESP_ERR,
1448 "illegal attempt to update using time %lf when last"
1449 " update time is %lf (minimum one second step)\n",
1450 stamp, ci->last_update_stamp);
1451 }
1452 else
1453 ci->last_update_stamp = stamp;
1455 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1456 &ci->values_alloc, config_alloc_chunk))
1457 {
1458 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1459 continue;
1460 }
1462 values_num++;
1463 }
1465 if (((now - ci->last_flush_time) >= config_write_interval)
1466 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1467 && (ci->values_num > 0))
1468 {
1469 enqueue_cache_item (ci, TAIL);
1470 }
1472 pthread_mutex_unlock (&cache_lock);
1474 if (values_num < 1)
1475 return send_response(sock, RESP_ERR, "No values updated.\n");
1476 else
1477 return send_response(sock, RESP_OK,
1478 "errors, enqueued %i value(s).\n", values_num);
1480 /* NOTREACHED */
1481 assert(1==0);
1483 } /* }}} int handle_request_update */
1485 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1486 {
1487 char *file, file_tmp[PATH_MAX];
1488 char *cf;
1490 char *start_str;
1491 char *end_str;
1492 time_t start_tm;
1493 time_t end_tm;
1495 unsigned long step;
1496 unsigned long ds_cnt;
1497 char **ds_namv;
1498 rrd_value_t *data;
1500 int status;
1501 unsigned long i;
1502 time_t t;
1503 rrd_value_t *data_ptr;
1505 file = NULL;
1506 cf = NULL;
1507 start_str = NULL;
1508 end_str = NULL;
1510 /* Read the arguments */
1511 do /* while (0) */
1512 {
1513 status = buffer_get_field (&buffer, &buffer_size, &file);
1514 if (status != 0)
1515 break;
1517 status = buffer_get_field (&buffer, &buffer_size, &cf);
1518 if (status != 0)
1519 break;
1521 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1522 if (status != 0)
1523 {
1524 start_str = NULL;
1525 status = 0;
1526 break;
1527 }
1529 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1530 if (status != 0)
1531 {
1532 end_str = NULL;
1533 status = 0;
1534 break;
1535 }
1536 } while (0);
1538 if (status != 0)
1539 return (syntax_error(sock,cmd));
1541 get_abs_path(&file, file_tmp);
1542 if (!check_file_access(file, sock)) return 0;
1544 status = flush_file (file);
1545 if ((status != 0) && (status != ENOENT))
1546 return (send_response (sock, RESP_ERR,
1547 "flush_file (%s) failed with status %i.\n", file, status));
1549 t = time (NULL); /* "now" */
1551 /* Parse start time */
1552 if (start_str != NULL)
1553 {
1554 char *endptr;
1555 long value;
1557 endptr = NULL;
1558 errno = 0;
1559 value = strtol (start_str, &endptr, /* base = */ 0);
1560 if ((endptr == start_str) || (errno != 0))
1561 return (send_response(sock, RESP_ERR,
1562 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1563 start_str));
1565 if (value > 0)
1566 start_tm = (time_t) value;
1567 else
1568 start_tm = (time_t) (t + value);
1569 }
1570 else
1571 {
1572 start_tm = t - 86400;
1573 }
1575 /* Parse end time */
1576 if (end_str != NULL)
1577 {
1578 char *endptr;
1579 long value;
1581 endptr = NULL;
1582 errno = 0;
1583 value = strtol (end_str, &endptr, /* base = */ 0);
1584 if ((endptr == end_str) || (errno != 0))
1585 return (send_response(sock, RESP_ERR,
1586 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1587 end_str));
1589 if (value > 0)
1590 end_tm = (time_t) value;
1591 else
1592 end_tm = (time_t) (t + value);
1593 }
1594 else
1595 {
1596 end_tm = t;
1597 }
1599 step = -1;
1600 ds_cnt = 0;
1601 ds_namv = NULL;
1602 data = NULL;
1604 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1605 &ds_cnt, &ds_namv, &data);
1606 if (status != 0)
1607 return (send_response(sock, RESP_ERR,
1608 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1610 add_response_info (sock, "FlushVersion: %lu\n", 1);
1611 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1612 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1613 add_response_info (sock, "Step: %lu\n", step);
1614 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1616 #define SSTRCAT(buffer,str,buffer_fill) do { \
1617 size_t str_len = strlen (str); \
1618 if ((buffer_fill + str_len) > sizeof (buffer)) \
1619 str_len = sizeof (buffer) - buffer_fill; \
1620 if (str_len > 0) { \
1621 strncpy (buffer + buffer_fill, str, str_len); \
1622 buffer_fill += str_len; \
1623 assert (buffer_fill <= sizeof (buffer)); \
1624 if (buffer_fill == sizeof (buffer)) \
1625 buffer[buffer_fill - 1] = 0; \
1626 else \
1627 buffer[buffer_fill] = 0; \
1628 } \
1629 } while (0)
1631 { /* Add list of DS names */
1632 char linebuf[1024];
1633 size_t linebuf_fill;
1635 memset (linebuf, 0, sizeof (linebuf));
1636 linebuf_fill = 0;
1637 for (i = 0; i < ds_cnt; i++)
1638 {
1639 if (i > 0)
1640 SSTRCAT (linebuf, " ", linebuf_fill);
1641 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1642 rrd_freemem(ds_namv[i]);
1643 }
1644 rrd_freemem(ds_namv);
1645 add_response_info (sock, "DSName: %s\n", linebuf);
1646 }
1648 /* Add the actual data */
1649 assert (step > 0);
1650 data_ptr = data;
1651 for (t = start_tm + step; t <= end_tm; t += step)
1652 {
1653 char linebuf[1024];
1654 size_t linebuf_fill;
1655 char tmp[128];
1657 memset (linebuf, 0, sizeof (linebuf));
1658 linebuf_fill = 0;
1659 for (i = 0; i < ds_cnt; i++)
1660 {
1661 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1662 tmp[sizeof (tmp) - 1] = 0;
1663 SSTRCAT (linebuf, tmp, linebuf_fill);
1665 data_ptr++;
1666 }
1668 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1669 } /* for (t) */
1670 rrd_freemem(data);
1672 return (send_response (sock, RESP_OK, "Success\n"));
1673 #undef SSTRCAT
1674 } /* }}} int handle_request_fetch */
1676 /* we came across a "WROTE" entry during journal replay.
1677 * throw away any values that we have accumulated for this file
1678 */
1679 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1680 {
1681 cache_item_t *ci;
1682 const char *file = buffer;
1684 pthread_mutex_lock(&cache_lock);
1686 ci = g_tree_lookup(cache_tree, file);
1687 if (ci == NULL)
1688 {
1689 pthread_mutex_unlock(&cache_lock);
1690 return (0);
1691 }
1693 if (ci->values)
1694 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1696 wipe_ci_values(ci, now);
1697 remove_from_queue(ci);
1699 pthread_mutex_unlock(&cache_lock);
1700 return (0);
1701 } /* }}} int handle_request_wrote */
1703 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1704 {
1705 char *file, file_tmp[PATH_MAX];
1706 int status;
1707 rrd_info_t *info;
1709 /* obtain filename */
1710 status = buffer_get_field(&buffer, &buffer_size, &file);
1711 if (status != 0)
1712 return syntax_error(sock,cmd);
1713 /* get full pathname */
1714 get_abs_path(&file, file_tmp);
1715 if (!check_file_access(file, sock)) {
1716 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1717 }
1718 /* get data */
1719 rrd_clear_error ();
1720 info = rrd_info_r(file);
1721 if(!info) {
1722 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1723 }
1724 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1725 switch (data->type) {
1726 case RD_I_VAL:
1727 if (isnan(data->value.u_val))
1728 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1729 else
1730 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1731 break;
1732 case RD_I_CNT:
1733 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1734 break;
1735 case RD_I_INT:
1736 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1737 break;
1738 case RD_I_STR:
1739 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1740 break;
1741 case RD_I_BLO:
1742 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1743 break;
1744 }
1745 }
1747 rrd_info_free(info);
1749 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1750 } /* }}} static int handle_request_info */
1752 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1753 {
1754 char *i, *file, file_tmp[PATH_MAX];
1755 int status;
1756 int idx;
1757 time_t t;
1759 /* obtain filename */
1760 status = buffer_get_field(&buffer, &buffer_size, &file);
1761 if (status != 0)
1762 return syntax_error(sock,cmd);
1763 /* get full pathname */
1764 get_abs_path(&file, file_tmp);
1765 if (!check_file_access(file, sock)) {
1766 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1767 }
1769 status = buffer_get_field(&buffer, &buffer_size, &i);
1770 if (status != 0)
1771 return syntax_error(sock,cmd);
1772 idx = atoi(i);
1773 if(idx<0) {
1774 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1775 }
1777 /* get data */
1778 rrd_clear_error ();
1779 t = rrd_first_r(file,idx);
1780 if(t<1) {
1781 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1782 }
1783 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1784 } /* }}} static int handle_request_first */
1787 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1788 {
1789 char *file, file_tmp[PATH_MAX];
1790 int status;
1791 time_t t, from_file, step;
1792 rrd_file_t * rrd_file;
1793 cache_item_t * ci;
1794 rrd_t rrd;
1796 /* obtain filename */
1797 status = buffer_get_field(&buffer, &buffer_size, &file);
1798 if (status != 0)
1799 return syntax_error(sock,cmd);
1800 /* get full pathname */
1801 get_abs_path(&file, file_tmp);
1802 if (!check_file_access(file, sock)) {
1803 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1804 }
1805 rrd_clear_error();
1806 rrd_init(&rrd);
1807 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1808 if(!rrd_file) {
1809 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1810 }
1811 from_file = rrd.live_head->last_up;
1812 step = rrd.stat_head->pdp_step;
1813 rrd_close(rrd_file);
1814 pthread_mutex_lock(&cache_lock);
1815 ci = g_tree_lookup(cache_tree, file);
1816 if (ci)
1817 t = ci->last_update_stamp;
1818 else
1819 t = from_file;
1820 pthread_mutex_unlock(&cache_lock);
1821 t -= t % step;
1822 rrd_free(&rrd);
1823 if(t<1) {
1824 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1825 }
1826 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1827 } /* }}} static int handle_request_last */
1829 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1830 {
1831 char *file, file_tmp[PATH_MAX];
1832 char *tok;
1833 int ac = 0;
1834 char *av[128];
1835 int status;
1836 unsigned long step = 300;
1837 time_t last_up = time(NULL)-10;
1838 int no_overwrite = opt_no_overwrite;
1841 /* obtain filename */
1842 status = buffer_get_field(&buffer, &buffer_size, &file);
1843 if (status != 0)
1844 return syntax_error(sock,cmd);
1845 /* get full pathname */
1846 get_abs_path(&file, file_tmp);
1847 if (!check_file_access(file, sock)) {
1848 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1849 }
1850 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1852 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1853 if( ! strncmp(tok,"-b",2) ) {
1854 status = buffer_get_field(&buffer, &buffer_size, &tok );
1855 if (status != 0) return syntax_error(sock,cmd);
1856 last_up = (time_t) atol(tok);
1857 continue;
1858 }
1859 if( ! strncmp(tok,"-s",2) ) {
1860 status = buffer_get_field(&buffer, &buffer_size, &tok );
1861 if (status != 0) return syntax_error(sock,cmd);
1862 step = atol(tok);
1863 continue;
1864 }
1865 if( ! strncmp(tok,"-O",2) ) {
1866 no_overwrite = 1;
1867 continue;
1868 }
1869 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1870 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1871 return syntax_error(sock,cmd);
1872 }
1873 if(step<1) {
1874 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1875 }
1876 if (last_up < 3600 * 24 * 365 * 10) {
1877 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1878 }
1880 rrd_clear_error ();
1881 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1883 if(!status) {
1884 return send_response(sock, RESP_OK, "RRD created OK\n");
1885 }
1886 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1887 } /* }}} static int handle_request_create */
1889 /* start "BATCH" processing */
1890 static int batch_start (HANDLER_PROTO) /* {{{ */
1891 {
1892 int status;
1893 if (sock->batch_start)
1894 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1896 status = send_response(sock, RESP_OK,
1897 "Go ahead. End with dot '.' on its own line.\n");
1898 sock->batch_start = time(NULL);
1899 sock->batch_cmd = 0;
1901 return status;
1902 } /* }}} static int batch_start */
1904 /* finish "BATCH" processing and return results to the client */
1905 static int batch_done (HANDLER_PROTO) /* {{{ */
1906 {
1907 assert(sock->batch_start);
1908 sock->batch_start = 0;
1909 sock->batch_cmd = 0;
1910 return send_response(sock, RESP_OK, "errors\n");
1911 } /* }}} static int batch_done */
1913 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1914 {
1915 return -1;
1916 } /* }}} static int handle_request_quit */
1918 static command_t list_of_commands[] = { /* {{{ */
1919 {
1920 "UPDATE",
1921 handle_request_update,
1922 CMD_CONTEXT_ANY,
1923 "UPDATE <filename> <values> [<values> ...]\n"
1924 ,
1925 "Adds the given file to the internal cache if it is not yet known and\n"
1926 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1927 "for details.\n"
1928 "\n"
1929 "Each <values> has the following form:\n"
1930 " <values> = <time>:<value>[:<value>[...]]\n"
1931 "See the rrdupdate(1) manpage for details.\n"
1932 },
1933 {
1934 "WROTE",
1935 handle_request_wrote,
1936 CMD_CONTEXT_JOURNAL,
1937 NULL,
1938 NULL
1939 },
1940 {
1941 "FLUSH",
1942 handle_request_flush,
1943 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1944 "FLUSH <filename>\n"
1945 ,
1946 "Adds the given filename to the head of the update queue and returns\n"
1947 "after it has been dequeued.\n"
1948 },
1949 {
1950 "FLUSHALL",
1951 handle_request_flushall,
1952 CMD_CONTEXT_CLIENT,
1953 "FLUSHALL\n"
1954 ,
1955 "Triggers writing of all pending updates. Returns immediately.\n"
1956 },
1957 {
1958 "PENDING",
1959 handle_request_pending,
1960 CMD_CONTEXT_CLIENT,
1961 "PENDING <filename>\n"
1962 ,
1963 "Shows any 'pending' updates for a file, in order.\n"
1964 "The updates shown have not yet been written to the underlying RRD file.\n"
1965 },
1966 {
1967 "FORGET",
1968 handle_request_forget,
1969 CMD_CONTEXT_ANY,
1970 "FORGET <filename>\n"
1971 ,
1972 "Removes the file completely from the cache.\n"
1973 "Any pending updates for the file will be lost.\n"
1974 },
1975 {
1976 "QUEUE",
1977 handle_request_queue,
1978 CMD_CONTEXT_CLIENT,
1979 "QUEUE\n"
1980 ,
1981 "Shows all files in the output queue.\n"
1982 "The output is zero or more lines in the following format:\n"
1983 "(where <num_vals> is the number of values to be written)\n"
1984 "\n"
1985 "<num_vals> <filename>\n"
1986 },
1987 {
1988 "STATS",
1989 handle_request_stats,
1990 CMD_CONTEXT_CLIENT,
1991 "STATS\n"
1992 ,
1993 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1994 "a description of the values.\n"
1995 },
1996 {
1997 "HELP",
1998 handle_request_help,
1999 CMD_CONTEXT_CLIENT,
2000 "HELP [<command>]\n",
2001 NULL, /* special! */
2002 },
2003 {
2004 "BATCH",
2005 batch_start,
2006 CMD_CONTEXT_CLIENT,
2007 "BATCH\n"
2008 ,
2009 "The 'BATCH' command permits the client to initiate a bulk load\n"
2010 " of commands to rrdcached.\n"
2011 "\n"
2012 "Usage:\n"
2013 "\n"
2014 " client: BATCH\n"
2015 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2016 " client: command #1\n"
2017 " client: command #2\n"
2018 " client: ... and so on\n"
2019 " client: .\n"
2020 " server: 2 errors\n"
2021 " server: 7 message for command #7\n"
2022 " server: 9 message for command #9\n"
2023 "\n"
2024 "For more information, consult the rrdcached(1) documentation.\n"
2025 },
2026 {
2027 ".", /* BATCH terminator */
2028 batch_done,
2029 CMD_CONTEXT_BATCH,
2030 NULL,
2031 NULL
2032 },
2033 {
2034 "FETCH",
2035 handle_request_fetch,
2036 CMD_CONTEXT_CLIENT,
2037 "FETCH <file> <CF> [<start> [<end>]]\n"
2038 ,
2039 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2040 },
2041 {
2042 "INFO",
2043 handle_request_info,
2044 CMD_CONTEXT_CLIENT,
2045 "INFO <filename>\n",
2046 "The INFO command retrieves information about a specified RRD file.\n"
2047 "This is returned in standard rrdinfo format, a sequence of lines\n"
2048 "with the format <keyname> = <value>\n"
2049 "Note that this is the data as of the last update of the RRD file itself,\n"
2050 "not the last time data was received via rrdcached, so there may be pending\n"
2051 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2052 },
2053 {
2054 "FIRST",
2055 handle_request_first,
2056 CMD_CONTEXT_CLIENT,
2057 "FIRST <filename> <rra index>\n",
2058 "The FIRST command retrieves the first data time for a specified RRA in\n"
2059 "an RRD file.\n"
2060 },
2061 {
2062 "LAST",
2063 handle_request_last,
2064 CMD_CONTEXT_CLIENT,
2065 "LAST <filename>\n",
2066 "The LAST command retrieves the last update time for a specified RRD file.\n"
2067 "Note that this is the time of the last update of the RRD file itself, not\n"
2068 "the last time data was received via rrdcached, so there may be pending\n"
2069 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2070 },
2071 {
2072 "CREATE",
2073 handle_request_create,
2074 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2075 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2076 "The CREATE command will create an RRD file, overwriting any existing file\n"
2077 "unless the -O option is given or rrdcached was started with the -O option.\n"
2078 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2079 "not acceptable) and the step is in seconds (default is 300).\n"
2080 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2081 },
2082 {
2083 "QUIT",
2084 handle_request_quit,
2085 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2086 "QUIT\n"
2087 ,
2088 "Disconnect from rrdcached.\n"
2089 }
2090 }; /* }}} command_t list_of_commands[] */
2091 static size_t list_of_commands_len = sizeof (list_of_commands)
2092 / sizeof (list_of_commands[0]);
2094 static command_t *find_command(char *cmd)
2095 {
2096 size_t i;
2098 for (i = 0; i < list_of_commands_len; i++)
2099 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2100 return (&list_of_commands[i]);
2101 return NULL;
2102 }
2104 /* We currently use the index in the `list_of_commands' array as a bit position
2105 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2106 * outside these functions so that switching to a more elegant storage method
2107 * is easily possible. */
2108 static ssize_t find_command_index (const char *cmd) /* {{{ */
2109 {
2110 size_t i;
2112 for (i = 0; i < list_of_commands_len; i++)
2113 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2114 return ((ssize_t) i);
2115 return (-1);
2116 } /* }}} ssize_t find_command_index */
2118 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2119 const char *cmd)
2120 {
2121 ssize_t i;
2123 if (JOURNAL_REPLAY(sock))
2124 return (1);
2126 if (cmd == NULL)
2127 return (-1);
2129 if ((strcasecmp ("QUIT", cmd) == 0)
2130 || (strcasecmp ("HELP", cmd) == 0))
2131 return (1);
2132 else if (strcmp (".", cmd) == 0)
2133 cmd = "BATCH";
2135 i = find_command_index (cmd);
2136 if (i < 0)
2137 return (-1);
2138 assert (i < 32);
2140 if ((sock->permissions & (1 << i)) != 0)
2141 return (1);
2142 return (0);
2143 } /* }}} int socket_permission_check */
2145 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2146 const char *cmd)
2147 {
2148 ssize_t i;
2150 i = find_command_index (cmd);
2151 if (i < 0)
2152 return (-1);
2153 assert (i < 32);
2155 sock->permissions |= (1 << i);
2156 return (0);
2157 } /* }}} int socket_permission_add */
2159 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2160 {
2161 sock->permissions = 0;
2162 } /* }}} socket_permission_clear */
2164 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2165 listen_socket_t *src)
2166 {
2167 dest->permissions = src->permissions;
2168 } /* }}} socket_permission_copy */
2170 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2171 {
2172 size_t i;
2174 sock->permissions = 0;
2175 for (i = 0; i < list_of_commands_len; i++)
2176 sock->permissions |= (1 << i);
2177 } /* }}} void socket_permission_set_all */
2179 /* check whether commands are received in the expected context */
2180 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2181 {
2182 if (JOURNAL_REPLAY(sock))
2183 return (cmd->context & CMD_CONTEXT_JOURNAL);
2184 else if (sock->batch_start)
2185 return (cmd->context & CMD_CONTEXT_BATCH);
2186 else
2187 return (cmd->context & CMD_CONTEXT_CLIENT);
2189 /* NOTREACHED */
2190 assert(1==0);
2191 }
2193 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2194 {
2195 int status;
2196 char *cmd_str;
2197 char *resp_txt;
2198 command_t *help = NULL;
2200 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2201 if (status == 0)
2202 help = find_command(cmd_str);
2204 if (help && (help->syntax || help->help))
2205 {
2206 char tmp[RRD_CMD_MAX];
2208 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2209 resp_txt = tmp;
2211 if (help->syntax)
2212 add_response_info(sock, "Usage: %s\n", help->syntax);
2214 if (help->help)
2215 add_response_info(sock, "%s\n", help->help);
2216 }
2217 else
2218 {
2219 size_t i;
2221 resp_txt = "Command overview\n";
2223 for (i = 0; i < list_of_commands_len; i++)
2224 {
2225 if (list_of_commands[i].syntax == NULL)
2226 continue;
2227 add_response_info (sock, "%s", list_of_commands[i].syntax);
2228 }
2229 }
2231 return send_response(sock, RESP_OK, resp_txt);
2232 } /* }}} int handle_request_help */
2234 static int handle_request (DISPATCH_PROTO) /* {{{ */
2235 {
2236 char *buffer_ptr = buffer;
2237 char *cmd_str = NULL;
2238 command_t *cmd = NULL;
2239 int status;
2241 assert (buffer[buffer_size - 1] == '\0');
2243 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2244 if (status != 0)
2245 {
2246 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2247 return (-1);
2248 }
2250 if (sock != NULL && sock->batch_start)
2251 sock->batch_cmd++;
2253 cmd = find_command(cmd_str);
2254 if (!cmd)
2255 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2257 if (!socket_permission_check (sock, cmd->cmd))
2258 return send_response(sock, RESP_ERR, "Permission denied.\n");
2260 if (!command_check_context(sock, cmd))
2261 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2263 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2264 } /* }}} int handle_request */
2266 static void journal_set_free (journal_set *js) /* {{{ */
2267 {
2268 if (js == NULL)
2269 return;
2271 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2273 free(js);
2274 } /* }}} journal_set_free */
2276 static void journal_set_remove (journal_set *js) /* {{{ */
2277 {
2278 if (js == NULL)
2279 return;
2281 for (uint i=0; i < js->files_num; i++)
2282 {
2283 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2284 unlink(js->files[i]);
2285 }
2286 } /* }}} journal_set_remove */
2288 /* close current journal file handle.
2289 * MUST hold journal_lock before calling */
2290 static void journal_close(void) /* {{{ */
2291 {
2292 if (journal_fh != NULL)
2293 {
2294 if (fclose(journal_fh) != 0)
2295 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2296 }
2298 journal_fh = NULL;
2299 journal_size = 0;
2300 } /* }}} journal_close */
2302 /* MUST hold journal_lock before calling */
2303 static void journal_new_file(void) /* {{{ */
2304 {
2305 struct timeval now;
2306 int new_fd;
2307 char new_file[PATH_MAX + 1];
2309 assert(journal_dir != NULL);
2310 assert(journal_cur != NULL);
2312 journal_close();
2314 gettimeofday(&now, NULL);
2315 /* this format assures that the files sort in strcmp() order */
2316 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2317 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2319 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2320 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2321 if (new_fd < 0)
2322 goto error;
2324 journal_fh = fdopen(new_fd, "a");
2325 if (journal_fh == NULL)
2326 goto error;
2328 journal_size = ftell(journal_fh);
2329 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2331 /* record the file in the journal set */
2332 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2334 return;
2336 error:
2337 RRDD_LOG(LOG_CRIT,
2338 "JOURNALING DISABLED: Error while trying to create %s : %s",
2339 new_file, rrd_strerror(errno));
2340 RRDD_LOG(LOG_CRIT,
2341 "JOURNALING DISABLED: All values will be flushed at shutdown");
2343 close(new_fd);
2344 config_flush_at_shutdown = 1;
2346 } /* }}} journal_new_file */
2348 /* MUST NOT hold journal_lock before calling this */
2349 static void journal_rotate(void) /* {{{ */
2350 {
2351 journal_set *old_js = NULL;
2353 if (journal_dir == NULL)
2354 return;
2356 RRDD_LOG(LOG_DEBUG, "rotating journals");
2358 pthread_mutex_lock(&stats_lock);
2359 ++stats_journal_rotate;
2360 pthread_mutex_unlock(&stats_lock);
2362 pthread_mutex_lock(&journal_lock);
2364 journal_close();
2366 /* rotate the journal sets */
2367 old_js = journal_old;
2368 journal_old = journal_cur;
2369 journal_cur = calloc(1, sizeof(journal_set));
2371 if (journal_cur != NULL)
2372 journal_new_file();
2373 else
2374 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2376 pthread_mutex_unlock(&journal_lock);
2378 journal_set_remove(old_js);
2379 journal_set_free (old_js);
2381 } /* }}} static void journal_rotate */
2383 /* MUST hold journal_lock when calling */
2384 static void journal_done(void) /* {{{ */
2385 {
2386 if (journal_cur == NULL)
2387 return;
2389 journal_close();
2391 if (config_flush_at_shutdown)
2392 {
2393 RRDD_LOG(LOG_INFO, "removing journals");
2394 journal_set_remove(journal_old);
2395 journal_set_remove(journal_cur);
2396 }
2397 else
2398 {
2399 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2400 "journals will be used at next startup");
2401 }
2403 journal_set_free(journal_cur);
2404 journal_set_free(journal_old);
2405 free(journal_dir);
2407 } /* }}} static void journal_done */
2409 static int journal_write(char *cmd, char *args) /* {{{ */
2410 {
2411 int chars;
2413 if (journal_fh == NULL)
2414 return 0;
2416 pthread_mutex_lock(&journal_lock);
2417 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2418 journal_size += chars;
2420 if (journal_size > JOURNAL_MAX)
2421 journal_new_file();
2423 pthread_mutex_unlock(&journal_lock);
2425 if (chars > 0)
2426 {
2427 pthread_mutex_lock(&stats_lock);
2428 stats_journal_bytes += chars;
2429 pthread_mutex_unlock(&stats_lock);
2430 }
2432 return chars;
2433 } /* }}} static int journal_write */
2435 static int journal_replay (const char *file) /* {{{ */
2436 {
2437 FILE *fh;
2438 int entry_cnt = 0;
2439 int fail_cnt = 0;
2440 uint64_t line = 0;
2441 char entry[RRD_CMD_MAX];
2442 time_t now;
2444 if (file == NULL) return 0;
2446 {
2447 char *reason = "unknown error";
2448 int status = 0;
2449 struct stat statbuf;
2451 memset(&statbuf, 0, sizeof(statbuf));
2452 if (stat(file, &statbuf) != 0)
2453 {
2454 reason = "stat error";
2455 status = errno;
2456 }
2457 else if (!S_ISREG(statbuf.st_mode))
2458 {
2459 reason = "not a regular file";
2460 status = EPERM;
2461 }
2462 if (statbuf.st_uid != daemon_uid)
2463 {
2464 reason = "not owned by daemon user";
2465 status = EACCES;
2466 }
2467 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2468 {
2469 reason = "must not be user/group writable";
2470 status = EACCES;
2471 }
2473 if (status != 0)
2474 {
2475 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2476 file, rrd_strerror(status), reason);
2477 return 0;
2478 }
2479 }
2481 fh = fopen(file, "r");
2482 if (fh == NULL)
2483 {
2484 if (errno != ENOENT)
2485 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2486 file, rrd_strerror(errno));
2487 return 0;
2488 }
2489 else
2490 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2492 now = time(NULL);
2494 while(!feof(fh))
2495 {
2496 size_t entry_len;
2498 ++line;
2499 if (fgets(entry, sizeof(entry), fh) == NULL)
2500 break;
2501 entry_len = strlen(entry);
2503 /* check \n termination in case journal writing crashed mid-line */
2504 if (entry_len == 0)
2505 continue;
2506 else if (entry[entry_len - 1] != '\n')
2507 {
2508 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2509 ++fail_cnt;
2510 continue;
2511 }
2513 entry[entry_len - 1] = '\0';
2515 if (handle_request(NULL, now, entry, entry_len) == 0)
2516 ++entry_cnt;
2517 else
2518 ++fail_cnt;
2519 }
2521 fclose(fh);
2523 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2524 entry_cnt, fail_cnt);
2526 return entry_cnt > 0 ? 1 : 0;
2527 } /* }}} static int journal_replay */
2529 static int journal_sort(const void *v1, const void *v2)
2530 {
2531 char **jn1 = (char **) v1;
2532 char **jn2 = (char **) v2;
2534 return strcmp(*jn1,*jn2);
2535 }
2537 static void journal_init(void) /* {{{ */
2538 {
2539 int had_journal = 0;
2540 DIR *dir;
2541 struct dirent *dent;
2542 char path[PATH_MAX+1];
2544 if (journal_dir == NULL) return;
2546 pthread_mutex_lock(&journal_lock);
2548 journal_cur = calloc(1, sizeof(journal_set));
2549 if (journal_cur == NULL)
2550 {
2551 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2552 return;
2553 }
2555 RRDD_LOG(LOG_INFO, "checking for journal files");
2557 /* Handle old journal files during transition. This gives them the
2558 * correct sort order. TODO: remove after first release
2559 */
2560 {
2561 char old_path[PATH_MAX+1];
2562 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2563 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2564 rename(old_path, path);
2566 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2567 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2568 rename(old_path, path);
2569 }
2571 dir = opendir(journal_dir);
2572 if (!dir) {
2573 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2574 return;
2575 }
2576 while ((dent = readdir(dir)) != NULL)
2577 {
2578 /* looks like a journal file? */
2579 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2580 continue;
2582 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2584 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2585 {
2586 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2587 dent->d_name);
2588 break;
2589 }
2590 }
2591 closedir(dir);
2593 qsort(journal_cur->files, journal_cur->files_num,
2594 sizeof(journal_cur->files[0]), journal_sort);
2596 for (uint i=0; i < journal_cur->files_num; i++)
2597 had_journal += journal_replay(journal_cur->files[i]);
2599 journal_new_file();
2601 /* it must have been a crash. start a flush */
2602 if (had_journal && config_flush_at_shutdown)
2603 flush_old_values(-1);
2605 pthread_mutex_unlock(&journal_lock);
2607 RRDD_LOG(LOG_INFO, "journal processing complete");
2609 } /* }}} static void journal_init */
2611 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2612 {
2613 assert(sock != NULL);
2615 free(sock->rbuf); sock->rbuf = NULL;
2616 free(sock->wbuf); sock->wbuf = NULL;
2617 free(sock);
2618 } /* }}} void free_listen_socket */
2620 static void close_connection(listen_socket_t *sock) /* {{{ */
2621 {
2622 if (sock->fd >= 0)
2623 {
2624 close(sock->fd);
2625 sock->fd = -1;
2626 }
2628 free_listen_socket(sock);
2630 } /* }}} void close_connection */
2632 static void *connection_thread_main (void *args) /* {{{ */
2633 {
2634 listen_socket_t *sock;
2635 int fd;
2637 sock = (listen_socket_t *) args;
2638 fd = sock->fd;
2640 /* init read buffers */
2641 sock->next_read = sock->next_cmd = 0;
2642 sock->rbuf = malloc(RBUF_SIZE);
2643 if (sock->rbuf == NULL)
2644 {
2645 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2646 close_connection(sock);
2647 return NULL;
2648 }
2650 pthread_mutex_lock (&connection_threads_lock);
2651 #ifdef HAVE_LIBWRAP
2652 /* LIBWRAP does not support multiple threads! By putting this code
2653 inside pthread_mutex_lock we do not have to worry about request_info
2654 getting overwritten by another thread.
2655 */
2656 struct request_info req;
2657 request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2658 fromhost(&req);
2659 if(!hosts_access(&req)) {
2660 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2661 pthread_mutex_unlock (&connection_threads_lock);
2662 close_connection(sock);
2663 return NULL;
2664 }
2665 #endif /* HAVE_LIBWRAP */
2666 connection_threads_num++;
2667 pthread_mutex_unlock (&connection_threads_lock);
2669 while (state == RUNNING)
2670 {
2671 char *cmd;
2672 ssize_t cmd_len;
2673 ssize_t rbytes;
2674 time_t now;
2676 struct pollfd pollfd;
2677 int status;
2679 pollfd.fd = fd;
2680 pollfd.events = POLLIN | POLLPRI;
2681 pollfd.revents = 0;
2683 status = poll (&pollfd, 1, /* timeout = */ 500);
2684 if (state != RUNNING)
2685 break;
2686 else if (status == 0) /* timeout */
2687 continue;
2688 else if (status < 0) /* error */
2689 {
2690 status = errno;
2691 if (status != EINTR)
2692 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2693 continue;
2694 }
2696 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2697 break;
2698 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2699 {
2700 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2701 "poll(2) returned something unexpected: %#04hx",
2702 pollfd.revents);
2703 break;
2704 }
2706 rbytes = read(fd, sock->rbuf + sock->next_read,
2707 RBUF_SIZE - sock->next_read);
2708 if (rbytes < 0)
2709 {
2710 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2711 break;
2712 }
2713 else if (rbytes == 0)
2714 break; /* eof */
2716 sock->next_read += rbytes;
2718 if (sock->batch_start)
2719 now = sock->batch_start;
2720 else
2721 now = time(NULL);
2723 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2724 {
2725 status = handle_request (sock, now, cmd, cmd_len+1);
2726 if (status != 0)
2727 goto out_close;
2728 }
2729 }
2731 out_close:
2732 close_connection(sock);
2734 /* Remove this thread from the connection threads list */
2735 pthread_mutex_lock (&connection_threads_lock);
2736 connection_threads_num--;
2737 if (connection_threads_num <= 0)
2738 pthread_cond_broadcast(&connection_threads_done);
2739 pthread_mutex_unlock (&connection_threads_lock);
2741 return (NULL);
2742 } /* }}} void *connection_thread_main */
2744 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2745 {
2746 int fd;
2747 struct sockaddr_un sa;
2748 listen_socket_t *temp;
2749 int status;
2750 const char *path;
2751 char *path_copy, *dir;
2753 path = sock->addr;
2754 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2755 path += strlen("unix:");
2757 /* dirname may modify its argument */
2758 path_copy = strdup(path);
2759 if (path_copy == NULL)
2760 {
2761 fprintf(stderr, "rrdcached: strdup(): %s\n",
2762 rrd_strerror(errno));
2763 return (-1);
2764 }
2766 dir = dirname(path_copy);
2767 if (rrd_mkdir_p(dir, 0777) != 0)
2768 {
2769 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2770 dir, rrd_strerror(errno));
2771 return (-1);
2772 }
2774 free(path_copy);
2776 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2777 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2778 if (temp == NULL)
2779 {
2780 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2781 return (-1);
2782 }
2783 listen_fds = temp;
2784 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2786 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2787 if (fd < 0)
2788 {
2789 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2790 rrd_strerror(errno));
2791 return (-1);
2792 }
2794 memset (&sa, 0, sizeof (sa));
2795 sa.sun_family = AF_UNIX;
2796 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2798 /* if we've gotten this far, we own the pid file. any daemon started
2799 * with the same args must not be alive. therefore, ensure that we can
2800 * create the socket...
2801 */
2802 unlink(path);
2804 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2805 if (status != 0)
2806 {
2807 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2808 path, rrd_strerror(errno));
2809 close (fd);
2810 return (-1);
2811 }
2813 /* tweak the sockets group ownership */
2814 if (sock->socket_group != (gid_t)-1)
2815 {
2816 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2817 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2818 {
2819 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2820 }
2821 }
2823 if (sock->socket_permissions != (mode_t)-1)
2824 {
2825 if (chmod(path, sock->socket_permissions) != 0)
2826 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2827 (unsigned int)sock->socket_permissions, strerror(errno));
2828 }
2830 status = listen (fd, /* backlog = */ 10);
2831 if (status != 0)
2832 {
2833 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2834 path, rrd_strerror(errno));
2835 close (fd);
2836 unlink (path);
2837 return (-1);
2838 }
2840 listen_fds[listen_fds_num].fd = fd;
2841 listen_fds[listen_fds_num].family = PF_UNIX;
2842 strncpy(listen_fds[listen_fds_num].addr, path,
2843 sizeof (listen_fds[listen_fds_num].addr) - 1);
2844 listen_fds_num++;
2846 return (0);
2847 } /* }}} int open_listen_socket_unix */
2849 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2850 {
2851 struct addrinfo ai_hints;
2852 struct addrinfo *ai_res;
2853 struct addrinfo *ai_ptr;
2854 char addr_copy[NI_MAXHOST];
2855 char *addr;
2856 char *port;
2857 int status;
2859 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2860 addr_copy[sizeof (addr_copy) - 1] = 0;
2861 addr = addr_copy;
2863 memset (&ai_hints, 0, sizeof (ai_hints));
2864 ai_hints.ai_flags = 0;
2865 #ifdef AI_ADDRCONFIG
2866 ai_hints.ai_flags |= AI_ADDRCONFIG;
2867 #endif
2868 ai_hints.ai_family = AF_UNSPEC;
2869 ai_hints.ai_socktype = SOCK_STREAM;
2871 port = NULL;
2872 if (*addr == '[') /* IPv6+port format */
2873 {
2874 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2875 addr++;
2877 port = strchr (addr, ']');
2878 if (port == NULL)
2879 {
2880 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2881 return (-1);
2882 }
2883 *port = 0;
2884 port++;
2886 if (*port == ':')
2887 port++;
2888 else if (*port == 0)
2889 port = NULL;
2890 else
2891 {
2892 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2893 return (-1);
2894 }
2895 } /* if (*addr == '[') */
2896 else
2897 {
2898 port = rindex(addr, ':');
2899 if (port != NULL)
2900 {
2901 *port = 0;
2902 port++;
2903 }
2904 }
2905 ai_res = NULL;
2906 status = getaddrinfo (addr,
2907 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2908 &ai_hints, &ai_res);
2909 if (status != 0)
2910 {
2911 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2912 addr, gai_strerror (status));
2913 return (-1);
2914 }
2916 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2917 {
2918 int fd;
2919 listen_socket_t *temp;
2920 int one = 1;
2922 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2923 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2924 if (temp == NULL)
2925 {
2926 fprintf (stderr,
2927 "rrdcached: open_listen_socket_network: realloc failed.\n");
2928 continue;
2929 }
2930 listen_fds = temp;
2931 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2933 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2934 if (fd < 0)
2935 {
2936 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2937 rrd_strerror(errno));
2938 continue;
2939 }
2941 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2943 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2944 if (status != 0)
2945 {
2946 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2947 sock->addr, rrd_strerror(errno));
2948 close (fd);
2949 continue;
2950 }
2952 status = listen (fd, /* backlog = */ 10);
2953 if (status != 0)
2954 {
2955 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2956 sock->addr, rrd_strerror(errno));
2957 close (fd);
2958 freeaddrinfo(ai_res);
2959 return (-1);
2960 }
2962 listen_fds[listen_fds_num].fd = fd;
2963 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2964 listen_fds_num++;
2965 } /* for (ai_ptr) */
2967 freeaddrinfo(ai_res);
2968 return (0);
2969 } /* }}} static int open_listen_socket_network */
2971 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2972 {
2973 assert(sock != NULL);
2974 assert(sock->addr != NULL);
2976 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2977 || sock->addr[0] == '/')
2978 return (open_listen_socket_unix(sock));
2979 else
2980 return (open_listen_socket_network(sock));
2981 } /* }}} int open_listen_socket */
2983 static int close_listen_sockets (void) /* {{{ */
2984 {
2985 size_t i;
2987 for (i = 0; i < listen_fds_num; i++)
2988 {
2989 close (listen_fds[i].fd);
2991 if (listen_fds[i].family == PF_UNIX)
2992 unlink(listen_fds[i].addr);
2993 }
2995 free (listen_fds);
2996 listen_fds = NULL;
2997 listen_fds_num = 0;
2999 return (0);
3000 } /* }}} int close_listen_sockets */
3002 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3003 {
3004 struct pollfd *pollfds;
3005 int pollfds_num;
3006 int status;
3007 int i;
3009 if (listen_fds_num < 1)
3010 {
3011 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3012 return (NULL);
3013 }
3015 pollfds_num = listen_fds_num;
3016 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3017 if (pollfds == NULL)
3018 {
3019 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3020 return (NULL);
3021 }
3022 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3024 RRDD_LOG(LOG_INFO, "listening for connections");
3026 while (state == RUNNING)
3027 {
3028 for (i = 0; i < pollfds_num; i++)
3029 {
3030 pollfds[i].fd = listen_fds[i].fd;
3031 pollfds[i].events = POLLIN | POLLPRI;
3032 pollfds[i].revents = 0;
3033 }
3035 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3036 if (state != RUNNING)
3037 break;
3038 else if (status == 0) /* timeout */
3039 continue;
3040 else if (status < 0) /* error */
3041 {
3042 status = errno;
3043 if (status != EINTR)
3044 {
3045 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3046 }
3047 continue;
3048 }
3050 for (i = 0; i < pollfds_num; i++)
3051 {
3052 listen_socket_t *client_sock;
3053 struct sockaddr_storage client_sa;
3054 socklen_t client_sa_size;
3055 pthread_t tid;
3056 pthread_attr_t attr;
3058 if (pollfds[i].revents == 0)
3059 continue;
3061 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3062 {
3063 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3064 "poll(2) returned something unexpected for listen FD #%i.",
3065 pollfds[i].fd);
3066 continue;
3067 }
3069 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3070 if (client_sock == NULL)
3071 {
3072 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3073 continue;
3074 }
3075 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3077 client_sa_size = sizeof (client_sa);
3078 client_sock->fd = accept (pollfds[i].fd,
3079 (struct sockaddr *) &client_sa, &client_sa_size);
3080 if (client_sock->fd < 0)
3081 {
3082 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3083 free(client_sock);
3084 continue;
3085 }
3087 pthread_attr_init (&attr);
3088 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3090 status = pthread_create (&tid, &attr, connection_thread_main,
3091 client_sock);
3092 if (status != 0)
3093 {
3094 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3095 close_connection(client_sock);
3096 continue;
3097 }
3098 } /* for (pollfds_num) */
3099 } /* while (state == RUNNING) */
3101 RRDD_LOG(LOG_INFO, "starting shutdown");
3103 close_listen_sockets ();
3105 pthread_mutex_lock (&connection_threads_lock);
3106 while (connection_threads_num > 0)
3107 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3108 pthread_mutex_unlock (&connection_threads_lock);
3110 free(pollfds);
3112 return (NULL);
3113 } /* }}} void *listen_thread_main */
3115 static int daemonize (void) /* {{{ */
3116 {
3117 int pid_fd;
3118 char *base_dir;
3120 daemon_uid = geteuid();
3122 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3123 if (pid_fd < 0)
3124 pid_fd = check_pidfile();
3125 if (pid_fd < 0)
3126 return pid_fd;
3128 /* open all the listen sockets */
3129 if (config_listen_address_list_len > 0)
3130 {
3131 for (size_t i = 0; i < config_listen_address_list_len; i++)
3132 open_listen_socket (config_listen_address_list[i]);
3134 rrd_free_ptrs((void ***) &config_listen_address_list,
3135 &config_listen_address_list_len);
3136 }
3137 else
3138 {
3139 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3140 sizeof(default_socket.addr) - 1);
3141 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3143 if (default_socket.permissions == 0)
3144 socket_permission_set_all (&default_socket);
3146 open_listen_socket (&default_socket);
3147 }
3149 if (listen_fds_num < 1)
3150 {
3151 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3152 goto error;
3153 }
3155 if (!stay_foreground)
3156 {
3157 pid_t child;
3159 child = fork ();
3160 if (child < 0)
3161 {
3162 fprintf (stderr, "daemonize: fork(2) failed.\n");
3163 goto error;
3164 }
3165 else if (child > 0)
3166 exit(0);
3168 /* Become session leader */
3169 setsid ();
3171 /* Open the first three file descriptors to /dev/null */
3172 close (2);
3173 close (1);
3174 close (0);
3176 open ("/dev/null", O_RDWR);
3177 if (dup(0) == -1 || dup(0) == -1){
3178 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3179 }
3180 } /* if (!stay_foreground) */
3182 /* Change into the /tmp directory. */
3183 base_dir = (config_base_dir != NULL)
3184 ? config_base_dir
3185 : "/tmp";
3187 if (chdir (base_dir) != 0)
3188 {
3189 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3190 goto error;
3191 }
3193 install_signal_handlers();
3195 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3196 RRDD_LOG(LOG_INFO, "starting up");
3198 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3199 (GDestroyNotify) free_cache_item);
3200 if (cache_tree == NULL)
3201 {
3202 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3203 goto error;
3204 }
3206 return write_pidfile (pid_fd);
3208 error:
3209 remove_pidfile();
3210 return -1;
3211 } /* }}} int daemonize */
3213 static int cleanup (void) /* {{{ */
3214 {
3215 pthread_cond_broadcast (&flush_cond);
3216 pthread_join (flush_thread, NULL);
3218 pthread_cond_broadcast (&queue_cond);
3219 for (int i = 0; i < config_queue_threads; i++)
3220 pthread_join (queue_threads[i], NULL);
3222 if (config_flush_at_shutdown)
3223 {
3224 assert(cache_queue_head == NULL);
3225 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3226 }
3228 free(queue_threads);
3229 free(config_base_dir);
3231 pthread_mutex_lock(&cache_lock);
3232 g_tree_destroy(cache_tree);
3234 pthread_mutex_lock(&journal_lock);
3235 journal_done();
3237 RRDD_LOG(LOG_INFO, "goodbye");
3238 closelog ();
3240 remove_pidfile ();
3241 free(config_pid_file);
3243 return (0);
3244 } /* }}} int cleanup */
3246 static int read_options (int argc, char **argv) /* {{{ */
3247 {
3248 int option;
3249 int status = 0;
3251 socket_permission_clear (&default_socket);
3253 default_socket.socket_group = (gid_t)-1;
3254 default_socket.socket_permissions = (mode_t)-1;
3256 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3257 {
3258 switch (option)
3259 {
3260 case 'O':
3261 opt_no_overwrite = 1;
3262 break;
3264 case 'g':
3265 stay_foreground=1;
3266 break;
3268 case 'l':
3269 {
3270 listen_socket_t *new;
3272 new = malloc(sizeof(listen_socket_t));
3273 if (new == NULL)
3274 {
3275 fprintf(stderr, "read_options: malloc failed.\n");
3276 return(2);
3277 }
3278 memset(new, 0, sizeof(listen_socket_t));
3280 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3282 /* Add permissions to the socket {{{ */
3283 if (default_socket.permissions != 0)
3284 {
3285 socket_permission_copy (new, &default_socket);
3286 }
3287 else /* if (default_socket.permissions == 0) */
3288 {
3289 /* Add permission for ALL commands to the socket. */
3290 socket_permission_set_all (new);
3291 }
3292 /* }}} Done adding permissions. */
3294 new->socket_group = default_socket.socket_group;
3295 new->socket_permissions = default_socket.socket_permissions;
3297 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3298 &config_listen_address_list_len, new))
3299 {
3300 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3301 return (2);
3302 }
3303 }
3304 break;
3306 /* set socket group permissions */
3307 case 's':
3308 {
3309 gid_t group_gid;
3310 struct group *grp;
3312 group_gid = strtoul(optarg, NULL, 10);
3313 if (errno != EINVAL && group_gid>0)
3314 {
3315 /* we were passed a number */
3316 grp = getgrgid(group_gid);
3317 }
3318 else
3319 {
3320 grp = getgrnam(optarg);
3321 }
3323 if (grp)
3324 {
3325 default_socket.socket_group = grp->gr_gid;
3326 }
3327 else
3328 {
3329 /* no idea what the user wanted... */
3330 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3331 return (5);
3332 }
3333 }
3334 break;
3336 /* set socket file permissions */
3337 case 'm':
3338 {
3339 long tmp;
3340 char *endptr = NULL;
3342 tmp = strtol (optarg, &endptr, 8);
3343 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3344 || (tmp > 07777) || (tmp < 0)) {
3345 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3346 optarg);
3347 return (5);
3348 }
3350 default_socket.socket_permissions = (mode_t)tmp;
3351 }
3352 break;
3354 case 'P':
3355 {
3356 char *optcopy;
3357 char *saveptr;
3358 char *dummy;
3359 char *ptr;
3361 socket_permission_clear (&default_socket);
3363 optcopy = strdup (optarg);
3364 dummy = optcopy;
3365 saveptr = NULL;
3366 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3367 {
3368 dummy = NULL;
3369 status = socket_permission_add (&default_socket, ptr);
3370 if (status != 0)
3371 {
3372 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3373 "socket failed. Most likely, this permission doesn't "
3374 "exist. Check your command line.\n", ptr);
3375 status = 4;
3376 }
3377 }
3379 free (optcopy);
3380 }
3381 break;
3383 case 'f':
3384 {
3385 int temp;
3387 temp = atoi (optarg);
3388 if (temp > 0)
3389 config_flush_interval = temp;
3390 else
3391 {
3392 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3393 status = 3;
3394 }
3395 }
3396 break;
3398 case 'w':
3399 {
3400 int temp;
3402 temp = atoi (optarg);
3403 if (temp > 0)
3404 config_write_interval = temp;
3405 else
3406 {
3407 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3408 status = 2;
3409 }
3410 }
3411 break;
3413 case 'z':
3414 {
3415 int temp;
3417 temp = atoi(optarg);
3418 if (temp > 0)
3419 config_write_jitter = temp;
3420 else
3421 {
3422 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3423 status = 2;
3424 }
3426 break;
3427 }
3429 case 't':
3430 {
3431 int threads;
3432 threads = atoi(optarg);
3433 if (threads >= 1)
3434 config_queue_threads = threads;
3435 else
3436 {
3437 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3438 return 1;
3439 }
3440 }
3441 break;
3443 case 'B':
3444 config_write_base_only = 1;
3445 break;
3447 case 'b':
3448 {
3449 size_t len;
3450 char base_realpath[PATH_MAX];
3452 if (config_base_dir != NULL)
3453 free (config_base_dir);
3454 config_base_dir = strdup (optarg);
3455 if (config_base_dir == NULL)
3456 {
3457 fprintf (stderr, "read_options: strdup failed.\n");
3458 return (3);
3459 }
3461 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3462 {
3463 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3464 config_base_dir, rrd_strerror (errno));
3465 return (3);
3466 }
3468 /* make sure that the base directory is not resolved via
3469 * symbolic links. this makes some performance-enhancing
3470 * assumptions possible (we don't have to resolve paths
3471 * that start with a "/")
3472 */
3473 if (realpath(config_base_dir, base_realpath) == NULL)
3474 {
3475 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3476 "%s\n", config_base_dir, rrd_strerror(errno));
3477 return 5;
3478 }
3480 len = strlen (config_base_dir);
3481 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3482 {
3483 config_base_dir[len - 1] = 0;
3484 len--;
3485 }
3487 if (len < 1)
3488 {
3489 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3490 return (4);
3491 }
3493 _config_base_dir_len = len;
3495 len = strlen (base_realpath);
3496 while ((len > 0) && (base_realpath[len - 1] == '/'))
3497 {
3498 base_realpath[len - 1] = '\0';
3499 len--;
3500 }
3502 if (strncmp(config_base_dir,
3503 base_realpath, sizeof(base_realpath)) != 0)
3504 {
3505 fprintf(stderr,
3506 "Base directory (-b) resolved via file system links!\n"
3507 "Please consult rrdcached '-b' documentation!\n"
3508 "Consider specifying the real directory (%s)\n",
3509 base_realpath);
3510 return 5;
3511 }
3512 }
3513 break;
3515 case 'p':
3516 {
3517 if (config_pid_file != NULL)
3518 free (config_pid_file);
3519 config_pid_file = strdup (optarg);
3520 if (config_pid_file == NULL)
3521 {
3522 fprintf (stderr, "read_options: strdup failed.\n");
3523 return (3);
3524 }
3525 }
3526 break;
3528 case 'F':
3529 config_flush_at_shutdown = 1;
3530 break;
3532 case 'j':
3533 {
3534 char journal_dir_actual[PATH_MAX];
3535 journal_dir = realpath((const char *)optarg, journal_dir_actual);
3536 if (journal_dir)
3537 {
3538 // if we were able to properly resolve the path, lets have a copy
3539 // for use outside this block.
3540 journal_dir = strdup(journal_dir);
3541 status = rrd_mkdir_p(journal_dir, 0777);
3542 if (status != 0)
3543 {
3544 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3545 journal_dir, rrd_strerror(errno));
3546 return 6;
3547 }
3548 if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3549 {
3550 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3551 errno ? rrd_strerror(errno) : "");
3552 return 6;
3553 }
3554 } else {
3555 fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3556 errno ? rrd_strerror(errno) : "");
3557 return 6;
3558 }
3559 }
3560 break;
3562 case 'a':
3563 {
3564 int temp = atoi(optarg);
3565 if (temp > 0)
3566 config_alloc_chunk = temp;
3567 else
3568 {
3569 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3570 return 10;
3571 }
3572 }
3573 break;
3575 case 'h':
3576 case '?':
3577 printf ("RRDCacheD %s\n"
3578 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3579 "\n"
3580 "Usage: rrdcached [options]\n"
3581 "\n"
3582 "Valid options are:\n"
3583 " -l <address> Socket address to listen to.\n"
3584 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3585 " -P <perms> Sets the permissions to assign to all following "
3586 "sockets\n"
3587 " -w <seconds> Interval in which to write data.\n"
3588 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3589 " -t <threads> Number of write threads.\n"
3590 " -f <seconds> Interval in which to flush dead data.\n"
3591 " -p <file> Location of the PID-file.\n"
3592 " -b <dir> Base directory to change to.\n"
3593 " -B Restrict file access to paths within -b <dir>\n"
3594 " -g Do not fork and run in the foreground.\n"
3595 " -j <dir> Directory in which to create the journal files.\n"
3596 " -F Always flush all updates at shutdown\n"
3597 " -s <id|name> Group owner of all following UNIX sockets\n"
3598 " (the socket will also have read/write permissions "
3599 "for that group)\n"
3600 " -m <mode> File permissions (octal) of all following UNIX "
3601 "sockets\n"
3602 " -a <size> Memory allocation chunk size. Default is 1.\n"
3603 " -O Do not allow CREATE commands to overwrite existing\n"
3604 " files, even if asked to.\n"
3605 "\n"
3606 "For more information and a detailed description of all options "
3607 "please refer\n"
3608 "to the rrdcached(1) manual page.\n",
3609 VERSION);
3610 if (option == 'h')
3611 status = -1;
3612 else
3613 status = 1;
3614 break;
3615 } /* switch (option) */
3616 } /* while (getopt) */
3618 /* advise the user when values are not sane */
3619 if (config_flush_interval < 2 * config_write_interval)
3620 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3621 " 2x write interval (-w) !\n");
3622 if (config_write_jitter > config_write_interval)
3623 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3624 " write interval (-w) !\n");
3626 if (config_write_base_only && config_base_dir == NULL)
3627 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3628 " Consult the rrdcached documentation\n");
3630 if (journal_dir == NULL)
3631 config_flush_at_shutdown = 1;
3633 return (status);
3634 } /* }}} int read_options */
3636 int main (int argc, char **argv)
3637 {
3638 int status;
3640 status = read_options (argc, argv);
3641 if (status != 0)
3642 {
3643 if (status < 0)
3644 status = 0;
3645 return (status);
3646 }
3648 status = daemonize ();
3649 if (status != 0)
3650 {
3651 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3652 return (1);
3653 }
3655 journal_init();
3657 /* start the queue threads */
3658 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3659 if (queue_threads == NULL)
3660 {
3661 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3662 cleanup();
3663 return (1);
3664 }
3665 for (int i = 0; i < config_queue_threads; i++)
3666 {
3667 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3668 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3669 if (status != 0)
3670 {
3671 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3672 cleanup();
3673 return (1);
3674 }
3675 }
3677 /* start the flush thread */
3678 memset(&flush_thread, 0, sizeof(flush_thread));
3679 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3680 if (status != 0)
3681 {
3682 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3683 cleanup();
3684 return (1);
3685 }
3687 listen_thread_main (NULL);
3688 cleanup ();
3690 return (0);
3691 } /* int main */
3693 /*
3694 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3695 */