1a20974d2e46c848da9d12580110b7c6dac00627
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120 do { \
121 if (stay_foreground) { \
122 fprintf(stderr, __VA_ARGS__); \
123 fprintf(stderr, "\n"); } \
124 syslog ((severity), __VA_ARGS__); \
125 } while (0)
127 /*
128 * Types
129 */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
133 {
134 int fd;
135 char addr[PATH_MAX + 1];
136 int family;
138 /* state for BATCH processing */
139 time_t batch_start;
140 int batch_cmd;
142 /* buffered IO */
143 char *rbuf;
144 off_t next_cmd;
145 off_t next_read;
147 char *wbuf;
148 ssize_t wbuf_len;
150 uint32_t permissions;
152 gid_t socket_group;
153 mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
161 time_t UNUSED(now),\
162 char UNUSED(*buffer),\
163 size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO command_t UNUSED(*cmd),\
166 DISPATCH_PROTO
168 struct command_s {
169 char *cmd;
170 int (*handler)(HANDLER_PROTO);
172 char context; /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT (1<<0)
174 #define CMD_CONTEXT_BATCH (1<<1)
175 #define CMD_CONTEXT_JOURNAL (1<<2)
176 #define CMD_CONTEXT_ANY (0x7f)
178 char *syntax;
179 char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
185 {
186 char *file;
187 char **values;
188 size_t values_num; /* number of valid pointers */
189 size_t values_alloc; /* number of allocated pointers */
190 time_t last_flush_time;
191 time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194 int flags;
195 pthread_cond_t flushed;
196 cache_item_t *prev;
197 cache_item_t *next;
198 };
200 struct callback_flush_data_s
201 {
202 time_t now;
203 time_t abs_timeout;
204 char **keys;
205 size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
209 enum queue_side_e
210 {
211 HEAD,
212 TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
216 /* describe a set of journal files */
217 typedef struct {
218 char **files;
219 size_t files_num;
220 } journal_set;
222 /* max length of socket command or response */
223 #define CMD_MAX 4096
224 #define RBUF_SIZE (CMD_MAX*2)
226 /*
227 * Variables
228 */
229 static int stay_foreground = 0;
230 static uid_t daemon_uid;
232 static listen_socket_t *listen_fds = NULL;
233 static size_t listen_fds_num = 0;
235 static listen_socket_t default_socket;
237 enum {
238 RUNNING, /* normal operation */
239 FLUSHING, /* flushing remaining values */
240 SHUTDOWN /* shutting down */
241 } state = RUNNING;
243 static pthread_t *queue_threads;
244 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
245 static int config_queue_threads = 4;
247 static pthread_t flush_thread;
248 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
250 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
251 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
252 static int connection_threads_num = 0;
254 /* Cache stuff */
255 static GTree *cache_tree = NULL;
256 static cache_item_t *cache_queue_head = NULL;
257 static cache_item_t *cache_queue_tail = NULL;
258 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
260 static int config_write_interval = 300;
261 static int config_write_jitter = 0;
262 static int config_flush_interval = 3600;
263 static int config_flush_at_shutdown = 0;
264 static char *config_pid_file = NULL;
265 static char *config_base_dir = NULL;
266 static size_t _config_base_dir_len = 0;
267 static int config_write_base_only = 0;
268 static size_t config_alloc_chunk = 1;
270 static listen_socket_t **config_listen_address_list = NULL;
271 static size_t config_listen_address_list_len = 0;
273 static uint64_t stats_queue_length = 0;
274 static uint64_t stats_updates_received = 0;
275 static uint64_t stats_flush_received = 0;
276 static uint64_t stats_updates_written = 0;
277 static uint64_t stats_data_sets_written = 0;
278 static uint64_t stats_journal_bytes = 0;
279 static uint64_t stats_journal_rotate = 0;
280 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
282 static int opt_no_overwrite = 0; /* default for the daemon */
284 /* Journaled updates */
285 #define JOURNAL_REPLAY(s) ((s) == NULL)
286 #define JOURNAL_BASE "rrd.journal"
287 static journal_set *journal_cur = NULL;
288 static journal_set *journal_old = NULL;
289 static char *journal_dir = NULL;
290 static FILE *journal_fh = NULL; /* current journal file handle */
291 static long journal_size = 0; /* current journal size */
292 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
293 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
294 static int journal_write(char *cmd, char *args);
295 static void journal_done(void);
296 static void journal_rotate(void);
298 /* prototypes for forward refernces */
299 static int handle_request_help (HANDLER_PROTO);
301 /*
302 * Functions
303 */
304 static void sig_common (const char *sig) /* {{{ */
305 {
306 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
307 state = FLUSHING;
308 pthread_cond_broadcast(&flush_cond);
309 pthread_cond_broadcast(&queue_cond);
310 } /* }}} void sig_common */
312 static void sig_int_handler (int UNUSED(s)) /* {{{ */
313 {
314 sig_common("INT");
315 } /* }}} void sig_int_handler */
317 static void sig_term_handler (int UNUSED(s)) /* {{{ */
318 {
319 sig_common("TERM");
320 } /* }}} void sig_term_handler */
322 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
323 {
324 config_flush_at_shutdown = 1;
325 sig_common("USR1");
326 } /* }}} void sig_usr1_handler */
328 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
329 {
330 config_flush_at_shutdown = 0;
331 sig_common("USR2");
332 } /* }}} void sig_usr2_handler */
334 static void install_signal_handlers(void) /* {{{ */
335 {
336 /* These structures are static, because `sigaction' behaves weird if the are
337 * overwritten.. */
338 static struct sigaction sa_int;
339 static struct sigaction sa_term;
340 static struct sigaction sa_pipe;
341 static struct sigaction sa_usr1;
342 static struct sigaction sa_usr2;
344 /* Install signal handlers */
345 memset (&sa_int, 0, sizeof (sa_int));
346 sa_int.sa_handler = sig_int_handler;
347 sigaction (SIGINT, &sa_int, NULL);
349 memset (&sa_term, 0, sizeof (sa_term));
350 sa_term.sa_handler = sig_term_handler;
351 sigaction (SIGTERM, &sa_term, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_pipe));
354 sa_pipe.sa_handler = SIG_IGN;
355 sigaction (SIGPIPE, &sa_pipe, NULL);
357 memset (&sa_pipe, 0, sizeof (sa_usr1));
358 sa_usr1.sa_handler = sig_usr1_handler;
359 sigaction (SIGUSR1, &sa_usr1, NULL);
361 memset (&sa_usr2, 0, sizeof (sa_usr2));
362 sa_usr2.sa_handler = sig_usr2_handler;
363 sigaction (SIGUSR2, &sa_usr2, NULL);
365 } /* }}} void install_signal_handlers */
367 static int open_pidfile(char *action, int oflag) /* {{{ */
368 {
369 int fd;
370 const char *file;
371 char *file_copy, *dir;
373 file = (config_pid_file != NULL)
374 ? config_pid_file
375 : LOCALSTATEDIR "/run/rrdcached.pid";
377 /* dirname may modify its argument */
378 file_copy = strdup(file);
379 if (file_copy == NULL)
380 {
381 fprintf(stderr, "rrdcached: strdup(): %s\n",
382 rrd_strerror(errno));
383 return -1;
384 }
386 dir = dirname(file_copy);
387 if (rrd_mkdir_p(dir, 0777) != 0)
388 {
389 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
390 dir, rrd_strerror(errno));
391 return -1;
392 }
394 free(file_copy);
396 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
397 if (fd < 0)
398 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
399 action, file, rrd_strerror(errno));
401 return(fd);
402 } /* }}} static int open_pidfile */
404 /* check existing pid file to see whether a daemon is running */
405 static int check_pidfile(void)
406 {
407 int pid_fd;
408 pid_t pid;
409 char pid_str[16];
411 pid_fd = open_pidfile("open", O_RDWR);
412 if (pid_fd < 0)
413 return pid_fd;
415 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
416 return -1;
418 pid = atoi(pid_str);
419 if (pid <= 0)
420 return -1;
422 /* another running process that we can signal COULD be
423 * a competing rrdcached */
424 if (pid != getpid() && kill(pid, 0) == 0)
425 {
426 fprintf(stderr,
427 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
428 close(pid_fd);
429 return -1;
430 }
432 lseek(pid_fd, 0, SEEK_SET);
433 if (ftruncate(pid_fd, 0) == -1)
434 {
435 fprintf(stderr,
436 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
437 close(pid_fd);
438 return -1;
439 }
441 fprintf(stderr,
442 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
443 "rrdcached: starting normally.\n", pid);
445 return pid_fd;
446 } /* }}} static int check_pidfile */
448 static int write_pidfile (int fd) /* {{{ */
449 {
450 pid_t pid;
451 FILE *fh;
453 pid = getpid ();
455 fh = fdopen (fd, "w");
456 if (fh == NULL)
457 {
458 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
459 close(fd);
460 return (-1);
461 }
463 fprintf (fh, "%i\n", (int) pid);
464 fclose (fh);
466 return (0);
467 } /* }}} int write_pidfile */
469 static int remove_pidfile (void) /* {{{ */
470 {
471 char *file;
472 int status;
474 file = (config_pid_file != NULL)
475 ? config_pid_file
476 : LOCALSTATEDIR "/run/rrdcached.pid";
478 status = unlink (file);
479 if (status == 0)
480 return (0);
481 return (errno);
482 } /* }}} int remove_pidfile */
484 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
485 {
486 char *eol;
488 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
489 sock->next_read - sock->next_cmd);
491 if (eol == NULL)
492 {
493 /* no commands left, move remainder back to front of rbuf */
494 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
495 sock->next_read - sock->next_cmd);
496 sock->next_read -= sock->next_cmd;
497 sock->next_cmd = 0;
498 *len = 0;
499 return NULL;
500 }
501 else
502 {
503 char *cmd = sock->rbuf + sock->next_cmd;
504 *eol = '\0';
506 sock->next_cmd = eol - sock->rbuf + 1;
508 if (eol > sock->rbuf && *(eol-1) == '\r')
509 *(--eol) = '\0'; /* handle "\r\n" EOL */
511 *len = eol - cmd;
513 return cmd;
514 }
516 /* NOTREACHED */
517 assert(1==0);
518 } /* }}} char *next_cmd */
520 /* add the characters directly to the write buffer */
521 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
522 {
523 char *new_buf;
525 assert(sock != NULL);
527 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
528 if (new_buf == NULL)
529 {
530 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
531 return -1;
532 }
534 strncpy(new_buf + sock->wbuf_len, str, len + 1);
536 sock->wbuf = new_buf;
537 sock->wbuf_len += len;
539 return 0;
540 } /* }}} static int add_to_wbuf */
542 /* add the text to the "extra" info that's sent after the status line */
543 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
544 {
545 va_list argp;
546 char buffer[CMD_MAX];
547 int len;
549 if (JOURNAL_REPLAY(sock)) return 0;
550 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
552 va_start(argp, fmt);
553 #ifdef HAVE_VSNPRINTF
554 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
555 #else
556 len = vsprintf(buffer, fmt, argp);
557 #endif
558 va_end(argp);
559 if (len < 0)
560 {
561 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
562 return -1;
563 }
565 return add_to_wbuf(sock, buffer, len);
566 } /* }}} static int add_response_info */
568 static int count_lines(char *str) /* {{{ */
569 {
570 int lines = 0;
572 if (str != NULL)
573 {
574 while ((str = strchr(str, '\n')) != NULL)
575 {
576 ++lines;
577 ++str;
578 }
579 }
581 return lines;
582 } /* }}} static int count_lines */
584 /* send the response back to the user.
585 * returns 0 on success, -1 on error
586 * write buffer is always zeroed after this call */
587 static int send_response (listen_socket_t *sock, response_code rc,
588 char *fmt, ...) /* {{{ */
589 {
590 va_list argp;
591 char buffer[CMD_MAX];
592 int lines;
593 ssize_t wrote;
594 int rclen, len;
596 if (JOURNAL_REPLAY(sock)) return rc;
598 if (sock->batch_start)
599 {
600 if (rc == RESP_OK)
601 return rc; /* no response on success during BATCH */
602 lines = sock->batch_cmd;
603 }
604 else if (rc == RESP_OK)
605 lines = count_lines(sock->wbuf);
606 else
607 lines = -1;
609 rclen = sprintf(buffer, "%d ", lines);
610 va_start(argp, fmt);
611 #ifdef HAVE_VSNPRINTF
612 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
613 #else
614 len = vsprintf(buffer+rclen, fmt, argp);
615 #endif
616 va_end(argp);
617 if (len < 0)
618 return -1;
620 len += rclen;
622 /* append the result to the wbuf, don't write to the user */
623 if (sock->batch_start)
624 return add_to_wbuf(sock, buffer, len);
626 /* first write must be complete */
627 if (len != write(sock->fd, buffer, len))
628 {
629 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
630 return -1;
631 }
633 if (sock->wbuf != NULL && rc == RESP_OK)
634 {
635 wrote = 0;
636 while (wrote < sock->wbuf_len)
637 {
638 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
639 if (wb <= 0)
640 {
641 RRDD_LOG(LOG_INFO, "send_response: could not write results");
642 return -1;
643 }
644 wrote += wb;
645 }
646 }
648 free(sock->wbuf); sock->wbuf = NULL;
649 sock->wbuf_len = 0;
651 return 0;
652 } /* }}} */
654 static void wipe_ci_values(cache_item_t *ci, time_t when)
655 {
656 ci->values = NULL;
657 ci->values_num = 0;
658 ci->values_alloc = 0;
660 ci->last_flush_time = when;
661 if (config_write_jitter > 0)
662 ci->last_flush_time += (rrd_random() % config_write_jitter);
663 }
665 /* remove_from_queue
666 * remove a "cache_item_t" item from the queue.
667 * must hold 'cache_lock' when calling this
668 */
669 static void remove_from_queue(cache_item_t *ci) /* {{{ */
670 {
671 if (ci == NULL) return;
672 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
674 if (ci->prev == NULL)
675 cache_queue_head = ci->next; /* reset head */
676 else
677 ci->prev->next = ci->next;
679 if (ci->next == NULL)
680 cache_queue_tail = ci->prev; /* reset the tail */
681 else
682 ci->next->prev = ci->prev;
684 ci->next = ci->prev = NULL;
685 ci->flags &= ~CI_FLAGS_IN_QUEUE;
687 pthread_mutex_lock (&stats_lock);
688 assert (stats_queue_length > 0);
689 stats_queue_length--;
690 pthread_mutex_unlock (&stats_lock);
692 } /* }}} static void remove_from_queue */
694 /* free the resources associated with the cache_item_t
695 * must hold cache_lock when calling this function
696 */
697 static void *free_cache_item(cache_item_t *ci) /* {{{ */
698 {
699 if (ci == NULL) return NULL;
701 remove_from_queue(ci);
703 for (size_t i=0; i < ci->values_num; i++)
704 free(ci->values[i]);
706 free (ci->values);
707 free (ci->file);
709 /* in case anyone is waiting */
710 pthread_cond_broadcast(&ci->flushed);
711 pthread_cond_destroy(&ci->flushed);
713 free (ci);
715 return NULL;
716 } /* }}} static void *free_cache_item */
718 /*
719 * enqueue_cache_item:
720 * `cache_lock' must be acquired before calling this function!
721 */
722 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
723 queue_side_t side)
724 {
725 if (ci == NULL)
726 return (-1);
728 if (ci->values_num == 0)
729 return (0);
731 if (side == HEAD)
732 {
733 if (cache_queue_head == ci)
734 return 0;
736 /* remove if further down in queue */
737 remove_from_queue(ci);
739 ci->prev = NULL;
740 ci->next = cache_queue_head;
741 if (ci->next != NULL)
742 ci->next->prev = ci;
743 cache_queue_head = ci;
745 if (cache_queue_tail == NULL)
746 cache_queue_tail = cache_queue_head;
747 }
748 else /* (side == TAIL) */
749 {
750 /* We don't move values back in the list.. */
751 if (ci->flags & CI_FLAGS_IN_QUEUE)
752 return (0);
754 assert (ci->next == NULL);
755 assert (ci->prev == NULL);
757 ci->prev = cache_queue_tail;
759 if (cache_queue_tail == NULL)
760 cache_queue_head = ci;
761 else
762 cache_queue_tail->next = ci;
764 cache_queue_tail = ci;
765 }
767 ci->flags |= CI_FLAGS_IN_QUEUE;
769 pthread_cond_signal(&queue_cond);
770 pthread_mutex_lock (&stats_lock);
771 stats_queue_length++;
772 pthread_mutex_unlock (&stats_lock);
774 return (0);
775 } /* }}} int enqueue_cache_item */
777 /*
778 * tree_callback_flush:
779 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
780 * while this is in progress.
781 */
782 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
783 gpointer data)
784 {
785 cache_item_t *ci;
786 callback_flush_data_t *cfd;
788 ci = (cache_item_t *) value;
789 cfd = (callback_flush_data_t *) data;
791 if (ci->flags & CI_FLAGS_IN_QUEUE)
792 return FALSE;
794 if (ci->values_num > 0
795 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
796 {
797 enqueue_cache_item (ci, TAIL);
798 }
799 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
800 && (ci->values_num <= 0))
801 {
802 assert ((char *) key == ci->file);
803 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
804 {
805 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
806 return (FALSE);
807 }
808 }
810 return (FALSE);
811 } /* }}} gboolean tree_callback_flush */
813 static int flush_old_values (int max_age)
814 {
815 callback_flush_data_t cfd;
816 size_t k;
818 memset (&cfd, 0, sizeof (cfd));
819 /* Pass the current time as user data so that we don't need to call
820 * `time' for each node. */
821 cfd.now = time (NULL);
822 cfd.keys = NULL;
823 cfd.keys_num = 0;
825 if (max_age > 0)
826 cfd.abs_timeout = cfd.now - max_age;
827 else
828 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
830 /* `tree_callback_flush' will return the keys of all values that haven't
831 * been touched in the last `config_flush_interval' seconds in `cfd'.
832 * The char*'s in this array point to the same memory as ci->file, so we
833 * don't need to free them separately. */
834 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
836 for (k = 0; k < cfd.keys_num; k++)
837 {
838 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
839 /* should never fail, since we have held the cache_lock
840 * the entire time */
841 assert(status == TRUE);
842 }
844 if (cfd.keys != NULL)
845 {
846 free (cfd.keys);
847 cfd.keys = NULL;
848 }
850 return (0);
851 } /* int flush_old_values */
853 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
854 {
855 struct timeval now;
856 struct timespec next_flush;
857 int status;
859 gettimeofday (&now, NULL);
860 next_flush.tv_sec = now.tv_sec + config_flush_interval;
861 next_flush.tv_nsec = 1000 * now.tv_usec;
863 pthread_mutex_lock(&cache_lock);
865 while (state == RUNNING)
866 {
867 gettimeofday (&now, NULL);
868 if ((now.tv_sec > next_flush.tv_sec)
869 || ((now.tv_sec == next_flush.tv_sec)
870 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
871 {
872 RRDD_LOG(LOG_DEBUG, "flushing old values");
874 /* Determine the time of the next cache flush. */
875 next_flush.tv_sec = now.tv_sec + config_flush_interval;
877 /* Flush all values that haven't been written in the last
878 * `config_write_interval' seconds. */
879 flush_old_values (config_write_interval);
881 /* unlock the cache while we rotate so we don't block incoming
882 * updates if the fsync() blocks on disk I/O */
883 pthread_mutex_unlock(&cache_lock);
884 journal_rotate();
885 pthread_mutex_lock(&cache_lock);
886 }
888 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
889 if (status != 0 && status != ETIMEDOUT)
890 {
891 RRDD_LOG (LOG_ERR, "flush_thread_main: "
892 "pthread_cond_timedwait returned %i.", status);
893 }
894 }
896 if (config_flush_at_shutdown)
897 flush_old_values (-1); /* flush everything */
899 state = SHUTDOWN;
901 pthread_mutex_unlock(&cache_lock);
903 return NULL;
904 } /* void *flush_thread_main */
906 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
907 {
908 pthread_mutex_lock (&cache_lock);
910 while (state != SHUTDOWN
911 || (cache_queue_head != NULL && config_flush_at_shutdown))
912 {
913 cache_item_t *ci;
914 char *file;
915 char **values;
916 size_t values_num;
917 int status;
919 /* Now, check if there's something to store away. If not, wait until
920 * something comes in. */
921 if (cache_queue_head == NULL)
922 {
923 status = pthread_cond_wait (&queue_cond, &cache_lock);
924 if ((status != 0) && (status != ETIMEDOUT))
925 {
926 RRDD_LOG (LOG_ERR, "queue_thread_main: "
927 "pthread_cond_wait returned %i.", status);
928 }
929 }
931 /* Check if a value has arrived. This may be NULL if we timed out or there
932 * was an interrupt such as a signal. */
933 if (cache_queue_head == NULL)
934 continue;
936 ci = cache_queue_head;
938 /* copy the relevant parts */
939 file = strdup (ci->file);
940 if (file == NULL)
941 {
942 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
943 continue;
944 }
946 assert(ci->values != NULL);
947 assert(ci->values_num > 0);
949 values = ci->values;
950 values_num = ci->values_num;
952 wipe_ci_values(ci, time(NULL));
953 remove_from_queue(ci);
955 pthread_mutex_unlock (&cache_lock);
957 rrd_clear_error ();
958 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
959 if (status != 0)
960 {
961 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
962 "rrd_update_r (%s) failed with status %i. (%s)",
963 file, status, rrd_get_error());
964 }
966 journal_write("wrote", file);
968 /* Search again in the tree. It's possible someone issued a "FORGET"
969 * while we were writing the update values. */
970 pthread_mutex_lock(&cache_lock);
971 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
972 if (ci)
973 pthread_cond_broadcast(&ci->flushed);
974 pthread_mutex_unlock(&cache_lock);
976 if (status == 0)
977 {
978 pthread_mutex_lock (&stats_lock);
979 stats_updates_written++;
980 stats_data_sets_written += values_num;
981 pthread_mutex_unlock (&stats_lock);
982 }
984 rrd_free_ptrs((void ***) &values, &values_num);
985 free(file);
987 pthread_mutex_lock (&cache_lock);
988 }
989 pthread_mutex_unlock (&cache_lock);
991 return (NULL);
992 } /* }}} void *queue_thread_main */
994 static int buffer_get_field (char **buffer_ret, /* {{{ */
995 size_t *buffer_size_ret, char **field_ret)
996 {
997 char *buffer;
998 size_t buffer_pos;
999 size_t buffer_size;
1000 char *field;
1001 size_t field_size;
1002 int status;
1004 buffer = *buffer_ret;
1005 buffer_pos = 0;
1006 buffer_size = *buffer_size_ret;
1007 field = *buffer_ret;
1008 field_size = 0;
1010 if (buffer_size <= 0)
1011 return (-1);
1013 /* This is ensured by `handle_request'. */
1014 assert (buffer[buffer_size - 1] == '\0');
1016 status = -1;
1017 while (buffer_pos < buffer_size)
1018 {
1019 /* Check for end-of-field or end-of-buffer */
1020 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1021 {
1022 field[field_size] = 0;
1023 field_size++;
1024 buffer_pos++;
1025 status = 0;
1026 break;
1027 }
1028 /* Handle escaped characters. */
1029 else if (buffer[buffer_pos] == '\\')
1030 {
1031 if (buffer_pos >= (buffer_size - 1))
1032 break;
1033 buffer_pos++;
1034 field[field_size] = buffer[buffer_pos];
1035 field_size++;
1036 buffer_pos++;
1037 }
1038 /* Normal operation */
1039 else
1040 {
1041 field[field_size] = buffer[buffer_pos];
1042 field_size++;
1043 buffer_pos++;
1044 }
1045 } /* while (buffer_pos < buffer_size) */
1047 if (status != 0)
1048 return (status);
1050 *buffer_ret = buffer + buffer_pos;
1051 *buffer_size_ret = buffer_size - buffer_pos;
1052 *field_ret = field;
1054 return (0);
1055 } /* }}} int buffer_get_field */
1057 /* if we're restricting writes to the base directory,
1058 * check whether the file falls within the dir
1059 * returns 1 if OK, otherwise 0
1060 */
1061 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1062 {
1063 assert(file != NULL);
1065 if (!config_write_base_only
1066 || JOURNAL_REPLAY(sock)
1067 || config_base_dir == NULL)
1068 return 1;
1070 if (strstr(file, "../") != NULL) goto err;
1072 /* relative paths without "../" are ok */
1073 if (*file != '/') return 1;
1075 /* file must be of the format base + "/" + <1+ char filename> */
1076 if (strlen(file) < _config_base_dir_len + 2) goto err;
1077 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1078 if (*(file + _config_base_dir_len) != '/') goto err;
1080 return 1;
1082 err:
1083 if (sock != NULL && sock->fd >= 0)
1084 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1086 return 0;
1087 } /* }}} static int check_file_access */
1089 /* when using a base dir, convert relative paths to absolute paths.
1090 * if necessary, modifies the "filename" pointer to point
1091 * to the new path created in "tmp". "tmp" is provided
1092 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1093 *
1094 * this allows us to optimize for the expected case (absolute path)
1095 * with a no-op.
1096 */
1097 static void get_abs_path(char **filename, char *tmp)
1098 {
1099 assert(tmp != NULL);
1100 assert(filename != NULL && *filename != NULL);
1102 if (config_base_dir == NULL || **filename == '/')
1103 return;
1105 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1106 *filename = tmp;
1107 } /* }}} static int get_abs_path */
1109 static int flush_file (const char *filename) /* {{{ */
1110 {
1111 cache_item_t *ci;
1113 pthread_mutex_lock (&cache_lock);
1115 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1116 if (ci == NULL)
1117 {
1118 pthread_mutex_unlock (&cache_lock);
1119 return (ENOENT);
1120 }
1122 if (ci->values_num > 0)
1123 {
1124 /* Enqueue at head */
1125 enqueue_cache_item (ci, HEAD);
1126 pthread_cond_wait(&ci->flushed, &cache_lock);
1127 }
1129 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1130 * may have been purged during our cond_wait() */
1132 pthread_mutex_unlock(&cache_lock);
1134 return (0);
1135 } /* }}} int flush_file */
1137 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1138 {
1139 char *err = "Syntax error.\n";
1141 if (cmd && cmd->syntax)
1142 err = cmd->syntax;
1144 return send_response(sock, RESP_ERR, "Usage: %s", err);
1145 } /* }}} static int syntax_error() */
1147 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1148 {
1149 uint64_t copy_queue_length;
1150 uint64_t copy_updates_received;
1151 uint64_t copy_flush_received;
1152 uint64_t copy_updates_written;
1153 uint64_t copy_data_sets_written;
1154 uint64_t copy_journal_bytes;
1155 uint64_t copy_journal_rotate;
1157 uint64_t tree_nodes_number;
1158 uint64_t tree_depth;
1160 pthread_mutex_lock (&stats_lock);
1161 copy_queue_length = stats_queue_length;
1162 copy_updates_received = stats_updates_received;
1163 copy_flush_received = stats_flush_received;
1164 copy_updates_written = stats_updates_written;
1165 copy_data_sets_written = stats_data_sets_written;
1166 copy_journal_bytes = stats_journal_bytes;
1167 copy_journal_rotate = stats_journal_rotate;
1168 pthread_mutex_unlock (&stats_lock);
1170 pthread_mutex_lock (&cache_lock);
1171 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1172 tree_depth = (uint64_t) g_tree_height (cache_tree);
1173 pthread_mutex_unlock (&cache_lock);
1175 add_response_info(sock,
1176 "QueueLength: %"PRIu64"\n", copy_queue_length);
1177 add_response_info(sock,
1178 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1179 add_response_info(sock,
1180 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1181 add_response_info(sock,
1182 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1183 add_response_info(sock,
1184 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1185 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1186 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1187 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1188 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1190 send_response(sock, RESP_OK, "Statistics follow\n");
1192 return (0);
1193 } /* }}} int handle_request_stats */
1195 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1196 {
1197 char *file, file_tmp[PATH_MAX];
1198 int status;
1200 status = buffer_get_field (&buffer, &buffer_size, &file);
1201 if (status != 0)
1202 {
1203 return syntax_error(sock,cmd);
1204 }
1205 else
1206 {
1207 pthread_mutex_lock(&stats_lock);
1208 stats_flush_received++;
1209 pthread_mutex_unlock(&stats_lock);
1211 get_abs_path(&file, file_tmp);
1212 if (!check_file_access(file, sock)) return 0;
1214 status = flush_file (file);
1215 if (status == 0)
1216 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1217 else if (status == ENOENT)
1218 {
1219 /* no file in our tree; see whether it exists at all */
1220 struct stat statbuf;
1222 memset(&statbuf, 0, sizeof(statbuf));
1223 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1224 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1225 else
1226 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1227 }
1228 else if (status < 0)
1229 return send_response(sock, RESP_ERR, "Internal error.\n");
1230 else
1231 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1232 }
1234 /* NOTREACHED */
1235 assert(1==0);
1236 } /* }}} int handle_request_flush */
1238 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1239 {
1240 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1242 pthread_mutex_lock(&cache_lock);
1243 flush_old_values(-1);
1244 pthread_mutex_unlock(&cache_lock);
1246 return send_response(sock, RESP_OK, "Started flush.\n");
1247 } /* }}} static int handle_request_flushall */
1249 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1250 {
1251 int status;
1252 char *file, file_tmp[PATH_MAX];
1253 cache_item_t *ci;
1255 status = buffer_get_field(&buffer, &buffer_size, &file);
1256 if (status != 0)
1257 return syntax_error(sock,cmd);
1259 get_abs_path(&file, file_tmp);
1261 pthread_mutex_lock(&cache_lock);
1262 ci = g_tree_lookup(cache_tree, file);
1263 if (ci == NULL)
1264 {
1265 pthread_mutex_unlock(&cache_lock);
1266 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267 }
1269 for (size_t i=0; i < ci->values_num; i++)
1270 add_response_info(sock, "%s\n", ci->values[i]);
1272 pthread_mutex_unlock(&cache_lock);
1273 return send_response(sock, RESP_OK, "updates pending\n");
1274 } /* }}} static int handle_request_pending */
1276 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1277 {
1278 int status;
1279 gboolean found;
1280 char *file, file_tmp[PATH_MAX];
1282 status = buffer_get_field(&buffer, &buffer_size, &file);
1283 if (status != 0)
1284 return syntax_error(sock,cmd);
1286 get_abs_path(&file, file_tmp);
1287 if (!check_file_access(file, sock)) return 0;
1289 pthread_mutex_lock(&cache_lock);
1290 found = g_tree_remove(cache_tree, file);
1291 pthread_mutex_unlock(&cache_lock);
1293 if (found == TRUE)
1294 {
1295 if (!JOURNAL_REPLAY(sock))
1296 journal_write("forget", file);
1298 return send_response(sock, RESP_OK, "Gone!\n");
1299 }
1300 else
1301 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1303 /* NOTREACHED */
1304 assert(1==0);
1305 } /* }}} static int handle_request_forget */
1307 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1308 {
1309 cache_item_t *ci;
1311 pthread_mutex_lock(&cache_lock);
1313 ci = cache_queue_head;
1314 while (ci != NULL)
1315 {
1316 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1317 ci = ci->next;
1318 }
1320 pthread_mutex_unlock(&cache_lock);
1322 return send_response(sock, RESP_OK, "in queue.\n");
1323 } /* }}} int handle_request_queue */
1325 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1326 {
1327 char *file, file_tmp[PATH_MAX];
1328 int values_num = 0;
1329 int status;
1330 char orig_buf[CMD_MAX];
1332 cache_item_t *ci;
1334 /* save it for the journal later */
1335 if (!JOURNAL_REPLAY(sock))
1336 strncpy(orig_buf, buffer, buffer_size);
1338 status = buffer_get_field (&buffer, &buffer_size, &file);
1339 if (status != 0)
1340 return syntax_error(sock,cmd);
1342 pthread_mutex_lock(&stats_lock);
1343 stats_updates_received++;
1344 pthread_mutex_unlock(&stats_lock);
1346 get_abs_path(&file, file_tmp);
1347 if (!check_file_access(file, sock)) return 0;
1349 pthread_mutex_lock (&cache_lock);
1350 ci = g_tree_lookup (cache_tree, file);
1352 if (ci == NULL) /* {{{ */
1353 {
1354 struct stat statbuf;
1355 cache_item_t *tmp;
1357 /* don't hold the lock while we setup; stat(2) might block */
1358 pthread_mutex_unlock(&cache_lock);
1360 memset (&statbuf, 0, sizeof (statbuf));
1361 status = stat (file, &statbuf);
1362 if (status != 0)
1363 {
1364 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1366 status = errno;
1367 if (status == ENOENT)
1368 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1369 else
1370 return send_response(sock, RESP_ERR,
1371 "stat failed with error %i.\n", status);
1372 }
1373 if (!S_ISREG (statbuf.st_mode))
1374 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1376 if (access(file, R_OK|W_OK) != 0)
1377 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1378 file, rrd_strerror(errno));
1380 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1381 if (ci == NULL)
1382 {
1383 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1385 return send_response(sock, RESP_ERR, "malloc failed.\n");
1386 }
1387 memset (ci, 0, sizeof (cache_item_t));
1389 ci->file = strdup (file);
1390 if (ci->file == NULL)
1391 {
1392 free (ci);
1393 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1395 return send_response(sock, RESP_ERR, "strdup failed.\n");
1396 }
1398 wipe_ci_values(ci, now);
1399 ci->flags = CI_FLAGS_IN_TREE;
1400 pthread_cond_init(&ci->flushed, NULL);
1402 pthread_mutex_lock(&cache_lock);
1404 /* another UPDATE might have added this entry in the meantime */
1405 tmp = g_tree_lookup (cache_tree, file);
1406 if (tmp == NULL)
1407 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1408 else
1409 {
1410 free_cache_item (ci);
1411 ci = tmp;
1412 }
1414 /* state may have changed while we were unlocked */
1415 if (state == SHUTDOWN)
1416 return -1;
1417 } /* }}} */
1418 assert (ci != NULL);
1420 /* don't re-write updates in replay mode */
1421 if (!JOURNAL_REPLAY(sock))
1422 journal_write("update", orig_buf);
1424 while (buffer_size > 0)
1425 {
1426 char *value;
1427 time_t stamp;
1428 char *eostamp;
1430 status = buffer_get_field (&buffer, &buffer_size, &value);
1431 if (status != 0)
1432 {
1433 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1434 break;
1435 }
1437 /* make sure update time is always moving forward */
1438 stamp = strtol(value, &eostamp, 10);
1439 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1440 {
1441 pthread_mutex_unlock(&cache_lock);
1442 return send_response(sock, RESP_ERR,
1443 "Cannot find timestamp in '%s'!\n", value);
1444 }
1445 else if (stamp <= ci->last_update_stamp)
1446 {
1447 pthread_mutex_unlock(&cache_lock);
1448 return send_response(sock, RESP_ERR,
1449 "illegal attempt to update using time %ld when last"
1450 " update time is %ld (minimum one second step)\n",
1451 stamp, ci->last_update_stamp);
1452 }
1453 else
1454 ci->last_update_stamp = stamp;
1456 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1457 &ci->values_alloc, config_alloc_chunk))
1458 {
1459 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1460 continue;
1461 }
1463 values_num++;
1464 }
1466 if (((now - ci->last_flush_time) >= config_write_interval)
1467 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1468 && (ci->values_num > 0))
1469 {
1470 enqueue_cache_item (ci, TAIL);
1471 }
1473 pthread_mutex_unlock (&cache_lock);
1475 if (values_num < 1)
1476 return send_response(sock, RESP_ERR, "No values updated.\n");
1477 else
1478 return send_response(sock, RESP_OK,
1479 "errors, enqueued %i value(s).\n", values_num);
1481 /* NOTREACHED */
1482 assert(1==0);
1484 } /* }}} int handle_request_update */
1486 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1487 {
1488 char *file, file_tmp[PATH_MAX];
1489 char *cf;
1491 char *start_str;
1492 char *end_str;
1493 time_t start_tm;
1494 time_t end_tm;
1496 unsigned long step;
1497 unsigned long ds_cnt;
1498 char **ds_namv;
1499 rrd_value_t *data;
1501 int status;
1502 unsigned long i;
1503 time_t t;
1504 rrd_value_t *data_ptr;
1506 file = NULL;
1507 cf = NULL;
1508 start_str = NULL;
1509 end_str = NULL;
1511 /* Read the arguments */
1512 do /* while (0) */
1513 {
1514 status = buffer_get_field (&buffer, &buffer_size, &file);
1515 if (status != 0)
1516 break;
1518 status = buffer_get_field (&buffer, &buffer_size, &cf);
1519 if (status != 0)
1520 break;
1522 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1523 if (status != 0)
1524 {
1525 start_str = NULL;
1526 status = 0;
1527 break;
1528 }
1530 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1531 if (status != 0)
1532 {
1533 end_str = NULL;
1534 status = 0;
1535 break;
1536 }
1537 } while (0);
1539 if (status != 0)
1540 return (syntax_error(sock,cmd));
1542 get_abs_path(&file, file_tmp);
1543 if (!check_file_access(file, sock)) return 0;
1545 status = flush_file (file);
1546 if ((status != 0) && (status != ENOENT))
1547 return (send_response (sock, RESP_ERR,
1548 "flush_file (%s) failed with status %i.\n", file, status));
1550 t = time (NULL); /* "now" */
1552 /* Parse start time */
1553 if (start_str != NULL)
1554 {
1555 char *endptr;
1556 long value;
1558 endptr = NULL;
1559 errno = 0;
1560 value = strtol (start_str, &endptr, /* base = */ 0);
1561 if ((endptr == start_str) || (errno != 0))
1562 return (send_response(sock, RESP_ERR,
1563 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1564 start_str));
1566 if (value > 0)
1567 start_tm = (time_t) value;
1568 else
1569 start_tm = (time_t) (t + value);
1570 }
1571 else
1572 {
1573 start_tm = t - 86400;
1574 }
1576 /* Parse end time */
1577 if (end_str != NULL)
1578 {
1579 char *endptr;
1580 long value;
1582 endptr = NULL;
1583 errno = 0;
1584 value = strtol (end_str, &endptr, /* base = */ 0);
1585 if ((endptr == end_str) || (errno != 0))
1586 return (send_response(sock, RESP_ERR,
1587 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1588 end_str));
1590 if (value > 0)
1591 end_tm = (time_t) value;
1592 else
1593 end_tm = (time_t) (t + value);
1594 }
1595 else
1596 {
1597 end_tm = t;
1598 }
1600 step = -1;
1601 ds_cnt = 0;
1602 ds_namv = NULL;
1603 data = NULL;
1605 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1606 &ds_cnt, &ds_namv, &data);
1607 if (status != 0)
1608 return (send_response(sock, RESP_ERR,
1609 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1611 add_response_info (sock, "FlushVersion: %lu\n", 1);
1612 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1613 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1614 add_response_info (sock, "Step: %lu\n", step);
1615 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1617 #define SSTRCAT(buffer,str,buffer_fill) do { \
1618 size_t str_len = strlen (str); \
1619 if ((buffer_fill + str_len) > sizeof (buffer)) \
1620 str_len = sizeof (buffer) - buffer_fill; \
1621 if (str_len > 0) { \
1622 strncpy (buffer + buffer_fill, str, str_len); \
1623 buffer_fill += str_len; \
1624 assert (buffer_fill <= sizeof (buffer)); \
1625 if (buffer_fill == sizeof (buffer)) \
1626 buffer[buffer_fill - 1] = 0; \
1627 else \
1628 buffer[buffer_fill] = 0; \
1629 } \
1630 } while (0)
1632 { /* Add list of DS names */
1633 char linebuf[1024];
1634 size_t linebuf_fill;
1636 memset (linebuf, 0, sizeof (linebuf));
1637 linebuf_fill = 0;
1638 for (i = 0; i < ds_cnt; i++)
1639 {
1640 if (i > 0)
1641 SSTRCAT (linebuf, " ", linebuf_fill);
1642 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1643 rrd_freemem(ds_namv[i]);
1644 }
1645 rrd_freemem(ds_namv);
1646 add_response_info (sock, "DSName: %s\n", linebuf);
1647 }
1649 /* Add the actual data */
1650 assert (step > 0);
1651 data_ptr = data;
1652 for (t = start_tm + step; t <= end_tm; t += step)
1653 {
1654 char linebuf[1024];
1655 size_t linebuf_fill;
1656 char tmp[128];
1658 memset (linebuf, 0, sizeof (linebuf));
1659 linebuf_fill = 0;
1660 for (i = 0; i < ds_cnt; i++)
1661 {
1662 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1663 tmp[sizeof (tmp) - 1] = 0;
1664 SSTRCAT (linebuf, tmp, linebuf_fill);
1666 data_ptr++;
1667 }
1669 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1670 } /* for (t) */
1671 rrd_freemem(data);
1673 return (send_response (sock, RESP_OK, "Success\n"));
1674 #undef SSTRCAT
1675 } /* }}} int handle_request_fetch */
1677 /* we came across a "WROTE" entry during journal replay.
1678 * throw away any values that we have accumulated for this file
1679 */
1680 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1681 {
1682 cache_item_t *ci;
1683 const char *file = buffer;
1685 pthread_mutex_lock(&cache_lock);
1687 ci = g_tree_lookup(cache_tree, file);
1688 if (ci == NULL)
1689 {
1690 pthread_mutex_unlock(&cache_lock);
1691 return (0);
1692 }
1694 if (ci->values)
1695 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1697 wipe_ci_values(ci, now);
1698 remove_from_queue(ci);
1700 pthread_mutex_unlock(&cache_lock);
1701 return (0);
1702 } /* }}} int handle_request_wrote */
1704 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1705 {
1706 char *file, file_tmp[PATH_MAX];
1707 int status;
1708 rrd_info_t *info;
1710 /* obtain filename */
1711 status = buffer_get_field(&buffer, &buffer_size, &file);
1712 if (status != 0)
1713 return syntax_error(sock,cmd);
1714 /* get full pathname */
1715 get_abs_path(&file, file_tmp);
1716 if (!check_file_access(file, sock)) {
1717 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1718 }
1719 /* get data */
1720 rrd_clear_error ();
1721 info = rrd_info_r(file);
1722 if(!info) {
1723 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1724 }
1725 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1726 switch (data->type) {
1727 case RD_I_VAL:
1728 if (isnan(data->value.u_val))
1729 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1730 else
1731 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1732 break;
1733 case RD_I_CNT:
1734 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1735 break;
1736 case RD_I_INT:
1737 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1738 break;
1739 case RD_I_STR:
1740 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1741 break;
1742 case RD_I_BLO:
1743 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1744 break;
1745 }
1746 }
1748 rrd_info_free(info);
1750 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1751 } /* }}} static int handle_request_info */
1753 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1754 {
1755 char *i, *file, file_tmp[PATH_MAX];
1756 int status;
1757 int idx;
1758 time_t t;
1760 /* obtain filename */
1761 status = buffer_get_field(&buffer, &buffer_size, &file);
1762 if (status != 0)
1763 return syntax_error(sock,cmd);
1764 /* get full pathname */
1765 get_abs_path(&file, file_tmp);
1766 if (!check_file_access(file, sock)) {
1767 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1768 }
1770 status = buffer_get_field(&buffer, &buffer_size, &i);
1771 if (status != 0)
1772 return syntax_error(sock,cmd);
1773 idx = atoi(i);
1774 if(idx<0) {
1775 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1776 }
1778 /* get data */
1779 rrd_clear_error ();
1780 t = rrd_first_r(file,idx);
1781 if(t<1) {
1782 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1783 }
1784 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1785 } /* }}} static int handle_request_first */
1788 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1789 {
1790 char *file, file_tmp[PATH_MAX];
1791 int status;
1792 time_t t, from_file, step;
1793 rrd_file_t * rrd_file;
1794 cache_item_t * ci;
1795 rrd_t rrd;
1797 /* obtain filename */
1798 status = buffer_get_field(&buffer, &buffer_size, &file);
1799 if (status != 0)
1800 return syntax_error(sock,cmd);
1801 /* get full pathname */
1802 get_abs_path(&file, file_tmp);
1803 if (!check_file_access(file, sock)) {
1804 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1805 }
1806 rrd_clear_error();
1807 rrd_init(&rrd);
1808 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1809 if(!rrd_file) {
1810 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1811 }
1812 from_file = rrd.live_head->last_up;
1813 step = rrd.stat_head->pdp_step;
1814 rrd_close(rrd_file);
1815 pthread_mutex_lock(&cache_lock);
1816 ci = g_tree_lookup(cache_tree, file);
1817 if (ci)
1818 t = ci->last_update_stamp;
1819 else
1820 t = from_file;
1821 pthread_mutex_unlock(&cache_lock);
1822 t -= t % step;
1823 rrd_free(&rrd);
1824 if(t<1) {
1825 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1826 }
1827 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1828 } /* }}} static int handle_request_last */
1830 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1831 {
1832 char *file, file_tmp[PATH_MAX];
1833 char *tok;
1834 int ac = 0;
1835 char *av[128];
1836 int status;
1837 unsigned long step = 300;
1838 time_t last_up = time(NULL)-10;
1839 int no_overwrite = opt_no_overwrite;
1842 /* obtain filename */
1843 status = buffer_get_field(&buffer, &buffer_size, &file);
1844 if (status != 0)
1845 return syntax_error(sock,cmd);
1846 /* get full pathname */
1847 get_abs_path(&file, file_tmp);
1848 if (!check_file_access(file, sock)) {
1849 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1850 }
1851 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1853 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1854 if( ! strncmp(tok,"-b",2) ) {
1855 status = buffer_get_field(&buffer, &buffer_size, &tok );
1856 if (status != 0) return syntax_error(sock,cmd);
1857 last_up = (time_t) atol(tok);
1858 continue;
1859 }
1860 if( ! strncmp(tok,"-s",2) ) {
1861 status = buffer_get_field(&buffer, &buffer_size, &tok );
1862 if (status != 0) return syntax_error(sock,cmd);
1863 step = atol(tok);
1864 continue;
1865 }
1866 if( ! strncmp(tok,"-O",2) ) {
1867 no_overwrite = 1;
1868 continue;
1869 }
1870 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1871 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1872 return syntax_error(sock,cmd);
1873 }
1874 if(step<1) {
1875 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1876 }
1877 if (last_up < 3600 * 24 * 365 * 10) {
1878 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1879 }
1881 rrd_clear_error ();
1882 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1884 if(!status) {
1885 return send_response(sock, RESP_OK, "RRD created OK\n");
1886 }
1887 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1888 } /* }}} static int handle_request_create */
1890 /* start "BATCH" processing */
1891 static int batch_start (HANDLER_PROTO) /* {{{ */
1892 {
1893 int status;
1894 if (sock->batch_start)
1895 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1897 status = send_response(sock, RESP_OK,
1898 "Go ahead. End with dot '.' on its own line.\n");
1899 sock->batch_start = time(NULL);
1900 sock->batch_cmd = 0;
1902 return status;
1903 } /* }}} static int batch_start */
1905 /* finish "BATCH" processing and return results to the client */
1906 static int batch_done (HANDLER_PROTO) /* {{{ */
1907 {
1908 assert(sock->batch_start);
1909 sock->batch_start = 0;
1910 sock->batch_cmd = 0;
1911 return send_response(sock, RESP_OK, "errors\n");
1912 } /* }}} static int batch_done */
1914 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1915 {
1916 return -1;
1917 } /* }}} static int handle_request_quit */
1919 static command_t list_of_commands[] = { /* {{{ */
1920 {
1921 "UPDATE",
1922 handle_request_update,
1923 CMD_CONTEXT_ANY,
1924 "UPDATE <filename> <values> [<values> ...]\n"
1925 ,
1926 "Adds the given file to the internal cache if it is not yet known and\n"
1927 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1928 "for details.\n"
1929 "\n"
1930 "Each <values> has the following form:\n"
1931 " <values> = <time>:<value>[:<value>[...]]\n"
1932 "See the rrdupdate(1) manpage for details.\n"
1933 },
1934 {
1935 "WROTE",
1936 handle_request_wrote,
1937 CMD_CONTEXT_JOURNAL,
1938 NULL,
1939 NULL
1940 },
1941 {
1942 "FLUSH",
1943 handle_request_flush,
1944 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1945 "FLUSH <filename>\n"
1946 ,
1947 "Adds the given filename to the head of the update queue and returns\n"
1948 "after it has been dequeued.\n"
1949 },
1950 {
1951 "FLUSHALL",
1952 handle_request_flushall,
1953 CMD_CONTEXT_CLIENT,
1954 "FLUSHALL\n"
1955 ,
1956 "Triggers writing of all pending updates. Returns immediately.\n"
1957 },
1958 {
1959 "PENDING",
1960 handle_request_pending,
1961 CMD_CONTEXT_CLIENT,
1962 "PENDING <filename>\n"
1963 ,
1964 "Shows any 'pending' updates for a file, in order.\n"
1965 "The updates shown have not yet been written to the underlying RRD file.\n"
1966 },
1967 {
1968 "FORGET",
1969 handle_request_forget,
1970 CMD_CONTEXT_ANY,
1971 "FORGET <filename>\n"
1972 ,
1973 "Removes the file completely from the cache.\n"
1974 "Any pending updates for the file will be lost.\n"
1975 },
1976 {
1977 "QUEUE",
1978 handle_request_queue,
1979 CMD_CONTEXT_CLIENT,
1980 "QUEUE\n"
1981 ,
1982 "Shows all files in the output queue.\n"
1983 "The output is zero or more lines in the following format:\n"
1984 "(where <num_vals> is the number of values to be written)\n"
1985 "\n"
1986 "<num_vals> <filename>\n"
1987 },
1988 {
1989 "STATS",
1990 handle_request_stats,
1991 CMD_CONTEXT_CLIENT,
1992 "STATS\n"
1993 ,
1994 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1995 "a description of the values.\n"
1996 },
1997 {
1998 "HELP",
1999 handle_request_help,
2000 CMD_CONTEXT_CLIENT,
2001 "HELP [<command>]\n",
2002 NULL, /* special! */
2003 },
2004 {
2005 "BATCH",
2006 batch_start,
2007 CMD_CONTEXT_CLIENT,
2008 "BATCH\n"
2009 ,
2010 "The 'BATCH' command permits the client to initiate a bulk load\n"
2011 " of commands to rrdcached.\n"
2012 "\n"
2013 "Usage:\n"
2014 "\n"
2015 " client: BATCH\n"
2016 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2017 " client: command #1\n"
2018 " client: command #2\n"
2019 " client: ... and so on\n"
2020 " client: .\n"
2021 " server: 2 errors\n"
2022 " server: 7 message for command #7\n"
2023 " server: 9 message for command #9\n"
2024 "\n"
2025 "For more information, consult the rrdcached(1) documentation.\n"
2026 },
2027 {
2028 ".", /* BATCH terminator */
2029 batch_done,
2030 CMD_CONTEXT_BATCH,
2031 NULL,
2032 NULL
2033 },
2034 {
2035 "FETCH",
2036 handle_request_fetch,
2037 CMD_CONTEXT_CLIENT,
2038 "FETCH <file> <CF> [<start> [<end>]]\n"
2039 ,
2040 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2041 },
2042 {
2043 "INFO",
2044 handle_request_info,
2045 CMD_CONTEXT_CLIENT,
2046 "INFO <filename>\n",
2047 "The INFO command retrieves information about a specified RRD file.\n"
2048 "This is returned in standard rrdinfo format, a sequence of lines\n"
2049 "with the format <keyname> = <value>\n"
2050 "Note that this is the data as of the last update of the RRD file itself,\n"
2051 "not the last time data was received via rrdcached, so there may be pending\n"
2052 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2053 },
2054 {
2055 "FIRST",
2056 handle_request_first,
2057 CMD_CONTEXT_CLIENT,
2058 "FIRST <filename> <rra index>\n",
2059 "The FIRST command retrieves the first data time for a specified RRA in\n"
2060 "an RRD file.\n"
2061 },
2062 {
2063 "LAST",
2064 handle_request_last,
2065 CMD_CONTEXT_CLIENT,
2066 "LAST <filename>\n",
2067 "The LAST command retrieves the last update time for a specified RRD file.\n"
2068 "Note that this is the time of the last update of the RRD file itself, not\n"
2069 "the last time data was received via rrdcached, so there may be pending\n"
2070 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2071 },
2072 {
2073 "CREATE",
2074 handle_request_create,
2075 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2076 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2077 "The CREATE command will create an RRD file, overwriting any existing file\n"
2078 "unless the -O option is given or rrdcached was started with the -O option.\n"
2079 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2080 "not acceptable) and the step is in seconds (default is 300).\n"
2081 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2082 },
2083 {
2084 "QUIT",
2085 handle_request_quit,
2086 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2087 "QUIT\n"
2088 ,
2089 "Disconnect from rrdcached.\n"
2090 }
2091 }; /* }}} command_t list_of_commands[] */
2092 static size_t list_of_commands_len = sizeof (list_of_commands)
2093 / sizeof (list_of_commands[0]);
2095 static command_t *find_command(char *cmd)
2096 {
2097 size_t i;
2099 for (i = 0; i < list_of_commands_len; i++)
2100 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2101 return (&list_of_commands[i]);
2102 return NULL;
2103 }
2105 /* We currently use the index in the `list_of_commands' array as a bit position
2106 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2107 * outside these functions so that switching to a more elegant storage method
2108 * is easily possible. */
2109 static ssize_t find_command_index (const char *cmd) /* {{{ */
2110 {
2111 size_t i;
2113 for (i = 0; i < list_of_commands_len; i++)
2114 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2115 return ((ssize_t) i);
2116 return (-1);
2117 } /* }}} ssize_t find_command_index */
2119 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2120 const char *cmd)
2121 {
2122 ssize_t i;
2124 if (JOURNAL_REPLAY(sock))
2125 return (1);
2127 if (cmd == NULL)
2128 return (-1);
2130 if ((strcasecmp ("QUIT", cmd) == 0)
2131 || (strcasecmp ("HELP", cmd) == 0))
2132 return (1);
2133 else if (strcmp (".", cmd) == 0)
2134 cmd = "BATCH";
2136 i = find_command_index (cmd);
2137 if (i < 0)
2138 return (-1);
2139 assert (i < 32);
2141 if ((sock->permissions & (1 << i)) != 0)
2142 return (1);
2143 return (0);
2144 } /* }}} int socket_permission_check */
2146 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2147 const char *cmd)
2148 {
2149 ssize_t i;
2151 i = find_command_index (cmd);
2152 if (i < 0)
2153 return (-1);
2154 assert (i < 32);
2156 sock->permissions |= (1 << i);
2157 return (0);
2158 } /* }}} int socket_permission_add */
2160 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2161 {
2162 sock->permissions = 0;
2163 } /* }}} socket_permission_clear */
2165 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2166 listen_socket_t *src)
2167 {
2168 dest->permissions = src->permissions;
2169 } /* }}} socket_permission_copy */
2171 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2172 {
2173 size_t i;
2175 sock->permissions = 0;
2176 for (i = 0; i < list_of_commands_len; i++)
2177 sock->permissions |= (1 << i);
2178 } /* }}} void socket_permission_set_all */
2180 /* check whether commands are received in the expected context */
2181 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2182 {
2183 if (JOURNAL_REPLAY(sock))
2184 return (cmd->context & CMD_CONTEXT_JOURNAL);
2185 else if (sock->batch_start)
2186 return (cmd->context & CMD_CONTEXT_BATCH);
2187 else
2188 return (cmd->context & CMD_CONTEXT_CLIENT);
2190 /* NOTREACHED */
2191 assert(1==0);
2192 }
2194 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2195 {
2196 int status;
2197 char *cmd_str;
2198 char *resp_txt;
2199 command_t *help = NULL;
2201 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2202 if (status == 0)
2203 help = find_command(cmd_str);
2205 if (help && (help->syntax || help->help))
2206 {
2207 char tmp[CMD_MAX];
2209 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2210 resp_txt = tmp;
2212 if (help->syntax)
2213 add_response_info(sock, "Usage: %s\n", help->syntax);
2215 if (help->help)
2216 add_response_info(sock, "%s\n", help->help);
2217 }
2218 else
2219 {
2220 size_t i;
2222 resp_txt = "Command overview\n";
2224 for (i = 0; i < list_of_commands_len; i++)
2225 {
2226 if (list_of_commands[i].syntax == NULL)
2227 continue;
2228 add_response_info (sock, "%s", list_of_commands[i].syntax);
2229 }
2230 }
2232 return send_response(sock, RESP_OK, resp_txt);
2233 } /* }}} int handle_request_help */
2235 static int handle_request (DISPATCH_PROTO) /* {{{ */
2236 {
2237 char *buffer_ptr = buffer;
2238 char *cmd_str = NULL;
2239 command_t *cmd = NULL;
2240 int status;
2242 assert (buffer[buffer_size - 1] == '\0');
2244 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2245 if (status != 0)
2246 {
2247 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2248 return (-1);
2249 }
2251 if (sock != NULL && sock->batch_start)
2252 sock->batch_cmd++;
2254 cmd = find_command(cmd_str);
2255 if (!cmd)
2256 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2258 if (!socket_permission_check (sock, cmd->cmd))
2259 return send_response(sock, RESP_ERR, "Permission denied.\n");
2261 if (!command_check_context(sock, cmd))
2262 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2264 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2265 } /* }}} int handle_request */
2267 static void journal_set_free (journal_set *js) /* {{{ */
2268 {
2269 if (js == NULL)
2270 return;
2272 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2274 free(js);
2275 } /* }}} journal_set_free */
2277 static void journal_set_remove (journal_set *js) /* {{{ */
2278 {
2279 if (js == NULL)
2280 return;
2282 for (uint i=0; i < js->files_num; i++)
2283 {
2284 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2285 unlink(js->files[i]);
2286 }
2287 } /* }}} journal_set_remove */
2289 /* close current journal file handle.
2290 * MUST hold journal_lock before calling */
2291 static void journal_close(void) /* {{{ */
2292 {
2293 if (journal_fh != NULL)
2294 {
2295 if (fclose(journal_fh) != 0)
2296 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2297 }
2299 journal_fh = NULL;
2300 journal_size = 0;
2301 } /* }}} journal_close */
2303 /* MUST hold journal_lock before calling */
2304 static void journal_new_file(void) /* {{{ */
2305 {
2306 struct timeval now;
2307 int new_fd;
2308 char new_file[PATH_MAX + 1];
2310 assert(journal_dir != NULL);
2311 assert(journal_cur != NULL);
2313 journal_close();
2315 gettimeofday(&now, NULL);
2316 /* this format assures that the files sort in strcmp() order */
2317 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2318 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2320 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2321 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2322 if (new_fd < 0)
2323 goto error;
2325 journal_fh = fdopen(new_fd, "a");
2326 if (journal_fh == NULL)
2327 goto error;
2329 journal_size = ftell(journal_fh);
2330 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2332 /* record the file in the journal set */
2333 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2335 return;
2337 error:
2338 RRDD_LOG(LOG_CRIT,
2339 "JOURNALING DISABLED: Error while trying to create %s : %s",
2340 new_file, rrd_strerror(errno));
2341 RRDD_LOG(LOG_CRIT,
2342 "JOURNALING DISABLED: All values will be flushed at shutdown");
2344 close(new_fd);
2345 config_flush_at_shutdown = 1;
2347 } /* }}} journal_new_file */
2349 /* MUST NOT hold journal_lock before calling this */
2350 static void journal_rotate(void) /* {{{ */
2351 {
2352 journal_set *old_js = NULL;
2354 if (journal_dir == NULL)
2355 return;
2357 RRDD_LOG(LOG_DEBUG, "rotating journals");
2359 pthread_mutex_lock(&stats_lock);
2360 ++stats_journal_rotate;
2361 pthread_mutex_unlock(&stats_lock);
2363 pthread_mutex_lock(&journal_lock);
2365 journal_close();
2367 /* rotate the journal sets */
2368 old_js = journal_old;
2369 journal_old = journal_cur;
2370 journal_cur = calloc(1, sizeof(journal_set));
2372 if (journal_cur != NULL)
2373 journal_new_file();
2374 else
2375 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2377 pthread_mutex_unlock(&journal_lock);
2379 journal_set_remove(old_js);
2380 journal_set_free (old_js);
2382 } /* }}} static void journal_rotate */
2384 /* MUST hold journal_lock when calling */
2385 static void journal_done(void) /* {{{ */
2386 {
2387 if (journal_cur == NULL)
2388 return;
2390 journal_close();
2392 if (config_flush_at_shutdown)
2393 {
2394 RRDD_LOG(LOG_INFO, "removing journals");
2395 journal_set_remove(journal_old);
2396 journal_set_remove(journal_cur);
2397 }
2398 else
2399 {
2400 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2401 "journals will be used at next startup");
2402 }
2404 journal_set_free(journal_cur);
2405 journal_set_free(journal_old);
2406 free(journal_dir);
2408 } /* }}} static void journal_done */
2410 static int journal_write(char *cmd, char *args) /* {{{ */
2411 {
2412 int chars;
2414 if (journal_fh == NULL)
2415 return 0;
2417 pthread_mutex_lock(&journal_lock);
2418 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2419 journal_size += chars;
2421 if (journal_size > JOURNAL_MAX)
2422 journal_new_file();
2424 pthread_mutex_unlock(&journal_lock);
2426 if (chars > 0)
2427 {
2428 pthread_mutex_lock(&stats_lock);
2429 stats_journal_bytes += chars;
2430 pthread_mutex_unlock(&stats_lock);
2431 }
2433 return chars;
2434 } /* }}} static int journal_write */
2436 static int journal_replay (const char *file) /* {{{ */
2437 {
2438 FILE *fh;
2439 int entry_cnt = 0;
2440 int fail_cnt = 0;
2441 uint64_t line = 0;
2442 char entry[CMD_MAX];
2443 time_t now;
2445 if (file == NULL) return 0;
2447 {
2448 char *reason = "unknown error";
2449 int status = 0;
2450 struct stat statbuf;
2452 memset(&statbuf, 0, sizeof(statbuf));
2453 if (stat(file, &statbuf) != 0)
2454 {
2455 reason = "stat error";
2456 status = errno;
2457 }
2458 else if (!S_ISREG(statbuf.st_mode))
2459 {
2460 reason = "not a regular file";
2461 status = EPERM;
2462 }
2463 if (statbuf.st_uid != daemon_uid)
2464 {
2465 reason = "not owned by daemon user";
2466 status = EACCES;
2467 }
2468 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2469 {
2470 reason = "must not be user/group writable";
2471 status = EACCES;
2472 }
2474 if (status != 0)
2475 {
2476 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2477 file, rrd_strerror(status), reason);
2478 return 0;
2479 }
2480 }
2482 fh = fopen(file, "r");
2483 if (fh == NULL)
2484 {
2485 if (errno != ENOENT)
2486 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2487 file, rrd_strerror(errno));
2488 return 0;
2489 }
2490 else
2491 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2493 now = time(NULL);
2495 while(!feof(fh))
2496 {
2497 size_t entry_len;
2499 ++line;
2500 if (fgets(entry, sizeof(entry), fh) == NULL)
2501 break;
2502 entry_len = strlen(entry);
2504 /* check \n termination in case journal writing crashed mid-line */
2505 if (entry_len == 0)
2506 continue;
2507 else if (entry[entry_len - 1] != '\n')
2508 {
2509 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2510 ++fail_cnt;
2511 continue;
2512 }
2514 entry[entry_len - 1] = '\0';
2516 if (handle_request(NULL, now, entry, entry_len) == 0)
2517 ++entry_cnt;
2518 else
2519 ++fail_cnt;
2520 }
2522 fclose(fh);
2524 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2525 entry_cnt, fail_cnt);
2527 return entry_cnt > 0 ? 1 : 0;
2528 } /* }}} static int journal_replay */
2530 static int journal_sort(const void *v1, const void *v2)
2531 {
2532 char **jn1 = (char **) v1;
2533 char **jn2 = (char **) v2;
2535 return strcmp(*jn1,*jn2);
2536 }
2538 static void journal_init(void) /* {{{ */
2539 {
2540 int had_journal = 0;
2541 DIR *dir;
2542 struct dirent *dent;
2543 char path[PATH_MAX+1];
2545 if (journal_dir == NULL) return;
2547 pthread_mutex_lock(&journal_lock);
2549 journal_cur = calloc(1, sizeof(journal_set));
2550 if (journal_cur == NULL)
2551 {
2552 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2553 return;
2554 }
2556 RRDD_LOG(LOG_INFO, "checking for journal files");
2558 /* Handle old journal files during transition. This gives them the
2559 * correct sort order. TODO: remove after first release
2560 */
2561 {
2562 char old_path[PATH_MAX+1];
2563 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2564 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2565 rename(old_path, path);
2567 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2568 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2569 rename(old_path, path);
2570 }
2572 dir = opendir(journal_dir);
2573 if (!dir) {
2574 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2575 return;
2576 }
2577 while ((dent = readdir(dir)) != NULL)
2578 {
2579 /* looks like a journal file? */
2580 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2581 continue;
2583 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2585 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2586 {
2587 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2588 dent->d_name);
2589 break;
2590 }
2591 }
2592 closedir(dir);
2594 qsort(journal_cur->files, journal_cur->files_num,
2595 sizeof(journal_cur->files[0]), journal_sort);
2597 for (uint i=0; i < journal_cur->files_num; i++)
2598 had_journal += journal_replay(journal_cur->files[i]);
2600 journal_new_file();
2602 /* it must have been a crash. start a flush */
2603 if (had_journal && config_flush_at_shutdown)
2604 flush_old_values(-1);
2606 pthread_mutex_unlock(&journal_lock);
2608 RRDD_LOG(LOG_INFO, "journal processing complete");
2610 } /* }}} static void journal_init */
2612 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2613 {
2614 assert(sock != NULL);
2616 free(sock->rbuf); sock->rbuf = NULL;
2617 free(sock->wbuf); sock->wbuf = NULL;
2618 free(sock);
2619 } /* }}} void free_listen_socket */
2621 static void close_connection(listen_socket_t *sock) /* {{{ */
2622 {
2623 if (sock->fd >= 0)
2624 {
2625 close(sock->fd);
2626 sock->fd = -1;
2627 }
2629 free_listen_socket(sock);
2631 } /* }}} void close_connection */
2633 static void *connection_thread_main (void *args) /* {{{ */
2634 {
2635 listen_socket_t *sock;
2636 int fd;
2638 sock = (listen_socket_t *) args;
2639 fd = sock->fd;
2641 /* init read buffers */
2642 sock->next_read = sock->next_cmd = 0;
2643 sock->rbuf = malloc(RBUF_SIZE);
2644 if (sock->rbuf == NULL)
2645 {
2646 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2647 close_connection(sock);
2648 return NULL;
2649 }
2651 pthread_mutex_lock (&connection_threads_lock);
2652 #ifdef HAVE_LIBWRAP
2653 /* LIBWRAP does not support multiple threads! By putting this code
2654 inside pthread_mutex_lock we do not have to worry about request_info
2655 getting overwritten by another thread.
2656 */
2657 struct request_info req;
2658 request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2659 fromhost(&req);
2660 if(!hosts_access(&req)) {
2661 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2662 pthread_mutex_unlock (&connection_threads_lock);
2663 close_connection(sock);
2664 return NULL;
2665 }
2666 #endif /* HAVE_LIBWRAP */
2667 connection_threads_num++;
2668 pthread_mutex_unlock (&connection_threads_lock);
2670 while (state == RUNNING)
2671 {
2672 char *cmd;
2673 ssize_t cmd_len;
2674 ssize_t rbytes;
2675 time_t now;
2677 struct pollfd pollfd;
2678 int status;
2680 pollfd.fd = fd;
2681 pollfd.events = POLLIN | POLLPRI;
2682 pollfd.revents = 0;
2684 status = poll (&pollfd, 1, /* timeout = */ 500);
2685 if (state != RUNNING)
2686 break;
2687 else if (status == 0) /* timeout */
2688 continue;
2689 else if (status < 0) /* error */
2690 {
2691 status = errno;
2692 if (status != EINTR)
2693 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2694 continue;
2695 }
2697 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2698 break;
2699 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2700 {
2701 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2702 "poll(2) returned something unexpected: %#04hx",
2703 pollfd.revents);
2704 break;
2705 }
2707 rbytes = read(fd, sock->rbuf + sock->next_read,
2708 RBUF_SIZE - sock->next_read);
2709 if (rbytes < 0)
2710 {
2711 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2712 break;
2713 }
2714 else if (rbytes == 0)
2715 break; /* eof */
2717 sock->next_read += rbytes;
2719 if (sock->batch_start)
2720 now = sock->batch_start;
2721 else
2722 now = time(NULL);
2724 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2725 {
2726 status = handle_request (sock, now, cmd, cmd_len+1);
2727 if (status != 0)
2728 goto out_close;
2729 }
2730 }
2732 out_close:
2733 close_connection(sock);
2735 /* Remove this thread from the connection threads list */
2736 pthread_mutex_lock (&connection_threads_lock);
2737 connection_threads_num--;
2738 if (connection_threads_num <= 0)
2739 pthread_cond_broadcast(&connection_threads_done);
2740 pthread_mutex_unlock (&connection_threads_lock);
2742 return (NULL);
2743 } /* }}} void *connection_thread_main */
2745 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2746 {
2747 int fd;
2748 struct sockaddr_un sa;
2749 listen_socket_t *temp;
2750 int status;
2751 const char *path;
2752 char *path_copy, *dir;
2754 path = sock->addr;
2755 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2756 path += strlen("unix:");
2758 /* dirname may modify its argument */
2759 path_copy = strdup(path);
2760 if (path_copy == NULL)
2761 {
2762 fprintf(stderr, "rrdcached: strdup(): %s\n",
2763 rrd_strerror(errno));
2764 return (-1);
2765 }
2767 dir = dirname(path_copy);
2768 if (rrd_mkdir_p(dir, 0777) != 0)
2769 {
2770 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2771 dir, rrd_strerror(errno));
2772 return (-1);
2773 }
2775 free(path_copy);
2777 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2778 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2779 if (temp == NULL)
2780 {
2781 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2782 return (-1);
2783 }
2784 listen_fds = temp;
2785 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2787 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2788 if (fd < 0)
2789 {
2790 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2791 rrd_strerror(errno));
2792 return (-1);
2793 }
2795 memset (&sa, 0, sizeof (sa));
2796 sa.sun_family = AF_UNIX;
2797 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2799 /* if we've gotten this far, we own the pid file. any daemon started
2800 * with the same args must not be alive. therefore, ensure that we can
2801 * create the socket...
2802 */
2803 unlink(path);
2805 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2806 if (status != 0)
2807 {
2808 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2809 path, rrd_strerror(errno));
2810 close (fd);
2811 return (-1);
2812 }
2814 /* tweak the sockets group ownership */
2815 if (sock->socket_group != (gid_t)-1)
2816 {
2817 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2818 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2819 {
2820 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2821 }
2822 }
2824 if (sock->socket_permissions != (mode_t)-1)
2825 {
2826 if (chmod(path, sock->socket_permissions) != 0)
2827 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2828 (unsigned int)sock->socket_permissions, strerror(errno));
2829 }
2831 status = listen (fd, /* backlog = */ 10);
2832 if (status != 0)
2833 {
2834 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2835 path, rrd_strerror(errno));
2836 close (fd);
2837 unlink (path);
2838 return (-1);
2839 }
2841 listen_fds[listen_fds_num].fd = fd;
2842 listen_fds[listen_fds_num].family = PF_UNIX;
2843 strncpy(listen_fds[listen_fds_num].addr, path,
2844 sizeof (listen_fds[listen_fds_num].addr) - 1);
2845 listen_fds_num++;
2847 return (0);
2848 } /* }}} int open_listen_socket_unix */
2850 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2851 {
2852 struct addrinfo ai_hints;
2853 struct addrinfo *ai_res;
2854 struct addrinfo *ai_ptr;
2855 char addr_copy[NI_MAXHOST];
2856 char *addr;
2857 char *port;
2858 int status;
2860 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2861 addr_copy[sizeof (addr_copy) - 1] = 0;
2862 addr = addr_copy;
2864 memset (&ai_hints, 0, sizeof (ai_hints));
2865 ai_hints.ai_flags = 0;
2866 #ifdef AI_ADDRCONFIG
2867 ai_hints.ai_flags |= AI_ADDRCONFIG;
2868 #endif
2869 ai_hints.ai_family = AF_UNSPEC;
2870 ai_hints.ai_socktype = SOCK_STREAM;
2872 port = NULL;
2873 if (*addr == '[') /* IPv6+port format */
2874 {
2875 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2876 addr++;
2878 port = strchr (addr, ']');
2879 if (port == NULL)
2880 {
2881 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2882 return (-1);
2883 }
2884 *port = 0;
2885 port++;
2887 if (*port == ':')
2888 port++;
2889 else if (*port == 0)
2890 port = NULL;
2891 else
2892 {
2893 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2894 return (-1);
2895 }
2896 } /* if (*addr == '[') */
2897 else
2898 {
2899 port = rindex(addr, ':');
2900 if (port != NULL)
2901 {
2902 *port = 0;
2903 port++;
2904 }
2905 }
2906 ai_res = NULL;
2907 status = getaddrinfo (addr,
2908 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2909 &ai_hints, &ai_res);
2910 if (status != 0)
2911 {
2912 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2913 addr, gai_strerror (status));
2914 return (-1);
2915 }
2917 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2918 {
2919 int fd;
2920 listen_socket_t *temp;
2921 int one = 1;
2923 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2924 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2925 if (temp == NULL)
2926 {
2927 fprintf (stderr,
2928 "rrdcached: open_listen_socket_network: realloc failed.\n");
2929 continue;
2930 }
2931 listen_fds = temp;
2932 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2934 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2935 if (fd < 0)
2936 {
2937 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2938 rrd_strerror(errno));
2939 continue;
2940 }
2942 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2944 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2945 if (status != 0)
2946 {
2947 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2948 sock->addr, rrd_strerror(errno));
2949 close (fd);
2950 continue;
2951 }
2953 status = listen (fd, /* backlog = */ 10);
2954 if (status != 0)
2955 {
2956 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2957 sock->addr, rrd_strerror(errno));
2958 close (fd);
2959 freeaddrinfo(ai_res);
2960 return (-1);
2961 }
2963 listen_fds[listen_fds_num].fd = fd;
2964 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2965 listen_fds_num++;
2966 } /* for (ai_ptr) */
2968 freeaddrinfo(ai_res);
2969 return (0);
2970 } /* }}} static int open_listen_socket_network */
2972 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2973 {
2974 assert(sock != NULL);
2975 assert(sock->addr != NULL);
2977 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2978 || sock->addr[0] == '/')
2979 return (open_listen_socket_unix(sock));
2980 else
2981 return (open_listen_socket_network(sock));
2982 } /* }}} int open_listen_socket */
2984 static int close_listen_sockets (void) /* {{{ */
2985 {
2986 size_t i;
2988 for (i = 0; i < listen_fds_num; i++)
2989 {
2990 close (listen_fds[i].fd);
2992 if (listen_fds[i].family == PF_UNIX)
2993 unlink(listen_fds[i].addr);
2994 }
2996 free (listen_fds);
2997 listen_fds = NULL;
2998 listen_fds_num = 0;
3000 return (0);
3001 } /* }}} int close_listen_sockets */
3003 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3004 {
3005 struct pollfd *pollfds;
3006 int pollfds_num;
3007 int status;
3008 int i;
3010 if (listen_fds_num < 1)
3011 {
3012 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3013 return (NULL);
3014 }
3016 pollfds_num = listen_fds_num;
3017 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3018 if (pollfds == NULL)
3019 {
3020 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3021 return (NULL);
3022 }
3023 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3025 RRDD_LOG(LOG_INFO, "listening for connections");
3027 while (state == RUNNING)
3028 {
3029 for (i = 0; i < pollfds_num; i++)
3030 {
3031 pollfds[i].fd = listen_fds[i].fd;
3032 pollfds[i].events = POLLIN | POLLPRI;
3033 pollfds[i].revents = 0;
3034 }
3036 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3037 if (state != RUNNING)
3038 break;
3039 else if (status == 0) /* timeout */
3040 continue;
3041 else if (status < 0) /* error */
3042 {
3043 status = errno;
3044 if (status != EINTR)
3045 {
3046 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3047 }
3048 continue;
3049 }
3051 for (i = 0; i < pollfds_num; i++)
3052 {
3053 listen_socket_t *client_sock;
3054 struct sockaddr_storage client_sa;
3055 socklen_t client_sa_size;
3056 pthread_t tid;
3057 pthread_attr_t attr;
3059 if (pollfds[i].revents == 0)
3060 continue;
3062 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3063 {
3064 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3065 "poll(2) returned something unexpected for listen FD #%i.",
3066 pollfds[i].fd);
3067 continue;
3068 }
3070 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3071 if (client_sock == NULL)
3072 {
3073 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3074 continue;
3075 }
3076 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3078 client_sa_size = sizeof (client_sa);
3079 client_sock->fd = accept (pollfds[i].fd,
3080 (struct sockaddr *) &client_sa, &client_sa_size);
3081 if (client_sock->fd < 0)
3082 {
3083 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3084 free(client_sock);
3085 continue;
3086 }
3088 pthread_attr_init (&attr);
3089 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3091 status = pthread_create (&tid, &attr, connection_thread_main,
3092 client_sock);
3093 if (status != 0)
3094 {
3095 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3096 close_connection(client_sock);
3097 continue;
3098 }
3099 } /* for (pollfds_num) */
3100 } /* while (state == RUNNING) */
3102 RRDD_LOG(LOG_INFO, "starting shutdown");
3104 close_listen_sockets ();
3106 pthread_mutex_lock (&connection_threads_lock);
3107 while (connection_threads_num > 0)
3108 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3109 pthread_mutex_unlock (&connection_threads_lock);
3111 free(pollfds);
3113 return (NULL);
3114 } /* }}} void *listen_thread_main */
3116 static int daemonize (void) /* {{{ */
3117 {
3118 int pid_fd;
3119 char *base_dir;
3121 daemon_uid = geteuid();
3123 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3124 if (pid_fd < 0)
3125 pid_fd = check_pidfile();
3126 if (pid_fd < 0)
3127 return pid_fd;
3129 /* open all the listen sockets */
3130 if (config_listen_address_list_len > 0)
3131 {
3132 for (size_t i = 0; i < config_listen_address_list_len; i++)
3133 open_listen_socket (config_listen_address_list[i]);
3135 rrd_free_ptrs((void ***) &config_listen_address_list,
3136 &config_listen_address_list_len);
3137 }
3138 else
3139 {
3140 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3141 sizeof(default_socket.addr) - 1);
3142 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3144 if (default_socket.permissions == 0)
3145 socket_permission_set_all (&default_socket);
3147 open_listen_socket (&default_socket);
3148 }
3150 if (listen_fds_num < 1)
3151 {
3152 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3153 goto error;
3154 }
3156 if (!stay_foreground)
3157 {
3158 pid_t child;
3160 child = fork ();
3161 if (child < 0)
3162 {
3163 fprintf (stderr, "daemonize: fork(2) failed.\n");
3164 goto error;
3165 }
3166 else if (child > 0)
3167 exit(0);
3169 /* Become session leader */
3170 setsid ();
3172 /* Open the first three file descriptors to /dev/null */
3173 close (2);
3174 close (1);
3175 close (0);
3177 open ("/dev/null", O_RDWR);
3178 if (dup(0) == -1 || dup(0) == -1){
3179 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3180 }
3181 } /* if (!stay_foreground) */
3183 /* Change into the /tmp directory. */
3184 base_dir = (config_base_dir != NULL)
3185 ? config_base_dir
3186 : "/tmp";
3188 if (chdir (base_dir) != 0)
3189 {
3190 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3191 goto error;
3192 }
3194 install_signal_handlers();
3196 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3197 RRDD_LOG(LOG_INFO, "starting up");
3199 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3200 (GDestroyNotify) free_cache_item);
3201 if (cache_tree == NULL)
3202 {
3203 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3204 goto error;
3205 }
3207 return write_pidfile (pid_fd);
3209 error:
3210 remove_pidfile();
3211 return -1;
3212 } /* }}} int daemonize */
3214 static int cleanup (void) /* {{{ */
3215 {
3216 pthread_cond_broadcast (&flush_cond);
3217 pthread_join (flush_thread, NULL);
3219 pthread_cond_broadcast (&queue_cond);
3220 for (int i = 0; i < config_queue_threads; i++)
3221 pthread_join (queue_threads[i], NULL);
3223 if (config_flush_at_shutdown)
3224 {
3225 assert(cache_queue_head == NULL);
3226 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3227 }
3229 free(queue_threads);
3230 free(config_base_dir);
3232 pthread_mutex_lock(&cache_lock);
3233 g_tree_destroy(cache_tree);
3235 pthread_mutex_lock(&journal_lock);
3236 journal_done();
3238 RRDD_LOG(LOG_INFO, "goodbye");
3239 closelog ();
3241 remove_pidfile ();
3242 free(config_pid_file);
3244 return (0);
3245 } /* }}} int cleanup */
3247 static int read_options (int argc, char **argv) /* {{{ */
3248 {
3249 int option;
3250 int status = 0;
3252 socket_permission_clear (&default_socket);
3254 default_socket.socket_group = (gid_t)-1;
3255 default_socket.socket_permissions = (mode_t)-1;
3257 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3258 {
3259 switch (option)
3260 {
3261 case 'O':
3262 opt_no_overwrite = 1;
3263 break;
3265 case 'g':
3266 stay_foreground=1;
3267 break;
3269 case 'l':
3270 {
3271 listen_socket_t *new;
3273 new = malloc(sizeof(listen_socket_t));
3274 if (new == NULL)
3275 {
3276 fprintf(stderr, "read_options: malloc failed.\n");
3277 return(2);
3278 }
3279 memset(new, 0, sizeof(listen_socket_t));
3281 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3283 /* Add permissions to the socket {{{ */
3284 if (default_socket.permissions != 0)
3285 {
3286 socket_permission_copy (new, &default_socket);
3287 }
3288 else /* if (default_socket.permissions == 0) */
3289 {
3290 /* Add permission for ALL commands to the socket. */
3291 socket_permission_set_all (new);
3292 }
3293 /* }}} Done adding permissions. */
3295 new->socket_group = default_socket.socket_group;
3296 new->socket_permissions = default_socket.socket_permissions;
3298 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3299 &config_listen_address_list_len, new))
3300 {
3301 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3302 return (2);
3303 }
3304 }
3305 break;
3307 /* set socket group permissions */
3308 case 's':
3309 {
3310 gid_t group_gid;
3311 struct group *grp;
3313 group_gid = strtoul(optarg, NULL, 10);
3314 if (errno != EINVAL && group_gid>0)
3315 {
3316 /* we were passed a number */
3317 grp = getgrgid(group_gid);
3318 }
3319 else
3320 {
3321 grp = getgrnam(optarg);
3322 }
3324 if (grp)
3325 {
3326 default_socket.socket_group = grp->gr_gid;
3327 }
3328 else
3329 {
3330 /* no idea what the user wanted... */
3331 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3332 return (5);
3333 }
3334 }
3335 break;
3337 /* set socket file permissions */
3338 case 'm':
3339 {
3340 long tmp;
3341 char *endptr = NULL;
3343 tmp = strtol (optarg, &endptr, 8);
3344 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3345 || (tmp > 07777) || (tmp < 0)) {
3346 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3347 optarg);
3348 return (5);
3349 }
3351 default_socket.socket_permissions = (mode_t)tmp;
3352 }
3353 break;
3355 case 'P':
3356 {
3357 char *optcopy;
3358 char *saveptr;
3359 char *dummy;
3360 char *ptr;
3362 socket_permission_clear (&default_socket);
3364 optcopy = strdup (optarg);
3365 dummy = optcopy;
3366 saveptr = NULL;
3367 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3368 {
3369 dummy = NULL;
3370 status = socket_permission_add (&default_socket, ptr);
3371 if (status != 0)
3372 {
3373 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3374 "socket failed. Most likely, this permission doesn't "
3375 "exist. Check your command line.\n", ptr);
3376 status = 4;
3377 }
3378 }
3380 free (optcopy);
3381 }
3382 break;
3384 case 'f':
3385 {
3386 int temp;
3388 temp = atoi (optarg);
3389 if (temp > 0)
3390 config_flush_interval = temp;
3391 else
3392 {
3393 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3394 status = 3;
3395 }
3396 }
3397 break;
3399 case 'w':
3400 {
3401 int temp;
3403 temp = atoi (optarg);
3404 if (temp > 0)
3405 config_write_interval = temp;
3406 else
3407 {
3408 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3409 status = 2;
3410 }
3411 }
3412 break;
3414 case 'z':
3415 {
3416 int temp;
3418 temp = atoi(optarg);
3419 if (temp > 0)
3420 config_write_jitter = temp;
3421 else
3422 {
3423 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3424 status = 2;
3425 }
3427 break;
3428 }
3430 case 't':
3431 {
3432 int threads;
3433 threads = atoi(optarg);
3434 if (threads >= 1)
3435 config_queue_threads = threads;
3436 else
3437 {
3438 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3439 return 1;
3440 }
3441 }
3442 break;
3444 case 'B':
3445 config_write_base_only = 1;
3446 break;
3448 case 'b':
3449 {
3450 size_t len;
3451 char base_realpath[PATH_MAX];
3453 if (config_base_dir != NULL)
3454 free (config_base_dir);
3455 config_base_dir = strdup (optarg);
3456 if (config_base_dir == NULL)
3457 {
3458 fprintf (stderr, "read_options: strdup failed.\n");
3459 return (3);
3460 }
3462 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3463 {
3464 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3465 config_base_dir, rrd_strerror (errno));
3466 return (3);
3467 }
3469 /* make sure that the base directory is not resolved via
3470 * symbolic links. this makes some performance-enhancing
3471 * assumptions possible (we don't have to resolve paths
3472 * that start with a "/")
3473 */
3474 if (realpath(config_base_dir, base_realpath) == NULL)
3475 {
3476 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3477 "%s\n", config_base_dir, rrd_strerror(errno));
3478 return 5;
3479 }
3481 len = strlen (config_base_dir);
3482 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3483 {
3484 config_base_dir[len - 1] = 0;
3485 len--;
3486 }
3488 if (len < 1)
3489 {
3490 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3491 return (4);
3492 }
3494 _config_base_dir_len = len;
3496 len = strlen (base_realpath);
3497 while ((len > 0) && (base_realpath[len - 1] == '/'))
3498 {
3499 base_realpath[len - 1] = '\0';
3500 len--;
3501 }
3503 if (strncmp(config_base_dir,
3504 base_realpath, sizeof(base_realpath)) != 0)
3505 {
3506 fprintf(stderr,
3507 "Base directory (-b) resolved via file system links!\n"
3508 "Please consult rrdcached '-b' documentation!\n"
3509 "Consider specifying the real directory (%s)\n",
3510 base_realpath);
3511 return 5;
3512 }
3513 }
3514 break;
3516 case 'p':
3517 {
3518 if (config_pid_file != NULL)
3519 free (config_pid_file);
3520 config_pid_file = strdup (optarg);
3521 if (config_pid_file == NULL)
3522 {
3523 fprintf (stderr, "read_options: strdup failed.\n");
3524 return (3);
3525 }
3526 }
3527 break;
3529 case 'F':
3530 config_flush_at_shutdown = 1;
3531 break;
3533 case 'j':
3534 {
3535 char journal_dir_actual[PATH_MAX];
3536 const char *dir;
3537 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3539 status = rrd_mkdir_p(dir, 0777);
3540 if (status != 0)
3541 {
3542 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3543 dir, rrd_strerror(errno));
3544 return 6;
3545 }
3547 if (access(dir, R_OK|W_OK|X_OK) != 0)
3548 {
3549 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3550 errno ? rrd_strerror(errno) : "");
3551 return 6;
3552 }
3553 }
3554 break;
3556 case 'a':
3557 {
3558 int temp = atoi(optarg);
3559 if (temp > 0)
3560 config_alloc_chunk = temp;
3561 else
3562 {
3563 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3564 return 10;
3565 }
3566 }
3567 break;
3569 case 'h':
3570 case '?':
3571 printf ("RRDCacheD %s\n"
3572 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3573 "\n"
3574 "Usage: rrdcached [options]\n"
3575 "\n"
3576 "Valid options are:\n"
3577 " -l <address> Socket address to listen to.\n"
3578 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3579 " -P <perms> Sets the permissions to assign to all following "
3580 "sockets\n"
3581 " -w <seconds> Interval in which to write data.\n"
3582 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3583 " -t <threads> Number of write threads.\n"
3584 " -f <seconds> Interval in which to flush dead data.\n"
3585 " -p <file> Location of the PID-file.\n"
3586 " -b <dir> Base directory to change to.\n"
3587 " -B Restrict file access to paths within -b <dir>\n"
3588 " -g Do not fork and run in the foreground.\n"
3589 " -j <dir> Directory in which to create the journal files.\n"
3590 " -F Always flush all updates at shutdown\n"
3591 " -s <id|name> Group owner of all following UNIX sockets\n"
3592 " (the socket will also have read/write permissions "
3593 "for that group)\n"
3594 " -m <mode> File permissions (octal) of all following UNIX "
3595 "sockets\n"
3596 " -a <size> Memory allocation chunk size. Default is 1.\n"
3597 " -O Do not allow CREATE commands to overwrite existing\n"
3598 " files, even if asked to.\n"
3599 "\n"
3600 "For more information and a detailed description of all options "
3601 "please refer\n"
3602 "to the rrdcached(1) manual page.\n",
3603 VERSION);
3604 if (option == 'h')
3605 status = -1;
3606 else
3607 status = 1;
3608 break;
3609 } /* switch (option) */
3610 } /* while (getopt) */
3612 /* advise the user when values are not sane */
3613 if (config_flush_interval < 2 * config_write_interval)
3614 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3615 " 2x write interval (-w) !\n");
3616 if (config_write_jitter > config_write_interval)
3617 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3618 " write interval (-w) !\n");
3620 if (config_write_base_only && config_base_dir == NULL)
3621 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3622 " Consult the rrdcached documentation\n");
3624 if (journal_dir == NULL)
3625 config_flush_at_shutdown = 1;
3627 return (status);
3628 } /* }}} int read_options */
3630 int main (int argc, char **argv)
3631 {
3632 int status;
3634 status = read_options (argc, argv);
3635 if (status != 0)
3636 {
3637 if (status < 0)
3638 status = 0;
3639 return (status);
3640 }
3642 status = daemonize ();
3643 if (status != 0)
3644 {
3645 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3646 return (1);
3647 }
3649 journal_init();
3651 /* start the queue threads */
3652 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3653 if (queue_threads == NULL)
3654 {
3655 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3656 cleanup();
3657 return (1);
3658 }
3659 for (int i = 0; i < config_queue_threads; i++)
3660 {
3661 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3662 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3663 if (status != 0)
3664 {
3665 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3666 cleanup();
3667 return (1);
3668 }
3669 }
3671 /* start the flush thread */
3672 memset(&flush_thread, 0, sizeof(flush_thread));
3673 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3674 if (status != 0)
3675 {
3676 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3677 cleanup();
3678 return (1);
3679 }
3681 listen_thread_main (NULL);
3682 cleanup ();
3684 return (0);
3685 } /* int main */
3687 /*
3688 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3689 */