0586a8de7efbf82db37bd8a07ee8ef4f97bfd6cb
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120 do { \
121 if (stay_foreground) { \
122 fprintf(stderr, __VA_ARGS__); \
123 fprintf(stderr, "\n"); } \
124 syslog ((severity), __VA_ARGS__); \
125 } while (0)
127 /*
128 * Types
129 */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
133 {
134 int fd;
135 char addr[PATH_MAX + 1];
136 int family;
138 /* state for BATCH processing */
139 time_t batch_start;
140 int batch_cmd;
142 /* buffered IO */
143 char *rbuf;
144 off_t next_cmd;
145 off_t next_read;
147 char *wbuf;
148 ssize_t wbuf_len;
150 uint32_t permissions;
152 gid_t socket_group;
153 mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
161 time_t UNUSED(now),\
162 char UNUSED(*buffer),\
163 size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO command_t UNUSED(*cmd),\
166 DISPATCH_PROTO
168 struct command_s {
169 char *cmd;
170 int (*handler)(HANDLER_PROTO);
172 char context; /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT (1<<0)
174 #define CMD_CONTEXT_BATCH (1<<1)
175 #define CMD_CONTEXT_JOURNAL (1<<2)
176 #define CMD_CONTEXT_ANY (0x7f)
178 char *syntax;
179 char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
185 {
186 char *file;
187 char **values;
188 size_t values_num; /* number of valid pointers */
189 size_t values_alloc; /* number of allocated pointers */
190 time_t last_flush_time;
191 time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194 int flags;
195 pthread_cond_t flushed;
196 cache_item_t *prev;
197 cache_item_t *next;
198 };
200 struct callback_flush_data_s
201 {
202 time_t now;
203 time_t abs_timeout;
204 char **keys;
205 size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
209 enum queue_side_e
210 {
211 HEAD,
212 TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
216 /* describe a set of journal files */
217 typedef struct {
218 char **files;
219 size_t files_num;
220 } journal_set;
222 /* max length of socket command or response */
223 #define CMD_MAX 4096
224 #define RBUF_SIZE (CMD_MAX*2)
226 /*
227 * Variables
228 */
229 static int stay_foreground = 0;
230 static uid_t daemon_uid;
232 static listen_socket_t *listen_fds = NULL;
233 static size_t listen_fds_num = 0;
235 static listen_socket_t default_socket;
237 enum {
238 RUNNING, /* normal operation */
239 FLUSHING, /* flushing remaining values */
240 SHUTDOWN /* shutting down */
241 } state = RUNNING;
243 static pthread_t *queue_threads;
244 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
245 static int config_queue_threads = 4;
247 static pthread_t flush_thread;
248 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
250 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
251 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
252 static int connection_threads_num = 0;
254 /* Cache stuff */
255 static GTree *cache_tree = NULL;
256 static cache_item_t *cache_queue_head = NULL;
257 static cache_item_t *cache_queue_tail = NULL;
258 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
260 static int config_write_interval = 300;
261 static int config_write_jitter = 0;
262 static int config_flush_interval = 3600;
263 static int config_flush_at_shutdown = 0;
264 static char *config_pid_file = NULL;
265 static char *config_base_dir = NULL;
266 static size_t _config_base_dir_len = 0;
267 static int config_write_base_only = 0;
268 static size_t config_alloc_chunk = 1;
270 static listen_socket_t **config_listen_address_list = NULL;
271 static size_t config_listen_address_list_len = 0;
273 static uint64_t stats_queue_length = 0;
274 static uint64_t stats_updates_received = 0;
275 static uint64_t stats_flush_received = 0;
276 static uint64_t stats_updates_written = 0;
277 static uint64_t stats_data_sets_written = 0;
278 static uint64_t stats_journal_bytes = 0;
279 static uint64_t stats_journal_rotate = 0;
280 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
282 static int opt_no_overwrite = 0; /* default for the daemon */
284 /* Journaled updates */
285 #define JOURNAL_REPLAY(s) ((s) == NULL)
286 #define JOURNAL_BASE "rrd.journal"
287 static journal_set *journal_cur = NULL;
288 static journal_set *journal_old = NULL;
289 static char *journal_dir = NULL;
290 static FILE *journal_fh = NULL; /* current journal file handle */
291 static long journal_size = 0; /* current journal size */
292 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
293 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
294 static int journal_write(char *cmd, char *args);
295 static void journal_done(void);
296 static void journal_rotate(void);
298 /* prototypes for forward refernces */
299 static int handle_request_help (HANDLER_PROTO);
301 /*
302 * Functions
303 */
304 static void sig_common (const char *sig) /* {{{ */
305 {
306 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
307 state = FLUSHING;
308 pthread_cond_broadcast(&flush_cond);
309 pthread_cond_broadcast(&queue_cond);
310 } /* }}} void sig_common */
312 static void sig_int_handler (int UNUSED(s)) /* {{{ */
313 {
314 sig_common("INT");
315 } /* }}} void sig_int_handler */
317 static void sig_term_handler (int UNUSED(s)) /* {{{ */
318 {
319 sig_common("TERM");
320 } /* }}} void sig_term_handler */
322 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
323 {
324 config_flush_at_shutdown = 1;
325 sig_common("USR1");
326 } /* }}} void sig_usr1_handler */
328 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
329 {
330 config_flush_at_shutdown = 0;
331 sig_common("USR2");
332 } /* }}} void sig_usr2_handler */
334 static void install_signal_handlers(void) /* {{{ */
335 {
336 /* These structures are static, because `sigaction' behaves weird if the are
337 * overwritten.. */
338 static struct sigaction sa_int;
339 static struct sigaction sa_term;
340 static struct sigaction sa_pipe;
341 static struct sigaction sa_usr1;
342 static struct sigaction sa_usr2;
344 /* Install signal handlers */
345 memset (&sa_int, 0, sizeof (sa_int));
346 sa_int.sa_handler = sig_int_handler;
347 sigaction (SIGINT, &sa_int, NULL);
349 memset (&sa_term, 0, sizeof (sa_term));
350 sa_term.sa_handler = sig_term_handler;
351 sigaction (SIGTERM, &sa_term, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_pipe));
354 sa_pipe.sa_handler = SIG_IGN;
355 sigaction (SIGPIPE, &sa_pipe, NULL);
357 memset (&sa_pipe, 0, sizeof (sa_usr1));
358 sa_usr1.sa_handler = sig_usr1_handler;
359 sigaction (SIGUSR1, &sa_usr1, NULL);
361 memset (&sa_usr2, 0, sizeof (sa_usr2));
362 sa_usr2.sa_handler = sig_usr2_handler;
363 sigaction (SIGUSR2, &sa_usr2, NULL);
365 } /* }}} void install_signal_handlers */
367 static int open_pidfile(char *action, int oflag) /* {{{ */
368 {
369 int fd;
370 const char *file;
371 char *file_copy, *dir;
373 file = (config_pid_file != NULL)
374 ? config_pid_file
375 : LOCALSTATEDIR "/run/rrdcached.pid";
377 /* dirname may modify its argument */
378 file_copy = strdup(file);
379 if (file_copy == NULL)
380 {
381 fprintf(stderr, "rrdcached: strdup(): %s\n",
382 rrd_strerror(errno));
383 return -1;
384 }
386 dir = dirname(file_copy);
387 if (rrd_mkdir_p(dir, 0777) != 0)
388 {
389 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
390 dir, rrd_strerror(errno));
391 return -1;
392 }
394 free(file_copy);
396 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
397 if (fd < 0)
398 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
399 action, file, rrd_strerror(errno));
401 return(fd);
402 } /* }}} static int open_pidfile */
404 /* check existing pid file to see whether a daemon is running */
405 static int check_pidfile(void)
406 {
407 int pid_fd;
408 pid_t pid;
409 char pid_str[16];
411 pid_fd = open_pidfile("open", O_RDWR);
412 if (pid_fd < 0)
413 return pid_fd;
415 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
416 return -1;
418 pid = atoi(pid_str);
419 if (pid <= 0)
420 return -1;
422 /* another running process that we can signal COULD be
423 * a competing rrdcached */
424 if (pid != getpid() && kill(pid, 0) == 0)
425 {
426 fprintf(stderr,
427 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
428 close(pid_fd);
429 return -1;
430 }
432 lseek(pid_fd, 0, SEEK_SET);
433 if (ftruncate(pid_fd, 0) == -1)
434 {
435 fprintf(stderr,
436 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
437 close(pid_fd);
438 return -1;
439 }
441 fprintf(stderr,
442 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
443 "rrdcached: starting normally.\n", pid);
445 return pid_fd;
446 } /* }}} static int check_pidfile */
448 static int write_pidfile (int fd) /* {{{ */
449 {
450 pid_t pid;
451 FILE *fh;
453 pid = getpid ();
455 fh = fdopen (fd, "w");
456 if (fh == NULL)
457 {
458 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
459 close(fd);
460 return (-1);
461 }
463 fprintf (fh, "%i\n", (int) pid);
464 fclose (fh);
466 return (0);
467 } /* }}} int write_pidfile */
469 static int remove_pidfile (void) /* {{{ */
470 {
471 char *file;
472 int status;
474 file = (config_pid_file != NULL)
475 ? config_pid_file
476 : LOCALSTATEDIR "/run/rrdcached.pid";
478 status = unlink (file);
479 if (status == 0)
480 return (0);
481 return (errno);
482 } /* }}} int remove_pidfile */
484 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
485 {
486 char *eol;
488 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
489 sock->next_read - sock->next_cmd);
491 if (eol == NULL)
492 {
493 /* no commands left, move remainder back to front of rbuf */
494 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
495 sock->next_read - sock->next_cmd);
496 sock->next_read -= sock->next_cmd;
497 sock->next_cmd = 0;
498 *len = 0;
499 return NULL;
500 }
501 else
502 {
503 char *cmd = sock->rbuf + sock->next_cmd;
504 *eol = '\0';
506 sock->next_cmd = eol - sock->rbuf + 1;
508 if (eol > sock->rbuf && *(eol-1) == '\r')
509 *(--eol) = '\0'; /* handle "\r\n" EOL */
511 *len = eol - cmd;
513 return cmd;
514 }
516 /* NOTREACHED */
517 assert(1==0);
518 } /* }}} char *next_cmd */
520 /* add the characters directly to the write buffer */
521 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
522 {
523 char *new_buf;
525 assert(sock != NULL);
527 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
528 if (new_buf == NULL)
529 {
530 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
531 return -1;
532 }
534 strncpy(new_buf + sock->wbuf_len, str, len + 1);
536 sock->wbuf = new_buf;
537 sock->wbuf_len += len;
539 return 0;
540 } /* }}} static int add_to_wbuf */
542 /* add the text to the "extra" info that's sent after the status line */
543 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
544 {
545 va_list argp;
546 char buffer[CMD_MAX];
547 int len;
549 if (JOURNAL_REPLAY(sock)) return 0;
550 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
552 va_start(argp, fmt);
553 #ifdef HAVE_VSNPRINTF
554 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
555 #else
556 len = vsprintf(buffer, fmt, argp);
557 #endif
558 va_end(argp);
559 if (len < 0)
560 {
561 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
562 return -1;
563 }
565 return add_to_wbuf(sock, buffer, len);
566 } /* }}} static int add_response_info */
568 static int count_lines(char *str) /* {{{ */
569 {
570 int lines = 0;
572 if (str != NULL)
573 {
574 while ((str = strchr(str, '\n')) != NULL)
575 {
576 ++lines;
577 ++str;
578 }
579 }
581 return lines;
582 } /* }}} static int count_lines */
584 /* send the response back to the user.
585 * returns 0 on success, -1 on error
586 * write buffer is always zeroed after this call */
587 static int send_response (listen_socket_t *sock, response_code rc,
588 char *fmt, ...) /* {{{ */
589 {
590 va_list argp;
591 char buffer[CMD_MAX];
592 int lines;
593 ssize_t wrote;
594 int rclen, len;
596 if (JOURNAL_REPLAY(sock)) return rc;
598 if (sock->batch_start)
599 {
600 if (rc == RESP_OK)
601 return rc; /* no response on success during BATCH */
602 lines = sock->batch_cmd;
603 }
604 else if (rc == RESP_OK)
605 lines = count_lines(sock->wbuf);
606 else
607 lines = -1;
609 rclen = sprintf(buffer, "%d ", lines);
610 va_start(argp, fmt);
611 #ifdef HAVE_VSNPRINTF
612 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
613 #else
614 len = vsprintf(buffer+rclen, fmt, argp);
615 #endif
616 va_end(argp);
617 if (len < 0)
618 return -1;
620 len += rclen;
622 /* append the result to the wbuf, don't write to the user */
623 if (sock->batch_start)
624 return add_to_wbuf(sock, buffer, len);
626 /* first write must be complete */
627 if (len != write(sock->fd, buffer, len))
628 {
629 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
630 return -1;
631 }
633 if (sock->wbuf != NULL && rc == RESP_OK)
634 {
635 wrote = 0;
636 while (wrote < sock->wbuf_len)
637 {
638 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
639 if (wb <= 0)
640 {
641 RRDD_LOG(LOG_INFO, "send_response: could not write results");
642 return -1;
643 }
644 wrote += wb;
645 }
646 }
648 free(sock->wbuf); sock->wbuf = NULL;
649 sock->wbuf_len = 0;
651 return 0;
652 } /* }}} */
654 static void wipe_ci_values(cache_item_t *ci, time_t when)
655 {
656 ci->values = NULL;
657 ci->values_num = 0;
658 ci->values_alloc = 0;
660 ci->last_flush_time = when;
661 if (config_write_jitter > 0)
662 ci->last_flush_time += (rrd_random() % config_write_jitter);
663 }
665 /* remove_from_queue
666 * remove a "cache_item_t" item from the queue.
667 * must hold 'cache_lock' when calling this
668 */
669 static void remove_from_queue(cache_item_t *ci) /* {{{ */
670 {
671 if (ci == NULL) return;
672 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
674 if (ci->prev == NULL)
675 cache_queue_head = ci->next; /* reset head */
676 else
677 ci->prev->next = ci->next;
679 if (ci->next == NULL)
680 cache_queue_tail = ci->prev; /* reset the tail */
681 else
682 ci->next->prev = ci->prev;
684 ci->next = ci->prev = NULL;
685 ci->flags &= ~CI_FLAGS_IN_QUEUE;
687 pthread_mutex_lock (&stats_lock);
688 assert (stats_queue_length > 0);
689 stats_queue_length--;
690 pthread_mutex_unlock (&stats_lock);
692 } /* }}} static void remove_from_queue */
694 /* free the resources associated with the cache_item_t
695 * must hold cache_lock when calling this function
696 */
697 static void *free_cache_item(cache_item_t *ci) /* {{{ */
698 {
699 if (ci == NULL) return NULL;
701 remove_from_queue(ci);
703 for (size_t i=0; i < ci->values_num; i++)
704 free(ci->values[i]);
706 free (ci->values);
707 free (ci->file);
709 /* in case anyone is waiting */
710 pthread_cond_broadcast(&ci->flushed);
711 pthread_cond_destroy(&ci->flushed);
713 free (ci);
715 return NULL;
716 } /* }}} static void *free_cache_item */
718 /*
719 * enqueue_cache_item:
720 * `cache_lock' must be acquired before calling this function!
721 */
722 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
723 queue_side_t side)
724 {
725 if (ci == NULL)
726 return (-1);
728 if (ci->values_num == 0)
729 return (0);
731 if (side == HEAD)
732 {
733 if (cache_queue_head == ci)
734 return 0;
736 /* remove if further down in queue */
737 remove_from_queue(ci);
739 ci->prev = NULL;
740 ci->next = cache_queue_head;
741 if (ci->next != NULL)
742 ci->next->prev = ci;
743 cache_queue_head = ci;
745 if (cache_queue_tail == NULL)
746 cache_queue_tail = cache_queue_head;
747 }
748 else /* (side == TAIL) */
749 {
750 /* We don't move values back in the list.. */
751 if (ci->flags & CI_FLAGS_IN_QUEUE)
752 return (0);
754 assert (ci->next == NULL);
755 assert (ci->prev == NULL);
757 ci->prev = cache_queue_tail;
759 if (cache_queue_tail == NULL)
760 cache_queue_head = ci;
761 else
762 cache_queue_tail->next = ci;
764 cache_queue_tail = ci;
765 }
767 ci->flags |= CI_FLAGS_IN_QUEUE;
769 pthread_cond_signal(&queue_cond);
770 pthread_mutex_lock (&stats_lock);
771 stats_queue_length++;
772 pthread_mutex_unlock (&stats_lock);
774 return (0);
775 } /* }}} int enqueue_cache_item */
777 /*
778 * tree_callback_flush:
779 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
780 * while this is in progress.
781 */
782 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
783 gpointer data)
784 {
785 cache_item_t *ci;
786 callback_flush_data_t *cfd;
788 ci = (cache_item_t *) value;
789 cfd = (callback_flush_data_t *) data;
791 if (ci->flags & CI_FLAGS_IN_QUEUE)
792 return FALSE;
794 if (ci->values_num > 0
795 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
796 {
797 enqueue_cache_item (ci, TAIL);
798 }
799 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
800 && (ci->values_num <= 0))
801 {
802 assert ((char *) key == ci->file);
803 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
804 {
805 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
806 return (FALSE);
807 }
808 }
810 return (FALSE);
811 } /* }}} gboolean tree_callback_flush */
813 static int flush_old_values (int max_age)
814 {
815 callback_flush_data_t cfd;
816 size_t k;
818 memset (&cfd, 0, sizeof (cfd));
819 /* Pass the current time as user data so that we don't need to call
820 * `time' for each node. */
821 cfd.now = time (NULL);
822 cfd.keys = NULL;
823 cfd.keys_num = 0;
825 if (max_age > 0)
826 cfd.abs_timeout = cfd.now - max_age;
827 else
828 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
830 /* `tree_callback_flush' will return the keys of all values that haven't
831 * been touched in the last `config_flush_interval' seconds in `cfd'.
832 * The char*'s in this array point to the same memory as ci->file, so we
833 * don't need to free them separately. */
834 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
836 for (k = 0; k < cfd.keys_num; k++)
837 {
838 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
839 /* should never fail, since we have held the cache_lock
840 * the entire time */
841 assert(status == TRUE);
842 }
844 if (cfd.keys != NULL)
845 {
846 free (cfd.keys);
847 cfd.keys = NULL;
848 }
850 return (0);
851 } /* int flush_old_values */
853 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
854 {
855 struct timeval now;
856 struct timespec next_flush;
857 int status;
859 gettimeofday (&now, NULL);
860 next_flush.tv_sec = now.tv_sec + config_flush_interval;
861 next_flush.tv_nsec = 1000 * now.tv_usec;
863 pthread_mutex_lock(&cache_lock);
865 while (state == RUNNING)
866 {
867 gettimeofday (&now, NULL);
868 if ((now.tv_sec > next_flush.tv_sec)
869 || ((now.tv_sec == next_flush.tv_sec)
870 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
871 {
872 RRDD_LOG(LOG_DEBUG, "flushing old values");
874 /* Determine the time of the next cache flush. */
875 next_flush.tv_sec = now.tv_sec + config_flush_interval;
877 /* Flush all values that haven't been written in the last
878 * `config_write_interval' seconds. */
879 flush_old_values (config_write_interval);
881 /* unlock the cache while we rotate so we don't block incoming
882 * updates if the fsync() blocks on disk I/O */
883 pthread_mutex_unlock(&cache_lock);
884 journal_rotate();
885 pthread_mutex_lock(&cache_lock);
886 }
888 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
889 if (status != 0 && status != ETIMEDOUT)
890 {
891 RRDD_LOG (LOG_ERR, "flush_thread_main: "
892 "pthread_cond_timedwait returned %i.", status);
893 }
894 }
896 if (config_flush_at_shutdown)
897 flush_old_values (-1); /* flush everything */
899 state = SHUTDOWN;
901 pthread_mutex_unlock(&cache_lock);
903 return NULL;
904 } /* void *flush_thread_main */
906 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
907 {
908 pthread_mutex_lock (&cache_lock);
910 while (state != SHUTDOWN
911 || (cache_queue_head != NULL && config_flush_at_shutdown))
912 {
913 cache_item_t *ci;
914 char *file;
915 char **values;
916 size_t values_num;
917 int status;
919 /* Now, check if there's something to store away. If not, wait until
920 * something comes in. */
921 if (cache_queue_head == NULL)
922 {
923 status = pthread_cond_wait (&queue_cond, &cache_lock);
924 if ((status != 0) && (status != ETIMEDOUT))
925 {
926 RRDD_LOG (LOG_ERR, "queue_thread_main: "
927 "pthread_cond_wait returned %i.", status);
928 }
929 }
931 /* Check if a value has arrived. This may be NULL if we timed out or there
932 * was an interrupt such as a signal. */
933 if (cache_queue_head == NULL)
934 continue;
936 ci = cache_queue_head;
938 /* copy the relevant parts */
939 file = strdup (ci->file);
940 if (file == NULL)
941 {
942 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
943 continue;
944 }
946 assert(ci->values != NULL);
947 assert(ci->values_num > 0);
949 values = ci->values;
950 values_num = ci->values_num;
952 wipe_ci_values(ci, time(NULL));
953 remove_from_queue(ci);
955 pthread_mutex_unlock (&cache_lock);
957 rrd_clear_error ();
958 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
959 if (status != 0)
960 {
961 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
962 "rrd_update_r (%s) failed with status %i. (%s)",
963 file, status, rrd_get_error());
964 }
966 journal_write("wrote", file);
968 /* Search again in the tree. It's possible someone issued a "FORGET"
969 * while we were writing the update values. */
970 pthread_mutex_lock(&cache_lock);
971 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
972 if (ci)
973 pthread_cond_broadcast(&ci->flushed);
974 pthread_mutex_unlock(&cache_lock);
976 if (status == 0)
977 {
978 pthread_mutex_lock (&stats_lock);
979 stats_updates_written++;
980 stats_data_sets_written += values_num;
981 pthread_mutex_unlock (&stats_lock);
982 }
984 rrd_free_ptrs((void ***) &values, &values_num);
985 free(file);
987 pthread_mutex_lock (&cache_lock);
988 }
989 pthread_mutex_unlock (&cache_lock);
991 return (NULL);
992 } /* }}} void *queue_thread_main */
994 static int buffer_get_field (char **buffer_ret, /* {{{ */
995 size_t *buffer_size_ret, char **field_ret)
996 {
997 char *buffer;
998 size_t buffer_pos;
999 size_t buffer_size;
1000 char *field;
1001 size_t field_size;
1002 int status;
1004 buffer = *buffer_ret;
1005 buffer_pos = 0;
1006 buffer_size = *buffer_size_ret;
1007 field = *buffer_ret;
1008 field_size = 0;
1010 if (buffer_size <= 0)
1011 return (-1);
1013 /* This is ensured by `handle_request'. */
1014 assert (buffer[buffer_size - 1] == '\0');
1016 status = -1;
1017 while (buffer_pos < buffer_size)
1018 {
1019 /* Check for end-of-field or end-of-buffer */
1020 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1021 {
1022 field[field_size] = 0;
1023 field_size++;
1024 buffer_pos++;
1025 status = 0;
1026 break;
1027 }
1028 /* Handle escaped characters. */
1029 else if (buffer[buffer_pos] == '\\')
1030 {
1031 if (buffer_pos >= (buffer_size - 1))
1032 break;
1033 buffer_pos++;
1034 field[field_size] = buffer[buffer_pos];
1035 field_size++;
1036 buffer_pos++;
1037 }
1038 /* Normal operation */
1039 else
1040 {
1041 field[field_size] = buffer[buffer_pos];
1042 field_size++;
1043 buffer_pos++;
1044 }
1045 } /* while (buffer_pos < buffer_size) */
1047 if (status != 0)
1048 return (status);
1050 *buffer_ret = buffer + buffer_pos;
1051 *buffer_size_ret = buffer_size - buffer_pos;
1052 *field_ret = field;
1054 return (0);
1055 } /* }}} int buffer_get_field */
1057 /* if we're restricting writes to the base directory,
1058 * check whether the file falls within the dir
1059 * returns 1 if OK, otherwise 0
1060 */
1061 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1062 {
1063 assert(file != NULL);
1065 if (!config_write_base_only
1066 || JOURNAL_REPLAY(sock)
1067 || config_base_dir == NULL)
1068 return 1;
1070 if (strstr(file, "../") != NULL) goto err;
1072 /* relative paths without "../" are ok */
1073 if (*file != '/') return 1;
1075 /* file must be of the format base + "/" + <1+ char filename> */
1076 if (strlen(file) < _config_base_dir_len + 2) goto err;
1077 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1078 if (*(file + _config_base_dir_len) != '/') goto err;
1080 return 1;
1082 err:
1083 if (sock != NULL && sock->fd >= 0)
1084 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1086 return 0;
1087 } /* }}} static int check_file_access */
1089 /* when using a base dir, convert relative paths to absolute paths.
1090 * if necessary, modifies the "filename" pointer to point
1091 * to the new path created in "tmp". "tmp" is provided
1092 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1093 *
1094 * this allows us to optimize for the expected case (absolute path)
1095 * with a no-op.
1096 */
1097 static void get_abs_path(char **filename, char *tmp)
1098 {
1099 assert(tmp != NULL);
1100 assert(filename != NULL && *filename != NULL);
1102 if (config_base_dir == NULL || **filename == '/')
1103 return;
1105 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1106 *filename = tmp;
1107 } /* }}} static int get_abs_path */
1109 static int flush_file (const char *filename) /* {{{ */
1110 {
1111 cache_item_t *ci;
1113 pthread_mutex_lock (&cache_lock);
1115 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1116 if (ci == NULL)
1117 {
1118 pthread_mutex_unlock (&cache_lock);
1119 return (ENOENT);
1120 }
1122 if (ci->values_num > 0)
1123 {
1124 /* Enqueue at head */
1125 enqueue_cache_item (ci, HEAD);
1126 pthread_cond_wait(&ci->flushed, &cache_lock);
1127 }
1129 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1130 * may have been purged during our cond_wait() */
1132 pthread_mutex_unlock(&cache_lock);
1134 return (0);
1135 } /* }}} int flush_file */
1137 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1138 {
1139 char *err = "Syntax error.\n";
1141 if (cmd && cmd->syntax)
1142 err = cmd->syntax;
1144 return send_response(sock, RESP_ERR, "Usage: %s", err);
1145 } /* }}} static int syntax_error() */
1147 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1148 {
1149 uint64_t copy_queue_length;
1150 uint64_t copy_updates_received;
1151 uint64_t copy_flush_received;
1152 uint64_t copy_updates_written;
1153 uint64_t copy_data_sets_written;
1154 uint64_t copy_journal_bytes;
1155 uint64_t copy_journal_rotate;
1157 uint64_t tree_nodes_number;
1158 uint64_t tree_depth;
1160 pthread_mutex_lock (&stats_lock);
1161 copy_queue_length = stats_queue_length;
1162 copy_updates_received = stats_updates_received;
1163 copy_flush_received = stats_flush_received;
1164 copy_updates_written = stats_updates_written;
1165 copy_data_sets_written = stats_data_sets_written;
1166 copy_journal_bytes = stats_journal_bytes;
1167 copy_journal_rotate = stats_journal_rotate;
1168 pthread_mutex_unlock (&stats_lock);
1170 pthread_mutex_lock (&cache_lock);
1171 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1172 tree_depth = (uint64_t) g_tree_height (cache_tree);
1173 pthread_mutex_unlock (&cache_lock);
1175 add_response_info(sock,
1176 "QueueLength: %"PRIu64"\n", copy_queue_length);
1177 add_response_info(sock,
1178 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1179 add_response_info(sock,
1180 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1181 add_response_info(sock,
1182 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1183 add_response_info(sock,
1184 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1185 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1186 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1187 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1188 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1190 send_response(sock, RESP_OK, "Statistics follow\n");
1192 return (0);
1193 } /* }}} int handle_request_stats */
1195 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1196 {
1197 char *file, file_tmp[PATH_MAX];
1198 int status;
1200 status = buffer_get_field (&buffer, &buffer_size, &file);
1201 if (status != 0)
1202 {
1203 return syntax_error(sock,cmd);
1204 }
1205 else
1206 {
1207 pthread_mutex_lock(&stats_lock);
1208 stats_flush_received++;
1209 pthread_mutex_unlock(&stats_lock);
1211 get_abs_path(&file, file_tmp);
1212 if (!check_file_access(file, sock)) return 0;
1214 status = flush_file (file);
1215 if (status == 0)
1216 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1217 else if (status == ENOENT)
1218 {
1219 /* no file in our tree; see whether it exists at all */
1220 struct stat statbuf;
1222 memset(&statbuf, 0, sizeof(statbuf));
1223 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1224 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1225 else
1226 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1227 }
1228 else if (status < 0)
1229 return send_response(sock, RESP_ERR, "Internal error.\n");
1230 else
1231 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1232 }
1234 /* NOTREACHED */
1235 assert(1==0);
1236 } /* }}} int handle_request_flush */
1238 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1239 {
1240 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1242 pthread_mutex_lock(&cache_lock);
1243 flush_old_values(-1);
1244 pthread_mutex_unlock(&cache_lock);
1246 return send_response(sock, RESP_OK, "Started flush.\n");
1247 } /* }}} static int handle_request_flushall */
1249 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1250 {
1251 int status;
1252 char *file, file_tmp[PATH_MAX];
1253 cache_item_t *ci;
1255 status = buffer_get_field(&buffer, &buffer_size, &file);
1256 if (status != 0)
1257 return syntax_error(sock,cmd);
1259 get_abs_path(&file, file_tmp);
1261 pthread_mutex_lock(&cache_lock);
1262 ci = g_tree_lookup(cache_tree, file);
1263 if (ci == NULL)
1264 {
1265 pthread_mutex_unlock(&cache_lock);
1266 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267 }
1269 for (size_t i=0; i < ci->values_num; i++)
1270 add_response_info(sock, "%s\n", ci->values[i]);
1272 pthread_mutex_unlock(&cache_lock);
1273 return send_response(sock, RESP_OK, "updates pending\n");
1274 } /* }}} static int handle_request_pending */
1276 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1277 {
1278 int status;
1279 gboolean found;
1280 char *file, file_tmp[PATH_MAX];
1282 status = buffer_get_field(&buffer, &buffer_size, &file);
1283 if (status != 0)
1284 return syntax_error(sock,cmd);
1286 get_abs_path(&file, file_tmp);
1287 if (!check_file_access(file, sock)) return 0;
1289 pthread_mutex_lock(&cache_lock);
1290 found = g_tree_remove(cache_tree, file);
1291 pthread_mutex_unlock(&cache_lock);
1293 if (found == TRUE)
1294 {
1295 if (!JOURNAL_REPLAY(sock))
1296 journal_write("forget", file);
1298 return send_response(sock, RESP_OK, "Gone!\n");
1299 }
1300 else
1301 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1303 /* NOTREACHED */
1304 assert(1==0);
1305 } /* }}} static int handle_request_forget */
1307 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1308 {
1309 cache_item_t *ci;
1311 pthread_mutex_lock(&cache_lock);
1313 ci = cache_queue_head;
1314 while (ci != NULL)
1315 {
1316 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1317 ci = ci->next;
1318 }
1320 pthread_mutex_unlock(&cache_lock);
1322 return send_response(sock, RESP_OK, "in queue.\n");
1323 } /* }}} int handle_request_queue */
1325 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1326 {
1327 char *file, file_tmp[PATH_MAX];
1328 int values_num = 0;
1329 int status;
1330 char orig_buf[CMD_MAX];
1332 cache_item_t *ci;
1334 /* save it for the journal later */
1335 if (!JOURNAL_REPLAY(sock))
1336 strncpy(orig_buf, buffer, buffer_size);
1338 status = buffer_get_field (&buffer, &buffer_size, &file);
1339 if (status != 0)
1340 return syntax_error(sock,cmd);
1342 pthread_mutex_lock(&stats_lock);
1343 stats_updates_received++;
1344 pthread_mutex_unlock(&stats_lock);
1346 get_abs_path(&file, file_tmp);
1347 if (!check_file_access(file, sock)) return 0;
1349 pthread_mutex_lock (&cache_lock);
1350 ci = g_tree_lookup (cache_tree, file);
1352 if (ci == NULL) /* {{{ */
1353 {
1354 struct stat statbuf;
1355 cache_item_t *tmp;
1357 /* don't hold the lock while we setup; stat(2) might block */
1358 pthread_mutex_unlock(&cache_lock);
1360 memset (&statbuf, 0, sizeof (statbuf));
1361 status = stat (file, &statbuf);
1362 if (status != 0)
1363 {
1364 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1366 status = errno;
1367 if (status == ENOENT)
1368 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1369 else
1370 return send_response(sock, RESP_ERR,
1371 "stat failed with error %i.\n", status);
1372 }
1373 if (!S_ISREG (statbuf.st_mode))
1374 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1376 if (access(file, R_OK|W_OK) != 0)
1377 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1378 file, rrd_strerror(errno));
1380 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1381 if (ci == NULL)
1382 {
1383 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1385 return send_response(sock, RESP_ERR, "malloc failed.\n");
1386 }
1387 memset (ci, 0, sizeof (cache_item_t));
1389 ci->file = strdup (file);
1390 if (ci->file == NULL)
1391 {
1392 free (ci);
1393 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1395 return send_response(sock, RESP_ERR, "strdup failed.\n");
1396 }
1398 wipe_ci_values(ci, now);
1399 ci->flags = CI_FLAGS_IN_TREE;
1400 pthread_cond_init(&ci->flushed, NULL);
1402 pthread_mutex_lock(&cache_lock);
1404 /* another UPDATE might have added this entry in the meantime */
1405 tmp = g_tree_lookup (cache_tree, file);
1406 if (tmp == NULL)
1407 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1408 else
1409 {
1410 free_cache_item (ci);
1411 ci = tmp;
1412 }
1414 /* state may have changed while we were unlocked */
1415 if (state == SHUTDOWN)
1416 return -1;
1417 } /* }}} */
1418 assert (ci != NULL);
1420 /* don't re-write updates in replay mode */
1421 if (!JOURNAL_REPLAY(sock))
1422 journal_write("update", orig_buf);
1424 while (buffer_size > 0)
1425 {
1426 char *value;
1427 time_t stamp;
1428 char *eostamp;
1430 status = buffer_get_field (&buffer, &buffer_size, &value);
1431 if (status != 0)
1432 {
1433 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1434 break;
1435 }
1437 /* make sure update time is always moving forward */
1438 stamp = strtol(value, &eostamp, 10);
1439 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1440 {
1441 pthread_mutex_unlock(&cache_lock);
1442 return send_response(sock, RESP_ERR,
1443 "Cannot find timestamp in '%s'!\n", value);
1444 }
1445 else if (stamp <= ci->last_update_stamp)
1446 {
1447 pthread_mutex_unlock(&cache_lock);
1448 return send_response(sock, RESP_ERR,
1449 "illegal attempt to update using time %ld when last"
1450 " update time is %ld (minimum one second step)\n",
1451 stamp, ci->last_update_stamp);
1452 }
1453 else
1454 ci->last_update_stamp = stamp;
1456 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1457 &ci->values_alloc, config_alloc_chunk))
1458 {
1459 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1460 continue;
1461 }
1463 values_num++;
1464 }
1466 if (((now - ci->last_flush_time) >= config_write_interval)
1467 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1468 && (ci->values_num > 0))
1469 {
1470 enqueue_cache_item (ci, TAIL);
1471 }
1473 pthread_mutex_unlock (&cache_lock);
1475 if (values_num < 1)
1476 return send_response(sock, RESP_ERR, "No values updated.\n");
1477 else
1478 return send_response(sock, RESP_OK,
1479 "errors, enqueued %i value(s).\n", values_num);
1481 /* NOTREACHED */
1482 assert(1==0);
1484 } /* }}} int handle_request_update */
1486 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1487 {
1488 char *file, file_tmp[PATH_MAX];
1489 char *cf;
1491 char *start_str;
1492 char *end_str;
1493 time_t start_tm;
1494 time_t end_tm;
1496 unsigned long step;
1497 unsigned long ds_cnt;
1498 char **ds_namv;
1499 rrd_value_t *data;
1501 int status;
1502 unsigned long i;
1503 time_t t;
1504 rrd_value_t *data_ptr;
1506 file = NULL;
1507 cf = NULL;
1508 start_str = NULL;
1509 end_str = NULL;
1511 /* Read the arguments */
1512 do /* while (0) */
1513 {
1514 status = buffer_get_field (&buffer, &buffer_size, &file);
1515 if (status != 0)
1516 break;
1518 status = buffer_get_field (&buffer, &buffer_size, &cf);
1519 if (status != 0)
1520 break;
1522 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1523 if (status != 0)
1524 {
1525 start_str = NULL;
1526 status = 0;
1527 break;
1528 }
1530 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1531 if (status != 0)
1532 {
1533 end_str = NULL;
1534 status = 0;
1535 break;
1536 }
1537 } while (0);
1539 if (status != 0)
1540 return (syntax_error(sock,cmd));
1542 get_abs_path(&file, file_tmp);
1543 if (!check_file_access(file, sock)) return 0;
1545 status = flush_file (file);
1546 if ((status != 0) && (status != ENOENT))
1547 return (send_response (sock, RESP_ERR,
1548 "flush_file (%s) failed with status %i.\n", file, status));
1550 t = time (NULL); /* "now" */
1552 /* Parse start time */
1553 if (start_str != NULL)
1554 {
1555 char *endptr;
1556 long value;
1558 endptr = NULL;
1559 errno = 0;
1560 value = strtol (start_str, &endptr, /* base = */ 0);
1561 if ((endptr == start_str) || (errno != 0))
1562 return (send_response(sock, RESP_ERR,
1563 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1564 start_str));
1566 if (value > 0)
1567 start_tm = (time_t) value;
1568 else
1569 start_tm = (time_t) (t + value);
1570 }
1571 else
1572 {
1573 start_tm = t - 86400;
1574 }
1576 /* Parse end time */
1577 if (end_str != NULL)
1578 {
1579 char *endptr;
1580 long value;
1582 endptr = NULL;
1583 errno = 0;
1584 value = strtol (end_str, &endptr, /* base = */ 0);
1585 if ((endptr == end_str) || (errno != 0))
1586 return (send_response(sock, RESP_ERR,
1587 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1588 end_str));
1590 if (value > 0)
1591 end_tm = (time_t) value;
1592 else
1593 end_tm = (time_t) (t + value);
1594 }
1595 else
1596 {
1597 end_tm = t;
1598 }
1600 step = -1;
1601 ds_cnt = 0;
1602 ds_namv = NULL;
1603 data = NULL;
1605 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1606 &ds_cnt, &ds_namv, &data);
1607 if (status != 0)
1608 return (send_response(sock, RESP_ERR,
1609 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1611 add_response_info (sock, "FlushVersion: %lu\n", 1);
1612 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1613 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1614 add_response_info (sock, "Step: %lu\n", step);
1615 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1617 #define SSTRCAT(buffer,str,buffer_fill) do { \
1618 size_t str_len = strlen (str); \
1619 if ((buffer_fill + str_len) > sizeof (buffer)) \
1620 str_len = sizeof (buffer) - buffer_fill; \
1621 if (str_len > 0) { \
1622 strncpy (buffer + buffer_fill, str, str_len); \
1623 buffer_fill += str_len; \
1624 assert (buffer_fill <= sizeof (buffer)); \
1625 if (buffer_fill == sizeof (buffer)) \
1626 buffer[buffer_fill - 1] = 0; \
1627 else \
1628 buffer[buffer_fill] = 0; \
1629 } \
1630 } while (0)
1632 { /* Add list of DS names */
1633 char linebuf[1024];
1634 size_t linebuf_fill;
1636 memset (linebuf, 0, sizeof (linebuf));
1637 linebuf_fill = 0;
1638 for (i = 0; i < ds_cnt; i++)
1639 {
1640 if (i > 0)
1641 SSTRCAT (linebuf, " ", linebuf_fill);
1642 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1643 rrd_freemem(ds_namv[i]);
1644 }
1645 rrd_freemem(ds_namv);
1646 add_response_info (sock, "DSName: %s\n", linebuf);
1647 }
1649 /* Add the actual data */
1650 assert (step > 0);
1651 data_ptr = data;
1652 for (t = start_tm + step; t <= end_tm; t += step)
1653 {
1654 char linebuf[1024];
1655 size_t linebuf_fill;
1656 char tmp[128];
1658 memset (linebuf, 0, sizeof (linebuf));
1659 linebuf_fill = 0;
1660 for (i = 0; i < ds_cnt; i++)
1661 {
1662 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1663 tmp[sizeof (tmp) - 1] = 0;
1664 SSTRCAT (linebuf, tmp, linebuf_fill);
1666 data_ptr++;
1667 }
1669 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1670 } /* for (t) */
1671 rrd_freemem(data);
1673 return (send_response (sock, RESP_OK, "Success\n"));
1674 #undef SSTRCAT
1675 } /* }}} int handle_request_fetch */
1677 /* we came across a "WROTE" entry during journal replay.
1678 * throw away any values that we have accumulated for this file
1679 */
1680 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1681 {
1682 cache_item_t *ci;
1683 const char *file = buffer;
1685 pthread_mutex_lock(&cache_lock);
1687 ci = g_tree_lookup(cache_tree, file);
1688 if (ci == NULL)
1689 {
1690 pthread_mutex_unlock(&cache_lock);
1691 return (0);
1692 }
1694 if (ci->values)
1695 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1697 wipe_ci_values(ci, now);
1698 remove_from_queue(ci);
1700 pthread_mutex_unlock(&cache_lock);
1701 return (0);
1702 } /* }}} int handle_request_wrote */
1704 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1705 {
1706 char *file, file_tmp[PATH_MAX];
1707 int status;
1708 rrd_info_t *info;
1710 /* obtain filename */
1711 status = buffer_get_field(&buffer, &buffer_size, &file);
1712 if (status != 0)
1713 return syntax_error(sock,cmd);
1714 /* get full pathname */
1715 get_abs_path(&file, file_tmp);
1716 if (!check_file_access(file, sock)) {
1717 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1718 }
1719 /* get data */
1720 rrd_clear_error ();
1721 info = rrd_info_r(file);
1722 if(!info) {
1723 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1724 }
1725 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1726 switch (data->type) {
1727 case RD_I_VAL:
1728 if (isnan(data->value.u_val))
1729 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1730 else
1731 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1732 break;
1733 case RD_I_CNT:
1734 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1735 break;
1736 case RD_I_INT:
1737 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1738 break;
1739 case RD_I_STR:
1740 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1741 break;
1742 case RD_I_BLO:
1743 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1744 break;
1745 }
1746 }
1748 rrd_info_free(info);
1750 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1751 } /* }}} static int handle_request_info */
1753 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1754 {
1755 char *i, *file, file_tmp[PATH_MAX];
1756 int status;
1757 int idx;
1758 time_t t;
1760 /* obtain filename */
1761 status = buffer_get_field(&buffer, &buffer_size, &file);
1762 if (status != 0)
1763 return syntax_error(sock,cmd);
1764 /* get full pathname */
1765 get_abs_path(&file, file_tmp);
1766 if (!check_file_access(file, sock)) {
1767 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1768 }
1770 status = buffer_get_field(&buffer, &buffer_size, &i);
1771 if (status != 0)
1772 return syntax_error(sock,cmd);
1773 idx = atoi(i);
1774 if(idx<0) {
1775 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1776 }
1778 /* get data */
1779 rrd_clear_error ();
1780 t = rrd_first_r(file,idx);
1781 if(t<1) {
1782 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1783 }
1784 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1785 } /* }}} static int handle_request_first */
1788 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1789 {
1790 char *file, file_tmp[PATH_MAX];
1791 int status;
1792 time_t t, from_file, step;
1793 rrd_file_t * rrd_file;
1794 cache_item_t * ci;
1795 rrd_t rrd;
1797 /* obtain filename */
1798 status = buffer_get_field(&buffer, &buffer_size, &file);
1799 if (status != 0)
1800 return syntax_error(sock,cmd);
1801 /* get full pathname */
1802 get_abs_path(&file, file_tmp);
1803 if (!check_file_access(file, sock)) {
1804 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1805 }
1806 rrd_clear_error();
1807 rrd_init(&rrd);
1808 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1809 if(!rrd_file) {
1810 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1811 }
1812 from_file = rrd.live_head->last_up;
1813 step = rrd.stat_head->pdp_step;
1814 rrd_close(rrd_file);
1815 pthread_mutex_lock(&cache_lock);
1816 ci = g_tree_lookup(cache_tree, file);
1817 if (ci)
1818 t = ci->last_update_stamp;
1819 else
1820 t = from_file;
1821 pthread_mutex_unlock(&cache_lock);
1822 t -= t % step;
1823 rrd_free(&rrd);
1824 if(t<1) {
1825 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1826 }
1827 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1828 } /* }}} static int handle_request_last */
1830 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1831 {
1832 char *file, file_tmp[PATH_MAX];
1833 char *tok;
1834 int ac = 0;
1835 char *av[128];
1836 int status;
1837 unsigned long step = 300;
1838 time_t last_up = time(NULL)-10;
1839 int no_overwrite = opt_no_overwrite;
1842 /* obtain filename */
1843 status = buffer_get_field(&buffer, &buffer_size, &file);
1844 if (status != 0)
1845 return syntax_error(sock,cmd);
1846 /* get full pathname */
1847 get_abs_path(&file, file_tmp);
1848 if (!check_file_access(file, sock)) {
1849 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1850 }
1851 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1853 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1854 if( ! strncmp(tok,"-b",2) ) {
1855 status = buffer_get_field(&buffer, &buffer_size, &tok );
1856 if (status != 0) return syntax_error(sock,cmd);
1857 last_up = (time_t) atol(tok);
1858 continue;
1859 }
1860 if( ! strncmp(tok,"-s",2) ) {
1861 status = buffer_get_field(&buffer, &buffer_size, &tok );
1862 if (status != 0) return syntax_error(sock,cmd);
1863 step = atol(tok);
1864 continue;
1865 }
1866 if( ! strncmp(tok,"-O",2) ) {
1867 no_overwrite = 1;
1868 continue;
1869 }
1870 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1871 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1872 return syntax_error(sock,cmd);
1873 }
1874 if(step<1) {
1875 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1876 }
1877 if (last_up < 3600 * 24 * 365 * 10) {
1878 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1879 }
1881 rrd_clear_error ();
1882 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1884 if(!status) {
1885 return send_response(sock, RESP_OK, "RRD created OK\n");
1886 }
1887 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1888 } /* }}} static int handle_request_create */
1890 /* start "BATCH" processing */
1891 static int batch_start (HANDLER_PROTO) /* {{{ */
1892 {
1893 int status;
1894 if (sock->batch_start)
1895 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1897 status = send_response(sock, RESP_OK,
1898 "Go ahead. End with dot '.' on its own line.\n");
1899 sock->batch_start = time(NULL);
1900 sock->batch_cmd = 0;
1902 return status;
1903 } /* }}} static int batch_start */
1905 /* finish "BATCH" processing and return results to the client */
1906 static int batch_done (HANDLER_PROTO) /* {{{ */
1907 {
1908 assert(sock->batch_start);
1909 sock->batch_start = 0;
1910 sock->batch_cmd = 0;
1911 return send_response(sock, RESP_OK, "errors\n");
1912 } /* }}} static int batch_done */
1914 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1915 {
1916 return -1;
1917 } /* }}} static int handle_request_quit */
1919 static command_t list_of_commands[] = { /* {{{ */
1920 {
1921 "UPDATE",
1922 handle_request_update,
1923 CMD_CONTEXT_ANY,
1924 "UPDATE <filename> <values> [<values> ...]\n"
1925 ,
1926 "Adds the given file to the internal cache if it is not yet known and\n"
1927 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1928 "for details.\n"
1929 "\n"
1930 "Each <values> has the following form:\n"
1931 " <values> = <time>:<value>[:<value>[...]]\n"
1932 "See the rrdupdate(1) manpage for details.\n"
1933 },
1934 {
1935 "WROTE",
1936 handle_request_wrote,
1937 CMD_CONTEXT_JOURNAL,
1938 NULL,
1939 NULL
1940 },
1941 {
1942 "FLUSH",
1943 handle_request_flush,
1944 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1945 "FLUSH <filename>\n"
1946 ,
1947 "Adds the given filename to the head of the update queue and returns\n"
1948 "after it has been dequeued.\n"
1949 },
1950 {
1951 "FLUSHALL",
1952 handle_request_flushall,
1953 CMD_CONTEXT_CLIENT,
1954 "FLUSHALL\n"
1955 ,
1956 "Triggers writing of all pending updates. Returns immediately.\n"
1957 },
1958 {
1959 "PENDING",
1960 handle_request_pending,
1961 CMD_CONTEXT_CLIENT,
1962 "PENDING <filename>\n"
1963 ,
1964 "Shows any 'pending' updates for a file, in order.\n"
1965 "The updates shown have not yet been written to the underlying RRD file.\n"
1966 },
1967 {
1968 "FORGET",
1969 handle_request_forget,
1970 CMD_CONTEXT_ANY,
1971 "FORGET <filename>\n"
1972 ,
1973 "Removes the file completely from the cache.\n"
1974 "Any pending updates for the file will be lost.\n"
1975 },
1976 {
1977 "QUEUE",
1978 handle_request_queue,
1979 CMD_CONTEXT_CLIENT,
1980 "QUEUE\n"
1981 ,
1982 "Shows all files in the output queue.\n"
1983 "The output is zero or more lines in the following format:\n"
1984 "(where <num_vals> is the number of values to be written)\n"
1985 "\n"
1986 "<num_vals> <filename>\n"
1987 },
1988 {
1989 "STATS",
1990 handle_request_stats,
1991 CMD_CONTEXT_CLIENT,
1992 "STATS\n"
1993 ,
1994 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1995 "a description of the values.\n"
1996 },
1997 {
1998 "HELP",
1999 handle_request_help,
2000 CMD_CONTEXT_CLIENT,
2001 "HELP [<command>]\n",
2002 NULL, /* special! */
2003 },
2004 {
2005 "BATCH",
2006 batch_start,
2007 CMD_CONTEXT_CLIENT,
2008 "BATCH\n"
2009 ,
2010 "The 'BATCH' command permits the client to initiate a bulk load\n"
2011 " of commands to rrdcached.\n"
2012 "\n"
2013 "Usage:\n"
2014 "\n"
2015 " client: BATCH\n"
2016 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2017 " client: command #1\n"
2018 " client: command #2\n"
2019 " client: ... and so on\n"
2020 " client: .\n"
2021 " server: 2 errors\n"
2022 " server: 7 message for command #7\n"
2023 " server: 9 message for command #9\n"
2024 "\n"
2025 "For more information, consult the rrdcached(1) documentation.\n"
2026 },
2027 {
2028 ".", /* BATCH terminator */
2029 batch_done,
2030 CMD_CONTEXT_BATCH,
2031 NULL,
2032 NULL
2033 },
2034 {
2035 "FETCH",
2036 handle_request_fetch,
2037 CMD_CONTEXT_CLIENT,
2038 "FETCH <file> <CF> [<start> [<end>]]\n"
2039 ,
2040 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2041 },
2042 {
2043 "INFO",
2044 handle_request_info,
2045 CMD_CONTEXT_CLIENT,
2046 "INFO <filename>\n",
2047 "The INFO command retrieves information about a specified RRD file.\n"
2048 "This is returned in standard rrdinfo format, a sequence of lines\n"
2049 "with the format <keyname> = <value>\n"
2050 "Note that this is the data as of the last update of the RRD file itself,\n"
2051 "not the last time data was received via rrdcached, so there may be pending\n"
2052 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2053 },
2054 {
2055 "FIRST",
2056 handle_request_first,
2057 CMD_CONTEXT_CLIENT,
2058 "FIRST <filename> <rra index>\n",
2059 "The FIRST command retrieves the first data time for a specified RRA in\n"
2060 "an RRD file.\n"
2061 },
2062 {
2063 "LAST",
2064 handle_request_last,
2065 CMD_CONTEXT_CLIENT,
2066 "LAST <filename>\n",
2067 "The LAST command retrieves the last update time for a specified RRD file.\n"
2068 "Note that this is the time of the last update of the RRD file itself, not\n"
2069 "the last time data was received via rrdcached, so there may be pending\n"
2070 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2071 },
2072 {
2073 "CREATE",
2074 handle_request_create,
2075 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2076 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2077 "The CREATE command will create an RRD file, overwriting any existing file\n"
2078 "unless the -O option is given or rrdcached was started with the -O option.\n"
2079 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2080 "not acceptable) and the step is in seconds (default is 300).\n"
2081 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2082 },
2083 {
2084 "QUIT",
2085 handle_request_quit,
2086 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2087 "QUIT\n"
2088 ,
2089 "Disconnect from rrdcached.\n"
2090 }
2091 }; /* }}} command_t list_of_commands[] */
2092 static size_t list_of_commands_len = sizeof (list_of_commands)
2093 / sizeof (list_of_commands[0]);
2095 static command_t *find_command(char *cmd)
2096 {
2097 size_t i;
2099 for (i = 0; i < list_of_commands_len; i++)
2100 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2101 return (&list_of_commands[i]);
2102 return NULL;
2103 }
2105 /* We currently use the index in the `list_of_commands' array as a bit position
2106 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2107 * outside these functions so that switching to a more elegant storage method
2108 * is easily possible. */
2109 static ssize_t find_command_index (const char *cmd) /* {{{ */
2110 {
2111 size_t i;
2113 for (i = 0; i < list_of_commands_len; i++)
2114 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2115 return ((ssize_t) i);
2116 return (-1);
2117 } /* }}} ssize_t find_command_index */
2119 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2120 const char *cmd)
2121 {
2122 ssize_t i;
2124 if (JOURNAL_REPLAY(sock))
2125 return (1);
2127 if (cmd == NULL)
2128 return (-1);
2130 if ((strcasecmp ("QUIT", cmd) == 0)
2131 || (strcasecmp ("HELP", cmd) == 0))
2132 return (1);
2133 else if (strcmp (".", cmd) == 0)
2134 cmd = "BATCH";
2136 i = find_command_index (cmd);
2137 if (i < 0)
2138 return (-1);
2139 assert (i < 32);
2141 if ((sock->permissions & (1 << i)) != 0)
2142 return (1);
2143 return (0);
2144 } /* }}} int socket_permission_check */
2146 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2147 const char *cmd)
2148 {
2149 ssize_t i;
2151 i = find_command_index (cmd);
2152 if (i < 0)
2153 return (-1);
2154 assert (i < 32);
2156 sock->permissions |= (1 << i);
2157 return (0);
2158 } /* }}} int socket_permission_add */
2160 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2161 {
2162 sock->permissions = 0;
2163 } /* }}} socket_permission_clear */
2165 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2166 listen_socket_t *src)
2167 {
2168 dest->permissions = src->permissions;
2169 } /* }}} socket_permission_copy */
2171 /* check whether commands are received in the expected context */
2172 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2173 {
2174 if (JOURNAL_REPLAY(sock))
2175 return (cmd->context & CMD_CONTEXT_JOURNAL);
2176 else if (sock->batch_start)
2177 return (cmd->context & CMD_CONTEXT_BATCH);
2178 else
2179 return (cmd->context & CMD_CONTEXT_CLIENT);
2181 /* NOTREACHED */
2182 assert(1==0);
2183 }
2185 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2186 {
2187 int status;
2188 char *cmd_str;
2189 char *resp_txt;
2190 command_t *help = NULL;
2192 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2193 if (status == 0)
2194 help = find_command(cmd_str);
2196 if (help && (help->syntax || help->help))
2197 {
2198 char tmp[CMD_MAX];
2200 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2201 resp_txt = tmp;
2203 if (help->syntax)
2204 add_response_info(sock, "Usage: %s\n", help->syntax);
2206 if (help->help)
2207 add_response_info(sock, "%s\n", help->help);
2208 }
2209 else
2210 {
2211 size_t i;
2213 resp_txt = "Command overview\n";
2215 for (i = 0; i < list_of_commands_len; i++)
2216 {
2217 if (list_of_commands[i].syntax == NULL)
2218 continue;
2219 add_response_info (sock, "%s", list_of_commands[i].syntax);
2220 }
2221 }
2223 return send_response(sock, RESP_OK, resp_txt);
2224 } /* }}} int handle_request_help */
2226 static int handle_request (DISPATCH_PROTO) /* {{{ */
2227 {
2228 char *buffer_ptr = buffer;
2229 char *cmd_str = NULL;
2230 command_t *cmd = NULL;
2231 int status;
2233 assert (buffer[buffer_size - 1] == '\0');
2235 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2236 if (status != 0)
2237 {
2238 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2239 return (-1);
2240 }
2242 if (sock != NULL && sock->batch_start)
2243 sock->batch_cmd++;
2245 cmd = find_command(cmd_str);
2246 if (!cmd)
2247 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2249 if (!socket_permission_check (sock, cmd->cmd))
2250 return send_response(sock, RESP_ERR, "Permission denied.\n");
2252 if (!command_check_context(sock, cmd))
2253 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2255 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2256 } /* }}} int handle_request */
2258 static void journal_set_free (journal_set *js) /* {{{ */
2259 {
2260 if (js == NULL)
2261 return;
2263 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2265 free(js);
2266 } /* }}} journal_set_free */
2268 static void journal_set_remove (journal_set *js) /* {{{ */
2269 {
2270 if (js == NULL)
2271 return;
2273 for (uint i=0; i < js->files_num; i++)
2274 {
2275 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2276 unlink(js->files[i]);
2277 }
2278 } /* }}} journal_set_remove */
2280 /* close current journal file handle.
2281 * MUST hold journal_lock before calling */
2282 static void journal_close(void) /* {{{ */
2283 {
2284 if (journal_fh != NULL)
2285 {
2286 if (fclose(journal_fh) != 0)
2287 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2288 }
2290 journal_fh = NULL;
2291 journal_size = 0;
2292 } /* }}} journal_close */
2294 /* MUST hold journal_lock before calling */
2295 static void journal_new_file(void) /* {{{ */
2296 {
2297 struct timeval now;
2298 int new_fd;
2299 char new_file[PATH_MAX + 1];
2301 assert(journal_dir != NULL);
2302 assert(journal_cur != NULL);
2304 journal_close();
2306 gettimeofday(&now, NULL);
2307 /* this format assures that the files sort in strcmp() order */
2308 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2309 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2311 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2312 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2313 if (new_fd < 0)
2314 goto error;
2316 journal_fh = fdopen(new_fd, "a");
2317 if (journal_fh == NULL)
2318 goto error;
2320 journal_size = ftell(journal_fh);
2321 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2323 /* record the file in the journal set */
2324 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2326 return;
2328 error:
2329 RRDD_LOG(LOG_CRIT,
2330 "JOURNALING DISABLED: Error while trying to create %s : %s",
2331 new_file, rrd_strerror(errno));
2332 RRDD_LOG(LOG_CRIT,
2333 "JOURNALING DISABLED: All values will be flushed at shutdown");
2335 close(new_fd);
2336 config_flush_at_shutdown = 1;
2338 } /* }}} journal_new_file */
2340 /* MUST NOT hold journal_lock before calling this */
2341 static void journal_rotate(void) /* {{{ */
2342 {
2343 journal_set *old_js = NULL;
2345 if (journal_dir == NULL)
2346 return;
2348 RRDD_LOG(LOG_DEBUG, "rotating journals");
2350 pthread_mutex_lock(&stats_lock);
2351 ++stats_journal_rotate;
2352 pthread_mutex_unlock(&stats_lock);
2354 pthread_mutex_lock(&journal_lock);
2356 journal_close();
2358 /* rotate the journal sets */
2359 old_js = journal_old;
2360 journal_old = journal_cur;
2361 journal_cur = calloc(1, sizeof(journal_set));
2363 if (journal_cur != NULL)
2364 journal_new_file();
2365 else
2366 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2368 pthread_mutex_unlock(&journal_lock);
2370 journal_set_remove(old_js);
2371 journal_set_free (old_js);
2373 } /* }}} static void journal_rotate */
2375 /* MUST hold journal_lock when calling */
2376 static void journal_done(void) /* {{{ */
2377 {
2378 if (journal_cur == NULL)
2379 return;
2381 journal_close();
2383 if (config_flush_at_shutdown)
2384 {
2385 RRDD_LOG(LOG_INFO, "removing journals");
2386 journal_set_remove(journal_old);
2387 journal_set_remove(journal_cur);
2388 }
2389 else
2390 {
2391 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2392 "journals will be used at next startup");
2393 }
2395 journal_set_free(journal_cur);
2396 journal_set_free(journal_old);
2397 free(journal_dir);
2399 } /* }}} static void journal_done */
2401 static int journal_write(char *cmd, char *args) /* {{{ */
2402 {
2403 int chars;
2405 if (journal_fh == NULL)
2406 return 0;
2408 pthread_mutex_lock(&journal_lock);
2409 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2410 journal_size += chars;
2412 if (journal_size > JOURNAL_MAX)
2413 journal_new_file();
2415 pthread_mutex_unlock(&journal_lock);
2417 if (chars > 0)
2418 {
2419 pthread_mutex_lock(&stats_lock);
2420 stats_journal_bytes += chars;
2421 pthread_mutex_unlock(&stats_lock);
2422 }
2424 return chars;
2425 } /* }}} static int journal_write */
2427 static int journal_replay (const char *file) /* {{{ */
2428 {
2429 FILE *fh;
2430 int entry_cnt = 0;
2431 int fail_cnt = 0;
2432 uint64_t line = 0;
2433 char entry[CMD_MAX];
2434 time_t now;
2436 if (file == NULL) return 0;
2438 {
2439 char *reason = "unknown error";
2440 int status = 0;
2441 struct stat statbuf;
2443 memset(&statbuf, 0, sizeof(statbuf));
2444 if (stat(file, &statbuf) != 0)
2445 {
2446 reason = "stat error";
2447 status = errno;
2448 }
2449 else if (!S_ISREG(statbuf.st_mode))
2450 {
2451 reason = "not a regular file";
2452 status = EPERM;
2453 }
2454 if (statbuf.st_uid != daemon_uid)
2455 {
2456 reason = "not owned by daemon user";
2457 status = EACCES;
2458 }
2459 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2460 {
2461 reason = "must not be user/group writable";
2462 status = EACCES;
2463 }
2465 if (status != 0)
2466 {
2467 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2468 file, rrd_strerror(status), reason);
2469 return 0;
2470 }
2471 }
2473 fh = fopen(file, "r");
2474 if (fh == NULL)
2475 {
2476 if (errno != ENOENT)
2477 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2478 file, rrd_strerror(errno));
2479 return 0;
2480 }
2481 else
2482 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2484 now = time(NULL);
2486 while(!feof(fh))
2487 {
2488 size_t entry_len;
2490 ++line;
2491 if (fgets(entry, sizeof(entry), fh) == NULL)
2492 break;
2493 entry_len = strlen(entry);
2495 /* check \n termination in case journal writing crashed mid-line */
2496 if (entry_len == 0)
2497 continue;
2498 else if (entry[entry_len - 1] != '\n')
2499 {
2500 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2501 ++fail_cnt;
2502 continue;
2503 }
2505 entry[entry_len - 1] = '\0';
2507 if (handle_request(NULL, now, entry, entry_len) == 0)
2508 ++entry_cnt;
2509 else
2510 ++fail_cnt;
2511 }
2513 fclose(fh);
2515 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2516 entry_cnt, fail_cnt);
2518 return entry_cnt > 0 ? 1 : 0;
2519 } /* }}} static int journal_replay */
2521 static int journal_sort(const void *v1, const void *v2)
2522 {
2523 char **jn1 = (char **) v1;
2524 char **jn2 = (char **) v2;
2526 return strcmp(*jn1,*jn2);
2527 }
2529 static void journal_init(void) /* {{{ */
2530 {
2531 int had_journal = 0;
2532 DIR *dir;
2533 struct dirent *dent;
2534 char path[PATH_MAX+1];
2536 if (journal_dir == NULL) return;
2538 pthread_mutex_lock(&journal_lock);
2540 journal_cur = calloc(1, sizeof(journal_set));
2541 if (journal_cur == NULL)
2542 {
2543 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2544 return;
2545 }
2547 RRDD_LOG(LOG_INFO, "checking for journal files");
2549 /* Handle old journal files during transition. This gives them the
2550 * correct sort order. TODO: remove after first release
2551 */
2552 {
2553 char old_path[PATH_MAX+1];
2554 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2555 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2556 rename(old_path, path);
2558 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2559 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2560 rename(old_path, path);
2561 }
2563 dir = opendir(journal_dir);
2564 if (!dir) {
2565 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2566 return;
2567 }
2568 while ((dent = readdir(dir)) != NULL)
2569 {
2570 /* looks like a journal file? */
2571 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2572 continue;
2574 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2576 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2577 {
2578 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2579 dent->d_name);
2580 break;
2581 }
2582 }
2583 closedir(dir);
2585 qsort(journal_cur->files, journal_cur->files_num,
2586 sizeof(journal_cur->files[0]), journal_sort);
2588 for (uint i=0; i < journal_cur->files_num; i++)
2589 had_journal += journal_replay(journal_cur->files[i]);
2591 journal_new_file();
2593 /* it must have been a crash. start a flush */
2594 if (had_journal && config_flush_at_shutdown)
2595 flush_old_values(-1);
2597 pthread_mutex_unlock(&journal_lock);
2599 RRDD_LOG(LOG_INFO, "journal processing complete");
2601 } /* }}} static void journal_init */
2603 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2604 {
2605 assert(sock != NULL);
2607 free(sock->rbuf); sock->rbuf = NULL;
2608 free(sock->wbuf); sock->wbuf = NULL;
2609 free(sock);
2610 } /* }}} void free_listen_socket */
2612 static void close_connection(listen_socket_t *sock) /* {{{ */
2613 {
2614 if (sock->fd >= 0)
2615 {
2616 close(sock->fd);
2617 sock->fd = -1;
2618 }
2620 free_listen_socket(sock);
2622 } /* }}} void close_connection */
2624 static void *connection_thread_main (void *args) /* {{{ */
2625 {
2626 listen_socket_t *sock;
2627 int fd;
2629 sock = (listen_socket_t *) args;
2630 fd = sock->fd;
2632 /* init read buffers */
2633 sock->next_read = sock->next_cmd = 0;
2634 sock->rbuf = malloc(RBUF_SIZE);
2635 if (sock->rbuf == NULL)
2636 {
2637 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2638 close_connection(sock);
2639 return NULL;
2640 }
2642 pthread_mutex_lock (&connection_threads_lock);
2643 #ifdef HAVE_LIBWRAP
2644 /* LIBWRAP does not support multiple threads! By putting this code
2645 inside pthread_mutex_lock we do not have to worry about request_info
2646 getting overwritten by another thread.
2647 */
2648 struct request_info req;
2649 request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2650 fromhost(&req);
2651 if(!hosts_access(&req)) {
2652 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2653 pthread_mutex_unlock (&connection_threads_lock);
2654 close_connection(sock);
2655 return NULL;
2656 }
2657 #endif /* HAVE_LIBWRAP */
2658 connection_threads_num++;
2659 pthread_mutex_unlock (&connection_threads_lock);
2661 while (state == RUNNING)
2662 {
2663 char *cmd;
2664 ssize_t cmd_len;
2665 ssize_t rbytes;
2666 time_t now;
2668 struct pollfd pollfd;
2669 int status;
2671 pollfd.fd = fd;
2672 pollfd.events = POLLIN | POLLPRI;
2673 pollfd.revents = 0;
2675 status = poll (&pollfd, 1, /* timeout = */ 500);
2676 if (state != RUNNING)
2677 break;
2678 else if (status == 0) /* timeout */
2679 continue;
2680 else if (status < 0) /* error */
2681 {
2682 status = errno;
2683 if (status != EINTR)
2684 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2685 continue;
2686 }
2688 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2689 break;
2690 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2691 {
2692 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2693 "poll(2) returned something unexpected: %#04hx",
2694 pollfd.revents);
2695 break;
2696 }
2698 rbytes = read(fd, sock->rbuf + sock->next_read,
2699 RBUF_SIZE - sock->next_read);
2700 if (rbytes < 0)
2701 {
2702 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2703 break;
2704 }
2705 else if (rbytes == 0)
2706 break; /* eof */
2708 sock->next_read += rbytes;
2710 if (sock->batch_start)
2711 now = sock->batch_start;
2712 else
2713 now = time(NULL);
2715 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2716 {
2717 status = handle_request (sock, now, cmd, cmd_len+1);
2718 if (status != 0)
2719 goto out_close;
2720 }
2721 }
2723 out_close:
2724 close_connection(sock);
2726 /* Remove this thread from the connection threads list */
2727 pthread_mutex_lock (&connection_threads_lock);
2728 connection_threads_num--;
2729 if (connection_threads_num <= 0)
2730 pthread_cond_broadcast(&connection_threads_done);
2731 pthread_mutex_unlock (&connection_threads_lock);
2733 return (NULL);
2734 } /* }}} void *connection_thread_main */
2736 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2737 {
2738 int fd;
2739 struct sockaddr_un sa;
2740 listen_socket_t *temp;
2741 int status;
2742 const char *path;
2743 char *path_copy, *dir;
2745 path = sock->addr;
2746 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2747 path += strlen("unix:");
2749 /* dirname may modify its argument */
2750 path_copy = strdup(path);
2751 if (path_copy == NULL)
2752 {
2753 fprintf(stderr, "rrdcached: strdup(): %s\n",
2754 rrd_strerror(errno));
2755 return (-1);
2756 }
2758 dir = dirname(path_copy);
2759 if (rrd_mkdir_p(dir, 0777) != 0)
2760 {
2761 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2762 dir, rrd_strerror(errno));
2763 return (-1);
2764 }
2766 free(path_copy);
2768 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2769 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2770 if (temp == NULL)
2771 {
2772 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2773 return (-1);
2774 }
2775 listen_fds = temp;
2776 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2778 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2779 if (fd < 0)
2780 {
2781 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2782 rrd_strerror(errno));
2783 return (-1);
2784 }
2786 memset (&sa, 0, sizeof (sa));
2787 sa.sun_family = AF_UNIX;
2788 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2790 /* if we've gotten this far, we own the pid file. any daemon started
2791 * with the same args must not be alive. therefore, ensure that we can
2792 * create the socket...
2793 */
2794 unlink(path);
2796 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2797 if (status != 0)
2798 {
2799 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2800 path, rrd_strerror(errno));
2801 close (fd);
2802 return (-1);
2803 }
2805 /* tweak the sockets group ownership */
2806 if (sock->socket_group != (gid_t)-1)
2807 {
2808 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2809 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2810 {
2811 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2812 }
2813 }
2815 if (sock->socket_permissions != (mode_t)-1)
2816 {
2817 if (chmod(path, sock->socket_permissions) != 0)
2818 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2819 (unsigned int)sock->socket_permissions, strerror(errno));
2820 }
2822 status = listen (fd, /* backlog = */ 10);
2823 if (status != 0)
2824 {
2825 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2826 path, rrd_strerror(errno));
2827 close (fd);
2828 unlink (path);
2829 return (-1);
2830 }
2832 listen_fds[listen_fds_num].fd = fd;
2833 listen_fds[listen_fds_num].family = PF_UNIX;
2834 strncpy(listen_fds[listen_fds_num].addr, path,
2835 sizeof (listen_fds[listen_fds_num].addr) - 1);
2836 listen_fds_num++;
2838 return (0);
2839 } /* }}} int open_listen_socket_unix */
2841 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2842 {
2843 struct addrinfo ai_hints;
2844 struct addrinfo *ai_res;
2845 struct addrinfo *ai_ptr;
2846 char addr_copy[NI_MAXHOST];
2847 char *addr;
2848 char *port;
2849 int status;
2851 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2852 addr_copy[sizeof (addr_copy) - 1] = 0;
2853 addr = addr_copy;
2855 memset (&ai_hints, 0, sizeof (ai_hints));
2856 ai_hints.ai_flags = 0;
2857 #ifdef AI_ADDRCONFIG
2858 ai_hints.ai_flags |= AI_ADDRCONFIG;
2859 #endif
2860 ai_hints.ai_family = AF_UNSPEC;
2861 ai_hints.ai_socktype = SOCK_STREAM;
2863 port = NULL;
2864 if (*addr == '[') /* IPv6+port format */
2865 {
2866 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2867 addr++;
2869 port = strchr (addr, ']');
2870 if (port == NULL)
2871 {
2872 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2873 return (-1);
2874 }
2875 *port = 0;
2876 port++;
2878 if (*port == ':')
2879 port++;
2880 else if (*port == 0)
2881 port = NULL;
2882 else
2883 {
2884 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2885 return (-1);
2886 }
2887 } /* if (*addr == '[') */
2888 else
2889 {
2890 port = rindex(addr, ':');
2891 if (port != NULL)
2892 {
2893 *port = 0;
2894 port++;
2895 }
2896 }
2897 ai_res = NULL;
2898 status = getaddrinfo (addr,
2899 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2900 &ai_hints, &ai_res);
2901 if (status != 0)
2902 {
2903 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2904 addr, gai_strerror (status));
2905 return (-1);
2906 }
2908 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2909 {
2910 int fd;
2911 listen_socket_t *temp;
2912 int one = 1;
2914 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2915 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2916 if (temp == NULL)
2917 {
2918 fprintf (stderr,
2919 "rrdcached: open_listen_socket_network: realloc failed.\n");
2920 continue;
2921 }
2922 listen_fds = temp;
2923 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2925 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2926 if (fd < 0)
2927 {
2928 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2929 rrd_strerror(errno));
2930 continue;
2931 }
2933 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2935 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2936 if (status != 0)
2937 {
2938 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2939 sock->addr, rrd_strerror(errno));
2940 close (fd);
2941 continue;
2942 }
2944 status = listen (fd, /* backlog = */ 10);
2945 if (status != 0)
2946 {
2947 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2948 sock->addr, rrd_strerror(errno));
2949 close (fd);
2950 freeaddrinfo(ai_res);
2951 return (-1);
2952 }
2954 listen_fds[listen_fds_num].fd = fd;
2955 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2956 listen_fds_num++;
2957 } /* for (ai_ptr) */
2959 freeaddrinfo(ai_res);
2960 return (0);
2961 } /* }}} static int open_listen_socket_network */
2963 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2964 {
2965 assert(sock != NULL);
2966 assert(sock->addr != NULL);
2968 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2969 || sock->addr[0] == '/')
2970 return (open_listen_socket_unix(sock));
2971 else
2972 return (open_listen_socket_network(sock));
2973 } /* }}} int open_listen_socket */
2975 static int close_listen_sockets (void) /* {{{ */
2976 {
2977 size_t i;
2979 for (i = 0; i < listen_fds_num; i++)
2980 {
2981 close (listen_fds[i].fd);
2983 if (listen_fds[i].family == PF_UNIX)
2984 unlink(listen_fds[i].addr);
2985 }
2987 free (listen_fds);
2988 listen_fds = NULL;
2989 listen_fds_num = 0;
2991 return (0);
2992 } /* }}} int close_listen_sockets */
2994 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2995 {
2996 struct pollfd *pollfds;
2997 int pollfds_num;
2998 int status;
2999 int i;
3001 if (listen_fds_num < 1)
3002 {
3003 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3004 return (NULL);
3005 }
3007 pollfds_num = listen_fds_num;
3008 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3009 if (pollfds == NULL)
3010 {
3011 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3012 return (NULL);
3013 }
3014 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3016 RRDD_LOG(LOG_INFO, "listening for connections");
3018 while (state == RUNNING)
3019 {
3020 for (i = 0; i < pollfds_num; i++)
3021 {
3022 pollfds[i].fd = listen_fds[i].fd;
3023 pollfds[i].events = POLLIN | POLLPRI;
3024 pollfds[i].revents = 0;
3025 }
3027 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3028 if (state != RUNNING)
3029 break;
3030 else if (status == 0) /* timeout */
3031 continue;
3032 else if (status < 0) /* error */
3033 {
3034 status = errno;
3035 if (status != EINTR)
3036 {
3037 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3038 }
3039 continue;
3040 }
3042 for (i = 0; i < pollfds_num; i++)
3043 {
3044 listen_socket_t *client_sock;
3045 struct sockaddr_storage client_sa;
3046 socklen_t client_sa_size;
3047 pthread_t tid;
3048 pthread_attr_t attr;
3050 if (pollfds[i].revents == 0)
3051 continue;
3053 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3054 {
3055 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3056 "poll(2) returned something unexpected for listen FD #%i.",
3057 pollfds[i].fd);
3058 continue;
3059 }
3061 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3062 if (client_sock == NULL)
3063 {
3064 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3065 continue;
3066 }
3067 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3069 client_sa_size = sizeof (client_sa);
3070 client_sock->fd = accept (pollfds[i].fd,
3071 (struct sockaddr *) &client_sa, &client_sa_size);
3072 if (client_sock->fd < 0)
3073 {
3074 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3075 free(client_sock);
3076 continue;
3077 }
3079 pthread_attr_init (&attr);
3080 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3082 status = pthread_create (&tid, &attr, connection_thread_main,
3083 client_sock);
3084 if (status != 0)
3085 {
3086 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3087 close_connection(client_sock);
3088 continue;
3089 }
3090 } /* for (pollfds_num) */
3091 } /* while (state == RUNNING) */
3093 RRDD_LOG(LOG_INFO, "starting shutdown");
3095 close_listen_sockets ();
3097 pthread_mutex_lock (&connection_threads_lock);
3098 while (connection_threads_num > 0)
3099 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3100 pthread_mutex_unlock (&connection_threads_lock);
3102 free(pollfds);
3104 return (NULL);
3105 } /* }}} void *listen_thread_main */
3107 static int daemonize (void) /* {{{ */
3108 {
3109 int pid_fd;
3110 char *base_dir;
3112 daemon_uid = geteuid();
3114 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3115 if (pid_fd < 0)
3116 pid_fd = check_pidfile();
3117 if (pid_fd < 0)
3118 return pid_fd;
3120 /* open all the listen sockets */
3121 if (config_listen_address_list_len > 0)
3122 {
3123 for (size_t i = 0; i < config_listen_address_list_len; i++)
3124 open_listen_socket (config_listen_address_list[i]);
3126 rrd_free_ptrs((void ***) &config_listen_address_list,
3127 &config_listen_address_list_len);
3128 }
3129 else
3130 {
3131 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3132 sizeof(default_socket.addr) - 1);
3133 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3134 open_listen_socket (&default_socket);
3135 }
3137 if (listen_fds_num < 1)
3138 {
3139 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3140 goto error;
3141 }
3143 if (!stay_foreground)
3144 {
3145 pid_t child;
3147 child = fork ();
3148 if (child < 0)
3149 {
3150 fprintf (stderr, "daemonize: fork(2) failed.\n");
3151 goto error;
3152 }
3153 else if (child > 0)
3154 exit(0);
3156 /* Become session leader */
3157 setsid ();
3159 /* Open the first three file descriptors to /dev/null */
3160 close (2);
3161 close (1);
3162 close (0);
3164 open ("/dev/null", O_RDWR);
3165 if (dup(0) == -1 || dup(0) == -1){
3166 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3167 }
3168 } /* if (!stay_foreground) */
3170 /* Change into the /tmp directory. */
3171 base_dir = (config_base_dir != NULL)
3172 ? config_base_dir
3173 : "/tmp";
3175 if (chdir (base_dir) != 0)
3176 {
3177 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3178 goto error;
3179 }
3181 install_signal_handlers();
3183 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3184 RRDD_LOG(LOG_INFO, "starting up");
3186 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3187 (GDestroyNotify) free_cache_item);
3188 if (cache_tree == NULL)
3189 {
3190 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3191 goto error;
3192 }
3194 return write_pidfile (pid_fd);
3196 error:
3197 remove_pidfile();
3198 return -1;
3199 } /* }}} int daemonize */
3201 static int cleanup (void) /* {{{ */
3202 {
3203 pthread_cond_broadcast (&flush_cond);
3204 pthread_join (flush_thread, NULL);
3206 pthread_cond_broadcast (&queue_cond);
3207 for (int i = 0; i < config_queue_threads; i++)
3208 pthread_join (queue_threads[i], NULL);
3210 if (config_flush_at_shutdown)
3211 {
3212 assert(cache_queue_head == NULL);
3213 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3214 }
3216 free(queue_threads);
3217 free(config_base_dir);
3219 pthread_mutex_lock(&cache_lock);
3220 g_tree_destroy(cache_tree);
3222 pthread_mutex_lock(&journal_lock);
3223 journal_done();
3225 RRDD_LOG(LOG_INFO, "goodbye");
3226 closelog ();
3228 remove_pidfile ();
3229 free(config_pid_file);
3231 return (0);
3232 } /* }}} int cleanup */
3234 static int read_options (int argc, char **argv) /* {{{ */
3235 {
3236 int option;
3237 int status = 0;
3239 socket_permission_clear (&default_socket);
3241 default_socket.socket_group = (gid_t)-1;
3242 default_socket.socket_permissions = (mode_t)-1;
3244 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3245 {
3246 switch (option)
3247 {
3248 case 'O':
3249 opt_no_overwrite = 1;
3250 break;
3252 case 'g':
3253 stay_foreground=1;
3254 break;
3256 case 'l':
3257 {
3258 listen_socket_t *new;
3260 new = malloc(sizeof(listen_socket_t));
3261 if (new == NULL)
3262 {
3263 fprintf(stderr, "read_options: malloc failed.\n");
3264 return(2);
3265 }
3266 memset(new, 0, sizeof(listen_socket_t));
3268 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3270 /* Add permissions to the socket {{{ */
3271 if (default_socket.permissions != 0)
3272 {
3273 socket_permission_copy (new, &default_socket);
3274 }
3275 else /* if (default_socket.permissions == 0) */
3276 {
3277 /* Add permission for ALL commands to the socket. */
3278 size_t i;
3279 for (i = 0; i < list_of_commands_len; i++)
3280 {
3281 status = socket_permission_add (new, list_of_commands[i].cmd);
3282 if (status != 0)
3283 {
3284 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3285 "socket failed. This should never happen, ever! Sorry.\n",
3286 list_of_commands[i].cmd);
3287 status = 4;
3288 }
3289 }
3290 }
3291 /* }}} Done adding permissions. */
3293 new->socket_group = default_socket.socket_group;
3294 new->socket_permissions = default_socket.socket_permissions;
3296 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3297 &config_listen_address_list_len, new))
3298 {
3299 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3300 return (2);
3301 }
3302 }
3303 break;
3305 /* set socket group permissions */
3306 case 's':
3307 {
3308 gid_t group_gid;
3309 struct group *grp;
3311 group_gid = strtoul(optarg, NULL, 10);
3312 if (errno != EINVAL && group_gid>0)
3313 {
3314 /* we were passed a number */
3315 grp = getgrgid(group_gid);
3316 }
3317 else
3318 {
3319 grp = getgrnam(optarg);
3320 }
3322 if (grp)
3323 {
3324 default_socket.socket_group = grp->gr_gid;
3325 }
3326 else
3327 {
3328 /* no idea what the user wanted... */
3329 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3330 return (5);
3331 }
3332 }
3333 break;
3335 /* set socket file permissions */
3336 case 'm':
3337 {
3338 long tmp;
3339 char *endptr = NULL;
3341 tmp = strtol (optarg, &endptr, 8);
3342 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3343 || (tmp > 07777) || (tmp < 0)) {
3344 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3345 optarg);
3346 return (5);
3347 }
3349 default_socket.socket_permissions = (mode_t)tmp;
3350 }
3351 break;
3353 case 'P':
3354 {
3355 char *optcopy;
3356 char *saveptr;
3357 char *dummy;
3358 char *ptr;
3360 socket_permission_clear (&default_socket);
3362 optcopy = strdup (optarg);
3363 dummy = optcopy;
3364 saveptr = NULL;
3365 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3366 {
3367 dummy = NULL;
3368 status = socket_permission_add (&default_socket, ptr);
3369 if (status != 0)
3370 {
3371 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3372 "socket failed. Most likely, this permission doesn't "
3373 "exist. Check your command line.\n", ptr);
3374 status = 4;
3375 }
3376 }
3378 free (optcopy);
3379 }
3380 break;
3382 case 'f':
3383 {
3384 int temp;
3386 temp = atoi (optarg);
3387 if (temp > 0)
3388 config_flush_interval = temp;
3389 else
3390 {
3391 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3392 status = 3;
3393 }
3394 }
3395 break;
3397 case 'w':
3398 {
3399 int temp;
3401 temp = atoi (optarg);
3402 if (temp > 0)
3403 config_write_interval = temp;
3404 else
3405 {
3406 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3407 status = 2;
3408 }
3409 }
3410 break;
3412 case 'z':
3413 {
3414 int temp;
3416 temp = atoi(optarg);
3417 if (temp > 0)
3418 config_write_jitter = temp;
3419 else
3420 {
3421 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3422 status = 2;
3423 }
3425 break;
3426 }
3428 case 't':
3429 {
3430 int threads;
3431 threads = atoi(optarg);
3432 if (threads >= 1)
3433 config_queue_threads = threads;
3434 else
3435 {
3436 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3437 return 1;
3438 }
3439 }
3440 break;
3442 case 'B':
3443 config_write_base_only = 1;
3444 break;
3446 case 'b':
3447 {
3448 size_t len;
3449 char base_realpath[PATH_MAX];
3451 if (config_base_dir != NULL)
3452 free (config_base_dir);
3453 config_base_dir = strdup (optarg);
3454 if (config_base_dir == NULL)
3455 {
3456 fprintf (stderr, "read_options: strdup failed.\n");
3457 return (3);
3458 }
3460 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3461 {
3462 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3463 config_base_dir, rrd_strerror (errno));
3464 return (3);
3465 }
3467 /* make sure that the base directory is not resolved via
3468 * symbolic links. this makes some performance-enhancing
3469 * assumptions possible (we don't have to resolve paths
3470 * that start with a "/")
3471 */
3472 if (realpath(config_base_dir, base_realpath) == NULL)
3473 {
3474 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3475 "%s\n", config_base_dir, rrd_strerror(errno));
3476 return 5;
3477 }
3479 len = strlen (config_base_dir);
3480 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3481 {
3482 config_base_dir[len - 1] = 0;
3483 len--;
3484 }
3486 if (len < 1)
3487 {
3488 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3489 return (4);
3490 }
3492 _config_base_dir_len = len;
3494 len = strlen (base_realpath);
3495 while ((len > 0) && (base_realpath[len - 1] == '/'))
3496 {
3497 base_realpath[len - 1] = '\0';
3498 len--;
3499 }
3501 if (strncmp(config_base_dir,
3502 base_realpath, sizeof(base_realpath)) != 0)
3503 {
3504 fprintf(stderr,
3505 "Base directory (-b) resolved via file system links!\n"
3506 "Please consult rrdcached '-b' documentation!\n"
3507 "Consider specifying the real directory (%s)\n",
3508 base_realpath);
3509 return 5;
3510 }
3511 }
3512 break;
3514 case 'p':
3515 {
3516 if (config_pid_file != NULL)
3517 free (config_pid_file);
3518 config_pid_file = strdup (optarg);
3519 if (config_pid_file == NULL)
3520 {
3521 fprintf (stderr, "read_options: strdup failed.\n");
3522 return (3);
3523 }
3524 }
3525 break;
3527 case 'F':
3528 config_flush_at_shutdown = 1;
3529 break;
3531 case 'j':
3532 {
3533 char journal_dir_actual[PATH_MAX];
3534 const char *dir;
3535 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3537 status = rrd_mkdir_p(dir, 0777);
3538 if (status != 0)
3539 {
3540 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3541 dir, rrd_strerror(errno));
3542 return 6;
3543 }
3545 if (access(dir, R_OK|W_OK|X_OK) != 0)
3546 {
3547 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3548 errno ? rrd_strerror(errno) : "");
3549 return 6;
3550 }
3551 }
3552 break;
3554 case 'a':
3555 {
3556 int temp = atoi(optarg);
3557 if (temp > 0)
3558 config_alloc_chunk = temp;
3559 else
3560 {
3561 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3562 return 10;
3563 }
3564 }
3565 break;
3567 case 'h':
3568 case '?':
3569 printf ("RRDCacheD %s\n"
3570 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3571 "\n"
3572 "Usage: rrdcached [options]\n"
3573 "\n"
3574 "Valid options are:\n"
3575 " -l <address> Socket address to listen to.\n"
3576 " -P <perms> Sets the permissions to assign to all following "
3577 "sockets\n"
3578 " -w <seconds> Interval in which to write data.\n"
3579 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3580 " -t <threads> Number of write threads.\n"
3581 " -f <seconds> Interval in which to flush dead data.\n"
3582 " -p <file> Location of the PID-file.\n"
3583 " -b <dir> Base directory to change to.\n"
3584 " -B Restrict file access to paths within -b <dir>\n"
3585 " -g Do not fork and run in the foreground.\n"
3586 " -j <dir> Directory in which to create the journal files.\n"
3587 " -F Always flush all updates at shutdown\n"
3588 " -s <id|name> Group owner of all following UNIX sockets\n"
3589 " (the socket will also have read/write permissions "
3590 "for that group)\n"
3591 " -m <mode> File permissions (octal) of all following UNIX "
3592 "sockets\n"
3593 " -a <size> Memory allocation chunk size. Default is 1.\n"
3594 " -O Do not allow CREATE commands to overwrite existing\n"
3595 " files, even if asked to.\n"
3596 "\n"
3597 "For more information and a detailed description of all options "
3598 "please refer\n"
3599 "to the rrdcached(1) manual page.\n",
3600 VERSION);
3601 if (option == 'h')
3602 status = -1;
3603 else
3604 status = 1;
3605 break;
3606 } /* switch (option) */
3607 } /* while (getopt) */
3609 /* advise the user when values are not sane */
3610 if (config_flush_interval < 2 * config_write_interval)
3611 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3612 " 2x write interval (-w) !\n");
3613 if (config_write_jitter > config_write_interval)
3614 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3615 " write interval (-w) !\n");
3617 if (config_write_base_only && config_base_dir == NULL)
3618 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3619 " Consult the rrdcached documentation\n");
3621 if (journal_dir == NULL)
3622 config_flush_at_shutdown = 1;
3624 return (status);
3625 } /* }}} int read_options */
3627 int main (int argc, char **argv)
3628 {
3629 int status;
3631 status = read_options (argc, argv);
3632 if (status != 0)
3633 {
3634 if (status < 0)
3635 status = 0;
3636 return (status);
3637 }
3639 status = daemonize ();
3640 if (status != 0)
3641 {
3642 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3643 return (1);
3644 }
3646 journal_init();
3648 /* start the queue threads */
3649 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3650 if (queue_threads == NULL)
3651 {
3652 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3653 cleanup();
3654 return (1);
3655 }
3656 for (int i = 0; i < config_queue_threads; i++)
3657 {
3658 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3659 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3660 if (status != 0)
3661 {
3662 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3663 cleanup();
3664 return (1);
3665 }
3666 }
3668 /* start the flush thread */
3669 memset(&flush_thread, 0, sizeof(flush_thread));
3670 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3671 if (status != 0)
3672 {
3673 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3674 cleanup();
3675 return (1);
3676 }
3678 listen_thread_main (NULL);
3679 cleanup ();
3681 return (0);
3682 } /* int main */
3684 /*
3685 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3686 */