0dc8e0b33f28c7f38be82cc513e727b26b92366e
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116 do { \
117 if (stay_foreground) { \
118 fprintf(stderr, __VA_ARGS__); \
119 fprintf(stderr, "\n"); } \
120 syslog ((severity), __VA_ARGS__); \
121 } while (0)
123 /*
124 * Types
125 */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
129 {
130 int fd;
131 char addr[PATH_MAX + 1];
132 int family;
134 /* state for BATCH processing */
135 time_t batch_start;
136 int batch_cmd;
138 /* buffered IO */
139 char *rbuf;
140 off_t next_cmd;
141 off_t next_read;
143 char *wbuf;
144 ssize_t wbuf_len;
146 uint32_t permissions;
148 gid_t socket_group;
149 mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
157 time_t UNUSED(now),\
158 char UNUSED(*buffer),\
159 size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO command_t UNUSED(*cmd),\
162 DISPATCH_PROTO
164 struct command_s {
165 char *cmd;
166 int (*handler)(HANDLER_PROTO);
168 char context; /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT (1<<0)
170 #define CMD_CONTEXT_BATCH (1<<1)
171 #define CMD_CONTEXT_JOURNAL (1<<2)
172 #define CMD_CONTEXT_ANY (0x7f)
174 char *syntax;
175 char *help;
176 };
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182 char *file;
183 char **values;
184 size_t values_num; /* number of valid pointers */
185 size_t values_alloc; /* number of allocated pointers */
186 time_t last_flush_time;
187 time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
190 int flags;
191 pthread_cond_t flushed;
192 cache_item_t *prev;
193 cache_item_t *next;
194 };
196 struct callback_flush_data_s
197 {
198 time_t now;
199 time_t abs_timeout;
200 char **keys;
201 size_t keys_num;
202 };
203 typedef struct callback_flush_data_s callback_flush_data_t;
205 enum queue_side_e
206 {
207 HEAD,
208 TAIL
209 };
210 typedef enum queue_side_e queue_side_t;
212 /* describe a set of journal files */
213 typedef struct {
214 char **files;
215 size_t files_num;
216 } journal_set;
218 /* max length of socket command or response */
219 #define CMD_MAX 4096
220 #define RBUF_SIZE (CMD_MAX*2)
222 /*
223 * Variables
224 */
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
231 static listen_socket_t default_socket;
233 enum {
234 RUNNING, /* normal operation */
235 FLUSHING, /* flushing remaining values */
236 SHUTDOWN /* shutting down */
237 } state = RUNNING;
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
250 /* Cache stuff */
251 static GTree *cache_tree = NULL;
252 static cache_item_t *cache_queue_head = NULL;
253 static cache_item_t *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
256 static int config_write_interval = 300;
257 static int config_write_jitter = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
278 static int opt_no_overwrite = 0; /* default for the daemon */
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL; /* current journal file handle */
287 static long journal_size = 0; /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
297 /*
298 * Functions
299 */
300 static void sig_common (const char *sig) /* {{{ */
301 {
302 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
303 state = FLUSHING;
304 pthread_cond_broadcast(&flush_cond);
305 pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
309 {
310 sig_common("INT");
311 } /* }}} void sig_int_handler */
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
314 {
315 sig_common("TERM");
316 } /* }}} void sig_term_handler */
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
319 {
320 config_flush_at_shutdown = 1;
321 sig_common("USR1");
322 } /* }}} void sig_usr1_handler */
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
325 {
326 config_flush_at_shutdown = 0;
327 sig_common("USR2");
328 } /* }}} void sig_usr2_handler */
330 static void install_signal_handlers(void) /* {{{ */
331 {
332 /* These structures are static, because `sigaction' behaves weird if the are
333 * overwritten.. */
334 static struct sigaction sa_int;
335 static struct sigaction sa_term;
336 static struct sigaction sa_pipe;
337 static struct sigaction sa_usr1;
338 static struct sigaction sa_usr2;
340 /* Install signal handlers */
341 memset (&sa_int, 0, sizeof (sa_int));
342 sa_int.sa_handler = sig_int_handler;
343 sigaction (SIGINT, &sa_int, NULL);
345 memset (&sa_term, 0, sizeof (sa_term));
346 sa_term.sa_handler = sig_term_handler;
347 sigaction (SIGTERM, &sa_term, NULL);
349 memset (&sa_pipe, 0, sizeof (sa_pipe));
350 sa_pipe.sa_handler = SIG_IGN;
351 sigaction (SIGPIPE, &sa_pipe, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_usr1));
354 sa_usr1.sa_handler = sig_usr1_handler;
355 sigaction (SIGUSR1, &sa_usr1, NULL);
357 memset (&sa_usr2, 0, sizeof (sa_usr2));
358 sa_usr2.sa_handler = sig_usr2_handler;
359 sigaction (SIGUSR2, &sa_usr2, NULL);
361 } /* }}} void install_signal_handlers */
363 static int open_pidfile(char *action, int oflag) /* {{{ */
364 {
365 int fd;
366 const char *file;
367 char *file_copy, *dir;
369 file = (config_pid_file != NULL)
370 ? config_pid_file
371 : LOCALSTATEDIR "/run/rrdcached.pid";
373 /* dirname may modify its argument */
374 file_copy = strdup(file);
375 if (file_copy == NULL)
376 {
377 fprintf(stderr, "rrdcached: strdup(): %s\n",
378 rrd_strerror(errno));
379 return -1;
380 }
382 dir = dirname(file_copy);
383 if (rrd_mkdir_p(dir, 0777) != 0)
384 {
385 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386 dir, rrd_strerror(errno));
387 return -1;
388 }
390 free(file_copy);
392 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
393 if (fd < 0)
394 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395 action, file, rrd_strerror(errno));
397 return(fd);
398 } /* }}} static int open_pidfile */
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
402 {
403 int pid_fd;
404 pid_t pid;
405 char pid_str[16];
407 pid_fd = open_pidfile("open", O_RDWR);
408 if (pid_fd < 0)
409 return pid_fd;
411 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
412 return -1;
414 pid = atoi(pid_str);
415 if (pid <= 0)
416 return -1;
418 /* another running process that we can signal COULD be
419 * a competing rrdcached */
420 if (pid != getpid() && kill(pid, 0) == 0)
421 {
422 fprintf(stderr,
423 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
424 close(pid_fd);
425 return -1;
426 }
428 lseek(pid_fd, 0, SEEK_SET);
429 if (ftruncate(pid_fd, 0) == -1)
430 {
431 fprintf(stderr,
432 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
433 close(pid_fd);
434 return -1;
435 }
437 fprintf(stderr,
438 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439 "rrdcached: starting normally.\n", pid);
441 return pid_fd;
442 } /* }}} static int check_pidfile */
444 static int write_pidfile (int fd) /* {{{ */
445 {
446 pid_t pid;
447 FILE *fh;
449 pid = getpid ();
451 fh = fdopen (fd, "w");
452 if (fh == NULL)
453 {
454 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
455 close(fd);
456 return (-1);
457 }
459 fprintf (fh, "%i\n", (int) pid);
460 fclose (fh);
462 return (0);
463 } /* }}} int write_pidfile */
465 static int remove_pidfile (void) /* {{{ */
466 {
467 char *file;
468 int status;
470 file = (config_pid_file != NULL)
471 ? config_pid_file
472 : LOCALSTATEDIR "/run/rrdcached.pid";
474 status = unlink (file);
475 if (status == 0)
476 return (0);
477 return (errno);
478 } /* }}} int remove_pidfile */
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
481 {
482 char *eol;
484 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485 sock->next_read - sock->next_cmd);
487 if (eol == NULL)
488 {
489 /* no commands left, move remainder back to front of rbuf */
490 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491 sock->next_read - sock->next_cmd);
492 sock->next_read -= sock->next_cmd;
493 sock->next_cmd = 0;
494 *len = 0;
495 return NULL;
496 }
497 else
498 {
499 char *cmd = sock->rbuf + sock->next_cmd;
500 *eol = '\0';
502 sock->next_cmd = eol - sock->rbuf + 1;
504 if (eol > sock->rbuf && *(eol-1) == '\r')
505 *(--eol) = '\0'; /* handle "\r\n" EOL */
507 *len = eol - cmd;
509 return cmd;
510 }
512 /* NOTREACHED */
513 assert(1==0);
514 } /* }}} char *next_cmd */
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
518 {
519 char *new_buf;
521 assert(sock != NULL);
523 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
524 if (new_buf == NULL)
525 {
526 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
527 return -1;
528 }
530 strncpy(new_buf + sock->wbuf_len, str, len + 1);
532 sock->wbuf = new_buf;
533 sock->wbuf_len += len;
535 return 0;
536 } /* }}} static int add_to_wbuf */
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
540 {
541 va_list argp;
542 char buffer[CMD_MAX];
543 int len;
545 if (JOURNAL_REPLAY(sock)) return 0;
546 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
548 va_start(argp, fmt);
549 #ifdef HAVE_VSNPRINTF
550 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
551 #else
552 len = vsprintf(buffer, fmt, argp);
553 #endif
554 va_end(argp);
555 if (len < 0)
556 {
557 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
558 return -1;
559 }
561 return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
564 static int count_lines(char *str) /* {{{ */
565 {
566 int lines = 0;
568 if (str != NULL)
569 {
570 while ((str = strchr(str, '\n')) != NULL)
571 {
572 ++lines;
573 ++str;
574 }
575 }
577 return lines;
578 } /* }}} static int count_lines */
580 /* send the response back to the user.
581 * returns 0 on success, -1 on error
582 * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584 char *fmt, ...) /* {{{ */
585 {
586 va_list argp;
587 char buffer[CMD_MAX];
588 int lines;
589 ssize_t wrote;
590 int rclen, len;
592 if (JOURNAL_REPLAY(sock)) return rc;
594 if (sock->batch_start)
595 {
596 if (rc == RESP_OK)
597 return rc; /* no response on success during BATCH */
598 lines = sock->batch_cmd;
599 }
600 else if (rc == RESP_OK)
601 lines = count_lines(sock->wbuf);
602 else
603 lines = -1;
605 rclen = sprintf(buffer, "%d ", lines);
606 va_start(argp, fmt);
607 #ifdef HAVE_VSNPRINTF
608 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
609 #else
610 len = vsprintf(buffer+rclen, fmt, argp);
611 #endif
612 va_end(argp);
613 if (len < 0)
614 return -1;
616 len += rclen;
618 /* append the result to the wbuf, don't write to the user */
619 if (sock->batch_start)
620 return add_to_wbuf(sock, buffer, len);
622 /* first write must be complete */
623 if (len != write(sock->fd, buffer, len))
624 {
625 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
626 return -1;
627 }
629 if (sock->wbuf != NULL && rc == RESP_OK)
630 {
631 wrote = 0;
632 while (wrote < sock->wbuf_len)
633 {
634 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
635 if (wb <= 0)
636 {
637 RRDD_LOG(LOG_INFO, "send_response: could not write results");
638 return -1;
639 }
640 wrote += wb;
641 }
642 }
644 free(sock->wbuf); sock->wbuf = NULL;
645 sock->wbuf_len = 0;
647 return 0;
648 } /* }}} */
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
651 {
652 ci->values = NULL;
653 ci->values_num = 0;
654 ci->values_alloc = 0;
656 ci->last_flush_time = when;
657 if (config_write_jitter > 0)
658 ci->last_flush_time += (rrd_random() % config_write_jitter);
659 }
661 /* remove_from_queue
662 * remove a "cache_item_t" item from the queue.
663 * must hold 'cache_lock' when calling this
664 */
665 static void remove_from_queue(cache_item_t *ci) /* {{{ */
666 {
667 if (ci == NULL) return;
668 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
670 if (ci->prev == NULL)
671 cache_queue_head = ci->next; /* reset head */
672 else
673 ci->prev->next = ci->next;
675 if (ci->next == NULL)
676 cache_queue_tail = ci->prev; /* reset the tail */
677 else
678 ci->next->prev = ci->prev;
680 ci->next = ci->prev = NULL;
681 ci->flags &= ~CI_FLAGS_IN_QUEUE;
683 pthread_mutex_lock (&stats_lock);
684 assert (stats_queue_length > 0);
685 stats_queue_length--;
686 pthread_mutex_unlock (&stats_lock);
688 } /* }}} static void remove_from_queue */
690 /* free the resources associated with the cache_item_t
691 * must hold cache_lock when calling this function
692 */
693 static void *free_cache_item(cache_item_t *ci) /* {{{ */
694 {
695 if (ci == NULL) return NULL;
697 remove_from_queue(ci);
699 for (size_t i=0; i < ci->values_num; i++)
700 free(ci->values[i]);
702 free (ci->values);
703 free (ci->file);
705 /* in case anyone is waiting */
706 pthread_cond_broadcast(&ci->flushed);
707 pthread_cond_destroy(&ci->flushed);
709 free (ci);
711 return NULL;
712 } /* }}} static void *free_cache_item */
714 /*
715 * enqueue_cache_item:
716 * `cache_lock' must be acquired before calling this function!
717 */
718 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
719 queue_side_t side)
720 {
721 if (ci == NULL)
722 return (-1);
724 if (ci->values_num == 0)
725 return (0);
727 if (side == HEAD)
728 {
729 if (cache_queue_head == ci)
730 return 0;
732 /* remove if further down in queue */
733 remove_from_queue(ci);
735 ci->prev = NULL;
736 ci->next = cache_queue_head;
737 if (ci->next != NULL)
738 ci->next->prev = ci;
739 cache_queue_head = ci;
741 if (cache_queue_tail == NULL)
742 cache_queue_tail = cache_queue_head;
743 }
744 else /* (side == TAIL) */
745 {
746 /* We don't move values back in the list.. */
747 if (ci->flags & CI_FLAGS_IN_QUEUE)
748 return (0);
750 assert (ci->next == NULL);
751 assert (ci->prev == NULL);
753 ci->prev = cache_queue_tail;
755 if (cache_queue_tail == NULL)
756 cache_queue_head = ci;
757 else
758 cache_queue_tail->next = ci;
760 cache_queue_tail = ci;
761 }
763 ci->flags |= CI_FLAGS_IN_QUEUE;
765 pthread_cond_signal(&queue_cond);
766 pthread_mutex_lock (&stats_lock);
767 stats_queue_length++;
768 pthread_mutex_unlock (&stats_lock);
770 return (0);
771 } /* }}} int enqueue_cache_item */
773 /*
774 * tree_callback_flush:
775 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
776 * while this is in progress.
777 */
778 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
779 gpointer data)
780 {
781 cache_item_t *ci;
782 callback_flush_data_t *cfd;
784 ci = (cache_item_t *) value;
785 cfd = (callback_flush_data_t *) data;
787 if (ci->flags & CI_FLAGS_IN_QUEUE)
788 return FALSE;
790 if (ci->values_num > 0
791 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
792 {
793 enqueue_cache_item (ci, TAIL);
794 }
795 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
796 && (ci->values_num <= 0))
797 {
798 assert ((char *) key == ci->file);
799 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
800 {
801 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
802 return (FALSE);
803 }
804 }
806 return (FALSE);
807 } /* }}} gboolean tree_callback_flush */
809 static int flush_old_values (int max_age)
810 {
811 callback_flush_data_t cfd;
812 size_t k;
814 memset (&cfd, 0, sizeof (cfd));
815 /* Pass the current time as user data so that we don't need to call
816 * `time' for each node. */
817 cfd.now = time (NULL);
818 cfd.keys = NULL;
819 cfd.keys_num = 0;
821 if (max_age > 0)
822 cfd.abs_timeout = cfd.now - max_age;
823 else
824 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
826 /* `tree_callback_flush' will return the keys of all values that haven't
827 * been touched in the last `config_flush_interval' seconds in `cfd'.
828 * The char*'s in this array point to the same memory as ci->file, so we
829 * don't need to free them separately. */
830 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
832 for (k = 0; k < cfd.keys_num; k++)
833 {
834 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
835 /* should never fail, since we have held the cache_lock
836 * the entire time */
837 assert(status == TRUE);
838 }
840 if (cfd.keys != NULL)
841 {
842 free (cfd.keys);
843 cfd.keys = NULL;
844 }
846 return (0);
847 } /* int flush_old_values */
849 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
850 {
851 struct timeval now;
852 struct timespec next_flush;
853 int status;
855 gettimeofday (&now, NULL);
856 next_flush.tv_sec = now.tv_sec + config_flush_interval;
857 next_flush.tv_nsec = 1000 * now.tv_usec;
859 pthread_mutex_lock(&cache_lock);
861 while (state == RUNNING)
862 {
863 gettimeofday (&now, NULL);
864 if ((now.tv_sec > next_flush.tv_sec)
865 || ((now.tv_sec == next_flush.tv_sec)
866 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
867 {
868 RRDD_LOG(LOG_DEBUG, "flushing old values");
870 /* Determine the time of the next cache flush. */
871 next_flush.tv_sec = now.tv_sec + config_flush_interval;
873 /* Flush all values that haven't been written in the last
874 * `config_write_interval' seconds. */
875 flush_old_values (config_write_interval);
877 /* unlock the cache while we rotate so we don't block incoming
878 * updates if the fsync() blocks on disk I/O */
879 pthread_mutex_unlock(&cache_lock);
880 journal_rotate();
881 pthread_mutex_lock(&cache_lock);
882 }
884 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
885 if (status != 0 && status != ETIMEDOUT)
886 {
887 RRDD_LOG (LOG_ERR, "flush_thread_main: "
888 "pthread_cond_timedwait returned %i.", status);
889 }
890 }
892 if (config_flush_at_shutdown)
893 flush_old_values (-1); /* flush everything */
895 state = SHUTDOWN;
897 pthread_mutex_unlock(&cache_lock);
899 return NULL;
900 } /* void *flush_thread_main */
902 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
903 {
904 pthread_mutex_lock (&cache_lock);
906 while (state != SHUTDOWN
907 || (cache_queue_head != NULL && config_flush_at_shutdown))
908 {
909 cache_item_t *ci;
910 char *file;
911 char **values;
912 size_t values_num;
913 int status;
915 /* Now, check if there's something to store away. If not, wait until
916 * something comes in. */
917 if (cache_queue_head == NULL)
918 {
919 status = pthread_cond_wait (&queue_cond, &cache_lock);
920 if ((status != 0) && (status != ETIMEDOUT))
921 {
922 RRDD_LOG (LOG_ERR, "queue_thread_main: "
923 "pthread_cond_wait returned %i.", status);
924 }
925 }
927 /* Check if a value has arrived. This may be NULL if we timed out or there
928 * was an interrupt such as a signal. */
929 if (cache_queue_head == NULL)
930 continue;
932 ci = cache_queue_head;
934 /* copy the relevant parts */
935 file = strdup (ci->file);
936 if (file == NULL)
937 {
938 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
939 continue;
940 }
942 assert(ci->values != NULL);
943 assert(ci->values_num > 0);
945 values = ci->values;
946 values_num = ci->values_num;
948 wipe_ci_values(ci, time(NULL));
949 remove_from_queue(ci);
951 pthread_mutex_unlock (&cache_lock);
953 rrd_clear_error ();
954 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
955 if (status != 0)
956 {
957 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
958 "rrd_update_r (%s) failed with status %i. (%s)",
959 file, status, rrd_get_error());
960 }
962 journal_write("wrote", file);
964 /* Search again in the tree. It's possible someone issued a "FORGET"
965 * while we were writing the update values. */
966 pthread_mutex_lock(&cache_lock);
967 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
968 if (ci)
969 pthread_cond_broadcast(&ci->flushed);
970 pthread_mutex_unlock(&cache_lock);
972 if (status == 0)
973 {
974 pthread_mutex_lock (&stats_lock);
975 stats_updates_written++;
976 stats_data_sets_written += values_num;
977 pthread_mutex_unlock (&stats_lock);
978 }
980 rrd_free_ptrs((void ***) &values, &values_num);
981 free(file);
983 pthread_mutex_lock (&cache_lock);
984 }
985 pthread_mutex_unlock (&cache_lock);
987 return (NULL);
988 } /* }}} void *queue_thread_main */
990 static int buffer_get_field (char **buffer_ret, /* {{{ */
991 size_t *buffer_size_ret, char **field_ret)
992 {
993 char *buffer;
994 size_t buffer_pos;
995 size_t buffer_size;
996 char *field;
997 size_t field_size;
998 int status;
1000 buffer = *buffer_ret;
1001 buffer_pos = 0;
1002 buffer_size = *buffer_size_ret;
1003 field = *buffer_ret;
1004 field_size = 0;
1006 if (buffer_size <= 0)
1007 return (-1);
1009 /* This is ensured by `handle_request'. */
1010 assert (buffer[buffer_size - 1] == '\0');
1012 status = -1;
1013 while (buffer_pos < buffer_size)
1014 {
1015 /* Check for end-of-field or end-of-buffer */
1016 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1017 {
1018 field[field_size] = 0;
1019 field_size++;
1020 buffer_pos++;
1021 status = 0;
1022 break;
1023 }
1024 /* Handle escaped characters. */
1025 else if (buffer[buffer_pos] == '\\')
1026 {
1027 if (buffer_pos >= (buffer_size - 1))
1028 break;
1029 buffer_pos++;
1030 field[field_size] = buffer[buffer_pos];
1031 field_size++;
1032 buffer_pos++;
1033 }
1034 /* Normal operation */
1035 else
1036 {
1037 field[field_size] = buffer[buffer_pos];
1038 field_size++;
1039 buffer_pos++;
1040 }
1041 } /* while (buffer_pos < buffer_size) */
1043 if (status != 0)
1044 return (status);
1046 *buffer_ret = buffer + buffer_pos;
1047 *buffer_size_ret = buffer_size - buffer_pos;
1048 *field_ret = field;
1050 return (0);
1051 } /* }}} int buffer_get_field */
1053 /* if we're restricting writes to the base directory,
1054 * check whether the file falls within the dir
1055 * returns 1 if OK, otherwise 0
1056 */
1057 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1058 {
1059 assert(file != NULL);
1061 if (!config_write_base_only
1062 || JOURNAL_REPLAY(sock)
1063 || config_base_dir == NULL)
1064 return 1;
1066 if (strstr(file, "../") != NULL) goto err;
1068 /* relative paths without "../" are ok */
1069 if (*file != '/') return 1;
1071 /* file must be of the format base + "/" + <1+ char filename> */
1072 if (strlen(file) < _config_base_dir_len + 2) goto err;
1073 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1074 if (*(file + _config_base_dir_len) != '/') goto err;
1076 return 1;
1078 err:
1079 if (sock != NULL && sock->fd >= 0)
1080 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1082 return 0;
1083 } /* }}} static int check_file_access */
1085 /* when using a base dir, convert relative paths to absolute paths.
1086 * if necessary, modifies the "filename" pointer to point
1087 * to the new path created in "tmp". "tmp" is provided
1088 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1089 *
1090 * this allows us to optimize for the expected case (absolute path)
1091 * with a no-op.
1092 */
1093 static void get_abs_path(char **filename, char *tmp)
1094 {
1095 assert(tmp != NULL);
1096 assert(filename != NULL && *filename != NULL);
1098 if (config_base_dir == NULL || **filename == '/')
1099 return;
1101 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1102 *filename = tmp;
1103 } /* }}} static int get_abs_path */
1105 static int flush_file (const char *filename) /* {{{ */
1106 {
1107 cache_item_t *ci;
1109 pthread_mutex_lock (&cache_lock);
1111 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1112 if (ci == NULL)
1113 {
1114 pthread_mutex_unlock (&cache_lock);
1115 return (ENOENT);
1116 }
1118 if (ci->values_num > 0)
1119 {
1120 /* Enqueue at head */
1121 enqueue_cache_item (ci, HEAD);
1122 pthread_cond_wait(&ci->flushed, &cache_lock);
1123 }
1125 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1126 * may have been purged during our cond_wait() */
1128 pthread_mutex_unlock(&cache_lock);
1130 return (0);
1131 } /* }}} int flush_file */
1133 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1134 {
1135 char *err = "Syntax error.\n";
1137 if (cmd && cmd->syntax)
1138 err = cmd->syntax;
1140 return send_response(sock, RESP_ERR, "Usage: %s", err);
1141 } /* }}} static int syntax_error() */
1143 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1144 {
1145 uint64_t copy_queue_length;
1146 uint64_t copy_updates_received;
1147 uint64_t copy_flush_received;
1148 uint64_t copy_updates_written;
1149 uint64_t copy_data_sets_written;
1150 uint64_t copy_journal_bytes;
1151 uint64_t copy_journal_rotate;
1153 uint64_t tree_nodes_number;
1154 uint64_t tree_depth;
1156 pthread_mutex_lock (&stats_lock);
1157 copy_queue_length = stats_queue_length;
1158 copy_updates_received = stats_updates_received;
1159 copy_flush_received = stats_flush_received;
1160 copy_updates_written = stats_updates_written;
1161 copy_data_sets_written = stats_data_sets_written;
1162 copy_journal_bytes = stats_journal_bytes;
1163 copy_journal_rotate = stats_journal_rotate;
1164 pthread_mutex_unlock (&stats_lock);
1166 pthread_mutex_lock (&cache_lock);
1167 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1168 tree_depth = (uint64_t) g_tree_height (cache_tree);
1169 pthread_mutex_unlock (&cache_lock);
1171 add_response_info(sock,
1172 "QueueLength: %"PRIu64"\n", copy_queue_length);
1173 add_response_info(sock,
1174 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1175 add_response_info(sock,
1176 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1177 add_response_info(sock,
1178 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1179 add_response_info(sock,
1180 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1181 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1182 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1183 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1184 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1186 send_response(sock, RESP_OK, "Statistics follow\n");
1188 return (0);
1189 } /* }}} int handle_request_stats */
1191 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1192 {
1193 char *file, file_tmp[PATH_MAX];
1194 int status;
1196 status = buffer_get_field (&buffer, &buffer_size, &file);
1197 if (status != 0)
1198 {
1199 return syntax_error(sock,cmd);
1200 }
1201 else
1202 {
1203 pthread_mutex_lock(&stats_lock);
1204 stats_flush_received++;
1205 pthread_mutex_unlock(&stats_lock);
1207 get_abs_path(&file, file_tmp);
1208 if (!check_file_access(file, sock)) return 0;
1210 status = flush_file (file);
1211 if (status == 0)
1212 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1213 else if (status == ENOENT)
1214 {
1215 /* no file in our tree; see whether it exists at all */
1216 struct stat statbuf;
1218 memset(&statbuf, 0, sizeof(statbuf));
1219 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1220 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1221 else
1222 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1223 }
1224 else if (status < 0)
1225 return send_response(sock, RESP_ERR, "Internal error.\n");
1226 else
1227 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1228 }
1230 /* NOTREACHED */
1231 assert(1==0);
1232 } /* }}} int handle_request_flush */
1234 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1235 {
1236 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1238 pthread_mutex_lock(&cache_lock);
1239 flush_old_values(-1);
1240 pthread_mutex_unlock(&cache_lock);
1242 return send_response(sock, RESP_OK, "Started flush.\n");
1243 } /* }}} static int handle_request_flushall */
1245 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1246 {
1247 int status;
1248 char *file, file_tmp[PATH_MAX];
1249 cache_item_t *ci;
1251 status = buffer_get_field(&buffer, &buffer_size, &file);
1252 if (status != 0)
1253 return syntax_error(sock,cmd);
1255 get_abs_path(&file, file_tmp);
1257 pthread_mutex_lock(&cache_lock);
1258 ci = g_tree_lookup(cache_tree, file);
1259 if (ci == NULL)
1260 {
1261 pthread_mutex_unlock(&cache_lock);
1262 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1263 }
1265 for (size_t i=0; i < ci->values_num; i++)
1266 add_response_info(sock, "%s\n", ci->values[i]);
1268 pthread_mutex_unlock(&cache_lock);
1269 return send_response(sock, RESP_OK, "updates pending\n");
1270 } /* }}} static int handle_request_pending */
1272 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1273 {
1274 int status;
1275 gboolean found;
1276 char *file, file_tmp[PATH_MAX];
1278 status = buffer_get_field(&buffer, &buffer_size, &file);
1279 if (status != 0)
1280 return syntax_error(sock,cmd);
1282 get_abs_path(&file, file_tmp);
1283 if (!check_file_access(file, sock)) return 0;
1285 pthread_mutex_lock(&cache_lock);
1286 found = g_tree_remove(cache_tree, file);
1287 pthread_mutex_unlock(&cache_lock);
1289 if (found == TRUE)
1290 {
1291 if (!JOURNAL_REPLAY(sock))
1292 journal_write("forget", file);
1294 return send_response(sock, RESP_OK, "Gone!\n");
1295 }
1296 else
1297 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1299 /* NOTREACHED */
1300 assert(1==0);
1301 } /* }}} static int handle_request_forget */
1303 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1304 {
1305 cache_item_t *ci;
1307 pthread_mutex_lock(&cache_lock);
1309 ci = cache_queue_head;
1310 while (ci != NULL)
1311 {
1312 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1313 ci = ci->next;
1314 }
1316 pthread_mutex_unlock(&cache_lock);
1318 return send_response(sock, RESP_OK, "in queue.\n");
1319 } /* }}} int handle_request_queue */
1321 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1322 {
1323 char *file, file_tmp[PATH_MAX];
1324 int values_num = 0;
1325 int status;
1326 char orig_buf[CMD_MAX];
1328 cache_item_t *ci;
1330 /* save it for the journal later */
1331 if (!JOURNAL_REPLAY(sock))
1332 strncpy(orig_buf, buffer, buffer_size);
1334 status = buffer_get_field (&buffer, &buffer_size, &file);
1335 if (status != 0)
1336 return syntax_error(sock,cmd);
1338 pthread_mutex_lock(&stats_lock);
1339 stats_updates_received++;
1340 pthread_mutex_unlock(&stats_lock);
1342 get_abs_path(&file, file_tmp);
1343 if (!check_file_access(file, sock)) return 0;
1345 pthread_mutex_lock (&cache_lock);
1346 ci = g_tree_lookup (cache_tree, file);
1348 if (ci == NULL) /* {{{ */
1349 {
1350 struct stat statbuf;
1351 cache_item_t *tmp;
1353 /* don't hold the lock while we setup; stat(2) might block */
1354 pthread_mutex_unlock(&cache_lock);
1356 memset (&statbuf, 0, sizeof (statbuf));
1357 status = stat (file, &statbuf);
1358 if (status != 0)
1359 {
1360 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1362 status = errno;
1363 if (status == ENOENT)
1364 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1365 else
1366 return send_response(sock, RESP_ERR,
1367 "stat failed with error %i.\n", status);
1368 }
1369 if (!S_ISREG (statbuf.st_mode))
1370 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1372 if (access(file, R_OK|W_OK) != 0)
1373 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1374 file, rrd_strerror(errno));
1376 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1377 if (ci == NULL)
1378 {
1379 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1381 return send_response(sock, RESP_ERR, "malloc failed.\n");
1382 }
1383 memset (ci, 0, sizeof (cache_item_t));
1385 ci->file = strdup (file);
1386 if (ci->file == NULL)
1387 {
1388 free (ci);
1389 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1391 return send_response(sock, RESP_ERR, "strdup failed.\n");
1392 }
1394 wipe_ci_values(ci, now);
1395 ci->flags = CI_FLAGS_IN_TREE;
1396 pthread_cond_init(&ci->flushed, NULL);
1398 pthread_mutex_lock(&cache_lock);
1400 /* another UPDATE might have added this entry in the meantime */
1401 tmp = g_tree_lookup (cache_tree, file);
1402 if (tmp == NULL)
1403 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1404 else
1405 {
1406 free_cache_item (ci);
1407 ci = tmp;
1408 }
1410 /* state may have changed while we were unlocked */
1411 if (state == SHUTDOWN)
1412 return -1;
1413 } /* }}} */
1414 assert (ci != NULL);
1416 /* don't re-write updates in replay mode */
1417 if (!JOURNAL_REPLAY(sock))
1418 journal_write("update", orig_buf);
1420 while (buffer_size > 0)
1421 {
1422 char *value;
1423 time_t stamp;
1424 char *eostamp;
1426 status = buffer_get_field (&buffer, &buffer_size, &value);
1427 if (status != 0)
1428 {
1429 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1430 break;
1431 }
1433 /* make sure update time is always moving forward */
1434 stamp = strtol(value, &eostamp, 10);
1435 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1436 {
1437 pthread_mutex_unlock(&cache_lock);
1438 return send_response(sock, RESP_ERR,
1439 "Cannot find timestamp in '%s'!\n", value);
1440 }
1441 else if (stamp <= ci->last_update_stamp)
1442 {
1443 pthread_mutex_unlock(&cache_lock);
1444 return send_response(sock, RESP_ERR,
1445 "illegal attempt to update using time %ld when last"
1446 " update time is %ld (minimum one second step)\n",
1447 stamp, ci->last_update_stamp);
1448 }
1449 else
1450 ci->last_update_stamp = stamp;
1452 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1453 &ci->values_alloc, config_alloc_chunk))
1454 {
1455 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1456 continue;
1457 }
1459 values_num++;
1460 }
1462 if (((now - ci->last_flush_time) >= config_write_interval)
1463 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1464 && (ci->values_num > 0))
1465 {
1466 enqueue_cache_item (ci, TAIL);
1467 }
1469 pthread_mutex_unlock (&cache_lock);
1471 if (values_num < 1)
1472 return send_response(sock, RESP_ERR, "No values updated.\n");
1473 else
1474 return send_response(sock, RESP_OK,
1475 "errors, enqueued %i value(s).\n", values_num);
1477 /* NOTREACHED */
1478 assert(1==0);
1480 } /* }}} int handle_request_update */
1482 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1483 {
1484 char *file, file_tmp[PATH_MAX];
1485 char *cf;
1487 char *start_str;
1488 char *end_str;
1489 time_t start_tm;
1490 time_t end_tm;
1492 unsigned long step;
1493 unsigned long ds_cnt;
1494 char **ds_namv;
1495 rrd_value_t *data;
1497 int status;
1498 unsigned long i;
1499 time_t t;
1500 rrd_value_t *data_ptr;
1502 file = NULL;
1503 cf = NULL;
1504 start_str = NULL;
1505 end_str = NULL;
1507 /* Read the arguments */
1508 do /* while (0) */
1509 {
1510 status = buffer_get_field (&buffer, &buffer_size, &file);
1511 if (status != 0)
1512 break;
1514 status = buffer_get_field (&buffer, &buffer_size, &cf);
1515 if (status != 0)
1516 break;
1518 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1519 if (status != 0)
1520 {
1521 start_str = NULL;
1522 status = 0;
1523 break;
1524 }
1526 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1527 if (status != 0)
1528 {
1529 end_str = NULL;
1530 status = 0;
1531 break;
1532 }
1533 } while (0);
1535 if (status != 0)
1536 return (syntax_error(sock,cmd));
1538 get_abs_path(&file, file_tmp);
1539 if (!check_file_access(file, sock)) return 0;
1541 status = flush_file (file);
1542 if ((status != 0) && (status != ENOENT))
1543 return (send_response (sock, RESP_ERR,
1544 "flush_file (%s) failed with status %i.\n", file, status));
1546 t = time (NULL); /* "now" */
1548 /* Parse start time */
1549 if (start_str != NULL)
1550 {
1551 char *endptr;
1552 long value;
1554 endptr = NULL;
1555 errno = 0;
1556 value = strtol (start_str, &endptr, /* base = */ 0);
1557 if ((endptr == start_str) || (errno != 0))
1558 return (send_response(sock, RESP_ERR,
1559 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1560 start_str));
1562 if (value > 0)
1563 start_tm = (time_t) value;
1564 else
1565 start_tm = (time_t) (t + value);
1566 }
1567 else
1568 {
1569 start_tm = t - 86400;
1570 }
1572 /* Parse end time */
1573 if (end_str != NULL)
1574 {
1575 char *endptr;
1576 long value;
1578 endptr = NULL;
1579 errno = 0;
1580 value = strtol (end_str, &endptr, /* base = */ 0);
1581 if ((endptr == end_str) || (errno != 0))
1582 return (send_response(sock, RESP_ERR,
1583 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1584 end_str));
1586 if (value > 0)
1587 end_tm = (time_t) value;
1588 else
1589 end_tm = (time_t) (t + value);
1590 }
1591 else
1592 {
1593 end_tm = t;
1594 }
1596 step = -1;
1597 ds_cnt = 0;
1598 ds_namv = NULL;
1599 data = NULL;
1601 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1602 &ds_cnt, &ds_namv, &data);
1603 if (status != 0)
1604 return (send_response(sock, RESP_ERR,
1605 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1607 add_response_info (sock, "FlushVersion: %lu\n", 1);
1608 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1609 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1610 add_response_info (sock, "Step: %lu\n", step);
1611 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1613 #define SSTRCAT(buffer,str,buffer_fill) do { \
1614 size_t str_len = strlen (str); \
1615 if ((buffer_fill + str_len) > sizeof (buffer)) \
1616 str_len = sizeof (buffer) - buffer_fill; \
1617 if (str_len > 0) { \
1618 strncpy (buffer + buffer_fill, str, str_len); \
1619 buffer_fill += str_len; \
1620 assert (buffer_fill <= sizeof (buffer)); \
1621 if (buffer_fill == sizeof (buffer)) \
1622 buffer[buffer_fill - 1] = 0; \
1623 else \
1624 buffer[buffer_fill] = 0; \
1625 } \
1626 } while (0)
1628 { /* Add list of DS names */
1629 char linebuf[1024];
1630 size_t linebuf_fill;
1632 memset (linebuf, 0, sizeof (linebuf));
1633 linebuf_fill = 0;
1634 for (i = 0; i < ds_cnt; i++)
1635 {
1636 if (i > 0)
1637 SSTRCAT (linebuf, " ", linebuf_fill);
1638 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1639 rrd_freemem(ds_namv[i]);
1640 }
1641 rrd_freemem(ds_namv);
1642 add_response_info (sock, "DSName: %s\n", linebuf);
1643 }
1645 /* Add the actual data */
1646 assert (step > 0);
1647 data_ptr = data;
1648 for (t = start_tm + step; t <= end_tm; t += step)
1649 {
1650 char linebuf[1024];
1651 size_t linebuf_fill;
1652 char tmp[128];
1654 memset (linebuf, 0, sizeof (linebuf));
1655 linebuf_fill = 0;
1656 for (i = 0; i < ds_cnt; i++)
1657 {
1658 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1659 tmp[sizeof (tmp) - 1] = 0;
1660 SSTRCAT (linebuf, tmp, linebuf_fill);
1662 data_ptr++;
1663 }
1665 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1666 } /* for (t) */
1667 rrd_freemem(data);
1669 return (send_response (sock, RESP_OK, "Success\n"));
1670 #undef SSTRCAT
1671 } /* }}} int handle_request_fetch */
1673 /* we came across a "WROTE" entry during journal replay.
1674 * throw away any values that we have accumulated for this file
1675 */
1676 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1677 {
1678 cache_item_t *ci;
1679 const char *file = buffer;
1681 pthread_mutex_lock(&cache_lock);
1683 ci = g_tree_lookup(cache_tree, file);
1684 if (ci == NULL)
1685 {
1686 pthread_mutex_unlock(&cache_lock);
1687 return (0);
1688 }
1690 if (ci->values)
1691 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1693 wipe_ci_values(ci, now);
1694 remove_from_queue(ci);
1696 pthread_mutex_unlock(&cache_lock);
1697 return (0);
1698 } /* }}} int handle_request_wrote */
1700 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1701 {
1702 char *file, file_tmp[PATH_MAX];
1703 int status;
1704 rrd_info_t *info;
1706 /* obtain filename */
1707 status = buffer_get_field(&buffer, &buffer_size, &file);
1708 if (status != 0)
1709 return syntax_error(sock,cmd);
1710 /* get full pathname */
1711 get_abs_path(&file, file_tmp);
1712 if (!check_file_access(file, sock)) {
1713 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1714 }
1715 /* get data */
1716 rrd_clear_error ();
1717 info = rrd_info_r(file);
1718 if(!info) {
1719 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1720 }
1721 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1722 switch (data->type) {
1723 case RD_I_VAL:
1724 if (isnan(data->value.u_val))
1725 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1726 else
1727 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1728 break;
1729 case RD_I_CNT:
1730 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1731 break;
1732 case RD_I_INT:
1733 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1734 break;
1735 case RD_I_STR:
1736 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1737 break;
1738 case RD_I_BLO:
1739 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1740 break;
1741 }
1742 }
1744 rrd_info_free(info);
1746 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1747 } /* }}} static int handle_request_info */
1749 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1750 {
1751 char *i, *file, file_tmp[PATH_MAX];
1752 int status;
1753 int idx;
1754 time_t t;
1756 /* obtain filename */
1757 status = buffer_get_field(&buffer, &buffer_size, &file);
1758 if (status != 0)
1759 return syntax_error(sock,cmd);
1760 /* get full pathname */
1761 get_abs_path(&file, file_tmp);
1762 if (!check_file_access(file, sock)) {
1763 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1764 }
1766 status = buffer_get_field(&buffer, &buffer_size, &i);
1767 if (status != 0)
1768 return syntax_error(sock,cmd);
1769 idx = atoi(i);
1770 if(idx<0) {
1771 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1772 }
1774 /* get data */
1775 rrd_clear_error ();
1776 t = rrd_first_r(file,idx);
1777 if(t<1) {
1778 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1779 }
1780 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1781 } /* }}} static int handle_request_first */
1784 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1785 {
1786 char *file, file_tmp[PATH_MAX];
1787 int status;
1788 time_t t, from_file, step;
1789 rrd_file_t * rrd_file;
1790 cache_item_t * ci;
1791 rrd_t rrd;
1793 /* obtain filename */
1794 status = buffer_get_field(&buffer, &buffer_size, &file);
1795 if (status != 0)
1796 return syntax_error(sock,cmd);
1797 /* get full pathname */
1798 get_abs_path(&file, file_tmp);
1799 if (!check_file_access(file, sock)) {
1800 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1801 }
1802 rrd_clear_error();
1803 rrd_init(&rrd);
1804 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1805 if(!rrd_file) {
1806 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1807 }
1808 from_file = rrd.live_head->last_up;
1809 step = rrd.stat_head->pdp_step;
1810 rrd_close(rrd_file);
1811 pthread_mutex_lock(&cache_lock);
1812 ci = g_tree_lookup(cache_tree, file);
1813 if (ci)
1814 t = ci->last_update_stamp;
1815 else
1816 t = from_file;
1817 pthread_mutex_unlock(&cache_lock);
1818 t -= t % step;
1819 rrd_free(&rrd);
1820 if(t<1) {
1821 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1822 }
1823 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1824 } /* }}} static int handle_request_last */
1826 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1827 {
1828 char *file, file_tmp[PATH_MAX];
1829 char *tok;
1830 int ac = 0;
1831 char *av[128];
1832 int status;
1833 unsigned long step = 300;
1834 time_t last_up = time(NULL)-10;
1835 int no_overwrite = opt_no_overwrite;
1838 /* obtain filename */
1839 status = buffer_get_field(&buffer, &buffer_size, &file);
1840 if (status != 0)
1841 return syntax_error(sock,cmd);
1842 /* get full pathname */
1843 get_abs_path(&file, file_tmp);
1844 if (!check_file_access(file, sock)) {
1845 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1846 }
1847 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1849 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1850 if( ! strncmp(tok,"-b",2) ) {
1851 status = buffer_get_field(&buffer, &buffer_size, &tok );
1852 if (status != 0) return syntax_error(sock,cmd);
1853 last_up = (time_t) atol(tok);
1854 continue;
1855 }
1856 if( ! strncmp(tok,"-s",2) ) {
1857 status = buffer_get_field(&buffer, &buffer_size, &tok );
1858 if (status != 0) return syntax_error(sock,cmd);
1859 step = atol(tok);
1860 continue;
1861 }
1862 if( ! strncmp(tok,"-O",2) ) {
1863 no_overwrite = 1;
1864 continue;
1865 }
1866 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1867 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1868 return syntax_error(sock,cmd);
1869 }
1870 if(step<1) {
1871 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1872 }
1873 if (last_up < 3600 * 24 * 365 * 10) {
1874 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1875 }
1877 rrd_clear_error ();
1878 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1880 if(!status) {
1881 return send_response(sock, RESP_OK, "RRD created OK\n");
1882 }
1883 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1884 } /* }}} static int handle_request_create */
1886 /* start "BATCH" processing */
1887 static int batch_start (HANDLER_PROTO) /* {{{ */
1888 {
1889 int status;
1890 if (sock->batch_start)
1891 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1893 status = send_response(sock, RESP_OK,
1894 "Go ahead. End with dot '.' on its own line.\n");
1895 sock->batch_start = time(NULL);
1896 sock->batch_cmd = 0;
1898 return status;
1899 } /* }}} static int batch_start */
1901 /* finish "BATCH" processing and return results to the client */
1902 static int batch_done (HANDLER_PROTO) /* {{{ */
1903 {
1904 assert(sock->batch_start);
1905 sock->batch_start = 0;
1906 sock->batch_cmd = 0;
1907 return send_response(sock, RESP_OK, "errors\n");
1908 } /* }}} static int batch_done */
1910 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1911 {
1912 return -1;
1913 } /* }}} static int handle_request_quit */
1915 static command_t list_of_commands[] = { /* {{{ */
1916 {
1917 "UPDATE",
1918 handle_request_update,
1919 CMD_CONTEXT_ANY,
1920 "UPDATE <filename> <values> [<values> ...]\n"
1921 ,
1922 "Adds the given file to the internal cache if it is not yet known and\n"
1923 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1924 "for details.\n"
1925 "\n"
1926 "Each <values> has the following form:\n"
1927 " <values> = <time>:<value>[:<value>[...]]\n"
1928 "See the rrdupdate(1) manpage for details.\n"
1929 },
1930 {
1931 "WROTE",
1932 handle_request_wrote,
1933 CMD_CONTEXT_JOURNAL,
1934 NULL,
1935 NULL
1936 },
1937 {
1938 "FLUSH",
1939 handle_request_flush,
1940 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1941 "FLUSH <filename>\n"
1942 ,
1943 "Adds the given filename to the head of the update queue and returns\n"
1944 "after it has been dequeued.\n"
1945 },
1946 {
1947 "FLUSHALL",
1948 handle_request_flushall,
1949 CMD_CONTEXT_CLIENT,
1950 "FLUSHALL\n"
1951 ,
1952 "Triggers writing of all pending updates. Returns immediately.\n"
1953 },
1954 {
1955 "PENDING",
1956 handle_request_pending,
1957 CMD_CONTEXT_CLIENT,
1958 "PENDING <filename>\n"
1959 ,
1960 "Shows any 'pending' updates for a file, in order.\n"
1961 "The updates shown have not yet been written to the underlying RRD file.\n"
1962 },
1963 {
1964 "FORGET",
1965 handle_request_forget,
1966 CMD_CONTEXT_ANY,
1967 "FORGET <filename>\n"
1968 ,
1969 "Removes the file completely from the cache.\n"
1970 "Any pending updates for the file will be lost.\n"
1971 },
1972 {
1973 "QUEUE",
1974 handle_request_queue,
1975 CMD_CONTEXT_CLIENT,
1976 "QUEUE\n"
1977 ,
1978 "Shows all files in the output queue.\n"
1979 "The output is zero or more lines in the following format:\n"
1980 "(where <num_vals> is the number of values to be written)\n"
1981 "\n"
1982 "<num_vals> <filename>\n"
1983 },
1984 {
1985 "STATS",
1986 handle_request_stats,
1987 CMD_CONTEXT_CLIENT,
1988 "STATS\n"
1989 ,
1990 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1991 "a description of the values.\n"
1992 },
1993 {
1994 "HELP",
1995 handle_request_help,
1996 CMD_CONTEXT_CLIENT,
1997 "HELP [<command>]\n",
1998 NULL, /* special! */
1999 },
2000 {
2001 "BATCH",
2002 batch_start,
2003 CMD_CONTEXT_CLIENT,
2004 "BATCH\n"
2005 ,
2006 "The 'BATCH' command permits the client to initiate a bulk load\n"
2007 " of commands to rrdcached.\n"
2008 "\n"
2009 "Usage:\n"
2010 "\n"
2011 " client: BATCH\n"
2012 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2013 " client: command #1\n"
2014 " client: command #2\n"
2015 " client: ... and so on\n"
2016 " client: .\n"
2017 " server: 2 errors\n"
2018 " server: 7 message for command #7\n"
2019 " server: 9 message for command #9\n"
2020 "\n"
2021 "For more information, consult the rrdcached(1) documentation.\n"
2022 },
2023 {
2024 ".", /* BATCH terminator */
2025 batch_done,
2026 CMD_CONTEXT_BATCH,
2027 NULL,
2028 NULL
2029 },
2030 {
2031 "FETCH",
2032 handle_request_fetch,
2033 CMD_CONTEXT_CLIENT,
2034 "FETCH <file> <CF> [<start> [<end>]]\n"
2035 ,
2036 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2037 },
2038 {
2039 "INFO",
2040 handle_request_info,
2041 CMD_CONTEXT_CLIENT,
2042 "INFO <filename>\n",
2043 "The INFO command retrieves information about a specified RRD file.\n"
2044 "This is returned in standard rrdinfo format, a sequence of lines\n"
2045 "with the format <keyname> = <value>\n"
2046 "Note that this is the data as of the last update of the RRD file itself,\n"
2047 "not the last time data was received via rrdcached, so there may be pending\n"
2048 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2049 },
2050 {
2051 "FIRST",
2052 handle_request_first,
2053 CMD_CONTEXT_CLIENT,
2054 "FIRST <filename> <rra index>\n",
2055 "The FIRST command retrieves the first data time for a specified RRA in\n"
2056 "an RRD file.\n"
2057 },
2058 {
2059 "LAST",
2060 handle_request_last,
2061 CMD_CONTEXT_CLIENT,
2062 "LAST <filename>\n",
2063 "The LAST command retrieves the last update time for a specified RRD file.\n"
2064 "Note that this is the time of the last update of the RRD file itself, not\n"
2065 "the last time data was received via rrdcached, so there may be pending\n"
2066 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2067 },
2068 {
2069 "CREATE",
2070 handle_request_create,
2071 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2072 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2073 "The CREATE command will create an RRD file, overwriting any existing file\n"
2074 "unless the -O option is given or rrdcached was started with the -O option.\n"
2075 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2076 "not acceptable) and the step is in seconds (default is 300).\n"
2077 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2078 },
2079 {
2080 "QUIT",
2081 handle_request_quit,
2082 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2083 "QUIT\n"
2084 ,
2085 "Disconnect from rrdcached.\n"
2086 }
2087 }; /* }}} command_t list_of_commands[] */
2088 static size_t list_of_commands_len = sizeof (list_of_commands)
2089 / sizeof (list_of_commands[0]);
2091 static command_t *find_command(char *cmd)
2092 {
2093 size_t i;
2095 for (i = 0; i < list_of_commands_len; i++)
2096 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2097 return (&list_of_commands[i]);
2098 return NULL;
2099 }
2101 /* We currently use the index in the `list_of_commands' array as a bit position
2102 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2103 * outside these functions so that switching to a more elegant storage method
2104 * is easily possible. */
2105 static ssize_t find_command_index (const char *cmd) /* {{{ */
2106 {
2107 size_t i;
2109 for (i = 0; i < list_of_commands_len; i++)
2110 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2111 return ((ssize_t) i);
2112 return (-1);
2113 } /* }}} ssize_t find_command_index */
2115 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2116 const char *cmd)
2117 {
2118 ssize_t i;
2120 if (JOURNAL_REPLAY(sock))
2121 return (1);
2123 if (cmd == NULL)
2124 return (-1);
2126 if ((strcasecmp ("QUIT", cmd) == 0)
2127 || (strcasecmp ("HELP", cmd) == 0))
2128 return (1);
2129 else if (strcmp (".", cmd) == 0)
2130 cmd = "BATCH";
2132 i = find_command_index (cmd);
2133 if (i < 0)
2134 return (-1);
2135 assert (i < 32);
2137 if ((sock->permissions & (1 << i)) != 0)
2138 return (1);
2139 return (0);
2140 } /* }}} int socket_permission_check */
2142 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2143 const char *cmd)
2144 {
2145 ssize_t i;
2147 i = find_command_index (cmd);
2148 if (i < 0)
2149 return (-1);
2150 assert (i < 32);
2152 sock->permissions |= (1 << i);
2153 return (0);
2154 } /* }}} int socket_permission_add */
2156 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2157 {
2158 sock->permissions = 0;
2159 } /* }}} socket_permission_clear */
2161 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2162 listen_socket_t *src)
2163 {
2164 dest->permissions = src->permissions;
2165 } /* }}} socket_permission_copy */
2167 /* check whether commands are received in the expected context */
2168 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2169 {
2170 if (JOURNAL_REPLAY(sock))
2171 return (cmd->context & CMD_CONTEXT_JOURNAL);
2172 else if (sock->batch_start)
2173 return (cmd->context & CMD_CONTEXT_BATCH);
2174 else
2175 return (cmd->context & CMD_CONTEXT_CLIENT);
2177 /* NOTREACHED */
2178 assert(1==0);
2179 }
2181 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2182 {
2183 int status;
2184 char *cmd_str;
2185 char *resp_txt;
2186 command_t *help = NULL;
2188 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2189 if (status == 0)
2190 help = find_command(cmd_str);
2192 if (help && (help->syntax || help->help))
2193 {
2194 char tmp[CMD_MAX];
2196 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2197 resp_txt = tmp;
2199 if (help->syntax)
2200 add_response_info(sock, "Usage: %s\n", help->syntax);
2202 if (help->help)
2203 add_response_info(sock, "%s\n", help->help);
2204 }
2205 else
2206 {
2207 size_t i;
2209 resp_txt = "Command overview\n";
2211 for (i = 0; i < list_of_commands_len; i++)
2212 {
2213 if (list_of_commands[i].syntax == NULL)
2214 continue;
2215 add_response_info (sock, "%s", list_of_commands[i].syntax);
2216 }
2217 }
2219 return send_response(sock, RESP_OK, resp_txt);
2220 } /* }}} int handle_request_help */
2222 static int handle_request (DISPATCH_PROTO) /* {{{ */
2223 {
2224 char *buffer_ptr = buffer;
2225 char *cmd_str = NULL;
2226 command_t *cmd = NULL;
2227 int status;
2229 assert (buffer[buffer_size - 1] == '\0');
2231 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2232 if (status != 0)
2233 {
2234 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2235 return (-1);
2236 }
2238 if (sock != NULL && sock->batch_start)
2239 sock->batch_cmd++;
2241 cmd = find_command(cmd_str);
2242 if (!cmd)
2243 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2245 if (!socket_permission_check (sock, cmd->cmd))
2246 return send_response(sock, RESP_ERR, "Permission denied.\n");
2248 if (!command_check_context(sock, cmd))
2249 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2251 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2252 } /* }}} int handle_request */
2254 static void journal_set_free (journal_set *js) /* {{{ */
2255 {
2256 if (js == NULL)
2257 return;
2259 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2261 free(js);
2262 } /* }}} journal_set_free */
2264 static void journal_set_remove (journal_set *js) /* {{{ */
2265 {
2266 if (js == NULL)
2267 return;
2269 for (uint i=0; i < js->files_num; i++)
2270 {
2271 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2272 unlink(js->files[i]);
2273 }
2274 } /* }}} journal_set_remove */
2276 /* close current journal file handle.
2277 * MUST hold journal_lock before calling */
2278 static void journal_close(void) /* {{{ */
2279 {
2280 if (journal_fh != NULL)
2281 {
2282 if (fclose(journal_fh) != 0)
2283 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2284 }
2286 journal_fh = NULL;
2287 journal_size = 0;
2288 } /* }}} journal_close */
2290 /* MUST hold journal_lock before calling */
2291 static void journal_new_file(void) /* {{{ */
2292 {
2293 struct timeval now;
2294 int new_fd;
2295 char new_file[PATH_MAX + 1];
2297 assert(journal_dir != NULL);
2298 assert(journal_cur != NULL);
2300 journal_close();
2302 gettimeofday(&now, NULL);
2303 /* this format assures that the files sort in strcmp() order */
2304 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2305 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2307 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2308 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2309 if (new_fd < 0)
2310 goto error;
2312 journal_fh = fdopen(new_fd, "a");
2313 if (journal_fh == NULL)
2314 goto error;
2316 journal_size = ftell(journal_fh);
2317 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2319 /* record the file in the journal set */
2320 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2322 return;
2324 error:
2325 RRDD_LOG(LOG_CRIT,
2326 "JOURNALING DISABLED: Error while trying to create %s : %s",
2327 new_file, rrd_strerror(errno));
2328 RRDD_LOG(LOG_CRIT,
2329 "JOURNALING DISABLED: All values will be flushed at shutdown");
2331 close(new_fd);
2332 config_flush_at_shutdown = 1;
2334 } /* }}} journal_new_file */
2336 /* MUST NOT hold journal_lock before calling this */
2337 static void journal_rotate(void) /* {{{ */
2338 {
2339 journal_set *old_js = NULL;
2341 if (journal_dir == NULL)
2342 return;
2344 RRDD_LOG(LOG_DEBUG, "rotating journals");
2346 pthread_mutex_lock(&stats_lock);
2347 ++stats_journal_rotate;
2348 pthread_mutex_unlock(&stats_lock);
2350 pthread_mutex_lock(&journal_lock);
2352 journal_close();
2354 /* rotate the journal sets */
2355 old_js = journal_old;
2356 journal_old = journal_cur;
2357 journal_cur = calloc(1, sizeof(journal_set));
2359 if (journal_cur != NULL)
2360 journal_new_file();
2361 else
2362 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2364 pthread_mutex_unlock(&journal_lock);
2366 journal_set_remove(old_js);
2367 journal_set_free (old_js);
2369 } /* }}} static void journal_rotate */
2371 /* MUST hold journal_lock when calling */
2372 static void journal_done(void) /* {{{ */
2373 {
2374 if (journal_cur == NULL)
2375 return;
2377 journal_close();
2379 if (config_flush_at_shutdown)
2380 {
2381 RRDD_LOG(LOG_INFO, "removing journals");
2382 journal_set_remove(journal_old);
2383 journal_set_remove(journal_cur);
2384 }
2385 else
2386 {
2387 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2388 "journals will be used at next startup");
2389 }
2391 journal_set_free(journal_cur);
2392 journal_set_free(journal_old);
2393 free(journal_dir);
2395 } /* }}} static void journal_done */
2397 static int journal_write(char *cmd, char *args) /* {{{ */
2398 {
2399 int chars;
2401 if (journal_fh == NULL)
2402 return 0;
2404 pthread_mutex_lock(&journal_lock);
2405 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2406 journal_size += chars;
2408 if (journal_size > JOURNAL_MAX)
2409 journal_new_file();
2411 pthread_mutex_unlock(&journal_lock);
2413 if (chars > 0)
2414 {
2415 pthread_mutex_lock(&stats_lock);
2416 stats_journal_bytes += chars;
2417 pthread_mutex_unlock(&stats_lock);
2418 }
2420 return chars;
2421 } /* }}} static int journal_write */
2423 static int journal_replay (const char *file) /* {{{ */
2424 {
2425 FILE *fh;
2426 int entry_cnt = 0;
2427 int fail_cnt = 0;
2428 uint64_t line = 0;
2429 char entry[CMD_MAX];
2430 time_t now;
2432 if (file == NULL) return 0;
2434 {
2435 char *reason = "unknown error";
2436 int status = 0;
2437 struct stat statbuf;
2439 memset(&statbuf, 0, sizeof(statbuf));
2440 if (stat(file, &statbuf) != 0)
2441 {
2442 reason = "stat error";
2443 status = errno;
2444 }
2445 else if (!S_ISREG(statbuf.st_mode))
2446 {
2447 reason = "not a regular file";
2448 status = EPERM;
2449 }
2450 if (statbuf.st_uid != daemon_uid)
2451 {
2452 reason = "not owned by daemon user";
2453 status = EACCES;
2454 }
2455 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2456 {
2457 reason = "must not be user/group writable";
2458 status = EACCES;
2459 }
2461 if (status != 0)
2462 {
2463 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2464 file, rrd_strerror(status), reason);
2465 return 0;
2466 }
2467 }
2469 fh = fopen(file, "r");
2470 if (fh == NULL)
2471 {
2472 if (errno != ENOENT)
2473 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2474 file, rrd_strerror(errno));
2475 return 0;
2476 }
2477 else
2478 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2480 now = time(NULL);
2482 while(!feof(fh))
2483 {
2484 size_t entry_len;
2486 ++line;
2487 if (fgets(entry, sizeof(entry), fh) == NULL)
2488 break;
2489 entry_len = strlen(entry);
2491 /* check \n termination in case journal writing crashed mid-line */
2492 if (entry_len == 0)
2493 continue;
2494 else if (entry[entry_len - 1] != '\n')
2495 {
2496 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2497 ++fail_cnt;
2498 continue;
2499 }
2501 entry[entry_len - 1] = '\0';
2503 if (handle_request(NULL, now, entry, entry_len) == 0)
2504 ++entry_cnt;
2505 else
2506 ++fail_cnt;
2507 }
2509 fclose(fh);
2511 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2512 entry_cnt, fail_cnt);
2514 return entry_cnt > 0 ? 1 : 0;
2515 } /* }}} static int journal_replay */
2517 static int journal_sort(const void *v1, const void *v2)
2518 {
2519 char **jn1 = (char **) v1;
2520 char **jn2 = (char **) v2;
2522 return strcmp(*jn1,*jn2);
2523 }
2525 static void journal_init(void) /* {{{ */
2526 {
2527 int had_journal = 0;
2528 DIR *dir;
2529 struct dirent *dent;
2530 char path[PATH_MAX+1];
2532 if (journal_dir == NULL) return;
2534 pthread_mutex_lock(&journal_lock);
2536 journal_cur = calloc(1, sizeof(journal_set));
2537 if (journal_cur == NULL)
2538 {
2539 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2540 return;
2541 }
2543 RRDD_LOG(LOG_INFO, "checking for journal files");
2545 /* Handle old journal files during transition. This gives them the
2546 * correct sort order. TODO: remove after first release
2547 */
2548 {
2549 char old_path[PATH_MAX+1];
2550 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2551 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2552 rename(old_path, path);
2554 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2555 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2556 rename(old_path, path);
2557 }
2559 dir = opendir(journal_dir);
2560 if (!dir) {
2561 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2562 return;
2563 }
2564 while ((dent = readdir(dir)) != NULL)
2565 {
2566 /* looks like a journal file? */
2567 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2568 continue;
2570 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2572 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2573 {
2574 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2575 dent->d_name);
2576 break;
2577 }
2578 }
2579 closedir(dir);
2581 qsort(journal_cur->files, journal_cur->files_num,
2582 sizeof(journal_cur->files[0]), journal_sort);
2584 for (uint i=0; i < journal_cur->files_num; i++)
2585 had_journal += journal_replay(journal_cur->files[i]);
2587 journal_new_file();
2589 /* it must have been a crash. start a flush */
2590 if (had_journal && config_flush_at_shutdown)
2591 flush_old_values(-1);
2593 pthread_mutex_unlock(&journal_lock);
2595 RRDD_LOG(LOG_INFO, "journal processing complete");
2597 } /* }}} static void journal_init */
2599 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2600 {
2601 assert(sock != NULL);
2603 free(sock->rbuf); sock->rbuf = NULL;
2604 free(sock->wbuf); sock->wbuf = NULL;
2605 free(sock);
2606 } /* }}} void free_listen_socket */
2608 static void close_connection(listen_socket_t *sock) /* {{{ */
2609 {
2610 if (sock->fd >= 0)
2611 {
2612 close(sock->fd);
2613 sock->fd = -1;
2614 }
2616 free_listen_socket(sock);
2618 } /* }}} void close_connection */
2620 static void *connection_thread_main (void *args) /* {{{ */
2621 {
2622 listen_socket_t *sock;
2623 int fd;
2625 sock = (listen_socket_t *) args;
2626 fd = sock->fd;
2628 /* init read buffers */
2629 sock->next_read = sock->next_cmd = 0;
2630 sock->rbuf = malloc(RBUF_SIZE);
2631 if (sock->rbuf == NULL)
2632 {
2633 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2634 close_connection(sock);
2635 return NULL;
2636 }
2638 pthread_mutex_lock (&connection_threads_lock);
2639 connection_threads_num++;
2640 pthread_mutex_unlock (&connection_threads_lock);
2642 while (state == RUNNING)
2643 {
2644 char *cmd;
2645 ssize_t cmd_len;
2646 ssize_t rbytes;
2647 time_t now;
2649 struct pollfd pollfd;
2650 int status;
2652 pollfd.fd = fd;
2653 pollfd.events = POLLIN | POLLPRI;
2654 pollfd.revents = 0;
2656 status = poll (&pollfd, 1, /* timeout = */ 500);
2657 if (state != RUNNING)
2658 break;
2659 else if (status == 0) /* timeout */
2660 continue;
2661 else if (status < 0) /* error */
2662 {
2663 status = errno;
2664 if (status != EINTR)
2665 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2666 continue;
2667 }
2669 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2670 break;
2671 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2672 {
2673 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2674 "poll(2) returned something unexpected: %#04hx",
2675 pollfd.revents);
2676 break;
2677 }
2679 rbytes = read(fd, sock->rbuf + sock->next_read,
2680 RBUF_SIZE - sock->next_read);
2681 if (rbytes < 0)
2682 {
2683 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2684 break;
2685 }
2686 else if (rbytes == 0)
2687 break; /* eof */
2689 sock->next_read += rbytes;
2691 if (sock->batch_start)
2692 now = sock->batch_start;
2693 else
2694 now = time(NULL);
2696 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2697 {
2698 status = handle_request (sock, now, cmd, cmd_len+1);
2699 if (status != 0)
2700 goto out_close;
2701 }
2702 }
2704 out_close:
2705 close_connection(sock);
2707 /* Remove this thread from the connection threads list */
2708 pthread_mutex_lock (&connection_threads_lock);
2709 connection_threads_num--;
2710 if (connection_threads_num <= 0)
2711 pthread_cond_broadcast(&connection_threads_done);
2712 pthread_mutex_unlock (&connection_threads_lock);
2714 return (NULL);
2715 } /* }}} void *connection_thread_main */
2717 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2718 {
2719 int fd;
2720 struct sockaddr_un sa;
2721 listen_socket_t *temp;
2722 int status;
2723 const char *path;
2724 char *path_copy, *dir;
2726 path = sock->addr;
2727 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2728 path += strlen("unix:");
2730 /* dirname may modify its argument */
2731 path_copy = strdup(path);
2732 if (path_copy == NULL)
2733 {
2734 fprintf(stderr, "rrdcached: strdup(): %s\n",
2735 rrd_strerror(errno));
2736 return (-1);
2737 }
2739 dir = dirname(path_copy);
2740 if (rrd_mkdir_p(dir, 0777) != 0)
2741 {
2742 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2743 dir, rrd_strerror(errno));
2744 return (-1);
2745 }
2747 free(path_copy);
2749 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2750 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2751 if (temp == NULL)
2752 {
2753 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2754 return (-1);
2755 }
2756 listen_fds = temp;
2757 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2759 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2760 if (fd < 0)
2761 {
2762 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2763 rrd_strerror(errno));
2764 return (-1);
2765 }
2767 memset (&sa, 0, sizeof (sa));
2768 sa.sun_family = AF_UNIX;
2769 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2771 /* if we've gotten this far, we own the pid file. any daemon started
2772 * with the same args must not be alive. therefore, ensure that we can
2773 * create the socket...
2774 */
2775 unlink(path);
2777 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2778 if (status != 0)
2779 {
2780 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2781 path, rrd_strerror(errno));
2782 close (fd);
2783 return (-1);
2784 }
2786 /* tweak the sockets group ownership */
2787 if (sock->socket_group != (gid_t)-1)
2788 {
2789 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2790 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2791 {
2792 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2793 }
2794 }
2796 if (sock->socket_permissions != (mode_t)-1)
2797 {
2798 if (chmod(path, sock->socket_permissions) != 0)
2799 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2800 (unsigned int)sock->socket_permissions, strerror(errno));
2801 }
2803 status = listen (fd, /* backlog = */ 10);
2804 if (status != 0)
2805 {
2806 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2807 path, rrd_strerror(errno));
2808 close (fd);
2809 unlink (path);
2810 return (-1);
2811 }
2813 listen_fds[listen_fds_num].fd = fd;
2814 listen_fds[listen_fds_num].family = PF_UNIX;
2815 strncpy(listen_fds[listen_fds_num].addr, path,
2816 sizeof (listen_fds[listen_fds_num].addr) - 1);
2817 listen_fds_num++;
2819 return (0);
2820 } /* }}} int open_listen_socket_unix */
2822 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2823 {
2824 struct addrinfo ai_hints;
2825 struct addrinfo *ai_res;
2826 struct addrinfo *ai_ptr;
2827 char addr_copy[NI_MAXHOST];
2828 char *addr;
2829 char *port;
2830 int status;
2832 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2833 addr_copy[sizeof (addr_copy) - 1] = 0;
2834 addr = addr_copy;
2836 memset (&ai_hints, 0, sizeof (ai_hints));
2837 ai_hints.ai_flags = 0;
2838 #ifdef AI_ADDRCONFIG
2839 ai_hints.ai_flags |= AI_ADDRCONFIG;
2840 #endif
2841 ai_hints.ai_family = AF_UNSPEC;
2842 ai_hints.ai_socktype = SOCK_STREAM;
2844 port = NULL;
2845 if (*addr == '[') /* IPv6+port format */
2846 {
2847 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2848 addr++;
2850 port = strchr (addr, ']');
2851 if (port == NULL)
2852 {
2853 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2854 return (-1);
2855 }
2856 *port = 0;
2857 port++;
2859 if (*port == ':')
2860 port++;
2861 else if (*port == 0)
2862 port = NULL;
2863 else
2864 {
2865 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2866 return (-1);
2867 }
2868 } /* if (*addr == '[') */
2869 else
2870 {
2871 port = rindex(addr, ':');
2872 if (port != NULL)
2873 {
2874 *port = 0;
2875 port++;
2876 }
2877 }
2878 ai_res = NULL;
2879 status = getaddrinfo (addr,
2880 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2881 &ai_hints, &ai_res);
2882 if (status != 0)
2883 {
2884 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2885 addr, gai_strerror (status));
2886 return (-1);
2887 }
2889 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2890 {
2891 int fd;
2892 listen_socket_t *temp;
2893 int one = 1;
2895 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2896 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2897 if (temp == NULL)
2898 {
2899 fprintf (stderr,
2900 "rrdcached: open_listen_socket_network: realloc failed.\n");
2901 continue;
2902 }
2903 listen_fds = temp;
2904 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2906 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2907 if (fd < 0)
2908 {
2909 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2910 rrd_strerror(errno));
2911 continue;
2912 }
2914 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2916 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2917 if (status != 0)
2918 {
2919 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2920 sock->addr, rrd_strerror(errno));
2921 close (fd);
2922 continue;
2923 }
2925 status = listen (fd, /* backlog = */ 10);
2926 if (status != 0)
2927 {
2928 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2929 sock->addr, rrd_strerror(errno));
2930 close (fd);
2931 freeaddrinfo(ai_res);
2932 return (-1);
2933 }
2935 listen_fds[listen_fds_num].fd = fd;
2936 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2937 listen_fds_num++;
2938 } /* for (ai_ptr) */
2940 freeaddrinfo(ai_res);
2941 return (0);
2942 } /* }}} static int open_listen_socket_network */
2944 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2945 {
2946 assert(sock != NULL);
2947 assert(sock->addr != NULL);
2949 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2950 || sock->addr[0] == '/')
2951 return (open_listen_socket_unix(sock));
2952 else
2953 return (open_listen_socket_network(sock));
2954 } /* }}} int open_listen_socket */
2956 static int close_listen_sockets (void) /* {{{ */
2957 {
2958 size_t i;
2960 for (i = 0; i < listen_fds_num; i++)
2961 {
2962 close (listen_fds[i].fd);
2964 if (listen_fds[i].family == PF_UNIX)
2965 unlink(listen_fds[i].addr);
2966 }
2968 free (listen_fds);
2969 listen_fds = NULL;
2970 listen_fds_num = 0;
2972 return (0);
2973 } /* }}} int close_listen_sockets */
2975 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2976 {
2977 struct pollfd *pollfds;
2978 int pollfds_num;
2979 int status;
2980 int i;
2982 if (listen_fds_num < 1)
2983 {
2984 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2985 return (NULL);
2986 }
2988 pollfds_num = listen_fds_num;
2989 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2990 if (pollfds == NULL)
2991 {
2992 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2993 return (NULL);
2994 }
2995 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2997 RRDD_LOG(LOG_INFO, "listening for connections");
2999 while (state == RUNNING)
3000 {
3001 for (i = 0; i < pollfds_num; i++)
3002 {
3003 pollfds[i].fd = listen_fds[i].fd;
3004 pollfds[i].events = POLLIN | POLLPRI;
3005 pollfds[i].revents = 0;
3006 }
3008 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3009 if (state != RUNNING)
3010 break;
3011 else if (status == 0) /* timeout */
3012 continue;
3013 else if (status < 0) /* error */
3014 {
3015 status = errno;
3016 if (status != EINTR)
3017 {
3018 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3019 }
3020 continue;
3021 }
3023 for (i = 0; i < pollfds_num; i++)
3024 {
3025 listen_socket_t *client_sock;
3026 struct sockaddr_storage client_sa;
3027 socklen_t client_sa_size;
3028 pthread_t tid;
3029 pthread_attr_t attr;
3031 if (pollfds[i].revents == 0)
3032 continue;
3034 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3035 {
3036 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3037 "poll(2) returned something unexpected for listen FD #%i.",
3038 pollfds[i].fd);
3039 continue;
3040 }
3042 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3043 if (client_sock == NULL)
3044 {
3045 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3046 continue;
3047 }
3048 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3050 client_sa_size = sizeof (client_sa);
3051 client_sock->fd = accept (pollfds[i].fd,
3052 (struct sockaddr *) &client_sa, &client_sa_size);
3053 if (client_sock->fd < 0)
3054 {
3055 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3056 free(client_sock);
3057 continue;
3058 }
3060 pthread_attr_init (&attr);
3061 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3063 status = pthread_create (&tid, &attr, connection_thread_main,
3064 client_sock);
3065 if (status != 0)
3066 {
3067 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3068 close_connection(client_sock);
3069 continue;
3070 }
3071 } /* for (pollfds_num) */
3072 } /* while (state == RUNNING) */
3074 RRDD_LOG(LOG_INFO, "starting shutdown");
3076 close_listen_sockets ();
3078 pthread_mutex_lock (&connection_threads_lock);
3079 while (connection_threads_num > 0)
3080 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3081 pthread_mutex_unlock (&connection_threads_lock);
3083 free(pollfds);
3085 return (NULL);
3086 } /* }}} void *listen_thread_main */
3088 static int daemonize (void) /* {{{ */
3089 {
3090 int pid_fd;
3091 char *base_dir;
3093 daemon_uid = geteuid();
3095 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3096 if (pid_fd < 0)
3097 pid_fd = check_pidfile();
3098 if (pid_fd < 0)
3099 return pid_fd;
3101 /* open all the listen sockets */
3102 if (config_listen_address_list_len > 0)
3103 {
3104 for (size_t i = 0; i < config_listen_address_list_len; i++)
3105 open_listen_socket (config_listen_address_list[i]);
3107 rrd_free_ptrs((void ***) &config_listen_address_list,
3108 &config_listen_address_list_len);
3109 }
3110 else
3111 {
3112 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3113 sizeof(default_socket.addr) - 1);
3114 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3115 open_listen_socket (&default_socket);
3116 }
3118 if (listen_fds_num < 1)
3119 {
3120 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3121 goto error;
3122 }
3124 if (!stay_foreground)
3125 {
3126 pid_t child;
3128 child = fork ();
3129 if (child < 0)
3130 {
3131 fprintf (stderr, "daemonize: fork(2) failed.\n");
3132 goto error;
3133 }
3134 else if (child > 0)
3135 exit(0);
3137 /* Become session leader */
3138 setsid ();
3140 /* Open the first three file descriptors to /dev/null */
3141 close (2);
3142 close (1);
3143 close (0);
3145 open ("/dev/null", O_RDWR);
3146 if (dup(0) == -1 || dup(0) == -1){
3147 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3148 }
3149 } /* if (!stay_foreground) */
3151 /* Change into the /tmp directory. */
3152 base_dir = (config_base_dir != NULL)
3153 ? config_base_dir
3154 : "/tmp";
3156 if (chdir (base_dir) != 0)
3157 {
3158 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3159 goto error;
3160 }
3162 install_signal_handlers();
3164 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3165 RRDD_LOG(LOG_INFO, "starting up");
3167 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3168 (GDestroyNotify) free_cache_item);
3169 if (cache_tree == NULL)
3170 {
3171 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3172 goto error;
3173 }
3175 return write_pidfile (pid_fd);
3177 error:
3178 remove_pidfile();
3179 return -1;
3180 } /* }}} int daemonize */
3182 static int cleanup (void) /* {{{ */
3183 {
3184 pthread_cond_broadcast (&flush_cond);
3185 pthread_join (flush_thread, NULL);
3187 pthread_cond_broadcast (&queue_cond);
3188 for (int i = 0; i < config_queue_threads; i++)
3189 pthread_join (queue_threads[i], NULL);
3191 if (config_flush_at_shutdown)
3192 {
3193 assert(cache_queue_head == NULL);
3194 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3195 }
3197 free(queue_threads);
3198 free(config_base_dir);
3200 pthread_mutex_lock(&cache_lock);
3201 g_tree_destroy(cache_tree);
3203 pthread_mutex_lock(&journal_lock);
3204 journal_done();
3206 RRDD_LOG(LOG_INFO, "goodbye");
3207 closelog ();
3209 remove_pidfile ();
3210 free(config_pid_file);
3212 return (0);
3213 } /* }}} int cleanup */
3215 static int read_options (int argc, char **argv) /* {{{ */
3216 {
3217 int option;
3218 int status = 0;
3220 socket_permission_clear (&default_socket);
3222 default_socket.socket_group = (gid_t)-1;
3223 default_socket.socket_permissions = (mode_t)-1;
3225 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3226 {
3227 switch (option)
3228 {
3229 case 'O':
3230 opt_no_overwrite = 1;
3231 break;
3233 case 'g':
3234 stay_foreground=1;
3235 break;
3237 case 'l':
3238 {
3239 listen_socket_t *new;
3241 new = malloc(sizeof(listen_socket_t));
3242 if (new == NULL)
3243 {
3244 fprintf(stderr, "read_options: malloc failed.\n");
3245 return(2);
3246 }
3247 memset(new, 0, sizeof(listen_socket_t));
3249 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3251 /* Add permissions to the socket {{{ */
3252 if (default_socket.permissions != 0)
3253 {
3254 socket_permission_copy (new, &default_socket);
3255 }
3256 else /* if (default_socket.permissions == 0) */
3257 {
3258 /* Add permission for ALL commands to the socket. */
3259 size_t i;
3260 for (i = 0; i < list_of_commands_len; i++)
3261 {
3262 status = socket_permission_add (new, list_of_commands[i].cmd);
3263 if (status != 0)
3264 {
3265 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3266 "socket failed. This should never happen, ever! Sorry.\n",
3267 list_of_commands[i].cmd);
3268 status = 4;
3269 }
3270 }
3271 }
3272 /* }}} Done adding permissions. */
3274 new->socket_group = default_socket.socket_group;
3275 new->socket_permissions = default_socket.socket_permissions;
3277 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3278 &config_listen_address_list_len, new))
3279 {
3280 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3281 return (2);
3282 }
3283 }
3284 break;
3286 /* set socket group permissions */
3287 case 's':
3288 {
3289 gid_t group_gid;
3290 struct group *grp;
3292 group_gid = strtoul(optarg, NULL, 10);
3293 if (errno != EINVAL && group_gid>0)
3294 {
3295 /* we were passed a number */
3296 grp = getgrgid(group_gid);
3297 }
3298 else
3299 {
3300 grp = getgrnam(optarg);
3301 }
3303 if (grp)
3304 {
3305 default_socket.socket_group = grp->gr_gid;
3306 }
3307 else
3308 {
3309 /* no idea what the user wanted... */
3310 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3311 return (5);
3312 }
3313 }
3314 break;
3316 /* set socket file permissions */
3317 case 'm':
3318 {
3319 long tmp;
3320 char *endptr = NULL;
3322 tmp = strtol (optarg, &endptr, 8);
3323 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3324 || (tmp > 07777) || (tmp < 0)) {
3325 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3326 optarg);
3327 return (5);
3328 }
3330 default_socket.socket_permissions = (mode_t)tmp;
3331 }
3332 break;
3334 case 'P':
3335 {
3336 char *optcopy;
3337 char *saveptr;
3338 char *dummy;
3339 char *ptr;
3341 socket_permission_clear (&default_socket);
3343 optcopy = strdup (optarg);
3344 dummy = optcopy;
3345 saveptr = NULL;
3346 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3347 {
3348 dummy = NULL;
3349 status = socket_permission_add (&default_socket, ptr);
3350 if (status != 0)
3351 {
3352 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3353 "socket failed. Most likely, this permission doesn't "
3354 "exist. Check your command line.\n", ptr);
3355 status = 4;
3356 }
3357 }
3359 free (optcopy);
3360 }
3361 break;
3363 case 'f':
3364 {
3365 int temp;
3367 temp = atoi (optarg);
3368 if (temp > 0)
3369 config_flush_interval = temp;
3370 else
3371 {
3372 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3373 status = 3;
3374 }
3375 }
3376 break;
3378 case 'w':
3379 {
3380 int temp;
3382 temp = atoi (optarg);
3383 if (temp > 0)
3384 config_write_interval = temp;
3385 else
3386 {
3387 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3388 status = 2;
3389 }
3390 }
3391 break;
3393 case 'z':
3394 {
3395 int temp;
3397 temp = atoi(optarg);
3398 if (temp > 0)
3399 config_write_jitter = temp;
3400 else
3401 {
3402 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3403 status = 2;
3404 }
3406 break;
3407 }
3409 case 't':
3410 {
3411 int threads;
3412 threads = atoi(optarg);
3413 if (threads >= 1)
3414 config_queue_threads = threads;
3415 else
3416 {
3417 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3418 return 1;
3419 }
3420 }
3421 break;
3423 case 'B':
3424 config_write_base_only = 1;
3425 break;
3427 case 'b':
3428 {
3429 size_t len;
3430 char base_realpath[PATH_MAX];
3432 if (config_base_dir != NULL)
3433 free (config_base_dir);
3434 config_base_dir = strdup (optarg);
3435 if (config_base_dir == NULL)
3436 {
3437 fprintf (stderr, "read_options: strdup failed.\n");
3438 return (3);
3439 }
3441 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3442 {
3443 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3444 config_base_dir, rrd_strerror (errno));
3445 return (3);
3446 }
3448 /* make sure that the base directory is not resolved via
3449 * symbolic links. this makes some performance-enhancing
3450 * assumptions possible (we don't have to resolve paths
3451 * that start with a "/")
3452 */
3453 if (realpath(config_base_dir, base_realpath) == NULL)
3454 {
3455 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3456 "%s\n", config_base_dir, rrd_strerror(errno));
3457 return 5;
3458 }
3460 len = strlen (config_base_dir);
3461 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3462 {
3463 config_base_dir[len - 1] = 0;
3464 len--;
3465 }
3467 if (len < 1)
3468 {
3469 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3470 return (4);
3471 }
3473 _config_base_dir_len = len;
3475 len = strlen (base_realpath);
3476 while ((len > 0) && (base_realpath[len - 1] == '/'))
3477 {
3478 base_realpath[len - 1] = '\0';
3479 len--;
3480 }
3482 if (strncmp(config_base_dir,
3483 base_realpath, sizeof(base_realpath)) != 0)
3484 {
3485 fprintf(stderr,
3486 "Base directory (-b) resolved via file system links!\n"
3487 "Please consult rrdcached '-b' documentation!\n"
3488 "Consider specifying the real directory (%s)\n",
3489 base_realpath);
3490 return 5;
3491 }
3492 }
3493 break;
3495 case 'p':
3496 {
3497 if (config_pid_file != NULL)
3498 free (config_pid_file);
3499 config_pid_file = strdup (optarg);
3500 if (config_pid_file == NULL)
3501 {
3502 fprintf (stderr, "read_options: strdup failed.\n");
3503 return (3);
3504 }
3505 }
3506 break;
3508 case 'F':
3509 config_flush_at_shutdown = 1;
3510 break;
3512 case 'j':
3513 {
3514 char journal_dir_actual[PATH_MAX];
3515 const char *dir;
3516 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3518 status = rrd_mkdir_p(dir, 0777);
3519 if (status != 0)
3520 {
3521 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3522 dir, rrd_strerror(errno));
3523 return 6;
3524 }
3526 if (access(dir, R_OK|W_OK|X_OK) != 0)
3527 {
3528 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3529 errno ? rrd_strerror(errno) : "");
3530 return 6;
3531 }
3532 }
3533 break;
3535 case 'a':
3536 {
3537 int temp = atoi(optarg);
3538 if (temp > 0)
3539 config_alloc_chunk = temp;
3540 else
3541 {
3542 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3543 return 10;
3544 }
3545 }
3546 break;
3548 case 'h':
3549 case '?':
3550 printf ("RRDCacheD %s\n"
3551 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3552 "\n"
3553 "Usage: rrdcached [options]\n"
3554 "\n"
3555 "Valid options are:\n"
3556 " -l <address> Socket address to listen to.\n"
3557 " -P <perms> Sets the permissions to assign to all following "
3558 "sockets\n"
3559 " -w <seconds> Interval in which to write data.\n"
3560 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3561 " -t <threads> Number of write threads.\n"
3562 " -f <seconds> Interval in which to flush dead data.\n"
3563 " -p <file> Location of the PID-file.\n"
3564 " -b <dir> Base directory to change to.\n"
3565 " -B Restrict file access to paths within -b <dir>\n"
3566 " -g Do not fork and run in the foreground.\n"
3567 " -j <dir> Directory in which to create the journal files.\n"
3568 " -F Always flush all updates at shutdown\n"
3569 " -s <id|name> Group owner of all following UNIX sockets\n"
3570 " (the socket will also have read/write permissions "
3571 "for that group)\n"
3572 " -m <mode> File permissions (octal) of all following UNIX "
3573 "sockets\n"
3574 " -a <size> Memory allocation chunk size. Default is 1.\n"
3575 " -O Do not allow CREATE commands to overwrite existing\n"
3576 " files, even if asked to.\n"
3577 "\n"
3578 "For more information and a detailed description of all options "
3579 "please refer\n"
3580 "to the rrdcached(1) manual page.\n",
3581 VERSION);
3582 if (option == 'h')
3583 status = -1;
3584 else
3585 status = 1;
3586 break;
3587 } /* switch (option) */
3588 } /* while (getopt) */
3590 /* advise the user when values are not sane */
3591 if (config_flush_interval < 2 * config_write_interval)
3592 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3593 " 2x write interval (-w) !\n");
3594 if (config_write_jitter > config_write_interval)
3595 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3596 " write interval (-w) !\n");
3598 if (config_write_base_only && config_base_dir == NULL)
3599 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3600 " Consult the rrdcached documentation\n");
3602 if (journal_dir == NULL)
3603 config_flush_at_shutdown = 1;
3605 return (status);
3606 } /* }}} int read_options */
3608 int main (int argc, char **argv)
3609 {
3610 int status;
3612 status = read_options (argc, argv);
3613 if (status != 0)
3614 {
3615 if (status < 0)
3616 status = 0;
3617 return (status);
3618 }
3620 status = daemonize ();
3621 if (status != 0)
3622 {
3623 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3624 return (1);
3625 }
3627 journal_init();
3629 /* start the queue threads */
3630 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3631 if (queue_threads == NULL)
3632 {
3633 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3634 cleanup();
3635 return (1);
3636 }
3637 for (int i = 0; i < config_queue_threads; i++)
3638 {
3639 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3640 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3641 if (status != 0)
3642 {
3643 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3644 cleanup();
3645 return (1);
3646 }
3647 }
3649 /* start the flush thread */
3650 memset(&flush_thread, 0, sizeof(flush_thread));
3651 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3652 if (status != 0)
3653 {
3654 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3655 cleanup();
3656 return (1);
3657 }
3659 listen_thread_main (NULL);
3660 cleanup ();
3662 return (0);
3663 } /* int main */
3665 /*
3666 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3667 */