1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116 do { \
117 if (stay_foreground) { \
118 fprintf(stderr, __VA_ARGS__); \
119 fprintf(stderr, "\n"); } \
120 syslog ((severity), __VA_ARGS__); \
121 } while (0)
123 /*
124 * Types
125 */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
129 {
130 int fd;
131 char addr[PATH_MAX + 1];
132 int family;
134 /* state for BATCH processing */
135 time_t batch_start;
136 int batch_cmd;
138 /* buffered IO */
139 char *rbuf;
140 off_t next_cmd;
141 off_t next_read;
143 char *wbuf;
144 ssize_t wbuf_len;
146 uint32_t permissions;
148 gid_t socket_group;
149 mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
157 time_t UNUSED(now),\
158 char UNUSED(*buffer),\
159 size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO command_t UNUSED(*cmd),\
162 DISPATCH_PROTO
164 struct command_s {
165 char *cmd;
166 int (*handler)(HANDLER_PROTO);
168 char context; /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT (1<<0)
170 #define CMD_CONTEXT_BATCH (1<<1)
171 #define CMD_CONTEXT_JOURNAL (1<<2)
172 #define CMD_CONTEXT_ANY (0x7f)
174 char *syntax;
175 char *help;
176 };
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182 char *file;
183 char **values;
184 size_t values_num; /* number of valid pointers */
185 size_t values_alloc; /* number of allocated pointers */
186 time_t last_flush_time;
187 time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
190 int flags;
191 pthread_cond_t flushed;
192 cache_item_t *prev;
193 cache_item_t *next;
194 };
196 struct callback_flush_data_s
197 {
198 time_t now;
199 time_t abs_timeout;
200 char **keys;
201 size_t keys_num;
202 };
203 typedef struct callback_flush_data_s callback_flush_data_t;
205 enum queue_side_e
206 {
207 HEAD,
208 TAIL
209 };
210 typedef enum queue_side_e queue_side_t;
212 /* describe a set of journal files */
213 typedef struct {
214 char **files;
215 size_t files_num;
216 } journal_set;
218 /* max length of socket command or response */
219 #define CMD_MAX 4096
220 #define RBUF_SIZE (CMD_MAX*2)
222 /*
223 * Variables
224 */
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
231 static listen_socket_t default_socket;
233 enum {
234 RUNNING, /* normal operation */
235 FLUSHING, /* flushing remaining values */
236 SHUTDOWN /* shutting down */
237 } state = RUNNING;
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
250 /* Cache stuff */
251 static GTree *cache_tree = NULL;
252 static cache_item_t *cache_queue_head = NULL;
253 static cache_item_t *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
256 static int config_write_interval = 300;
257 static int config_write_jitter = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
278 static int opt_no_overwrite = 0; /* default for the daemon */
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL; /* current journal file handle */
287 static long journal_size = 0; /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
297 /*
298 * Functions
299 */
300 static void sig_common (const char *sig) /* {{{ */
301 {
302 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
303 state = FLUSHING;
304 pthread_cond_broadcast(&flush_cond);
305 pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
309 {
310 sig_common("INT");
311 } /* }}} void sig_int_handler */
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
314 {
315 sig_common("TERM");
316 } /* }}} void sig_term_handler */
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
319 {
320 config_flush_at_shutdown = 1;
321 sig_common("USR1");
322 } /* }}} void sig_usr1_handler */
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
325 {
326 config_flush_at_shutdown = 0;
327 sig_common("USR2");
328 } /* }}} void sig_usr2_handler */
330 static void install_signal_handlers(void) /* {{{ */
331 {
332 /* These structures are static, because `sigaction' behaves weird if the are
333 * overwritten.. */
334 static struct sigaction sa_int;
335 static struct sigaction sa_term;
336 static struct sigaction sa_pipe;
337 static struct sigaction sa_usr1;
338 static struct sigaction sa_usr2;
340 /* Install signal handlers */
341 memset (&sa_int, 0, sizeof (sa_int));
342 sa_int.sa_handler = sig_int_handler;
343 sigaction (SIGINT, &sa_int, NULL);
345 memset (&sa_term, 0, sizeof (sa_term));
346 sa_term.sa_handler = sig_term_handler;
347 sigaction (SIGTERM, &sa_term, NULL);
349 memset (&sa_pipe, 0, sizeof (sa_pipe));
350 sa_pipe.sa_handler = SIG_IGN;
351 sigaction (SIGPIPE, &sa_pipe, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_usr1));
354 sa_usr1.sa_handler = sig_usr1_handler;
355 sigaction (SIGUSR1, &sa_usr1, NULL);
357 memset (&sa_usr2, 0, sizeof (sa_usr2));
358 sa_usr2.sa_handler = sig_usr2_handler;
359 sigaction (SIGUSR2, &sa_usr2, NULL);
361 } /* }}} void install_signal_handlers */
363 static int open_pidfile(char *action, int oflag) /* {{{ */
364 {
365 int fd;
366 const char *file;
367 char *file_copy, *dir;
369 file = (config_pid_file != NULL)
370 ? config_pid_file
371 : LOCALSTATEDIR "/run/rrdcached.pid";
373 /* dirname may modify its argument */
374 file_copy = strdup(file);
375 if (file_copy == NULL)
376 {
377 fprintf(stderr, "rrdcached: strdup(): %s\n",
378 rrd_strerror(errno));
379 return -1;
380 }
382 dir = dirname(file_copy);
383 if (rrd_mkdir_p(dir, 0777) != 0)
384 {
385 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386 dir, rrd_strerror(errno));
387 return -1;
388 }
390 free(file_copy);
392 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
393 if (fd < 0)
394 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395 action, file, rrd_strerror(errno));
397 return(fd);
398 } /* }}} static int open_pidfile */
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
402 {
403 int pid_fd;
404 pid_t pid;
405 char pid_str[16];
407 pid_fd = open_pidfile("open", O_RDWR);
408 if (pid_fd < 0)
409 return pid_fd;
411 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
412 return -1;
414 pid = atoi(pid_str);
415 if (pid <= 0)
416 return -1;
418 /* another running process that we can signal COULD be
419 * a competing rrdcached */
420 if (pid != getpid() && kill(pid, 0) == 0)
421 {
422 fprintf(stderr,
423 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
424 close(pid_fd);
425 return -1;
426 }
428 lseek(pid_fd, 0, SEEK_SET);
429 if (ftruncate(pid_fd, 0) == -1)
430 {
431 fprintf(stderr,
432 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
433 close(pid_fd);
434 return -1;
435 }
437 fprintf(stderr,
438 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439 "rrdcached: starting normally.\n", pid);
441 return pid_fd;
442 } /* }}} static int check_pidfile */
444 static int write_pidfile (int fd) /* {{{ */
445 {
446 pid_t pid;
447 FILE *fh;
449 pid = getpid ();
451 fh = fdopen (fd, "w");
452 if (fh == NULL)
453 {
454 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
455 close(fd);
456 return (-1);
457 }
459 fprintf (fh, "%i\n", (int) pid);
460 fclose (fh);
462 return (0);
463 } /* }}} int write_pidfile */
465 static int remove_pidfile (void) /* {{{ */
466 {
467 char *file;
468 int status;
470 file = (config_pid_file != NULL)
471 ? config_pid_file
472 : LOCALSTATEDIR "/run/rrdcached.pid";
474 status = unlink (file);
475 if (status == 0)
476 return (0);
477 return (errno);
478 } /* }}} int remove_pidfile */
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
481 {
482 char *eol;
484 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485 sock->next_read - sock->next_cmd);
487 if (eol == NULL)
488 {
489 /* no commands left, move remainder back to front of rbuf */
490 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491 sock->next_read - sock->next_cmd);
492 sock->next_read -= sock->next_cmd;
493 sock->next_cmd = 0;
494 *len = 0;
495 return NULL;
496 }
497 else
498 {
499 char *cmd = sock->rbuf + sock->next_cmd;
500 *eol = '\0';
502 sock->next_cmd = eol - sock->rbuf + 1;
504 if (eol > sock->rbuf && *(eol-1) == '\r')
505 *(--eol) = '\0'; /* handle "\r\n" EOL */
507 *len = eol - cmd;
509 return cmd;
510 }
512 /* NOTREACHED */
513 assert(1==0);
514 } /* }}} char *next_cmd */
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
518 {
519 char *new_buf;
521 assert(sock != NULL);
523 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
524 if (new_buf == NULL)
525 {
526 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
527 return -1;
528 }
530 strncpy(new_buf + sock->wbuf_len, str, len + 1);
532 sock->wbuf = new_buf;
533 sock->wbuf_len += len;
535 return 0;
536 } /* }}} static int add_to_wbuf */
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
540 {
541 va_list argp;
542 char buffer[CMD_MAX];
543 int len;
545 if (JOURNAL_REPLAY(sock)) return 0;
546 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
548 va_start(argp, fmt);
549 #ifdef HAVE_VSNPRINTF
550 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
551 #else
552 len = vsprintf(buffer, fmt, argp);
553 #endif
554 va_end(argp);
555 if (len < 0)
556 {
557 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
558 return -1;
559 }
561 return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
564 static int count_lines(char *str) /* {{{ */
565 {
566 int lines = 0;
568 if (str != NULL)
569 {
570 while ((str = strchr(str, '\n')) != NULL)
571 {
572 ++lines;
573 ++str;
574 }
575 }
577 return lines;
578 } /* }}} static int count_lines */
580 /* send the response back to the user.
581 * returns 0 on success, -1 on error
582 * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584 char *fmt, ...) /* {{{ */
585 {
586 va_list argp;
587 char buffer[CMD_MAX];
588 int lines;
589 ssize_t wrote;
590 int rclen, len;
592 if (JOURNAL_REPLAY(sock)) return rc;
594 if (sock->batch_start)
595 {
596 if (rc == RESP_OK)
597 return rc; /* no response on success during BATCH */
598 lines = sock->batch_cmd;
599 }
600 else if (rc == RESP_OK)
601 lines = count_lines(sock->wbuf);
602 else
603 lines = -1;
605 rclen = sprintf(buffer, "%d ", lines);
606 va_start(argp, fmt);
607 #ifdef HAVE_VSNPRINTF
608 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
609 #else
610 len = vsprintf(buffer+rclen, fmt, argp);
611 #endif
612 va_end(argp);
613 if (len < 0)
614 return -1;
616 len += rclen;
618 /* append the result to the wbuf, don't write to the user */
619 if (sock->batch_start)
620 return add_to_wbuf(sock, buffer, len);
622 /* first write must be complete */
623 if (len != write(sock->fd, buffer, len))
624 {
625 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
626 return -1;
627 }
629 if (sock->wbuf != NULL && rc == RESP_OK)
630 {
631 wrote = 0;
632 while (wrote < sock->wbuf_len)
633 {
634 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
635 if (wb <= 0)
636 {
637 RRDD_LOG(LOG_INFO, "send_response: could not write results");
638 return -1;
639 }
640 wrote += wb;
641 }
642 }
644 free(sock->wbuf); sock->wbuf = NULL;
645 sock->wbuf_len = 0;
647 return 0;
648 } /* }}} */
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
651 {
652 ci->values = NULL;
653 ci->values_num = 0;
654 ci->values_alloc = 0;
656 ci->last_flush_time = when;
657 if (config_write_jitter > 0)
658 ci->last_flush_time += (rrd_random() % config_write_jitter);
659 }
661 /* remove_from_queue
662 * remove a "cache_item_t" item from the queue.
663 * must hold 'cache_lock' when calling this
664 */
665 static void remove_from_queue(cache_item_t *ci) /* {{{ */
666 {
667 if (ci == NULL) return;
668 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
670 if (ci->prev == NULL)
671 cache_queue_head = ci->next; /* reset head */
672 else
673 ci->prev->next = ci->next;
675 if (ci->next == NULL)
676 cache_queue_tail = ci->prev; /* reset the tail */
677 else
678 ci->next->prev = ci->prev;
680 ci->next = ci->prev = NULL;
681 ci->flags &= ~CI_FLAGS_IN_QUEUE;
683 pthread_mutex_lock (&stats_lock);
684 assert (stats_queue_length > 0);
685 stats_queue_length--;
686 pthread_mutex_unlock (&stats_lock);
688 } /* }}} static void remove_from_queue */
690 /* free the resources associated with the cache_item_t
691 * must hold cache_lock when calling this function
692 */
693 static void *free_cache_item(cache_item_t *ci) /* {{{ */
694 {
695 if (ci == NULL) return NULL;
697 remove_from_queue(ci);
699 for (size_t i=0; i < ci->values_num; i++)
700 free(ci->values[i]);
702 free (ci->values);
703 free (ci->file);
705 /* in case anyone is waiting */
706 pthread_cond_broadcast(&ci->flushed);
707 pthread_cond_destroy(&ci->flushed);
709 free (ci);
711 return NULL;
712 } /* }}} static void *free_cache_item */
714 /*
715 * enqueue_cache_item:
716 * `cache_lock' must be acquired before calling this function!
717 */
718 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
719 queue_side_t side)
720 {
721 if (ci == NULL)
722 return (-1);
724 if (ci->values_num == 0)
725 return (0);
727 if (side == HEAD)
728 {
729 if (cache_queue_head == ci)
730 return 0;
732 /* remove if further down in queue */
733 remove_from_queue(ci);
735 ci->prev = NULL;
736 ci->next = cache_queue_head;
737 if (ci->next != NULL)
738 ci->next->prev = ci;
739 cache_queue_head = ci;
741 if (cache_queue_tail == NULL)
742 cache_queue_tail = cache_queue_head;
743 }
744 else /* (side == TAIL) */
745 {
746 /* We don't move values back in the list.. */
747 if (ci->flags & CI_FLAGS_IN_QUEUE)
748 return (0);
750 assert (ci->next == NULL);
751 assert (ci->prev == NULL);
753 ci->prev = cache_queue_tail;
755 if (cache_queue_tail == NULL)
756 cache_queue_head = ci;
757 else
758 cache_queue_tail->next = ci;
760 cache_queue_tail = ci;
761 }
763 ci->flags |= CI_FLAGS_IN_QUEUE;
765 pthread_cond_signal(&queue_cond);
766 pthread_mutex_lock (&stats_lock);
767 stats_queue_length++;
768 pthread_mutex_unlock (&stats_lock);
770 return (0);
771 } /* }}} int enqueue_cache_item */
773 /*
774 * tree_callback_flush:
775 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
776 * while this is in progress.
777 */
778 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
779 gpointer data)
780 {
781 cache_item_t *ci;
782 callback_flush_data_t *cfd;
784 ci = (cache_item_t *) value;
785 cfd = (callback_flush_data_t *) data;
787 if (ci->flags & CI_FLAGS_IN_QUEUE)
788 return FALSE;
790 if (ci->values_num > 0
791 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
792 {
793 enqueue_cache_item (ci, TAIL);
794 }
795 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
796 && (ci->values_num <= 0))
797 {
798 assert ((char *) key == ci->file);
799 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
800 {
801 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
802 return (FALSE);
803 }
804 }
806 return (FALSE);
807 } /* }}} gboolean tree_callback_flush */
809 static int flush_old_values (int max_age)
810 {
811 callback_flush_data_t cfd;
812 size_t k;
814 memset (&cfd, 0, sizeof (cfd));
815 /* Pass the current time as user data so that we don't need to call
816 * `time' for each node. */
817 cfd.now = time (NULL);
818 cfd.keys = NULL;
819 cfd.keys_num = 0;
821 if (max_age > 0)
822 cfd.abs_timeout = cfd.now - max_age;
823 else
824 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
826 /* `tree_callback_flush' will return the keys of all values that haven't
827 * been touched in the last `config_flush_interval' seconds in `cfd'.
828 * The char*'s in this array point to the same memory as ci->file, so we
829 * don't need to free them separately. */
830 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
832 for (k = 0; k < cfd.keys_num; k++)
833 {
834 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
835 /* should never fail, since we have held the cache_lock
836 * the entire time */
837 assert(status == TRUE);
838 }
840 if (cfd.keys != NULL)
841 {
842 free (cfd.keys);
843 cfd.keys = NULL;
844 }
846 return (0);
847 } /* int flush_old_values */
849 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
850 {
851 struct timeval now;
852 struct timespec next_flush;
853 int status;
855 gettimeofday (&now, NULL);
856 next_flush.tv_sec = now.tv_sec + config_flush_interval;
857 next_flush.tv_nsec = 1000 * now.tv_usec;
859 pthread_mutex_lock(&cache_lock);
861 while (state == RUNNING)
862 {
863 gettimeofday (&now, NULL);
864 if ((now.tv_sec > next_flush.tv_sec)
865 || ((now.tv_sec == next_flush.tv_sec)
866 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
867 {
868 RRDD_LOG(LOG_DEBUG, "flushing old values");
870 /* Determine the time of the next cache flush. */
871 next_flush.tv_sec = now.tv_sec + config_flush_interval;
873 /* Flush all values that haven't been written in the last
874 * `config_write_interval' seconds. */
875 flush_old_values (config_write_interval);
877 /* unlock the cache while we rotate so we don't block incoming
878 * updates if the fsync() blocks on disk I/O */
879 pthread_mutex_unlock(&cache_lock);
880 journal_rotate();
881 pthread_mutex_lock(&cache_lock);
882 }
884 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
885 if (status != 0 && status != ETIMEDOUT)
886 {
887 RRDD_LOG (LOG_ERR, "flush_thread_main: "
888 "pthread_cond_timedwait returned %i.", status);
889 }
890 }
892 if (config_flush_at_shutdown)
893 flush_old_values (-1); /* flush everything */
895 state = SHUTDOWN;
897 pthread_mutex_unlock(&cache_lock);
899 return NULL;
900 } /* void *flush_thread_main */
902 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
903 {
904 pthread_mutex_lock (&cache_lock);
906 while (state != SHUTDOWN
907 || (cache_queue_head != NULL && config_flush_at_shutdown))
908 {
909 cache_item_t *ci;
910 char *file;
911 char **values;
912 size_t values_num;
913 int status;
915 /* Now, check if there's something to store away. If not, wait until
916 * something comes in. */
917 if (cache_queue_head == NULL)
918 {
919 status = pthread_cond_wait (&queue_cond, &cache_lock);
920 if ((status != 0) && (status != ETIMEDOUT))
921 {
922 RRDD_LOG (LOG_ERR, "queue_thread_main: "
923 "pthread_cond_wait returned %i.", status);
924 }
925 }
927 /* Check if a value has arrived. This may be NULL if we timed out or there
928 * was an interrupt such as a signal. */
929 if (cache_queue_head == NULL)
930 continue;
932 ci = cache_queue_head;
934 /* copy the relevant parts */
935 file = strdup (ci->file);
936 if (file == NULL)
937 {
938 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
939 continue;
940 }
942 assert(ci->values != NULL);
943 assert(ci->values_num > 0);
945 values = ci->values;
946 values_num = ci->values_num;
948 wipe_ci_values(ci, time(NULL));
949 remove_from_queue(ci);
951 pthread_mutex_unlock (&cache_lock);
953 rrd_clear_error ();
954 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
955 if (status != 0)
956 {
957 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
958 "rrd_update_r (%s) failed with status %i. (%s)",
959 file, status, rrd_get_error());
960 }
962 journal_write("wrote", file);
964 /* Search again in the tree. It's possible someone issued a "FORGET"
965 * while we were writing the update values. */
966 pthread_mutex_lock(&cache_lock);
967 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
968 if (ci)
969 pthread_cond_broadcast(&ci->flushed);
970 pthread_mutex_unlock(&cache_lock);
972 if (status == 0)
973 {
974 pthread_mutex_lock (&stats_lock);
975 stats_updates_written++;
976 stats_data_sets_written += values_num;
977 pthread_mutex_unlock (&stats_lock);
978 }
980 rrd_free_ptrs((void ***) &values, &values_num);
981 free(file);
983 pthread_mutex_lock (&cache_lock);
984 }
985 pthread_mutex_unlock (&cache_lock);
987 return (NULL);
988 } /* }}} void *queue_thread_main */
990 static int buffer_get_field (char **buffer_ret, /* {{{ */
991 size_t *buffer_size_ret, char **field_ret)
992 {
993 char *buffer;
994 size_t buffer_pos;
995 size_t buffer_size;
996 char *field;
997 size_t field_size;
998 int status;
1000 buffer = *buffer_ret;
1001 buffer_pos = 0;
1002 buffer_size = *buffer_size_ret;
1003 field = *buffer_ret;
1004 field_size = 0;
1006 if (buffer_size <= 0)
1007 return (-1);
1009 /* This is ensured by `handle_request'. */
1010 assert (buffer[buffer_size - 1] == '\0');
1012 status = -1;
1013 while (buffer_pos < buffer_size)
1014 {
1015 /* Check for end-of-field or end-of-buffer */
1016 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1017 {
1018 field[field_size] = 0;
1019 field_size++;
1020 buffer_pos++;
1021 status = 0;
1022 break;
1023 }
1024 /* Handle escaped characters. */
1025 else if (buffer[buffer_pos] == '\\')
1026 {
1027 if (buffer_pos >= (buffer_size - 1))
1028 break;
1029 buffer_pos++;
1030 field[field_size] = buffer[buffer_pos];
1031 field_size++;
1032 buffer_pos++;
1033 }
1034 /* Normal operation */
1035 else
1036 {
1037 field[field_size] = buffer[buffer_pos];
1038 field_size++;
1039 buffer_pos++;
1040 }
1041 } /* while (buffer_pos < buffer_size) */
1043 if (status != 0)
1044 return (status);
1046 *buffer_ret = buffer + buffer_pos;
1047 *buffer_size_ret = buffer_size - buffer_pos;
1048 *field_ret = field;
1050 return (0);
1051 } /* }}} int buffer_get_field */
1053 /* if we're restricting writes to the base directory,
1054 * check whether the file falls within the dir
1055 * returns 1 if OK, otherwise 0
1056 */
1057 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1058 {
1059 assert(file != NULL);
1061 if (!config_write_base_only
1062 || JOURNAL_REPLAY(sock)
1063 || config_base_dir == NULL)
1064 return 1;
1066 if (strstr(file, "../") != NULL) goto err;
1068 /* relative paths without "../" are ok */
1069 if (*file != '/') return 1;
1071 /* file must be of the format base + "/" + <1+ char filename> */
1072 if (strlen(file) < _config_base_dir_len + 2) goto err;
1073 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1074 if (*(file + _config_base_dir_len) != '/') goto err;
1076 return 1;
1078 err:
1079 if (sock != NULL && sock->fd >= 0)
1080 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1082 return 0;
1083 } /* }}} static int check_file_access */
1085 /* when using a base dir, convert relative paths to absolute paths.
1086 * if necessary, modifies the "filename" pointer to point
1087 * to the new path created in "tmp". "tmp" is provided
1088 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1089 *
1090 * this allows us to optimize for the expected case (absolute path)
1091 * with a no-op.
1092 */
1093 static void get_abs_path(char **filename, char *tmp)
1094 {
1095 assert(tmp != NULL);
1096 assert(filename != NULL && *filename != NULL);
1098 if (config_base_dir == NULL || **filename == '/')
1099 return;
1101 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1102 *filename = tmp;
1103 } /* }}} static int get_abs_path */
1105 static int flush_file (const char *filename) /* {{{ */
1106 {
1107 cache_item_t *ci;
1109 pthread_mutex_lock (&cache_lock);
1111 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1112 if (ci == NULL)
1113 {
1114 pthread_mutex_unlock (&cache_lock);
1115 return (ENOENT);
1116 }
1118 if (ci->values_num > 0)
1119 {
1120 /* Enqueue at head */
1121 enqueue_cache_item (ci, HEAD);
1122 pthread_cond_wait(&ci->flushed, &cache_lock);
1123 }
1125 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1126 * may have been purged during our cond_wait() */
1128 pthread_mutex_unlock(&cache_lock);
1130 return (0);
1131 } /* }}} int flush_file */
1133 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1134 {
1135 char *err = "Syntax error.\n";
1137 if (cmd && cmd->syntax)
1138 err = cmd->syntax;
1140 return send_response(sock, RESP_ERR, "Usage: %s", err);
1141 } /* }}} static int syntax_error() */
1143 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1144 {
1145 uint64_t copy_queue_length;
1146 uint64_t copy_updates_received;
1147 uint64_t copy_flush_received;
1148 uint64_t copy_updates_written;
1149 uint64_t copy_data_sets_written;
1150 uint64_t copy_journal_bytes;
1151 uint64_t copy_journal_rotate;
1153 uint64_t tree_nodes_number;
1154 uint64_t tree_depth;
1156 pthread_mutex_lock (&stats_lock);
1157 copy_queue_length = stats_queue_length;
1158 copy_updates_received = stats_updates_received;
1159 copy_flush_received = stats_flush_received;
1160 copy_updates_written = stats_updates_written;
1161 copy_data_sets_written = stats_data_sets_written;
1162 copy_journal_bytes = stats_journal_bytes;
1163 copy_journal_rotate = stats_journal_rotate;
1164 pthread_mutex_unlock (&stats_lock);
1166 pthread_mutex_lock (&cache_lock);
1167 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1168 tree_depth = (uint64_t) g_tree_height (cache_tree);
1169 pthread_mutex_unlock (&cache_lock);
1171 add_response_info(sock,
1172 "QueueLength: %"PRIu64"\n", copy_queue_length);
1173 add_response_info(sock,
1174 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1175 add_response_info(sock,
1176 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1177 add_response_info(sock,
1178 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1179 add_response_info(sock,
1180 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1181 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1182 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1183 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1184 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1186 send_response(sock, RESP_OK, "Statistics follow\n");
1188 return (0);
1189 } /* }}} int handle_request_stats */
1191 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1192 {
1193 char *file, file_tmp[PATH_MAX];
1194 int status;
1196 status = buffer_get_field (&buffer, &buffer_size, &file);
1197 if (status != 0)
1198 {
1199 return syntax_error(sock,cmd);
1200 }
1201 else
1202 {
1203 pthread_mutex_lock(&stats_lock);
1204 stats_flush_received++;
1205 pthread_mutex_unlock(&stats_lock);
1207 get_abs_path(&file, file_tmp);
1208 if (!check_file_access(file, sock)) return 0;
1210 status = flush_file (file);
1211 if (status == 0)
1212 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1213 else if (status == ENOENT)
1214 {
1215 /* no file in our tree; see whether it exists at all */
1216 struct stat statbuf;
1218 memset(&statbuf, 0, sizeof(statbuf));
1219 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1220 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1221 else
1222 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1223 }
1224 else if (status < 0)
1225 return send_response(sock, RESP_ERR, "Internal error.\n");
1226 else
1227 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1228 }
1230 /* NOTREACHED */
1231 assert(1==0);
1232 } /* }}} int handle_request_flush */
1234 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1235 {
1236 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1238 pthread_mutex_lock(&cache_lock);
1239 flush_old_values(-1);
1240 pthread_mutex_unlock(&cache_lock);
1242 return send_response(sock, RESP_OK, "Started flush.\n");
1243 } /* }}} static int handle_request_flushall */
1245 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1246 {
1247 int status;
1248 char *file, file_tmp[PATH_MAX];
1249 cache_item_t *ci;
1251 status = buffer_get_field(&buffer, &buffer_size, &file);
1252 if (status != 0)
1253 return syntax_error(sock,cmd);
1255 get_abs_path(&file, file_tmp);
1257 pthread_mutex_lock(&cache_lock);
1258 ci = g_tree_lookup(cache_tree, file);
1259 if (ci == NULL)
1260 {
1261 pthread_mutex_unlock(&cache_lock);
1262 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1263 }
1265 for (size_t i=0; i < ci->values_num; i++)
1266 add_response_info(sock, "%s\n", ci->values[i]);
1268 pthread_mutex_unlock(&cache_lock);
1269 return send_response(sock, RESP_OK, "updates pending\n");
1270 } /* }}} static int handle_request_pending */
1272 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1273 {
1274 int status;
1275 gboolean found;
1276 char *file, file_tmp[PATH_MAX];
1278 status = buffer_get_field(&buffer, &buffer_size, &file);
1279 if (status != 0)
1280 return syntax_error(sock,cmd);
1282 get_abs_path(&file, file_tmp);
1283 if (!check_file_access(file, sock)) return 0;
1285 pthread_mutex_lock(&cache_lock);
1286 found = g_tree_remove(cache_tree, file);
1287 pthread_mutex_unlock(&cache_lock);
1289 if (found == TRUE)
1290 {
1291 if (!JOURNAL_REPLAY(sock))
1292 journal_write("forget", file);
1294 return send_response(sock, RESP_OK, "Gone!\n");
1295 }
1296 else
1297 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1299 /* NOTREACHED */
1300 assert(1==0);
1301 } /* }}} static int handle_request_forget */
1303 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1304 {
1305 cache_item_t *ci;
1307 pthread_mutex_lock(&cache_lock);
1309 ci = cache_queue_head;
1310 while (ci != NULL)
1311 {
1312 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1313 ci = ci->next;
1314 }
1316 pthread_mutex_unlock(&cache_lock);
1318 return send_response(sock, RESP_OK, "in queue.\n");
1319 } /* }}} int handle_request_queue */
1321 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1322 {
1323 char *file, file_tmp[PATH_MAX];
1324 int values_num = 0;
1325 int status;
1326 char orig_buf[CMD_MAX];
1328 cache_item_t *ci;
1330 /* save it for the journal later */
1331 if (!JOURNAL_REPLAY(sock))
1332 strncpy(orig_buf, buffer, buffer_size);
1334 status = buffer_get_field (&buffer, &buffer_size, &file);
1335 if (status != 0)
1336 return syntax_error(sock,cmd);
1338 pthread_mutex_lock(&stats_lock);
1339 stats_updates_received++;
1340 pthread_mutex_unlock(&stats_lock);
1342 get_abs_path(&file, file_tmp);
1343 if (!check_file_access(file, sock)) return 0;
1345 pthread_mutex_lock (&cache_lock);
1346 ci = g_tree_lookup (cache_tree, file);
1348 if (ci == NULL) /* {{{ */
1349 {
1350 struct stat statbuf;
1351 cache_item_t *tmp;
1353 /* don't hold the lock while we setup; stat(2) might block */
1354 pthread_mutex_unlock(&cache_lock);
1356 memset (&statbuf, 0, sizeof (statbuf));
1357 status = stat (file, &statbuf);
1358 if (status != 0)
1359 {
1360 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1362 status = errno;
1363 if (status == ENOENT)
1364 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1365 else
1366 return send_response(sock, RESP_ERR,
1367 "stat failed with error %i.\n", status);
1368 }
1369 if (!S_ISREG (statbuf.st_mode))
1370 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1372 if (access(file, R_OK|W_OK) != 0)
1373 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1374 file, rrd_strerror(errno));
1376 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1377 if (ci == NULL)
1378 {
1379 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1381 return send_response(sock, RESP_ERR, "malloc failed.\n");
1382 }
1383 memset (ci, 0, sizeof (cache_item_t));
1385 ci->file = strdup (file);
1386 if (ci->file == NULL)
1387 {
1388 free (ci);
1389 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1391 return send_response(sock, RESP_ERR, "strdup failed.\n");
1392 }
1394 wipe_ci_values(ci, now);
1395 ci->flags = CI_FLAGS_IN_TREE;
1396 pthread_cond_init(&ci->flushed, NULL);
1398 pthread_mutex_lock(&cache_lock);
1400 /* another UPDATE might have added this entry in the meantime */
1401 tmp = g_tree_lookup (cache_tree, file);
1402 if (tmp == NULL)
1403 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1404 else
1405 {
1406 free_cache_item (ci);
1407 ci = tmp;
1408 }
1410 /* state may have changed while we were unlocked */
1411 if (state == SHUTDOWN)
1412 return -1;
1413 } /* }}} */
1414 assert (ci != NULL);
1416 /* don't re-write updates in replay mode */
1417 if (!JOURNAL_REPLAY(sock))
1418 journal_write("update", orig_buf);
1420 while (buffer_size > 0)
1421 {
1422 char *value;
1423 time_t stamp;
1424 char *eostamp;
1426 status = buffer_get_field (&buffer, &buffer_size, &value);
1427 if (status != 0)
1428 {
1429 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1430 break;
1431 }
1433 /* make sure update time is always moving forward */
1434 stamp = strtol(value, &eostamp, 10);
1435 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1436 {
1437 pthread_mutex_unlock(&cache_lock);
1438 return send_response(sock, RESP_ERR,
1439 "Cannot find timestamp in '%s'!\n", value);
1440 }
1441 else if (stamp <= ci->last_update_stamp)
1442 {
1443 pthread_mutex_unlock(&cache_lock);
1444 return send_response(sock, RESP_ERR,
1445 "illegal attempt to update using time %ld when last"
1446 " update time is %ld (minimum one second step)\n",
1447 stamp, ci->last_update_stamp);
1448 }
1449 else
1450 ci->last_update_stamp = stamp;
1452 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1453 &ci->values_alloc, config_alloc_chunk))
1454 {
1455 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1456 continue;
1457 }
1459 values_num++;
1460 }
1462 if (((now - ci->last_flush_time) >= config_write_interval)
1463 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1464 && (ci->values_num > 0))
1465 {
1466 enqueue_cache_item (ci, TAIL);
1467 }
1469 pthread_mutex_unlock (&cache_lock);
1471 if (values_num < 1)
1472 return send_response(sock, RESP_ERR, "No values updated.\n");
1473 else
1474 return send_response(sock, RESP_OK,
1475 "errors, enqueued %i value(s).\n", values_num);
1477 /* NOTREACHED */
1478 assert(1==0);
1480 } /* }}} int handle_request_update */
1482 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1483 {
1484 char *file, file_tmp[PATH_MAX];
1485 char *cf;
1487 char *start_str;
1488 char *end_str;
1489 time_t start_tm;
1490 time_t end_tm;
1492 unsigned long step;
1493 unsigned long ds_cnt;
1494 char **ds_namv;
1495 rrd_value_t *data;
1497 int status;
1498 unsigned long i;
1499 time_t t;
1500 rrd_value_t *data_ptr;
1502 file = NULL;
1503 cf = NULL;
1504 start_str = NULL;
1505 end_str = NULL;
1507 /* Read the arguments */
1508 do /* while (0) */
1509 {
1510 status = buffer_get_field (&buffer, &buffer_size, &file);
1511 if (status != 0)
1512 break;
1514 status = buffer_get_field (&buffer, &buffer_size, &cf);
1515 if (status != 0)
1516 break;
1518 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1519 if (status != 0)
1520 {
1521 start_str = NULL;
1522 status = 0;
1523 break;
1524 }
1526 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1527 if (status != 0)
1528 {
1529 end_str = NULL;
1530 status = 0;
1531 break;
1532 }
1533 } while (0);
1535 if (status != 0)
1536 return (syntax_error(sock,cmd));
1538 get_abs_path(&file, file_tmp);
1539 if (!check_file_access(file, sock)) return 0;
1541 status = flush_file (file);
1542 if ((status != 0) && (status != ENOENT))
1543 return (send_response (sock, RESP_ERR,
1544 "flush_file (%s) failed with status %i.\n", file, status));
1546 t = time (NULL); /* "now" */
1548 /* Parse start time */
1549 if (start_str != NULL)
1550 {
1551 char *endptr;
1552 long value;
1554 endptr = NULL;
1555 errno = 0;
1556 value = strtol (start_str, &endptr, /* base = */ 0);
1557 if ((endptr == start_str) || (errno != 0))
1558 return (send_response(sock, RESP_ERR,
1559 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1560 start_str));
1562 if (value > 0)
1563 start_tm = (time_t) value;
1564 else
1565 start_tm = (time_t) (t + value);
1566 }
1567 else
1568 {
1569 start_tm = t - 86400;
1570 }
1572 /* Parse end time */
1573 if (end_str != NULL)
1574 {
1575 char *endptr;
1576 long value;
1578 endptr = NULL;
1579 errno = 0;
1580 value = strtol (end_str, &endptr, /* base = */ 0);
1581 if ((endptr == end_str) || (errno != 0))
1582 return (send_response(sock, RESP_ERR,
1583 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1584 end_str));
1586 if (value > 0)
1587 end_tm = (time_t) value;
1588 else
1589 end_tm = (time_t) (t + value);
1590 }
1591 else
1592 {
1593 end_tm = t;
1594 }
1596 step = -1;
1597 ds_cnt = 0;
1598 ds_namv = NULL;
1599 data = NULL;
1601 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1602 &ds_cnt, &ds_namv, &data);
1603 if (status != 0)
1604 return (send_response(sock, RESP_ERR,
1605 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1607 add_response_info (sock, "FlushVersion: %lu\n", 1);
1608 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1609 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1610 add_response_info (sock, "Step: %lu\n", step);
1611 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1613 #define SSTRCAT(buffer,str,buffer_fill) do { \
1614 size_t str_len = strlen (str); \
1615 if ((buffer_fill + str_len) > sizeof (buffer)) \
1616 str_len = sizeof (buffer) - buffer_fill; \
1617 if (str_len > 0) { \
1618 strncpy (buffer + buffer_fill, str, str_len); \
1619 buffer_fill += str_len; \
1620 assert (buffer_fill <= sizeof (buffer)); \
1621 if (buffer_fill == sizeof (buffer)) \
1622 buffer[buffer_fill - 1] = 0; \
1623 else \
1624 buffer[buffer_fill] = 0; \
1625 } \
1626 } while (0)
1628 { /* Add list of DS names */
1629 char linebuf[1024];
1630 size_t linebuf_fill;
1632 memset (linebuf, 0, sizeof (linebuf));
1633 linebuf_fill = 0;
1634 for (i = 0; i < ds_cnt; i++)
1635 {
1636 if (i > 0)
1637 SSTRCAT (linebuf, " ", linebuf_fill);
1638 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1639 rrd_freemem(ds_namv[i]);
1640 }
1641 rrd_freemem(ds_namv);
1642 add_response_info (sock, "DSName: %s\n", linebuf);
1643 }
1645 /* Add the actual data */
1646 assert (step > 0);
1647 data_ptr = data;
1648 for (t = start_tm + step; t <= end_tm; t += step)
1649 {
1650 char linebuf[1024];
1651 size_t linebuf_fill;
1652 char tmp[128];
1654 memset (linebuf, 0, sizeof (linebuf));
1655 linebuf_fill = 0;
1656 for (i = 0; i < ds_cnt; i++)
1657 {
1658 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1659 tmp[sizeof (tmp) - 1] = 0;
1660 SSTRCAT (linebuf, tmp, linebuf_fill);
1662 data_ptr++;
1663 }
1665 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1666 } /* for (t) */
1667 rrd_freemem(data);
1669 return (send_response (sock, RESP_OK, "Success\n"));
1670 #undef SSTRCAT
1671 } /* }}} int handle_request_fetch */
1673 /* we came across a "WROTE" entry during journal replay.
1674 * throw away any values that we have accumulated for this file
1675 */
1676 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1677 {
1678 cache_item_t *ci;
1679 const char *file = buffer;
1681 pthread_mutex_lock(&cache_lock);
1683 ci = g_tree_lookup(cache_tree, file);
1684 if (ci == NULL)
1685 {
1686 pthread_mutex_unlock(&cache_lock);
1687 return (0);
1688 }
1690 if (ci->values)
1691 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1693 wipe_ci_values(ci, now);
1694 remove_from_queue(ci);
1696 pthread_mutex_unlock(&cache_lock);
1697 return (0);
1698 } /* }}} int handle_request_wrote */
1700 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1701 {
1702 char *file, file_tmp[PATH_MAX];
1703 int status;
1704 rrd_info_t *data;
1706 /* obtain filename */
1707 status = buffer_get_field(&buffer, &buffer_size, &file);
1708 if (status != 0)
1709 return syntax_error(sock,cmd);
1710 /* get full pathname */
1711 get_abs_path(&file, file_tmp);
1712 if (!check_file_access(file, sock)) {
1713 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1714 }
1715 /* get data */
1716 rrd_clear_error ();
1717 data = rrd_info_r(file);
1718 if(!data) {
1719 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1720 }
1721 while (data) {
1722 switch (data->type) {
1723 case RD_I_VAL:
1724 if (isnan(data->value.u_val))
1725 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1726 else
1727 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1728 break;
1729 case RD_I_CNT:
1730 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1731 break;
1732 case RD_I_INT:
1733 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1734 break;
1735 case RD_I_STR:
1736 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1737 break;
1738 case RD_I_BLO:
1739 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1740 break;
1741 }
1742 data = data->next;
1743 }
1744 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1745 } /* }}} static int handle_request_info */
1747 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1748 {
1749 char *i, *file, file_tmp[PATH_MAX];
1750 int status;
1751 int idx;
1752 time_t t;
1754 /* obtain filename */
1755 status = buffer_get_field(&buffer, &buffer_size, &file);
1756 if (status != 0)
1757 return syntax_error(sock,cmd);
1758 /* get full pathname */
1759 get_abs_path(&file, file_tmp);
1760 if (!check_file_access(file, sock)) {
1761 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1762 }
1764 status = buffer_get_field(&buffer, &buffer_size, &i);
1765 if (status != 0)
1766 return syntax_error(sock,cmd);
1767 idx = atoi(i);
1768 if(idx<0) {
1769 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1770 }
1772 /* get data */
1773 rrd_clear_error ();
1774 t = rrd_first_r(file,idx);
1775 if(t<1) {
1776 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1777 }
1778 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1779 } /* }}} static int handle_request_first */
1782 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1783 {
1784 char *file, file_tmp[PATH_MAX];
1785 int status;
1786 time_t t, from_file, step;
1787 rrd_file_t * rrd_file;
1788 cache_item_t * ci;
1789 rrd_t rrd;
1791 /* obtain filename */
1792 status = buffer_get_field(&buffer, &buffer_size, &file);
1793 if (status != 0)
1794 return syntax_error(sock,cmd);
1795 /* get full pathname */
1796 get_abs_path(&file, file_tmp);
1797 if (!check_file_access(file, sock)) {
1798 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1799 }
1800 rrd_clear_error();
1801 rrd_init(&rrd);
1802 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1803 if(!rrd_file) {
1804 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1805 }
1806 from_file = rrd.live_head->last_up;
1807 step = rrd.stat_head->pdp_step;
1808 rrd_close(rrd_file);
1809 pthread_mutex_lock(&cache_lock);
1810 ci = g_tree_lookup(cache_tree, file);
1811 if (ci)
1812 t = ci->last_update_stamp;
1813 else
1814 t = from_file;
1815 pthread_mutex_unlock(&cache_lock);
1816 t -= t % step;
1817 rrd_free(&rrd);
1818 if(t<1) {
1819 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1820 }
1821 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1822 } /* }}} static int handle_request_last */
1824 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1825 {
1826 char *file, file_tmp[PATH_MAX];
1827 char *tok;
1828 int ac = 0;
1829 char *av[128];
1830 int status;
1831 unsigned long step = 300;
1832 time_t last_up = time(NULL)-10;
1833 int no_overwrite = opt_no_overwrite;
1836 /* obtain filename */
1837 status = buffer_get_field(&buffer, &buffer_size, &file);
1838 if (status != 0)
1839 return syntax_error(sock,cmd);
1840 /* get full pathname */
1841 get_abs_path(&file, file_tmp);
1842 if (!check_file_access(file, sock)) {
1843 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1844 }
1845 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1847 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1848 if( ! strncmp(tok,"-b",2) ) {
1849 status = buffer_get_field(&buffer, &buffer_size, &tok );
1850 if (status != 0) return syntax_error(sock,cmd);
1851 last_up = (time_t) atol(tok);
1852 continue;
1853 }
1854 if( ! strncmp(tok,"-s",2) ) {
1855 status = buffer_get_field(&buffer, &buffer_size, &tok );
1856 if (status != 0) return syntax_error(sock,cmd);
1857 step = atol(tok);
1858 continue;
1859 }
1860 if( ! strncmp(tok,"-O",2) ) {
1861 no_overwrite = 1;
1862 continue;
1863 }
1864 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1865 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1866 return syntax_error(sock,cmd);
1867 }
1868 if(step<1) {
1869 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1870 }
1871 if (last_up < 3600 * 24 * 365 * 10) {
1872 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1873 }
1875 rrd_clear_error ();
1876 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1878 if(!status) {
1879 return send_response(sock, RESP_OK, "RRD created OK\n");
1880 }
1881 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1882 } /* }}} static int handle_request_create */
1884 /* start "BATCH" processing */
1885 static int batch_start (HANDLER_PROTO) /* {{{ */
1886 {
1887 int status;
1888 if (sock->batch_start)
1889 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1891 status = send_response(sock, RESP_OK,
1892 "Go ahead. End with dot '.' on its own line.\n");
1893 sock->batch_start = time(NULL);
1894 sock->batch_cmd = 0;
1896 return status;
1897 } /* }}} static int batch_start */
1899 /* finish "BATCH" processing and return results to the client */
1900 static int batch_done (HANDLER_PROTO) /* {{{ */
1901 {
1902 assert(sock->batch_start);
1903 sock->batch_start = 0;
1904 sock->batch_cmd = 0;
1905 return send_response(sock, RESP_OK, "errors\n");
1906 } /* }}} static int batch_done */
1908 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1909 {
1910 return -1;
1911 } /* }}} static int handle_request_quit */
1913 static command_t list_of_commands[] = { /* {{{ */
1914 {
1915 "UPDATE",
1916 handle_request_update,
1917 CMD_CONTEXT_ANY,
1918 "UPDATE <filename> <values> [<values> ...]\n"
1919 ,
1920 "Adds the given file to the internal cache if it is not yet known and\n"
1921 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1922 "for details.\n"
1923 "\n"
1924 "Each <values> has the following form:\n"
1925 " <values> = <time>:<value>[:<value>[...]]\n"
1926 "See the rrdupdate(1) manpage for details.\n"
1927 },
1928 {
1929 "WROTE",
1930 handle_request_wrote,
1931 CMD_CONTEXT_JOURNAL,
1932 NULL,
1933 NULL
1934 },
1935 {
1936 "FLUSH",
1937 handle_request_flush,
1938 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1939 "FLUSH <filename>\n"
1940 ,
1941 "Adds the given filename to the head of the update queue and returns\n"
1942 "after it has been dequeued.\n"
1943 },
1944 {
1945 "FLUSHALL",
1946 handle_request_flushall,
1947 CMD_CONTEXT_CLIENT,
1948 "FLUSHALL\n"
1949 ,
1950 "Triggers writing of all pending updates. Returns immediately.\n"
1951 },
1952 {
1953 "PENDING",
1954 handle_request_pending,
1955 CMD_CONTEXT_CLIENT,
1956 "PENDING <filename>\n"
1957 ,
1958 "Shows any 'pending' updates for a file, in order.\n"
1959 "The updates shown have not yet been written to the underlying RRD file.\n"
1960 },
1961 {
1962 "FORGET",
1963 handle_request_forget,
1964 CMD_CONTEXT_ANY,
1965 "FORGET <filename>\n"
1966 ,
1967 "Removes the file completely from the cache.\n"
1968 "Any pending updates for the file will be lost.\n"
1969 },
1970 {
1971 "QUEUE",
1972 handle_request_queue,
1973 CMD_CONTEXT_CLIENT,
1974 "QUEUE\n"
1975 ,
1976 "Shows all files in the output queue.\n"
1977 "The output is zero or more lines in the following format:\n"
1978 "(where <num_vals> is the number of values to be written)\n"
1979 "\n"
1980 "<num_vals> <filename>\n"
1981 },
1982 {
1983 "STATS",
1984 handle_request_stats,
1985 CMD_CONTEXT_CLIENT,
1986 "STATS\n"
1987 ,
1988 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1989 "a description of the values.\n"
1990 },
1991 {
1992 "HELP",
1993 handle_request_help,
1994 CMD_CONTEXT_CLIENT,
1995 "HELP [<command>]\n",
1996 NULL, /* special! */
1997 },
1998 {
1999 "BATCH",
2000 batch_start,
2001 CMD_CONTEXT_CLIENT,
2002 "BATCH\n"
2003 ,
2004 "The 'BATCH' command permits the client to initiate a bulk load\n"
2005 " of commands to rrdcached.\n"
2006 "\n"
2007 "Usage:\n"
2008 "\n"
2009 " client: BATCH\n"
2010 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2011 " client: command #1\n"
2012 " client: command #2\n"
2013 " client: ... and so on\n"
2014 " client: .\n"
2015 " server: 2 errors\n"
2016 " server: 7 message for command #7\n"
2017 " server: 9 message for command #9\n"
2018 "\n"
2019 "For more information, consult the rrdcached(1) documentation.\n"
2020 },
2021 {
2022 ".", /* BATCH terminator */
2023 batch_done,
2024 CMD_CONTEXT_BATCH,
2025 NULL,
2026 NULL
2027 },
2028 {
2029 "FETCH",
2030 handle_request_fetch,
2031 CMD_CONTEXT_CLIENT,
2032 "FETCH <file> <CF> [<start> [<end>]]\n"
2033 ,
2034 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2035 },
2036 {
2037 "INFO",
2038 handle_request_info,
2039 CMD_CONTEXT_CLIENT,
2040 "INFO <filename>\n",
2041 "The INFO command retrieves information about a specified RRD file.\n"
2042 "This is returned in standard rrdinfo format, a sequence of lines\n"
2043 "with the format <keyname> = <value>\n"
2044 "Note that this is the data as of the last update of the RRD file itself,\n"
2045 "not the last time data was received via rrdcached, so there may be pending\n"
2046 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2047 },
2048 {
2049 "FIRST",
2050 handle_request_first,
2051 CMD_CONTEXT_CLIENT,
2052 "FIRST <filename> <rra index>\n",
2053 "The FIRST command retrieves the first data time for a specified RRA in\n"
2054 "an RRD file.\n"
2055 },
2056 {
2057 "LAST",
2058 handle_request_last,
2059 CMD_CONTEXT_CLIENT,
2060 "LAST <filename>\n",
2061 "The LAST command retrieves the last update time for a specified RRD file.\n"
2062 "Note that this is the time of the last update of the RRD file itself, not\n"
2063 "the last time data was received via rrdcached, so there may be pending\n"
2064 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2065 },
2066 {
2067 "CREATE",
2068 handle_request_create,
2069 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2070 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2071 "The CREATE command will create an RRD file, overwriting any existing file\n"
2072 "unless the -O option is given or rrdcached was started with the -O option.\n"
2073 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2074 "not acceptable) and the step is in seconds (default is 300).\n"
2075 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2076 },
2077 {
2078 "QUIT",
2079 handle_request_quit,
2080 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2081 "QUIT\n"
2082 ,
2083 "Disconnect from rrdcached.\n"
2084 }
2085 }; /* }}} command_t list_of_commands[] */
2086 static size_t list_of_commands_len = sizeof (list_of_commands)
2087 / sizeof (list_of_commands[0]);
2089 static command_t *find_command(char *cmd)
2090 {
2091 size_t i;
2093 for (i = 0; i < list_of_commands_len; i++)
2094 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2095 return (&list_of_commands[i]);
2096 return NULL;
2097 }
2099 /* We currently use the index in the `list_of_commands' array as a bit position
2100 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2101 * outside these functions so that switching to a more elegant storage method
2102 * is easily possible. */
2103 static ssize_t find_command_index (const char *cmd) /* {{{ */
2104 {
2105 size_t i;
2107 for (i = 0; i < list_of_commands_len; i++)
2108 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2109 return ((ssize_t) i);
2110 return (-1);
2111 } /* }}} ssize_t find_command_index */
2113 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2114 const char *cmd)
2115 {
2116 ssize_t i;
2118 if (JOURNAL_REPLAY(sock))
2119 return (1);
2121 if (cmd == NULL)
2122 return (-1);
2124 if ((strcasecmp ("QUIT", cmd) == 0)
2125 || (strcasecmp ("HELP", cmd) == 0))
2126 return (1);
2127 else if (strcmp (".", cmd) == 0)
2128 cmd = "BATCH";
2130 i = find_command_index (cmd);
2131 if (i < 0)
2132 return (-1);
2133 assert (i < 32);
2135 if ((sock->permissions & (1 << i)) != 0)
2136 return (1);
2137 return (0);
2138 } /* }}} int socket_permission_check */
2140 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2141 const char *cmd)
2142 {
2143 ssize_t i;
2145 i = find_command_index (cmd);
2146 if (i < 0)
2147 return (-1);
2148 assert (i < 32);
2150 sock->permissions |= (1 << i);
2151 return (0);
2152 } /* }}} int socket_permission_add */
2154 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2155 {
2156 sock->permissions = 0;
2157 } /* }}} socket_permission_clear */
2159 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2160 listen_socket_t *src)
2161 {
2162 dest->permissions = src->permissions;
2163 } /* }}} socket_permission_copy */
2165 /* check whether commands are received in the expected context */
2166 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2167 {
2168 if (JOURNAL_REPLAY(sock))
2169 return (cmd->context & CMD_CONTEXT_JOURNAL);
2170 else if (sock->batch_start)
2171 return (cmd->context & CMD_CONTEXT_BATCH);
2172 else
2173 return (cmd->context & CMD_CONTEXT_CLIENT);
2175 /* NOTREACHED */
2176 assert(1==0);
2177 }
2179 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2180 {
2181 int status;
2182 char *cmd_str;
2183 char *resp_txt;
2184 command_t *help = NULL;
2186 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2187 if (status == 0)
2188 help = find_command(cmd_str);
2190 if (help && (help->syntax || help->help))
2191 {
2192 char tmp[CMD_MAX];
2194 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2195 resp_txt = tmp;
2197 if (help->syntax)
2198 add_response_info(sock, "Usage: %s\n", help->syntax);
2200 if (help->help)
2201 add_response_info(sock, "%s\n", help->help);
2202 }
2203 else
2204 {
2205 size_t i;
2207 resp_txt = "Command overview\n";
2209 for (i = 0; i < list_of_commands_len; i++)
2210 {
2211 if (list_of_commands[i].syntax == NULL)
2212 continue;
2213 add_response_info (sock, "%s", list_of_commands[i].syntax);
2214 }
2215 }
2217 return send_response(sock, RESP_OK, resp_txt);
2218 } /* }}} int handle_request_help */
2220 static int handle_request (DISPATCH_PROTO) /* {{{ */
2221 {
2222 char *buffer_ptr = buffer;
2223 char *cmd_str = NULL;
2224 command_t *cmd = NULL;
2225 int status;
2227 assert (buffer[buffer_size - 1] == '\0');
2229 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2230 if (status != 0)
2231 {
2232 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2233 return (-1);
2234 }
2236 if (sock != NULL && sock->batch_start)
2237 sock->batch_cmd++;
2239 cmd = find_command(cmd_str);
2240 if (!cmd)
2241 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2243 if (!socket_permission_check (sock, cmd->cmd))
2244 return send_response(sock, RESP_ERR, "Permission denied.\n");
2246 if (!command_check_context(sock, cmd))
2247 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2249 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2250 } /* }}} int handle_request */
2252 static void journal_set_free (journal_set *js) /* {{{ */
2253 {
2254 if (js == NULL)
2255 return;
2257 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2259 free(js);
2260 } /* }}} journal_set_free */
2262 static void journal_set_remove (journal_set *js) /* {{{ */
2263 {
2264 if (js == NULL)
2265 return;
2267 for (uint i=0; i < js->files_num; i++)
2268 {
2269 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2270 unlink(js->files[i]);
2271 }
2272 } /* }}} journal_set_remove */
2274 /* close current journal file handle.
2275 * MUST hold journal_lock before calling */
2276 static void journal_close(void) /* {{{ */
2277 {
2278 if (journal_fh != NULL)
2279 {
2280 if (fclose(journal_fh) != 0)
2281 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2282 }
2284 journal_fh = NULL;
2285 journal_size = 0;
2286 } /* }}} journal_close */
2288 /* MUST hold journal_lock before calling */
2289 static void journal_new_file(void) /* {{{ */
2290 {
2291 struct timeval now;
2292 int new_fd;
2293 char new_file[PATH_MAX + 1];
2295 assert(journal_dir != NULL);
2296 assert(journal_cur != NULL);
2298 journal_close();
2300 gettimeofday(&now, NULL);
2301 /* this format assures that the files sort in strcmp() order */
2302 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2303 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2305 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2306 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2307 if (new_fd < 0)
2308 goto error;
2310 journal_fh = fdopen(new_fd, "a");
2311 if (journal_fh == NULL)
2312 goto error;
2314 journal_size = ftell(journal_fh);
2315 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2317 /* record the file in the journal set */
2318 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2320 return;
2322 error:
2323 RRDD_LOG(LOG_CRIT,
2324 "JOURNALING DISABLED: Error while trying to create %s : %s",
2325 new_file, rrd_strerror(errno));
2326 RRDD_LOG(LOG_CRIT,
2327 "JOURNALING DISABLED: All values will be flushed at shutdown");
2329 close(new_fd);
2330 config_flush_at_shutdown = 1;
2332 } /* }}} journal_new_file */
2334 /* MUST NOT hold journal_lock before calling this */
2335 static void journal_rotate(void) /* {{{ */
2336 {
2337 journal_set *old_js = NULL;
2339 if (journal_dir == NULL)
2340 return;
2342 RRDD_LOG(LOG_DEBUG, "rotating journals");
2344 pthread_mutex_lock(&stats_lock);
2345 ++stats_journal_rotate;
2346 pthread_mutex_unlock(&stats_lock);
2348 pthread_mutex_lock(&journal_lock);
2350 journal_close();
2352 /* rotate the journal sets */
2353 old_js = journal_old;
2354 journal_old = journal_cur;
2355 journal_cur = calloc(1, sizeof(journal_set));
2357 if (journal_cur != NULL)
2358 journal_new_file();
2359 else
2360 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2362 pthread_mutex_unlock(&journal_lock);
2364 journal_set_remove(old_js);
2365 journal_set_free (old_js);
2367 } /* }}} static void journal_rotate */
2369 /* MUST hold journal_lock when calling */
2370 static void journal_done(void) /* {{{ */
2371 {
2372 if (journal_cur == NULL)
2373 return;
2375 journal_close();
2377 if (config_flush_at_shutdown)
2378 {
2379 RRDD_LOG(LOG_INFO, "removing journals");
2380 journal_set_remove(journal_old);
2381 journal_set_remove(journal_cur);
2382 }
2383 else
2384 {
2385 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2386 "journals will be used at next startup");
2387 }
2389 journal_set_free(journal_cur);
2390 journal_set_free(journal_old);
2391 free(journal_dir);
2393 } /* }}} static void journal_done */
2395 static int journal_write(char *cmd, char *args) /* {{{ */
2396 {
2397 int chars;
2399 if (journal_fh == NULL)
2400 return 0;
2402 pthread_mutex_lock(&journal_lock);
2403 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2404 journal_size += chars;
2406 if (journal_size > JOURNAL_MAX)
2407 journal_new_file();
2409 pthread_mutex_unlock(&journal_lock);
2411 if (chars > 0)
2412 {
2413 pthread_mutex_lock(&stats_lock);
2414 stats_journal_bytes += chars;
2415 pthread_mutex_unlock(&stats_lock);
2416 }
2418 return chars;
2419 } /* }}} static int journal_write */
2421 static int journal_replay (const char *file) /* {{{ */
2422 {
2423 FILE *fh;
2424 int entry_cnt = 0;
2425 int fail_cnt = 0;
2426 uint64_t line = 0;
2427 char entry[CMD_MAX];
2428 time_t now;
2430 if (file == NULL) return 0;
2432 {
2433 char *reason = "unknown error";
2434 int status = 0;
2435 struct stat statbuf;
2437 memset(&statbuf, 0, sizeof(statbuf));
2438 if (stat(file, &statbuf) != 0)
2439 {
2440 reason = "stat error";
2441 status = errno;
2442 }
2443 else if (!S_ISREG(statbuf.st_mode))
2444 {
2445 reason = "not a regular file";
2446 status = EPERM;
2447 }
2448 if (statbuf.st_uid != daemon_uid)
2449 {
2450 reason = "not owned by daemon user";
2451 status = EACCES;
2452 }
2453 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2454 {
2455 reason = "must not be user/group writable";
2456 status = EACCES;
2457 }
2459 if (status != 0)
2460 {
2461 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2462 file, rrd_strerror(status), reason);
2463 return 0;
2464 }
2465 }
2467 fh = fopen(file, "r");
2468 if (fh == NULL)
2469 {
2470 if (errno != ENOENT)
2471 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2472 file, rrd_strerror(errno));
2473 return 0;
2474 }
2475 else
2476 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2478 now = time(NULL);
2480 while(!feof(fh))
2481 {
2482 size_t entry_len;
2484 ++line;
2485 if (fgets(entry, sizeof(entry), fh) == NULL)
2486 break;
2487 entry_len = strlen(entry);
2489 /* check \n termination in case journal writing crashed mid-line */
2490 if (entry_len == 0)
2491 continue;
2492 else if (entry[entry_len - 1] != '\n')
2493 {
2494 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2495 ++fail_cnt;
2496 continue;
2497 }
2499 entry[entry_len - 1] = '\0';
2501 if (handle_request(NULL, now, entry, entry_len) == 0)
2502 ++entry_cnt;
2503 else
2504 ++fail_cnt;
2505 }
2507 fclose(fh);
2509 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2510 entry_cnt, fail_cnt);
2512 return entry_cnt > 0 ? 1 : 0;
2513 } /* }}} static int journal_replay */
2515 static int journal_sort(const void *v1, const void *v2)
2516 {
2517 char **jn1 = (char **) v1;
2518 char **jn2 = (char **) v2;
2520 return strcmp(*jn1,*jn2);
2521 }
2523 static void journal_init(void) /* {{{ */
2524 {
2525 int had_journal = 0;
2526 DIR *dir;
2527 struct dirent *dent;
2528 char path[PATH_MAX+1];
2530 if (journal_dir == NULL) return;
2532 pthread_mutex_lock(&journal_lock);
2534 journal_cur = calloc(1, sizeof(journal_set));
2535 if (journal_cur == NULL)
2536 {
2537 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2538 return;
2539 }
2541 RRDD_LOG(LOG_INFO, "checking for journal files");
2543 /* Handle old journal files during transition. This gives them the
2544 * correct sort order. TODO: remove after first release
2545 */
2546 {
2547 char old_path[PATH_MAX+1];
2548 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2549 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2550 rename(old_path, path);
2552 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2553 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2554 rename(old_path, path);
2555 }
2557 dir = opendir(journal_dir);
2558 if (!dir) {
2559 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2560 return;
2561 }
2562 while ((dent = readdir(dir)) != NULL)
2563 {
2564 /* looks like a journal file? */
2565 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2566 continue;
2568 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2570 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2571 {
2572 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2573 dent->d_name);
2574 break;
2575 }
2576 }
2577 closedir(dir);
2579 qsort(journal_cur->files, journal_cur->files_num,
2580 sizeof(journal_cur->files[0]), journal_sort);
2582 for (uint i=0; i < journal_cur->files_num; i++)
2583 had_journal += journal_replay(journal_cur->files[i]);
2585 journal_new_file();
2587 /* it must have been a crash. start a flush */
2588 if (had_journal && config_flush_at_shutdown)
2589 flush_old_values(-1);
2591 pthread_mutex_unlock(&journal_lock);
2593 RRDD_LOG(LOG_INFO, "journal processing complete");
2595 } /* }}} static void journal_init */
2597 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2598 {
2599 assert(sock != NULL);
2601 free(sock->rbuf); sock->rbuf = NULL;
2602 free(sock->wbuf); sock->wbuf = NULL;
2603 free(sock);
2604 } /* }}} void free_listen_socket */
2606 static void close_connection(listen_socket_t *sock) /* {{{ */
2607 {
2608 if (sock->fd >= 0)
2609 {
2610 close(sock->fd);
2611 sock->fd = -1;
2612 }
2614 free_listen_socket(sock);
2616 } /* }}} void close_connection */
2618 static void *connection_thread_main (void *args) /* {{{ */
2619 {
2620 listen_socket_t *sock;
2621 int fd;
2623 sock = (listen_socket_t *) args;
2624 fd = sock->fd;
2626 /* init read buffers */
2627 sock->next_read = sock->next_cmd = 0;
2628 sock->rbuf = malloc(RBUF_SIZE);
2629 if (sock->rbuf == NULL)
2630 {
2631 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2632 close_connection(sock);
2633 return NULL;
2634 }
2636 pthread_mutex_lock (&connection_threads_lock);
2637 connection_threads_num++;
2638 pthread_mutex_unlock (&connection_threads_lock);
2640 while (state == RUNNING)
2641 {
2642 char *cmd;
2643 ssize_t cmd_len;
2644 ssize_t rbytes;
2645 time_t now;
2647 struct pollfd pollfd;
2648 int status;
2650 pollfd.fd = fd;
2651 pollfd.events = POLLIN | POLLPRI;
2652 pollfd.revents = 0;
2654 status = poll (&pollfd, 1, /* timeout = */ 500);
2655 if (state != RUNNING)
2656 break;
2657 else if (status == 0) /* timeout */
2658 continue;
2659 else if (status < 0) /* error */
2660 {
2661 status = errno;
2662 if (status != EINTR)
2663 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2664 continue;
2665 }
2667 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2668 break;
2669 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2670 {
2671 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2672 "poll(2) returned something unexpected: %#04hx",
2673 pollfd.revents);
2674 break;
2675 }
2677 rbytes = read(fd, sock->rbuf + sock->next_read,
2678 RBUF_SIZE - sock->next_read);
2679 if (rbytes < 0)
2680 {
2681 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2682 break;
2683 }
2684 else if (rbytes == 0)
2685 break; /* eof */
2687 sock->next_read += rbytes;
2689 if (sock->batch_start)
2690 now = sock->batch_start;
2691 else
2692 now = time(NULL);
2694 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2695 {
2696 status = handle_request (sock, now, cmd, cmd_len+1);
2697 if (status != 0)
2698 goto out_close;
2699 }
2700 }
2702 out_close:
2703 close_connection(sock);
2705 /* Remove this thread from the connection threads list */
2706 pthread_mutex_lock (&connection_threads_lock);
2707 connection_threads_num--;
2708 if (connection_threads_num <= 0)
2709 pthread_cond_broadcast(&connection_threads_done);
2710 pthread_mutex_unlock (&connection_threads_lock);
2712 return (NULL);
2713 } /* }}} void *connection_thread_main */
2715 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2716 {
2717 int fd;
2718 struct sockaddr_un sa;
2719 listen_socket_t *temp;
2720 int status;
2721 const char *path;
2722 char *path_copy, *dir;
2724 path = sock->addr;
2725 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2726 path += strlen("unix:");
2728 /* dirname may modify its argument */
2729 path_copy = strdup(path);
2730 if (path_copy == NULL)
2731 {
2732 fprintf(stderr, "rrdcached: strdup(): %s\n",
2733 rrd_strerror(errno));
2734 return (-1);
2735 }
2737 dir = dirname(path_copy);
2738 if (rrd_mkdir_p(dir, 0777) != 0)
2739 {
2740 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2741 dir, rrd_strerror(errno));
2742 return (-1);
2743 }
2745 free(path_copy);
2747 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2748 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2749 if (temp == NULL)
2750 {
2751 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2752 return (-1);
2753 }
2754 listen_fds = temp;
2755 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2757 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2758 if (fd < 0)
2759 {
2760 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2761 rrd_strerror(errno));
2762 return (-1);
2763 }
2765 memset (&sa, 0, sizeof (sa));
2766 sa.sun_family = AF_UNIX;
2767 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2769 /* if we've gotten this far, we own the pid file. any daemon started
2770 * with the same args must not be alive. therefore, ensure that we can
2771 * create the socket...
2772 */
2773 unlink(path);
2775 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2776 if (status != 0)
2777 {
2778 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2779 path, rrd_strerror(errno));
2780 close (fd);
2781 return (-1);
2782 }
2784 /* tweak the sockets group ownership */
2785 if (sock->socket_group != (gid_t)-1)
2786 {
2787 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2788 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2789 {
2790 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2791 }
2792 }
2794 if (sock->socket_permissions != (mode_t)-1)
2795 {
2796 if (chmod(path, sock->socket_permissions) != 0)
2797 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2798 (unsigned int)sock->socket_permissions, strerror(errno));
2799 }
2801 status = listen (fd, /* backlog = */ 10);
2802 if (status != 0)
2803 {
2804 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2805 path, rrd_strerror(errno));
2806 close (fd);
2807 unlink (path);
2808 return (-1);
2809 }
2811 listen_fds[listen_fds_num].fd = fd;
2812 listen_fds[listen_fds_num].family = PF_UNIX;
2813 strncpy(listen_fds[listen_fds_num].addr, path,
2814 sizeof (listen_fds[listen_fds_num].addr) - 1);
2815 listen_fds_num++;
2817 return (0);
2818 } /* }}} int open_listen_socket_unix */
2820 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2821 {
2822 struct addrinfo ai_hints;
2823 struct addrinfo *ai_res;
2824 struct addrinfo *ai_ptr;
2825 char addr_copy[NI_MAXHOST];
2826 char *addr;
2827 char *port;
2828 int status;
2830 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2831 addr_copy[sizeof (addr_copy) - 1] = 0;
2832 addr = addr_copy;
2834 memset (&ai_hints, 0, sizeof (ai_hints));
2835 ai_hints.ai_flags = 0;
2836 #ifdef AI_ADDRCONFIG
2837 ai_hints.ai_flags |= AI_ADDRCONFIG;
2838 #endif
2839 ai_hints.ai_family = AF_UNSPEC;
2840 ai_hints.ai_socktype = SOCK_STREAM;
2842 port = NULL;
2843 if (*addr == '[') /* IPv6+port format */
2844 {
2845 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2846 addr++;
2848 port = strchr (addr, ']');
2849 if (port == NULL)
2850 {
2851 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2852 return (-1);
2853 }
2854 *port = 0;
2855 port++;
2857 if (*port == ':')
2858 port++;
2859 else if (*port == 0)
2860 port = NULL;
2861 else
2862 {
2863 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2864 return (-1);
2865 }
2866 } /* if (*addr == '[') */
2867 else
2868 {
2869 port = rindex(addr, ':');
2870 if (port != NULL)
2871 {
2872 *port = 0;
2873 port++;
2874 }
2875 }
2876 ai_res = NULL;
2877 status = getaddrinfo (addr,
2878 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2879 &ai_hints, &ai_res);
2880 if (status != 0)
2881 {
2882 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2883 addr, gai_strerror (status));
2884 return (-1);
2885 }
2887 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2888 {
2889 int fd;
2890 listen_socket_t *temp;
2891 int one = 1;
2893 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2894 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2895 if (temp == NULL)
2896 {
2897 fprintf (stderr,
2898 "rrdcached: open_listen_socket_network: realloc failed.\n");
2899 continue;
2900 }
2901 listen_fds = temp;
2902 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2904 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2905 if (fd < 0)
2906 {
2907 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2908 rrd_strerror(errno));
2909 continue;
2910 }
2912 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2914 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2915 if (status != 0)
2916 {
2917 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2918 sock->addr, rrd_strerror(errno));
2919 close (fd);
2920 continue;
2921 }
2923 status = listen (fd, /* backlog = */ 10);
2924 if (status != 0)
2925 {
2926 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2927 sock->addr, rrd_strerror(errno));
2928 close (fd);
2929 freeaddrinfo(ai_res);
2930 return (-1);
2931 }
2933 listen_fds[listen_fds_num].fd = fd;
2934 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2935 listen_fds_num++;
2936 } /* for (ai_ptr) */
2938 freeaddrinfo(ai_res);
2939 return (0);
2940 } /* }}} static int open_listen_socket_network */
2942 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2943 {
2944 assert(sock != NULL);
2945 assert(sock->addr != NULL);
2947 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2948 || sock->addr[0] == '/')
2949 return (open_listen_socket_unix(sock));
2950 else
2951 return (open_listen_socket_network(sock));
2952 } /* }}} int open_listen_socket */
2954 static int close_listen_sockets (void) /* {{{ */
2955 {
2956 size_t i;
2958 for (i = 0; i < listen_fds_num; i++)
2959 {
2960 close (listen_fds[i].fd);
2962 if (listen_fds[i].family == PF_UNIX)
2963 unlink(listen_fds[i].addr);
2964 }
2966 free (listen_fds);
2967 listen_fds = NULL;
2968 listen_fds_num = 0;
2970 return (0);
2971 } /* }}} int close_listen_sockets */
2973 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2974 {
2975 struct pollfd *pollfds;
2976 int pollfds_num;
2977 int status;
2978 int i;
2980 if (listen_fds_num < 1)
2981 {
2982 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2983 return (NULL);
2984 }
2986 pollfds_num = listen_fds_num;
2987 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2988 if (pollfds == NULL)
2989 {
2990 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2991 return (NULL);
2992 }
2993 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2995 RRDD_LOG(LOG_INFO, "listening for connections");
2997 while (state == RUNNING)
2998 {
2999 for (i = 0; i < pollfds_num; i++)
3000 {
3001 pollfds[i].fd = listen_fds[i].fd;
3002 pollfds[i].events = POLLIN | POLLPRI;
3003 pollfds[i].revents = 0;
3004 }
3006 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3007 if (state != RUNNING)
3008 break;
3009 else if (status == 0) /* timeout */
3010 continue;
3011 else if (status < 0) /* error */
3012 {
3013 status = errno;
3014 if (status != EINTR)
3015 {
3016 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3017 }
3018 continue;
3019 }
3021 for (i = 0; i < pollfds_num; i++)
3022 {
3023 listen_socket_t *client_sock;
3024 struct sockaddr_storage client_sa;
3025 socklen_t client_sa_size;
3026 pthread_t tid;
3027 pthread_attr_t attr;
3029 if (pollfds[i].revents == 0)
3030 continue;
3032 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3033 {
3034 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3035 "poll(2) returned something unexpected for listen FD #%i.",
3036 pollfds[i].fd);
3037 continue;
3038 }
3040 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3041 if (client_sock == NULL)
3042 {
3043 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3044 continue;
3045 }
3046 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3048 client_sa_size = sizeof (client_sa);
3049 client_sock->fd = accept (pollfds[i].fd,
3050 (struct sockaddr *) &client_sa, &client_sa_size);
3051 if (client_sock->fd < 0)
3052 {
3053 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3054 free(client_sock);
3055 continue;
3056 }
3058 pthread_attr_init (&attr);
3059 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3061 status = pthread_create (&tid, &attr, connection_thread_main,
3062 client_sock);
3063 if (status != 0)
3064 {
3065 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3066 close_connection(client_sock);
3067 continue;
3068 }
3069 } /* for (pollfds_num) */
3070 } /* while (state == RUNNING) */
3072 RRDD_LOG(LOG_INFO, "starting shutdown");
3074 close_listen_sockets ();
3076 pthread_mutex_lock (&connection_threads_lock);
3077 while (connection_threads_num > 0)
3078 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3079 pthread_mutex_unlock (&connection_threads_lock);
3081 free(pollfds);
3083 return (NULL);
3084 } /* }}} void *listen_thread_main */
3086 static int daemonize (void) /* {{{ */
3087 {
3088 int pid_fd;
3089 char *base_dir;
3091 daemon_uid = geteuid();
3093 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3094 if (pid_fd < 0)
3095 pid_fd = check_pidfile();
3096 if (pid_fd < 0)
3097 return pid_fd;
3099 /* open all the listen sockets */
3100 if (config_listen_address_list_len > 0)
3101 {
3102 for (size_t i = 0; i < config_listen_address_list_len; i++)
3103 open_listen_socket (config_listen_address_list[i]);
3105 rrd_free_ptrs((void ***) &config_listen_address_list,
3106 &config_listen_address_list_len);
3107 }
3108 else
3109 {
3110 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3111 sizeof(default_socket.addr) - 1);
3112 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3113 open_listen_socket (&default_socket);
3114 }
3116 if (listen_fds_num < 1)
3117 {
3118 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3119 goto error;
3120 }
3122 if (!stay_foreground)
3123 {
3124 pid_t child;
3126 child = fork ();
3127 if (child < 0)
3128 {
3129 fprintf (stderr, "daemonize: fork(2) failed.\n");
3130 goto error;
3131 }
3132 else if (child > 0)
3133 exit(0);
3135 /* Become session leader */
3136 setsid ();
3138 /* Open the first three file descriptors to /dev/null */
3139 close (2);
3140 close (1);
3141 close (0);
3143 open ("/dev/null", O_RDWR);
3144 if (dup(0) == -1 || dup(0) == -1){
3145 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3146 }
3147 } /* if (!stay_foreground) */
3149 /* Change into the /tmp directory. */
3150 base_dir = (config_base_dir != NULL)
3151 ? config_base_dir
3152 : "/tmp";
3154 if (chdir (base_dir) != 0)
3155 {
3156 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3157 goto error;
3158 }
3160 install_signal_handlers();
3162 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3163 RRDD_LOG(LOG_INFO, "starting up");
3165 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3166 (GDestroyNotify) free_cache_item);
3167 if (cache_tree == NULL)
3168 {
3169 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3170 goto error;
3171 }
3173 return write_pidfile (pid_fd);
3175 error:
3176 remove_pidfile();
3177 return -1;
3178 } /* }}} int daemonize */
3180 static int cleanup (void) /* {{{ */
3181 {
3182 pthread_cond_broadcast (&flush_cond);
3183 pthread_join (flush_thread, NULL);
3185 pthread_cond_broadcast (&queue_cond);
3186 for (int i = 0; i < config_queue_threads; i++)
3187 pthread_join (queue_threads[i], NULL);
3189 if (config_flush_at_shutdown)
3190 {
3191 assert(cache_queue_head == NULL);
3192 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3193 }
3195 free(queue_threads);
3196 free(config_base_dir);
3198 pthread_mutex_lock(&cache_lock);
3199 g_tree_destroy(cache_tree);
3201 pthread_mutex_lock(&journal_lock);
3202 journal_done();
3204 RRDD_LOG(LOG_INFO, "goodbye");
3205 closelog ();
3207 remove_pidfile ();
3208 free(config_pid_file);
3210 return (0);
3211 } /* }}} int cleanup */
3213 static int read_options (int argc, char **argv) /* {{{ */
3214 {
3215 int option;
3216 int status = 0;
3218 socket_permission_clear (&default_socket);
3220 default_socket.socket_group = (gid_t)-1;
3221 default_socket.socket_permissions = (mode_t)-1;
3223 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3224 {
3225 switch (option)
3226 {
3227 case 'O':
3228 opt_no_overwrite = 1;
3229 break;
3231 case 'g':
3232 stay_foreground=1;
3233 break;
3235 case 'l':
3236 {
3237 listen_socket_t *new;
3239 new = malloc(sizeof(listen_socket_t));
3240 if (new == NULL)
3241 {
3242 fprintf(stderr, "read_options: malloc failed.\n");
3243 return(2);
3244 }
3245 memset(new, 0, sizeof(listen_socket_t));
3247 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3249 /* Add permissions to the socket {{{ */
3250 if (default_socket.permissions != 0)
3251 {
3252 socket_permission_copy (new, &default_socket);
3253 }
3254 else /* if (default_socket.permissions == 0) */
3255 {
3256 /* Add permission for ALL commands to the socket. */
3257 size_t i;
3258 for (i = 0; i < list_of_commands_len; i++)
3259 {
3260 status = socket_permission_add (new, list_of_commands[i].cmd);
3261 if (status != 0)
3262 {
3263 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3264 "socket failed. This should never happen, ever! Sorry.\n",
3265 list_of_commands[i].cmd);
3266 status = 4;
3267 }
3268 }
3269 }
3270 /* }}} Done adding permissions. */
3272 new->socket_group = default_socket.socket_group;
3273 new->socket_permissions = default_socket.socket_permissions;
3275 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3276 &config_listen_address_list_len, new))
3277 {
3278 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3279 return (2);
3280 }
3281 }
3282 break;
3284 /* set socket group permissions */
3285 case 's':
3286 {
3287 gid_t group_gid;
3288 struct group *grp;
3290 group_gid = strtoul(optarg, NULL, 10);
3291 if (errno != EINVAL && group_gid>0)
3292 {
3293 /* we were passed a number */
3294 grp = getgrgid(group_gid);
3295 }
3296 else
3297 {
3298 grp = getgrnam(optarg);
3299 }
3301 if (grp)
3302 {
3303 default_socket.socket_group = grp->gr_gid;
3304 }
3305 else
3306 {
3307 /* no idea what the user wanted... */
3308 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3309 return (5);
3310 }
3311 }
3312 break;
3314 /* set socket file permissions */
3315 case 'm':
3316 {
3317 long tmp;
3318 char *endptr = NULL;
3320 tmp = strtol (optarg, &endptr, 8);
3321 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3322 || (tmp > 07777) || (tmp < 0)) {
3323 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3324 optarg);
3325 return (5);
3326 }
3328 default_socket.socket_permissions = (mode_t)tmp;
3329 }
3330 break;
3332 case 'P':
3333 {
3334 char *optcopy;
3335 char *saveptr;
3336 char *dummy;
3337 char *ptr;
3339 socket_permission_clear (&default_socket);
3341 optcopy = strdup (optarg);
3342 dummy = optcopy;
3343 saveptr = NULL;
3344 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3345 {
3346 dummy = NULL;
3347 status = socket_permission_add (&default_socket, ptr);
3348 if (status != 0)
3349 {
3350 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3351 "socket failed. Most likely, this permission doesn't "
3352 "exist. Check your command line.\n", ptr);
3353 status = 4;
3354 }
3355 }
3357 free (optcopy);
3358 }
3359 break;
3361 case 'f':
3362 {
3363 int temp;
3365 temp = atoi (optarg);
3366 if (temp > 0)
3367 config_flush_interval = temp;
3368 else
3369 {
3370 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3371 status = 3;
3372 }
3373 }
3374 break;
3376 case 'w':
3377 {
3378 int temp;
3380 temp = atoi (optarg);
3381 if (temp > 0)
3382 config_write_interval = temp;
3383 else
3384 {
3385 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3386 status = 2;
3387 }
3388 }
3389 break;
3391 case 'z':
3392 {
3393 int temp;
3395 temp = atoi(optarg);
3396 if (temp > 0)
3397 config_write_jitter = temp;
3398 else
3399 {
3400 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3401 status = 2;
3402 }
3404 break;
3405 }
3407 case 't':
3408 {
3409 int threads;
3410 threads = atoi(optarg);
3411 if (threads >= 1)
3412 config_queue_threads = threads;
3413 else
3414 {
3415 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3416 return 1;
3417 }
3418 }
3419 break;
3421 case 'B':
3422 config_write_base_only = 1;
3423 break;
3425 case 'b':
3426 {
3427 size_t len;
3428 char base_realpath[PATH_MAX];
3430 if (config_base_dir != NULL)
3431 free (config_base_dir);
3432 config_base_dir = strdup (optarg);
3433 if (config_base_dir == NULL)
3434 {
3435 fprintf (stderr, "read_options: strdup failed.\n");
3436 return (3);
3437 }
3439 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3440 {
3441 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3442 config_base_dir, rrd_strerror (errno));
3443 return (3);
3444 }
3446 /* make sure that the base directory is not resolved via
3447 * symbolic links. this makes some performance-enhancing
3448 * assumptions possible (we don't have to resolve paths
3449 * that start with a "/")
3450 */
3451 if (realpath(config_base_dir, base_realpath) == NULL)
3452 {
3453 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3454 "%s\n", config_base_dir, rrd_strerror(errno));
3455 return 5;
3456 }
3458 len = strlen (config_base_dir);
3459 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3460 {
3461 config_base_dir[len - 1] = 0;
3462 len--;
3463 }
3465 if (len < 1)
3466 {
3467 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3468 return (4);
3469 }
3471 _config_base_dir_len = len;
3473 len = strlen (base_realpath);
3474 while ((len > 0) && (base_realpath[len - 1] == '/'))
3475 {
3476 base_realpath[len - 1] = '\0';
3477 len--;
3478 }
3480 if (strncmp(config_base_dir,
3481 base_realpath, sizeof(base_realpath)) != 0)
3482 {
3483 fprintf(stderr,
3484 "Base directory (-b) resolved via file system links!\n"
3485 "Please consult rrdcached '-b' documentation!\n"
3486 "Consider specifying the real directory (%s)\n",
3487 base_realpath);
3488 return 5;
3489 }
3490 }
3491 break;
3493 case 'p':
3494 {
3495 if (config_pid_file != NULL)
3496 free (config_pid_file);
3497 config_pid_file = strdup (optarg);
3498 if (config_pid_file == NULL)
3499 {
3500 fprintf (stderr, "read_options: strdup failed.\n");
3501 return (3);
3502 }
3503 }
3504 break;
3506 case 'F':
3507 config_flush_at_shutdown = 1;
3508 break;
3510 case 'j':
3511 {
3512 char journal_dir_actual[PATH_MAX];
3513 const char *dir;
3514 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3516 status = rrd_mkdir_p(dir, 0777);
3517 if (status != 0)
3518 {
3519 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3520 dir, rrd_strerror(errno));
3521 return 6;
3522 }
3524 if (access(dir, R_OK|W_OK|X_OK) != 0)
3525 {
3526 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3527 errno ? rrd_strerror(errno) : "");
3528 return 6;
3529 }
3530 }
3531 break;
3533 case 'a':
3534 {
3535 int temp = atoi(optarg);
3536 if (temp > 0)
3537 config_alloc_chunk = temp;
3538 else
3539 {
3540 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3541 return 10;
3542 }
3543 }
3544 break;
3546 case 'h':
3547 case '?':
3548 printf ("RRDCacheD %s\n"
3549 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3550 "\n"
3551 "Usage: rrdcached [options]\n"
3552 "\n"
3553 "Valid options are:\n"
3554 " -l <address> Socket address to listen to.\n"
3555 " -P <perms> Sets the permissions to assign to all following "
3556 "sockets\n"
3557 " -w <seconds> Interval in which to write data.\n"
3558 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3559 " -t <threads> Number of write threads.\n"
3560 " -f <seconds> Interval in which to flush dead data.\n"
3561 " -p <file> Location of the PID-file.\n"
3562 " -b <dir> Base directory to change to.\n"
3563 " -B Restrict file access to paths within -b <dir>\n"
3564 " -g Do not fork and run in the foreground.\n"
3565 " -j <dir> Directory in which to create the journal files.\n"
3566 " -F Always flush all updates at shutdown\n"
3567 " -s <id|name> Group owner of all following UNIX sockets\n"
3568 " (the socket will also have read/write permissions "
3569 "for that group)\n"
3570 " -m <mode> File permissions (octal) of all following UNIX "
3571 "sockets\n"
3572 " -a <size> Memory allocation chunk size. Default is 1.\n"
3573 " -O Do not allow CREATE commands to overwrite existing\n"
3574 " files, even if asked to.\n"
3575 "\n"
3576 "For more information and a detailed description of all options "
3577 "please refer\n"
3578 "to the rrdcached(1) manual page.\n",
3579 VERSION);
3580 if (option == 'h')
3581 status = -1;
3582 else
3583 status = 1;
3584 break;
3585 } /* switch (option) */
3586 } /* while (getopt) */
3588 /* advise the user when values are not sane */
3589 if (config_flush_interval < 2 * config_write_interval)
3590 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3591 " 2x write interval (-w) !\n");
3592 if (config_write_jitter > config_write_interval)
3593 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3594 " write interval (-w) !\n");
3596 if (config_write_base_only && config_base_dir == NULL)
3597 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3598 " Consult the rrdcached documentation\n");
3600 if (journal_dir == NULL)
3601 config_flush_at_shutdown = 1;
3603 return (status);
3604 } /* }}} int read_options */
3606 int main (int argc, char **argv)
3607 {
3608 int status;
3610 status = read_options (argc, argv);
3611 if (status != 0)
3612 {
3613 if (status < 0)
3614 status = 0;
3615 return (status);
3616 }
3618 status = daemonize ();
3619 if (status != 0)
3620 {
3621 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3622 return (1);
3623 }
3625 journal_init();
3627 /* start the queue threads */
3628 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3629 if (queue_threads == NULL)
3630 {
3631 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3632 cleanup();
3633 return (1);
3634 }
3635 for (int i = 0; i < config_queue_threads; i++)
3636 {
3637 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3638 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3639 if (status != 0)
3640 {
3641 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3642 cleanup();
3643 return (1);
3644 }
3645 }
3647 /* start the flush thread */
3648 memset(&flush_thread, 0, sizeof(flush_thread));
3649 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3650 if (status != 0)
3651 {
3652 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3653 cleanup();
3654 return (1);
3655 }
3657 listen_thread_main (NULL);
3658 cleanup ();
3660 return (0);
3661 } /* int main */
3663 /*
3664 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3665 */