1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120 do { \
121 if (stay_foreground) { \
122 fprintf(stderr, __VA_ARGS__); \
123 fprintf(stderr, "\n"); } \
124 syslog ((severity), __VA_ARGS__); \
125 } while (0)
127 /*
128 * Types
129 */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
133 {
134 int fd;
135 char addr[PATH_MAX + 1];
136 int family;
138 /* state for BATCH processing */
139 time_t batch_start;
140 int batch_cmd;
142 /* buffered IO */
143 char *rbuf;
144 off_t next_cmd;
145 off_t next_read;
147 char *wbuf;
148 ssize_t wbuf_len;
150 uint32_t permissions;
152 gid_t socket_group;
153 mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
161 time_t UNUSED(now),\
162 char UNUSED(*buffer),\
163 size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO command_t UNUSED(*cmd),\
166 DISPATCH_PROTO
168 struct command_s {
169 char *cmd;
170 int (*handler)(HANDLER_PROTO);
172 char context; /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT (1<<0)
174 #define CMD_CONTEXT_BATCH (1<<1)
175 #define CMD_CONTEXT_JOURNAL (1<<2)
176 #define CMD_CONTEXT_ANY (0x7f)
178 char *syntax;
179 char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
185 {
186 char *file;
187 char **values;
188 size_t values_num; /* number of valid pointers */
189 size_t values_alloc; /* number of allocated pointers */
190 time_t last_flush_time;
191 time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194 int flags;
195 pthread_cond_t flushed;
196 cache_item_t *prev;
197 cache_item_t *next;
198 };
200 struct callback_flush_data_s
201 {
202 time_t now;
203 time_t abs_timeout;
204 char **keys;
205 size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
209 enum queue_side_e
210 {
211 HEAD,
212 TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
216 /* describe a set of journal files */
217 typedef struct {
218 char **files;
219 size_t files_num;
220 } journal_set;
222 #define RBUF_SIZE (RRD_CMD_MAX*2)
224 /*
225 * Variables
226 */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
233 static listen_socket_t default_socket;
235 enum {
236 RUNNING, /* normal operation */
237 FLUSHING, /* flushing remaining values */
238 SHUTDOWN /* shutting down */
239 } state = RUNNING;
241 static pthread_t *queue_threads;
242 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
243 static int config_queue_threads = 4;
245 static pthread_t flush_thread;
246 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
248 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
249 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
250 static int connection_threads_num = 0;
252 /* Cache stuff */
253 static GTree *cache_tree = NULL;
254 static cache_item_t *cache_queue_head = NULL;
255 static cache_item_t *cache_queue_tail = NULL;
256 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
258 static int config_write_interval = 300;
259 static int config_write_jitter = 0;
260 static int config_flush_interval = 3600;
261 static int config_flush_at_shutdown = 0;
262 static char *config_pid_file = NULL;
263 static char *config_base_dir = NULL;
264 static size_t _config_base_dir_len = 0;
265 static int config_write_base_only = 0;
266 static size_t config_alloc_chunk = 1;
268 static listen_socket_t **config_listen_address_list = NULL;
269 static size_t config_listen_address_list_len = 0;
271 static uint64_t stats_queue_length = 0;
272 static uint64_t stats_updates_received = 0;
273 static uint64_t stats_flush_received = 0;
274 static uint64_t stats_updates_written = 0;
275 static uint64_t stats_data_sets_written = 0;
276 static uint64_t stats_journal_bytes = 0;
277 static uint64_t stats_journal_rotate = 0;
278 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
280 static int opt_no_overwrite = 0; /* default for the daemon */
282 /* Journaled updates */
283 #define JOURNAL_REPLAY(s) ((s) == NULL)
284 #define JOURNAL_BASE "rrd.journal"
285 static journal_set *journal_cur = NULL;
286 static journal_set *journal_old = NULL;
287 static char *journal_dir = NULL;
288 static FILE *journal_fh = NULL; /* current journal file handle */
289 static long journal_size = 0; /* current journal size */
290 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
291 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
292 static int journal_write(char *cmd, char *args);
293 static void journal_done(void);
294 static void journal_rotate(void);
296 /* prototypes for forward refernces */
297 static int handle_request_help (HANDLER_PROTO);
299 /*
300 * Functions
301 */
302 static void sig_common (const char *sig) /* {{{ */
303 {
304 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
305 state = FLUSHING;
306 pthread_cond_broadcast(&flush_cond);
307 pthread_cond_broadcast(&queue_cond);
308 } /* }}} void sig_common */
310 static void sig_int_handler (int UNUSED(s)) /* {{{ */
311 {
312 sig_common("INT");
313 } /* }}} void sig_int_handler */
315 static void sig_term_handler (int UNUSED(s)) /* {{{ */
316 {
317 sig_common("TERM");
318 } /* }}} void sig_term_handler */
320 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
321 {
322 config_flush_at_shutdown = 1;
323 sig_common("USR1");
324 } /* }}} void sig_usr1_handler */
326 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
327 {
328 config_flush_at_shutdown = 0;
329 sig_common("USR2");
330 } /* }}} void sig_usr2_handler */
332 static void install_signal_handlers(void) /* {{{ */
333 {
334 /* These structures are static, because `sigaction' behaves weird if the are
335 * overwritten.. */
336 static struct sigaction sa_int;
337 static struct sigaction sa_term;
338 static struct sigaction sa_pipe;
339 static struct sigaction sa_usr1;
340 static struct sigaction sa_usr2;
342 /* Install signal handlers */
343 memset (&sa_int, 0, sizeof (sa_int));
344 sa_int.sa_handler = sig_int_handler;
345 sigaction (SIGINT, &sa_int, NULL);
347 memset (&sa_term, 0, sizeof (sa_term));
348 sa_term.sa_handler = sig_term_handler;
349 sigaction (SIGTERM, &sa_term, NULL);
351 memset (&sa_pipe, 0, sizeof (sa_pipe));
352 sa_pipe.sa_handler = SIG_IGN;
353 sigaction (SIGPIPE, &sa_pipe, NULL);
355 memset (&sa_pipe, 0, sizeof (sa_usr1));
356 sa_usr1.sa_handler = sig_usr1_handler;
357 sigaction (SIGUSR1, &sa_usr1, NULL);
359 memset (&sa_usr2, 0, sizeof (sa_usr2));
360 sa_usr2.sa_handler = sig_usr2_handler;
361 sigaction (SIGUSR2, &sa_usr2, NULL);
363 } /* }}} void install_signal_handlers */
365 static int open_pidfile(char *action, int oflag) /* {{{ */
366 {
367 int fd;
368 const char *file;
369 char *file_copy, *dir;
371 file = (config_pid_file != NULL)
372 ? config_pid_file
373 : LOCALSTATEDIR "/run/rrdcached.pid";
375 /* dirname may modify its argument */
376 file_copy = strdup(file);
377 if (file_copy == NULL)
378 {
379 fprintf(stderr, "rrdcached: strdup(): %s\n",
380 rrd_strerror(errno));
381 return -1;
382 }
384 dir = dirname(file_copy);
385 if (rrd_mkdir_p(dir, 0777) != 0)
386 {
387 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
388 dir, rrd_strerror(errno));
389 return -1;
390 }
392 free(file_copy);
394 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
395 if (fd < 0)
396 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
397 action, file, rrd_strerror(errno));
399 return(fd);
400 } /* }}} static int open_pidfile */
402 /* check existing pid file to see whether a daemon is running */
403 static int check_pidfile(void)
404 {
405 int pid_fd;
406 pid_t pid;
407 char pid_str[16];
409 pid_fd = open_pidfile("open", O_RDWR);
410 if (pid_fd < 0)
411 return pid_fd;
413 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
414 return -1;
416 pid = atoi(pid_str);
417 if (pid <= 0)
418 return -1;
420 /* another running process that we can signal COULD be
421 * a competing rrdcached */
422 if (pid != getpid() && kill(pid, 0) == 0)
423 {
424 fprintf(stderr,
425 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
426 close(pid_fd);
427 return -1;
428 }
430 lseek(pid_fd, 0, SEEK_SET);
431 if (ftruncate(pid_fd, 0) == -1)
432 {
433 fprintf(stderr,
434 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
435 close(pid_fd);
436 return -1;
437 }
439 fprintf(stderr,
440 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
441 "rrdcached: starting normally.\n", pid);
443 return pid_fd;
444 } /* }}} static int check_pidfile */
446 static int write_pidfile (int fd) /* {{{ */
447 {
448 pid_t pid;
449 FILE *fh;
451 pid = getpid ();
453 fh = fdopen (fd, "w");
454 if (fh == NULL)
455 {
456 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
457 close(fd);
458 return (-1);
459 }
461 fprintf (fh, "%i\n", (int) pid);
462 fclose (fh);
464 return (0);
465 } /* }}} int write_pidfile */
467 static int remove_pidfile (void) /* {{{ */
468 {
469 char *file;
470 int status;
472 file = (config_pid_file != NULL)
473 ? config_pid_file
474 : LOCALSTATEDIR "/run/rrdcached.pid";
476 status = unlink (file);
477 if (status == 0)
478 return (0);
479 return (errno);
480 } /* }}} int remove_pidfile */
482 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
483 {
484 char *eol;
486 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
487 sock->next_read - sock->next_cmd);
489 if (eol == NULL)
490 {
491 /* no commands left, move remainder back to front of rbuf */
492 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
493 sock->next_read - sock->next_cmd);
494 sock->next_read -= sock->next_cmd;
495 sock->next_cmd = 0;
496 *len = 0;
497 return NULL;
498 }
499 else
500 {
501 char *cmd = sock->rbuf + sock->next_cmd;
502 *eol = '\0';
504 sock->next_cmd = eol - sock->rbuf + 1;
506 if (eol > sock->rbuf && *(eol-1) == '\r')
507 *(--eol) = '\0'; /* handle "\r\n" EOL */
509 *len = eol - cmd;
511 return cmd;
512 }
514 /* NOTREACHED */
515 assert(1==0);
516 } /* }}} char *next_cmd */
518 /* add the characters directly to the write buffer */
519 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
520 {
521 char *new_buf;
523 assert(sock != NULL);
525 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
526 if (new_buf == NULL)
527 {
528 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
529 return -1;
530 }
532 strncpy(new_buf + sock->wbuf_len, str, len + 1);
534 sock->wbuf = new_buf;
535 sock->wbuf_len += len;
537 return 0;
538 } /* }}} static int add_to_wbuf */
540 /* add the text to the "extra" info that's sent after the status line */
541 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
542 {
543 va_list argp;
544 char buffer[RRD_CMD_MAX];
545 int len;
547 if (JOURNAL_REPLAY(sock)) return 0;
548 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
550 va_start(argp, fmt);
551 #ifdef HAVE_VSNPRINTF
552 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
553 #else
554 len = vsprintf(buffer, fmt, argp);
555 #endif
556 va_end(argp);
557 if (len < 0)
558 {
559 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
560 return -1;
561 }
563 return add_to_wbuf(sock, buffer, len);
564 } /* }}} static int add_response_info */
566 static int count_lines(char *str) /* {{{ */
567 {
568 int lines = 0;
570 if (str != NULL)
571 {
572 while ((str = strchr(str, '\n')) != NULL)
573 {
574 ++lines;
575 ++str;
576 }
577 }
579 return lines;
580 } /* }}} static int count_lines */
582 /* send the response back to the user.
583 * returns 0 on success, -1 on error
584 * write buffer is always zeroed after this call */
585 static int send_response (listen_socket_t *sock, response_code rc,
586 char *fmt, ...) /* {{{ */
587 {
588 va_list argp;
589 char buffer[RRD_CMD_MAX];
590 int lines;
591 ssize_t wrote;
592 int rclen, len;
594 if (JOURNAL_REPLAY(sock)) return rc;
596 if (sock->batch_start)
597 {
598 if (rc == RESP_OK)
599 return rc; /* no response on success during BATCH */
600 lines = sock->batch_cmd;
601 }
602 else if (rc == RESP_OK)
603 lines = count_lines(sock->wbuf);
604 else
605 lines = -1;
607 rclen = sprintf(buffer, "%d ", lines);
608 va_start(argp, fmt);
609 #ifdef HAVE_VSNPRINTF
610 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
611 #else
612 len = vsprintf(buffer+rclen, fmt, argp);
613 #endif
614 va_end(argp);
615 if (len < 0)
616 return -1;
618 len += rclen;
620 /* append the result to the wbuf, don't write to the user */
621 if (sock->batch_start)
622 return add_to_wbuf(sock, buffer, len);
624 /* first write must be complete */
625 if (len != write(sock->fd, buffer, len))
626 {
627 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
628 return -1;
629 }
631 if (sock->wbuf != NULL && rc == RESP_OK)
632 {
633 wrote = 0;
634 while (wrote < sock->wbuf_len)
635 {
636 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
637 if (wb <= 0)
638 {
639 RRDD_LOG(LOG_INFO, "send_response: could not write results");
640 return -1;
641 }
642 wrote += wb;
643 }
644 }
646 free(sock->wbuf); sock->wbuf = NULL;
647 sock->wbuf_len = 0;
649 return 0;
650 } /* }}} */
652 static void wipe_ci_values(cache_item_t *ci, time_t when)
653 {
654 ci->values = NULL;
655 ci->values_num = 0;
656 ci->values_alloc = 0;
658 ci->last_flush_time = when;
659 if (config_write_jitter > 0)
660 ci->last_flush_time += (rrd_random() % config_write_jitter);
661 }
663 /* remove_from_queue
664 * remove a "cache_item_t" item from the queue.
665 * must hold 'cache_lock' when calling this
666 */
667 static void remove_from_queue(cache_item_t *ci) /* {{{ */
668 {
669 if (ci == NULL) return;
670 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
672 if (ci->prev == NULL)
673 cache_queue_head = ci->next; /* reset head */
674 else
675 ci->prev->next = ci->next;
677 if (ci->next == NULL)
678 cache_queue_tail = ci->prev; /* reset the tail */
679 else
680 ci->next->prev = ci->prev;
682 ci->next = ci->prev = NULL;
683 ci->flags &= ~CI_FLAGS_IN_QUEUE;
685 pthread_mutex_lock (&stats_lock);
686 assert (stats_queue_length > 0);
687 stats_queue_length--;
688 pthread_mutex_unlock (&stats_lock);
690 } /* }}} static void remove_from_queue */
692 /* free the resources associated with the cache_item_t
693 * must hold cache_lock when calling this function
694 */
695 static void *free_cache_item(cache_item_t *ci) /* {{{ */
696 {
697 if (ci == NULL) return NULL;
699 remove_from_queue(ci);
701 for (size_t i=0; i < ci->values_num; i++)
702 free(ci->values[i]);
704 free (ci->values);
705 free (ci->file);
707 /* in case anyone is waiting */
708 pthread_cond_broadcast(&ci->flushed);
709 pthread_cond_destroy(&ci->flushed);
711 free (ci);
713 return NULL;
714 } /* }}} static void *free_cache_item */
716 /*
717 * enqueue_cache_item:
718 * `cache_lock' must be acquired before calling this function!
719 */
720 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
721 queue_side_t side)
722 {
723 if (ci == NULL)
724 return (-1);
726 if (ci->values_num == 0)
727 return (0);
729 if (side == HEAD)
730 {
731 if (cache_queue_head == ci)
732 return 0;
734 /* remove if further down in queue */
735 remove_from_queue(ci);
737 ci->prev = NULL;
738 ci->next = cache_queue_head;
739 if (ci->next != NULL)
740 ci->next->prev = ci;
741 cache_queue_head = ci;
743 if (cache_queue_tail == NULL)
744 cache_queue_tail = cache_queue_head;
745 }
746 else /* (side == TAIL) */
747 {
748 /* We don't move values back in the list.. */
749 if (ci->flags & CI_FLAGS_IN_QUEUE)
750 return (0);
752 assert (ci->next == NULL);
753 assert (ci->prev == NULL);
755 ci->prev = cache_queue_tail;
757 if (cache_queue_tail == NULL)
758 cache_queue_head = ci;
759 else
760 cache_queue_tail->next = ci;
762 cache_queue_tail = ci;
763 }
765 ci->flags |= CI_FLAGS_IN_QUEUE;
767 pthread_cond_signal(&queue_cond);
768 pthread_mutex_lock (&stats_lock);
769 stats_queue_length++;
770 pthread_mutex_unlock (&stats_lock);
772 return (0);
773 } /* }}} int enqueue_cache_item */
775 /*
776 * tree_callback_flush:
777 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
778 * while this is in progress.
779 */
780 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
781 gpointer data)
782 {
783 cache_item_t *ci;
784 callback_flush_data_t *cfd;
786 ci = (cache_item_t *) value;
787 cfd = (callback_flush_data_t *) data;
789 if (ci->flags & CI_FLAGS_IN_QUEUE)
790 return FALSE;
792 if (ci->values_num > 0
793 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
794 {
795 enqueue_cache_item (ci, TAIL);
796 }
797 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
798 && (ci->values_num <= 0))
799 {
800 assert ((char *) key == ci->file);
801 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
802 {
803 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
804 return (FALSE);
805 }
806 }
808 return (FALSE);
809 } /* }}} gboolean tree_callback_flush */
811 static int flush_old_values (int max_age)
812 {
813 callback_flush_data_t cfd;
814 size_t k;
816 memset (&cfd, 0, sizeof (cfd));
817 /* Pass the current time as user data so that we don't need to call
818 * `time' for each node. */
819 cfd.now = time (NULL);
820 cfd.keys = NULL;
821 cfd.keys_num = 0;
823 if (max_age > 0)
824 cfd.abs_timeout = cfd.now - max_age;
825 else
826 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
828 /* `tree_callback_flush' will return the keys of all values that haven't
829 * been touched in the last `config_flush_interval' seconds in `cfd'.
830 * The char*'s in this array point to the same memory as ci->file, so we
831 * don't need to free them separately. */
832 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
834 for (k = 0; k < cfd.keys_num; k++)
835 {
836 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
837 /* should never fail, since we have held the cache_lock
838 * the entire time */
839 assert(status == TRUE);
840 }
842 if (cfd.keys != NULL)
843 {
844 free (cfd.keys);
845 cfd.keys = NULL;
846 }
848 return (0);
849 } /* int flush_old_values */
851 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
852 {
853 struct timeval now;
854 struct timespec next_flush;
855 int status;
857 gettimeofday (&now, NULL);
858 next_flush.tv_sec = now.tv_sec + config_flush_interval;
859 next_flush.tv_nsec = 1000 * now.tv_usec;
861 pthread_mutex_lock(&cache_lock);
863 while (state == RUNNING)
864 {
865 gettimeofday (&now, NULL);
866 if ((now.tv_sec > next_flush.tv_sec)
867 || ((now.tv_sec == next_flush.tv_sec)
868 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
869 {
870 RRDD_LOG(LOG_DEBUG, "flushing old values");
872 /* Determine the time of the next cache flush. */
873 next_flush.tv_sec = now.tv_sec + config_flush_interval;
875 /* Flush all values that haven't been written in the last
876 * `config_write_interval' seconds. */
877 flush_old_values (config_write_interval);
879 /* unlock the cache while we rotate so we don't block incoming
880 * updates if the fsync() blocks on disk I/O */
881 pthread_mutex_unlock(&cache_lock);
882 journal_rotate();
883 pthread_mutex_lock(&cache_lock);
884 }
886 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
887 if (status != 0 && status != ETIMEDOUT)
888 {
889 RRDD_LOG (LOG_ERR, "flush_thread_main: "
890 "pthread_cond_timedwait returned %i.", status);
891 }
892 }
894 if (config_flush_at_shutdown)
895 flush_old_values (-1); /* flush everything */
897 state = SHUTDOWN;
899 pthread_mutex_unlock(&cache_lock);
901 return NULL;
902 } /* void *flush_thread_main */
904 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
905 {
906 pthread_mutex_lock (&cache_lock);
908 while (state != SHUTDOWN
909 || (cache_queue_head != NULL && config_flush_at_shutdown))
910 {
911 cache_item_t *ci;
912 char *file;
913 char **values;
914 size_t values_num;
915 int status;
917 /* Now, check if there's something to store away. If not, wait until
918 * something comes in. */
919 if (cache_queue_head == NULL)
920 {
921 status = pthread_cond_wait (&queue_cond, &cache_lock);
922 if ((status != 0) && (status != ETIMEDOUT))
923 {
924 RRDD_LOG (LOG_ERR, "queue_thread_main: "
925 "pthread_cond_wait returned %i.", status);
926 }
927 }
929 /* Check if a value has arrived. This may be NULL if we timed out or there
930 * was an interrupt such as a signal. */
931 if (cache_queue_head == NULL)
932 continue;
934 ci = cache_queue_head;
936 /* copy the relevant parts */
937 file = strdup (ci->file);
938 if (file == NULL)
939 {
940 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
941 continue;
942 }
944 assert(ci->values != NULL);
945 assert(ci->values_num > 0);
947 values = ci->values;
948 values_num = ci->values_num;
950 wipe_ci_values(ci, time(NULL));
951 remove_from_queue(ci);
953 pthread_mutex_unlock (&cache_lock);
955 rrd_clear_error ();
956 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
957 if (status != 0)
958 {
959 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
960 "rrd_update_r (%s) failed with status %i. (%s)",
961 file, status, rrd_get_error());
962 }
964 journal_write("wrote", file);
966 /* Search again in the tree. It's possible someone issued a "FORGET"
967 * while we were writing the update values. */
968 pthread_mutex_lock(&cache_lock);
969 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
970 if (ci)
971 pthread_cond_broadcast(&ci->flushed);
972 pthread_mutex_unlock(&cache_lock);
974 if (status == 0)
975 {
976 pthread_mutex_lock (&stats_lock);
977 stats_updates_written++;
978 stats_data_sets_written += values_num;
979 pthread_mutex_unlock (&stats_lock);
980 }
982 rrd_free_ptrs((void ***) &values, &values_num);
983 free(file);
985 pthread_mutex_lock (&cache_lock);
986 }
987 pthread_mutex_unlock (&cache_lock);
989 return (NULL);
990 } /* }}} void *queue_thread_main */
992 static int buffer_get_field (char **buffer_ret, /* {{{ */
993 size_t *buffer_size_ret, char **field_ret)
994 {
995 char *buffer;
996 size_t buffer_pos;
997 size_t buffer_size;
998 char *field;
999 size_t field_size;
1000 int status;
1002 buffer = *buffer_ret;
1003 buffer_pos = 0;
1004 buffer_size = *buffer_size_ret;
1005 field = *buffer_ret;
1006 field_size = 0;
1008 if (buffer_size <= 0)
1009 return (-1);
1011 /* This is ensured by `handle_request'. */
1012 assert (buffer[buffer_size - 1] == '\0');
1014 status = -1;
1015 while (buffer_pos < buffer_size)
1016 {
1017 /* Check for end-of-field or end-of-buffer */
1018 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1019 {
1020 field[field_size] = 0;
1021 field_size++;
1022 buffer_pos++;
1023 status = 0;
1024 break;
1025 }
1026 /* Handle escaped characters. */
1027 else if (buffer[buffer_pos] == '\\')
1028 {
1029 if (buffer_pos >= (buffer_size - 1))
1030 break;
1031 buffer_pos++;
1032 field[field_size] = buffer[buffer_pos];
1033 field_size++;
1034 buffer_pos++;
1035 }
1036 /* Normal operation */
1037 else
1038 {
1039 field[field_size] = buffer[buffer_pos];
1040 field_size++;
1041 buffer_pos++;
1042 }
1043 } /* while (buffer_pos < buffer_size) */
1045 if (status != 0)
1046 return (status);
1048 *buffer_ret = buffer + buffer_pos;
1049 *buffer_size_ret = buffer_size - buffer_pos;
1050 *field_ret = field;
1052 return (0);
1053 } /* }}} int buffer_get_field */
1055 /* if we're restricting writes to the base directory,
1056 * check whether the file falls within the dir
1057 * returns 1 if OK, otherwise 0
1058 */
1059 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1060 {
1061 assert(file != NULL);
1063 if (!config_write_base_only
1064 || JOURNAL_REPLAY(sock)
1065 || config_base_dir == NULL)
1066 return 1;
1068 if (strstr(file, "../") != NULL) goto err;
1070 /* relative paths without "../" are ok */
1071 if (*file != '/') return 1;
1073 /* file must be of the format base + "/" + <1+ char filename> */
1074 if (strlen(file) < _config_base_dir_len + 2) goto err;
1075 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1076 if (*(file + _config_base_dir_len) != '/') goto err;
1078 return 1;
1080 err:
1081 if (sock != NULL && sock->fd >= 0)
1082 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1084 return 0;
1085 } /* }}} static int check_file_access */
1087 /* when using a base dir, convert relative paths to absolute paths.
1088 * if necessary, modifies the "filename" pointer to point
1089 * to the new path created in "tmp". "tmp" is provided
1090 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1091 *
1092 * this allows us to optimize for the expected case (absolute path)
1093 * with a no-op.
1094 */
1095 static void get_abs_path(char **filename, char *tmp)
1096 {
1097 assert(tmp != NULL);
1098 assert(filename != NULL && *filename != NULL);
1100 if (config_base_dir == NULL || **filename == '/')
1101 return;
1103 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1104 *filename = tmp;
1105 } /* }}} static int get_abs_path */
1107 static int flush_file (const char *filename) /* {{{ */
1108 {
1109 cache_item_t *ci;
1111 pthread_mutex_lock (&cache_lock);
1113 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1114 if (ci == NULL)
1115 {
1116 pthread_mutex_unlock (&cache_lock);
1117 return (ENOENT);
1118 }
1120 if (ci->values_num > 0)
1121 {
1122 /* Enqueue at head */
1123 enqueue_cache_item (ci, HEAD);
1124 pthread_cond_wait(&ci->flushed, &cache_lock);
1125 }
1127 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1128 * may have been purged during our cond_wait() */
1130 pthread_mutex_unlock(&cache_lock);
1132 return (0);
1133 } /* }}} int flush_file */
1135 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1136 {
1137 char *err = "Syntax error.\n";
1139 if (cmd && cmd->syntax)
1140 err = cmd->syntax;
1142 return send_response(sock, RESP_ERR, "Usage: %s", err);
1143 } /* }}} static int syntax_error() */
1145 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1146 {
1147 uint64_t copy_queue_length;
1148 uint64_t copy_updates_received;
1149 uint64_t copy_flush_received;
1150 uint64_t copy_updates_written;
1151 uint64_t copy_data_sets_written;
1152 uint64_t copy_journal_bytes;
1153 uint64_t copy_journal_rotate;
1155 uint64_t tree_nodes_number;
1156 uint64_t tree_depth;
1158 pthread_mutex_lock (&stats_lock);
1159 copy_queue_length = stats_queue_length;
1160 copy_updates_received = stats_updates_received;
1161 copy_flush_received = stats_flush_received;
1162 copy_updates_written = stats_updates_written;
1163 copy_data_sets_written = stats_data_sets_written;
1164 copy_journal_bytes = stats_journal_bytes;
1165 copy_journal_rotate = stats_journal_rotate;
1166 pthread_mutex_unlock (&stats_lock);
1168 pthread_mutex_lock (&cache_lock);
1169 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1170 tree_depth = (uint64_t) g_tree_height (cache_tree);
1171 pthread_mutex_unlock (&cache_lock);
1173 add_response_info(sock,
1174 "QueueLength: %"PRIu64"\n", copy_queue_length);
1175 add_response_info(sock,
1176 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1177 add_response_info(sock,
1178 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1179 add_response_info(sock,
1180 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1181 add_response_info(sock,
1182 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1183 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1184 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1185 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1186 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1188 send_response(sock, RESP_OK, "Statistics follow\n");
1190 return (0);
1191 } /* }}} int handle_request_stats */
1193 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1194 {
1195 char *file, file_tmp[PATH_MAX];
1196 int status;
1198 status = buffer_get_field (&buffer, &buffer_size, &file);
1199 if (status != 0)
1200 {
1201 return syntax_error(sock,cmd);
1202 }
1203 else
1204 {
1205 pthread_mutex_lock(&stats_lock);
1206 stats_flush_received++;
1207 pthread_mutex_unlock(&stats_lock);
1209 get_abs_path(&file, file_tmp);
1210 if (!check_file_access(file, sock)) return 0;
1212 status = flush_file (file);
1213 if (status == 0)
1214 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1215 else if (status == ENOENT)
1216 {
1217 /* no file in our tree; see whether it exists at all */
1218 struct stat statbuf;
1220 memset(&statbuf, 0, sizeof(statbuf));
1221 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1222 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1223 else
1224 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1225 }
1226 else if (status < 0)
1227 return send_response(sock, RESP_ERR, "Internal error.\n");
1228 else
1229 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1230 }
1232 /* NOTREACHED */
1233 assert(1==0);
1234 } /* }}} int handle_request_flush */
1236 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1237 {
1238 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1240 pthread_mutex_lock(&cache_lock);
1241 flush_old_values(-1);
1242 pthread_mutex_unlock(&cache_lock);
1244 return send_response(sock, RESP_OK, "Started flush.\n");
1245 } /* }}} static int handle_request_flushall */
1247 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1248 {
1249 int status;
1250 char *file, file_tmp[PATH_MAX];
1251 cache_item_t *ci;
1253 status = buffer_get_field(&buffer, &buffer_size, &file);
1254 if (status != 0)
1255 return syntax_error(sock,cmd);
1257 get_abs_path(&file, file_tmp);
1259 pthread_mutex_lock(&cache_lock);
1260 ci = g_tree_lookup(cache_tree, file);
1261 if (ci == NULL)
1262 {
1263 pthread_mutex_unlock(&cache_lock);
1264 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265 }
1267 for (size_t i=0; i < ci->values_num; i++)
1268 add_response_info(sock, "%s\n", ci->values[i]);
1270 pthread_mutex_unlock(&cache_lock);
1271 return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1274 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1275 {
1276 int status;
1277 gboolean found;
1278 char *file, file_tmp[PATH_MAX];
1280 status = buffer_get_field(&buffer, &buffer_size, &file);
1281 if (status != 0)
1282 return syntax_error(sock,cmd);
1284 get_abs_path(&file, file_tmp);
1285 if (!check_file_access(file, sock)) return 0;
1287 pthread_mutex_lock(&cache_lock);
1288 found = g_tree_remove(cache_tree, file);
1289 pthread_mutex_unlock(&cache_lock);
1291 if (found == TRUE)
1292 {
1293 if (!JOURNAL_REPLAY(sock))
1294 journal_write("forget", file);
1296 return send_response(sock, RESP_OK, "Gone!\n");
1297 }
1298 else
1299 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1301 /* NOTREACHED */
1302 assert(1==0);
1303 } /* }}} static int handle_request_forget */
1305 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1306 {
1307 cache_item_t *ci;
1309 pthread_mutex_lock(&cache_lock);
1311 ci = cache_queue_head;
1312 while (ci != NULL)
1313 {
1314 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1315 ci = ci->next;
1316 }
1318 pthread_mutex_unlock(&cache_lock);
1320 return send_response(sock, RESP_OK, "in queue.\n");
1321 } /* }}} int handle_request_queue */
1323 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1324 {
1325 char *file, file_tmp[PATH_MAX];
1326 int values_num = 0;
1327 int status;
1328 char orig_buf[RRD_CMD_MAX];
1330 cache_item_t *ci;
1332 /* save it for the journal later */
1333 if (!JOURNAL_REPLAY(sock))
1334 strncpy(orig_buf, buffer, buffer_size);
1336 status = buffer_get_field (&buffer, &buffer_size, &file);
1337 if (status != 0)
1338 return syntax_error(sock,cmd);
1340 pthread_mutex_lock(&stats_lock);
1341 stats_updates_received++;
1342 pthread_mutex_unlock(&stats_lock);
1344 get_abs_path(&file, file_tmp);
1345 if (!check_file_access(file, sock)) return 0;
1347 pthread_mutex_lock (&cache_lock);
1348 ci = g_tree_lookup (cache_tree, file);
1350 if (ci == NULL) /* {{{ */
1351 {
1352 struct stat statbuf;
1353 cache_item_t *tmp;
1355 /* don't hold the lock while we setup; stat(2) might block */
1356 pthread_mutex_unlock(&cache_lock);
1358 memset (&statbuf, 0, sizeof (statbuf));
1359 status = stat (file, &statbuf);
1360 if (status != 0)
1361 {
1362 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1364 status = errno;
1365 if (status == ENOENT)
1366 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1367 else
1368 return send_response(sock, RESP_ERR,
1369 "stat failed with error %i.\n", status);
1370 }
1371 if (!S_ISREG (statbuf.st_mode))
1372 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1374 if (access(file, R_OK|W_OK) != 0)
1375 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1376 file, rrd_strerror(errno));
1378 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1379 if (ci == NULL)
1380 {
1381 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1383 return send_response(sock, RESP_ERR, "malloc failed.\n");
1384 }
1385 memset (ci, 0, sizeof (cache_item_t));
1387 ci->file = strdup (file);
1388 if (ci->file == NULL)
1389 {
1390 free (ci);
1391 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1393 return send_response(sock, RESP_ERR, "strdup failed.\n");
1394 }
1396 wipe_ci_values(ci, now);
1397 ci->flags = CI_FLAGS_IN_TREE;
1398 pthread_cond_init(&ci->flushed, NULL);
1400 pthread_mutex_lock(&cache_lock);
1402 /* another UPDATE might have added this entry in the meantime */
1403 tmp = g_tree_lookup (cache_tree, file);
1404 if (tmp == NULL)
1405 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1406 else
1407 {
1408 free_cache_item (ci);
1409 ci = tmp;
1410 }
1412 /* state may have changed while we were unlocked */
1413 if (state == SHUTDOWN)
1414 return -1;
1415 } /* }}} */
1416 assert (ci != NULL);
1418 /* don't re-write updates in replay mode */
1419 if (!JOURNAL_REPLAY(sock))
1420 journal_write("update", orig_buf);
1422 while (buffer_size > 0)
1423 {
1424 char *value;
1425 time_t stamp;
1426 char *eostamp;
1428 status = buffer_get_field (&buffer, &buffer_size, &value);
1429 if (status != 0)
1430 {
1431 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1432 break;
1433 }
1435 /* make sure update time is always moving forward */
1436 stamp = strtol(value, &eostamp, 10);
1437 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1438 {
1439 pthread_mutex_unlock(&cache_lock);
1440 return send_response(sock, RESP_ERR,
1441 "Cannot find timestamp in '%s'!\n", value);
1442 }
1443 else if (stamp <= ci->last_update_stamp)
1444 {
1445 pthread_mutex_unlock(&cache_lock);
1446 return send_response(sock, RESP_ERR,
1447 "illegal attempt to update using time %ld when last"
1448 " update time is %ld (minimum one second step)\n",
1449 stamp, ci->last_update_stamp);
1450 }
1451 else
1452 ci->last_update_stamp = stamp;
1454 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1455 &ci->values_alloc, config_alloc_chunk))
1456 {
1457 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1458 continue;
1459 }
1461 values_num++;
1462 }
1464 if (((now - ci->last_flush_time) >= config_write_interval)
1465 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1466 && (ci->values_num > 0))
1467 {
1468 enqueue_cache_item (ci, TAIL);
1469 }
1471 pthread_mutex_unlock (&cache_lock);
1473 if (values_num < 1)
1474 return send_response(sock, RESP_ERR, "No values updated.\n");
1475 else
1476 return send_response(sock, RESP_OK,
1477 "errors, enqueued %i value(s).\n", values_num);
1479 /* NOTREACHED */
1480 assert(1==0);
1482 } /* }}} int handle_request_update */
1484 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1485 {
1486 char *file, file_tmp[PATH_MAX];
1487 char *cf;
1489 char *start_str;
1490 char *end_str;
1491 time_t start_tm;
1492 time_t end_tm;
1494 unsigned long step;
1495 unsigned long ds_cnt;
1496 char **ds_namv;
1497 rrd_value_t *data;
1499 int status;
1500 unsigned long i;
1501 time_t t;
1502 rrd_value_t *data_ptr;
1504 file = NULL;
1505 cf = NULL;
1506 start_str = NULL;
1507 end_str = NULL;
1509 /* Read the arguments */
1510 do /* while (0) */
1511 {
1512 status = buffer_get_field (&buffer, &buffer_size, &file);
1513 if (status != 0)
1514 break;
1516 status = buffer_get_field (&buffer, &buffer_size, &cf);
1517 if (status != 0)
1518 break;
1520 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1521 if (status != 0)
1522 {
1523 start_str = NULL;
1524 status = 0;
1525 break;
1526 }
1528 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1529 if (status != 0)
1530 {
1531 end_str = NULL;
1532 status = 0;
1533 break;
1534 }
1535 } while (0);
1537 if (status != 0)
1538 return (syntax_error(sock,cmd));
1540 get_abs_path(&file, file_tmp);
1541 if (!check_file_access(file, sock)) return 0;
1543 status = flush_file (file);
1544 if ((status != 0) && (status != ENOENT))
1545 return (send_response (sock, RESP_ERR,
1546 "flush_file (%s) failed with status %i.\n", file, status));
1548 t = time (NULL); /* "now" */
1550 /* Parse start time */
1551 if (start_str != NULL)
1552 {
1553 char *endptr;
1554 long value;
1556 endptr = NULL;
1557 errno = 0;
1558 value = strtol (start_str, &endptr, /* base = */ 0);
1559 if ((endptr == start_str) || (errno != 0))
1560 return (send_response(sock, RESP_ERR,
1561 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1562 start_str));
1564 if (value > 0)
1565 start_tm = (time_t) value;
1566 else
1567 start_tm = (time_t) (t + value);
1568 }
1569 else
1570 {
1571 start_tm = t - 86400;
1572 }
1574 /* Parse end time */
1575 if (end_str != NULL)
1576 {
1577 char *endptr;
1578 long value;
1580 endptr = NULL;
1581 errno = 0;
1582 value = strtol (end_str, &endptr, /* base = */ 0);
1583 if ((endptr == end_str) || (errno != 0))
1584 return (send_response(sock, RESP_ERR,
1585 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1586 end_str));
1588 if (value > 0)
1589 end_tm = (time_t) value;
1590 else
1591 end_tm = (time_t) (t + value);
1592 }
1593 else
1594 {
1595 end_tm = t;
1596 }
1598 step = -1;
1599 ds_cnt = 0;
1600 ds_namv = NULL;
1601 data = NULL;
1603 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1604 &ds_cnt, &ds_namv, &data);
1605 if (status != 0)
1606 return (send_response(sock, RESP_ERR,
1607 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1609 add_response_info (sock, "FlushVersion: %lu\n", 1);
1610 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1611 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1612 add_response_info (sock, "Step: %lu\n", step);
1613 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1615 #define SSTRCAT(buffer,str,buffer_fill) do { \
1616 size_t str_len = strlen (str); \
1617 if ((buffer_fill + str_len) > sizeof (buffer)) \
1618 str_len = sizeof (buffer) - buffer_fill; \
1619 if (str_len > 0) { \
1620 strncpy (buffer + buffer_fill, str, str_len); \
1621 buffer_fill += str_len; \
1622 assert (buffer_fill <= sizeof (buffer)); \
1623 if (buffer_fill == sizeof (buffer)) \
1624 buffer[buffer_fill - 1] = 0; \
1625 else \
1626 buffer[buffer_fill] = 0; \
1627 } \
1628 } while (0)
1630 { /* Add list of DS names */
1631 char linebuf[1024];
1632 size_t linebuf_fill;
1634 memset (linebuf, 0, sizeof (linebuf));
1635 linebuf_fill = 0;
1636 for (i = 0; i < ds_cnt; i++)
1637 {
1638 if (i > 0)
1639 SSTRCAT (linebuf, " ", linebuf_fill);
1640 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1641 rrd_freemem(ds_namv[i]);
1642 }
1643 rrd_freemem(ds_namv);
1644 add_response_info (sock, "DSName: %s\n", linebuf);
1645 }
1647 /* Add the actual data */
1648 assert (step > 0);
1649 data_ptr = data;
1650 for (t = start_tm + step; t <= end_tm; t += step)
1651 {
1652 char linebuf[1024];
1653 size_t linebuf_fill;
1654 char tmp[128];
1656 memset (linebuf, 0, sizeof (linebuf));
1657 linebuf_fill = 0;
1658 for (i = 0; i < ds_cnt; i++)
1659 {
1660 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1661 tmp[sizeof (tmp) - 1] = 0;
1662 SSTRCAT (linebuf, tmp, linebuf_fill);
1664 data_ptr++;
1665 }
1667 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1668 } /* for (t) */
1669 rrd_freemem(data);
1671 return (send_response (sock, RESP_OK, "Success\n"));
1672 #undef SSTRCAT
1673 } /* }}} int handle_request_fetch */
1675 /* we came across a "WROTE" entry during journal replay.
1676 * throw away any values that we have accumulated for this file
1677 */
1678 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1679 {
1680 cache_item_t *ci;
1681 const char *file = buffer;
1683 pthread_mutex_lock(&cache_lock);
1685 ci = g_tree_lookup(cache_tree, file);
1686 if (ci == NULL)
1687 {
1688 pthread_mutex_unlock(&cache_lock);
1689 return (0);
1690 }
1692 if (ci->values)
1693 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1695 wipe_ci_values(ci, now);
1696 remove_from_queue(ci);
1698 pthread_mutex_unlock(&cache_lock);
1699 return (0);
1700 } /* }}} int handle_request_wrote */
1702 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1703 {
1704 char *file, file_tmp[PATH_MAX];
1705 int status;
1706 rrd_info_t *info;
1708 /* obtain filename */
1709 status = buffer_get_field(&buffer, &buffer_size, &file);
1710 if (status != 0)
1711 return syntax_error(sock,cmd);
1712 /* get full pathname */
1713 get_abs_path(&file, file_tmp);
1714 if (!check_file_access(file, sock)) {
1715 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1716 }
1717 /* get data */
1718 rrd_clear_error ();
1719 info = rrd_info_r(file);
1720 if(!info) {
1721 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1722 }
1723 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1724 switch (data->type) {
1725 case RD_I_VAL:
1726 if (isnan(data->value.u_val))
1727 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1728 else
1729 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1730 break;
1731 case RD_I_CNT:
1732 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1733 break;
1734 case RD_I_INT:
1735 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1736 break;
1737 case RD_I_STR:
1738 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1739 break;
1740 case RD_I_BLO:
1741 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1742 break;
1743 }
1744 }
1746 rrd_info_free(info);
1748 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1749 } /* }}} static int handle_request_info */
1751 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1752 {
1753 char *i, *file, file_tmp[PATH_MAX];
1754 int status;
1755 int idx;
1756 time_t t;
1758 /* obtain filename */
1759 status = buffer_get_field(&buffer, &buffer_size, &file);
1760 if (status != 0)
1761 return syntax_error(sock,cmd);
1762 /* get full pathname */
1763 get_abs_path(&file, file_tmp);
1764 if (!check_file_access(file, sock)) {
1765 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1766 }
1768 status = buffer_get_field(&buffer, &buffer_size, &i);
1769 if (status != 0)
1770 return syntax_error(sock,cmd);
1771 idx = atoi(i);
1772 if(idx<0) {
1773 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1774 }
1776 /* get data */
1777 rrd_clear_error ();
1778 t = rrd_first_r(file,idx);
1779 if(t<1) {
1780 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1781 }
1782 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1783 } /* }}} static int handle_request_first */
1786 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1787 {
1788 char *file, file_tmp[PATH_MAX];
1789 int status;
1790 time_t t, from_file, step;
1791 rrd_file_t * rrd_file;
1792 cache_item_t * ci;
1793 rrd_t rrd;
1795 /* obtain filename */
1796 status = buffer_get_field(&buffer, &buffer_size, &file);
1797 if (status != 0)
1798 return syntax_error(sock,cmd);
1799 /* get full pathname */
1800 get_abs_path(&file, file_tmp);
1801 if (!check_file_access(file, sock)) {
1802 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1803 }
1804 rrd_clear_error();
1805 rrd_init(&rrd);
1806 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1807 if(!rrd_file) {
1808 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1809 }
1810 from_file = rrd.live_head->last_up;
1811 step = rrd.stat_head->pdp_step;
1812 rrd_close(rrd_file);
1813 pthread_mutex_lock(&cache_lock);
1814 ci = g_tree_lookup(cache_tree, file);
1815 if (ci)
1816 t = ci->last_update_stamp;
1817 else
1818 t = from_file;
1819 pthread_mutex_unlock(&cache_lock);
1820 t -= t % step;
1821 rrd_free(&rrd);
1822 if(t<1) {
1823 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1824 }
1825 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1826 } /* }}} static int handle_request_last */
1828 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1829 {
1830 char *file, file_tmp[PATH_MAX];
1831 char *tok;
1832 int ac = 0;
1833 char *av[128];
1834 int status;
1835 unsigned long step = 300;
1836 time_t last_up = time(NULL)-10;
1837 int no_overwrite = opt_no_overwrite;
1840 /* obtain filename */
1841 status = buffer_get_field(&buffer, &buffer_size, &file);
1842 if (status != 0)
1843 return syntax_error(sock,cmd);
1844 /* get full pathname */
1845 get_abs_path(&file, file_tmp);
1846 if (!check_file_access(file, sock)) {
1847 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1848 }
1849 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1851 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1852 if( ! strncmp(tok,"-b",2) ) {
1853 status = buffer_get_field(&buffer, &buffer_size, &tok );
1854 if (status != 0) return syntax_error(sock,cmd);
1855 last_up = (time_t) atol(tok);
1856 continue;
1857 }
1858 if( ! strncmp(tok,"-s",2) ) {
1859 status = buffer_get_field(&buffer, &buffer_size, &tok );
1860 if (status != 0) return syntax_error(sock,cmd);
1861 step = atol(tok);
1862 continue;
1863 }
1864 if( ! strncmp(tok,"-O",2) ) {
1865 no_overwrite = 1;
1866 continue;
1867 }
1868 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1869 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1870 return syntax_error(sock,cmd);
1871 }
1872 if(step<1) {
1873 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1874 }
1875 if (last_up < 3600 * 24 * 365 * 10) {
1876 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1877 }
1879 rrd_clear_error ();
1880 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1882 if(!status) {
1883 return send_response(sock, RESP_OK, "RRD created OK\n");
1884 }
1885 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1886 } /* }}} static int handle_request_create */
1888 /* start "BATCH" processing */
1889 static int batch_start (HANDLER_PROTO) /* {{{ */
1890 {
1891 int status;
1892 if (sock->batch_start)
1893 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1895 status = send_response(sock, RESP_OK,
1896 "Go ahead. End with dot '.' on its own line.\n");
1897 sock->batch_start = time(NULL);
1898 sock->batch_cmd = 0;
1900 return status;
1901 } /* }}} static int batch_start */
1903 /* finish "BATCH" processing and return results to the client */
1904 static int batch_done (HANDLER_PROTO) /* {{{ */
1905 {
1906 assert(sock->batch_start);
1907 sock->batch_start = 0;
1908 sock->batch_cmd = 0;
1909 return send_response(sock, RESP_OK, "errors\n");
1910 } /* }}} static int batch_done */
1912 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1913 {
1914 return -1;
1915 } /* }}} static int handle_request_quit */
1917 static command_t list_of_commands[] = { /* {{{ */
1918 {
1919 "UPDATE",
1920 handle_request_update,
1921 CMD_CONTEXT_ANY,
1922 "UPDATE <filename> <values> [<values> ...]\n"
1923 ,
1924 "Adds the given file to the internal cache if it is not yet known and\n"
1925 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1926 "for details.\n"
1927 "\n"
1928 "Each <values> has the following form:\n"
1929 " <values> = <time>:<value>[:<value>[...]]\n"
1930 "See the rrdupdate(1) manpage for details.\n"
1931 },
1932 {
1933 "WROTE",
1934 handle_request_wrote,
1935 CMD_CONTEXT_JOURNAL,
1936 NULL,
1937 NULL
1938 },
1939 {
1940 "FLUSH",
1941 handle_request_flush,
1942 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1943 "FLUSH <filename>\n"
1944 ,
1945 "Adds the given filename to the head of the update queue and returns\n"
1946 "after it has been dequeued.\n"
1947 },
1948 {
1949 "FLUSHALL",
1950 handle_request_flushall,
1951 CMD_CONTEXT_CLIENT,
1952 "FLUSHALL\n"
1953 ,
1954 "Triggers writing of all pending updates. Returns immediately.\n"
1955 },
1956 {
1957 "PENDING",
1958 handle_request_pending,
1959 CMD_CONTEXT_CLIENT,
1960 "PENDING <filename>\n"
1961 ,
1962 "Shows any 'pending' updates for a file, in order.\n"
1963 "The updates shown have not yet been written to the underlying RRD file.\n"
1964 },
1965 {
1966 "FORGET",
1967 handle_request_forget,
1968 CMD_CONTEXT_ANY,
1969 "FORGET <filename>\n"
1970 ,
1971 "Removes the file completely from the cache.\n"
1972 "Any pending updates for the file will be lost.\n"
1973 },
1974 {
1975 "QUEUE",
1976 handle_request_queue,
1977 CMD_CONTEXT_CLIENT,
1978 "QUEUE\n"
1979 ,
1980 "Shows all files in the output queue.\n"
1981 "The output is zero or more lines in the following format:\n"
1982 "(where <num_vals> is the number of values to be written)\n"
1983 "\n"
1984 "<num_vals> <filename>\n"
1985 },
1986 {
1987 "STATS",
1988 handle_request_stats,
1989 CMD_CONTEXT_CLIENT,
1990 "STATS\n"
1991 ,
1992 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1993 "a description of the values.\n"
1994 },
1995 {
1996 "HELP",
1997 handle_request_help,
1998 CMD_CONTEXT_CLIENT,
1999 "HELP [<command>]\n",
2000 NULL, /* special! */
2001 },
2002 {
2003 "BATCH",
2004 batch_start,
2005 CMD_CONTEXT_CLIENT,
2006 "BATCH\n"
2007 ,
2008 "The 'BATCH' command permits the client to initiate a bulk load\n"
2009 " of commands to rrdcached.\n"
2010 "\n"
2011 "Usage:\n"
2012 "\n"
2013 " client: BATCH\n"
2014 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2015 " client: command #1\n"
2016 " client: command #2\n"
2017 " client: ... and so on\n"
2018 " client: .\n"
2019 " server: 2 errors\n"
2020 " server: 7 message for command #7\n"
2021 " server: 9 message for command #9\n"
2022 "\n"
2023 "For more information, consult the rrdcached(1) documentation.\n"
2024 },
2025 {
2026 ".", /* BATCH terminator */
2027 batch_done,
2028 CMD_CONTEXT_BATCH,
2029 NULL,
2030 NULL
2031 },
2032 {
2033 "FETCH",
2034 handle_request_fetch,
2035 CMD_CONTEXT_CLIENT,
2036 "FETCH <file> <CF> [<start> [<end>]]\n"
2037 ,
2038 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2039 },
2040 {
2041 "INFO",
2042 handle_request_info,
2043 CMD_CONTEXT_CLIENT,
2044 "INFO <filename>\n",
2045 "The INFO command retrieves information about a specified RRD file.\n"
2046 "This is returned in standard rrdinfo format, a sequence of lines\n"
2047 "with the format <keyname> = <value>\n"
2048 "Note that this is the data as of the last update of the RRD file itself,\n"
2049 "not the last time data was received via rrdcached, so there may be pending\n"
2050 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2051 },
2052 {
2053 "FIRST",
2054 handle_request_first,
2055 CMD_CONTEXT_CLIENT,
2056 "FIRST <filename> <rra index>\n",
2057 "The FIRST command retrieves the first data time for a specified RRA in\n"
2058 "an RRD file.\n"
2059 },
2060 {
2061 "LAST",
2062 handle_request_last,
2063 CMD_CONTEXT_CLIENT,
2064 "LAST <filename>\n",
2065 "The LAST command retrieves the last update time for a specified RRD file.\n"
2066 "Note that this is the time of the last update of the RRD file itself, not\n"
2067 "the last time data was received via rrdcached, so there may be pending\n"
2068 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2069 },
2070 {
2071 "CREATE",
2072 handle_request_create,
2073 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2074 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2075 "The CREATE command will create an RRD file, overwriting any existing file\n"
2076 "unless the -O option is given or rrdcached was started with the -O option.\n"
2077 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2078 "not acceptable) and the step is in seconds (default is 300).\n"
2079 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2080 },
2081 {
2082 "QUIT",
2083 handle_request_quit,
2084 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2085 "QUIT\n"
2086 ,
2087 "Disconnect from rrdcached.\n"
2088 }
2089 }; /* }}} command_t list_of_commands[] */
2090 static size_t list_of_commands_len = sizeof (list_of_commands)
2091 / sizeof (list_of_commands[0]);
2093 static command_t *find_command(char *cmd)
2094 {
2095 size_t i;
2097 for (i = 0; i < list_of_commands_len; i++)
2098 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2099 return (&list_of_commands[i]);
2100 return NULL;
2101 }
2103 /* We currently use the index in the `list_of_commands' array as a bit position
2104 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2105 * outside these functions so that switching to a more elegant storage method
2106 * is easily possible. */
2107 static ssize_t find_command_index (const char *cmd) /* {{{ */
2108 {
2109 size_t i;
2111 for (i = 0; i < list_of_commands_len; i++)
2112 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2113 return ((ssize_t) i);
2114 return (-1);
2115 } /* }}} ssize_t find_command_index */
2117 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2118 const char *cmd)
2119 {
2120 ssize_t i;
2122 if (JOURNAL_REPLAY(sock))
2123 return (1);
2125 if (cmd == NULL)
2126 return (-1);
2128 if ((strcasecmp ("QUIT", cmd) == 0)
2129 || (strcasecmp ("HELP", cmd) == 0))
2130 return (1);
2131 else if (strcmp (".", cmd) == 0)
2132 cmd = "BATCH";
2134 i = find_command_index (cmd);
2135 if (i < 0)
2136 return (-1);
2137 assert (i < 32);
2139 if ((sock->permissions & (1 << i)) != 0)
2140 return (1);
2141 return (0);
2142 } /* }}} int socket_permission_check */
2144 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2145 const char *cmd)
2146 {
2147 ssize_t i;
2149 i = find_command_index (cmd);
2150 if (i < 0)
2151 return (-1);
2152 assert (i < 32);
2154 sock->permissions |= (1 << i);
2155 return (0);
2156 } /* }}} int socket_permission_add */
2158 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2159 {
2160 sock->permissions = 0;
2161 } /* }}} socket_permission_clear */
2163 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2164 listen_socket_t *src)
2165 {
2166 dest->permissions = src->permissions;
2167 } /* }}} socket_permission_copy */
2169 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2170 {
2171 size_t i;
2173 sock->permissions = 0;
2174 for (i = 0; i < list_of_commands_len; i++)
2175 sock->permissions |= (1 << i);
2176 } /* }}} void socket_permission_set_all */
2178 /* check whether commands are received in the expected context */
2179 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2180 {
2181 if (JOURNAL_REPLAY(sock))
2182 return (cmd->context & CMD_CONTEXT_JOURNAL);
2183 else if (sock->batch_start)
2184 return (cmd->context & CMD_CONTEXT_BATCH);
2185 else
2186 return (cmd->context & CMD_CONTEXT_CLIENT);
2188 /* NOTREACHED */
2189 assert(1==0);
2190 }
2192 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2193 {
2194 int status;
2195 char *cmd_str;
2196 char *resp_txt;
2197 command_t *help = NULL;
2199 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2200 if (status == 0)
2201 help = find_command(cmd_str);
2203 if (help && (help->syntax || help->help))
2204 {
2205 char tmp[RRD_CMD_MAX];
2207 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2208 resp_txt = tmp;
2210 if (help->syntax)
2211 add_response_info(sock, "Usage: %s\n", help->syntax);
2213 if (help->help)
2214 add_response_info(sock, "%s\n", help->help);
2215 }
2216 else
2217 {
2218 size_t i;
2220 resp_txt = "Command overview\n";
2222 for (i = 0; i < list_of_commands_len; i++)
2223 {
2224 if (list_of_commands[i].syntax == NULL)
2225 continue;
2226 add_response_info (sock, "%s", list_of_commands[i].syntax);
2227 }
2228 }
2230 return send_response(sock, RESP_OK, resp_txt);
2231 } /* }}} int handle_request_help */
2233 static int handle_request (DISPATCH_PROTO) /* {{{ */
2234 {
2235 char *buffer_ptr = buffer;
2236 char *cmd_str = NULL;
2237 command_t *cmd = NULL;
2238 int status;
2240 assert (buffer[buffer_size - 1] == '\0');
2242 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2243 if (status != 0)
2244 {
2245 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2246 return (-1);
2247 }
2249 if (sock != NULL && sock->batch_start)
2250 sock->batch_cmd++;
2252 cmd = find_command(cmd_str);
2253 if (!cmd)
2254 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2256 if (!socket_permission_check (sock, cmd->cmd))
2257 return send_response(sock, RESP_ERR, "Permission denied.\n");
2259 if (!command_check_context(sock, cmd))
2260 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2262 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2263 } /* }}} int handle_request */
2265 static void journal_set_free (journal_set *js) /* {{{ */
2266 {
2267 if (js == NULL)
2268 return;
2270 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2272 free(js);
2273 } /* }}} journal_set_free */
2275 static void journal_set_remove (journal_set *js) /* {{{ */
2276 {
2277 if (js == NULL)
2278 return;
2280 for (uint i=0; i < js->files_num; i++)
2281 {
2282 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2283 unlink(js->files[i]);
2284 }
2285 } /* }}} journal_set_remove */
2287 /* close current journal file handle.
2288 * MUST hold journal_lock before calling */
2289 static void journal_close(void) /* {{{ */
2290 {
2291 if (journal_fh != NULL)
2292 {
2293 if (fclose(journal_fh) != 0)
2294 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2295 }
2297 journal_fh = NULL;
2298 journal_size = 0;
2299 } /* }}} journal_close */
2301 /* MUST hold journal_lock before calling */
2302 static void journal_new_file(void) /* {{{ */
2303 {
2304 struct timeval now;
2305 int new_fd;
2306 char new_file[PATH_MAX + 1];
2308 assert(journal_dir != NULL);
2309 assert(journal_cur != NULL);
2311 journal_close();
2313 gettimeofday(&now, NULL);
2314 /* this format assures that the files sort in strcmp() order */
2315 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2316 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2318 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2319 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2320 if (new_fd < 0)
2321 goto error;
2323 journal_fh = fdopen(new_fd, "a");
2324 if (journal_fh == NULL)
2325 goto error;
2327 journal_size = ftell(journal_fh);
2328 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2330 /* record the file in the journal set */
2331 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2333 return;
2335 error:
2336 RRDD_LOG(LOG_CRIT,
2337 "JOURNALING DISABLED: Error while trying to create %s : %s",
2338 new_file, rrd_strerror(errno));
2339 RRDD_LOG(LOG_CRIT,
2340 "JOURNALING DISABLED: All values will be flushed at shutdown");
2342 close(new_fd);
2343 config_flush_at_shutdown = 1;
2345 } /* }}} journal_new_file */
2347 /* MUST NOT hold journal_lock before calling this */
2348 static void journal_rotate(void) /* {{{ */
2349 {
2350 journal_set *old_js = NULL;
2352 if (journal_dir == NULL)
2353 return;
2355 RRDD_LOG(LOG_DEBUG, "rotating journals");
2357 pthread_mutex_lock(&stats_lock);
2358 ++stats_journal_rotate;
2359 pthread_mutex_unlock(&stats_lock);
2361 pthread_mutex_lock(&journal_lock);
2363 journal_close();
2365 /* rotate the journal sets */
2366 old_js = journal_old;
2367 journal_old = journal_cur;
2368 journal_cur = calloc(1, sizeof(journal_set));
2370 if (journal_cur != NULL)
2371 journal_new_file();
2372 else
2373 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2375 pthread_mutex_unlock(&journal_lock);
2377 journal_set_remove(old_js);
2378 journal_set_free (old_js);
2380 } /* }}} static void journal_rotate */
2382 /* MUST hold journal_lock when calling */
2383 static void journal_done(void) /* {{{ */
2384 {
2385 if (journal_cur == NULL)
2386 return;
2388 journal_close();
2390 if (config_flush_at_shutdown)
2391 {
2392 RRDD_LOG(LOG_INFO, "removing journals");
2393 journal_set_remove(journal_old);
2394 journal_set_remove(journal_cur);
2395 }
2396 else
2397 {
2398 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2399 "journals will be used at next startup");
2400 }
2402 journal_set_free(journal_cur);
2403 journal_set_free(journal_old);
2404 free(journal_dir);
2406 } /* }}} static void journal_done */
2408 static int journal_write(char *cmd, char *args) /* {{{ */
2409 {
2410 int chars;
2412 if (journal_fh == NULL)
2413 return 0;
2415 pthread_mutex_lock(&journal_lock);
2416 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2417 journal_size += chars;
2419 if (journal_size > JOURNAL_MAX)
2420 journal_new_file();
2422 pthread_mutex_unlock(&journal_lock);
2424 if (chars > 0)
2425 {
2426 pthread_mutex_lock(&stats_lock);
2427 stats_journal_bytes += chars;
2428 pthread_mutex_unlock(&stats_lock);
2429 }
2431 return chars;
2432 } /* }}} static int journal_write */
2434 static int journal_replay (const char *file) /* {{{ */
2435 {
2436 FILE *fh;
2437 int entry_cnt = 0;
2438 int fail_cnt = 0;
2439 uint64_t line = 0;
2440 char entry[RRD_CMD_MAX];
2441 time_t now;
2443 if (file == NULL) return 0;
2445 {
2446 char *reason = "unknown error";
2447 int status = 0;
2448 struct stat statbuf;
2450 memset(&statbuf, 0, sizeof(statbuf));
2451 if (stat(file, &statbuf) != 0)
2452 {
2453 reason = "stat error";
2454 status = errno;
2455 }
2456 else if (!S_ISREG(statbuf.st_mode))
2457 {
2458 reason = "not a regular file";
2459 status = EPERM;
2460 }
2461 if (statbuf.st_uid != daemon_uid)
2462 {
2463 reason = "not owned by daemon user";
2464 status = EACCES;
2465 }
2466 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2467 {
2468 reason = "must not be user/group writable";
2469 status = EACCES;
2470 }
2472 if (status != 0)
2473 {
2474 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2475 file, rrd_strerror(status), reason);
2476 return 0;
2477 }
2478 }
2480 fh = fopen(file, "r");
2481 if (fh == NULL)
2482 {
2483 if (errno != ENOENT)
2484 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2485 file, rrd_strerror(errno));
2486 return 0;
2487 }
2488 else
2489 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2491 now = time(NULL);
2493 while(!feof(fh))
2494 {
2495 size_t entry_len;
2497 ++line;
2498 if (fgets(entry, sizeof(entry), fh) == NULL)
2499 break;
2500 entry_len = strlen(entry);
2502 /* check \n termination in case journal writing crashed mid-line */
2503 if (entry_len == 0)
2504 continue;
2505 else if (entry[entry_len - 1] != '\n')
2506 {
2507 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2508 ++fail_cnt;
2509 continue;
2510 }
2512 entry[entry_len - 1] = '\0';
2514 if (handle_request(NULL, now, entry, entry_len) == 0)
2515 ++entry_cnt;
2516 else
2517 ++fail_cnt;
2518 }
2520 fclose(fh);
2522 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2523 entry_cnt, fail_cnt);
2525 return entry_cnt > 0 ? 1 : 0;
2526 } /* }}} static int journal_replay */
2528 static int journal_sort(const void *v1, const void *v2)
2529 {
2530 char **jn1 = (char **) v1;
2531 char **jn2 = (char **) v2;
2533 return strcmp(*jn1,*jn2);
2534 }
2536 static void journal_init(void) /* {{{ */
2537 {
2538 int had_journal = 0;
2539 DIR *dir;
2540 struct dirent *dent;
2541 char path[PATH_MAX+1];
2543 if (journal_dir == NULL) return;
2545 pthread_mutex_lock(&journal_lock);
2547 journal_cur = calloc(1, sizeof(journal_set));
2548 if (journal_cur == NULL)
2549 {
2550 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2551 return;
2552 }
2554 RRDD_LOG(LOG_INFO, "checking for journal files");
2556 /* Handle old journal files during transition. This gives them the
2557 * correct sort order. TODO: remove after first release
2558 */
2559 {
2560 char old_path[PATH_MAX+1];
2561 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2562 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2563 rename(old_path, path);
2565 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2566 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2567 rename(old_path, path);
2568 }
2570 dir = opendir(journal_dir);
2571 if (!dir) {
2572 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2573 return;
2574 }
2575 while ((dent = readdir(dir)) != NULL)
2576 {
2577 /* looks like a journal file? */
2578 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2579 continue;
2581 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2583 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2584 {
2585 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2586 dent->d_name);
2587 break;
2588 }
2589 }
2590 closedir(dir);
2592 qsort(journal_cur->files, journal_cur->files_num,
2593 sizeof(journal_cur->files[0]), journal_sort);
2595 for (uint i=0; i < journal_cur->files_num; i++)
2596 had_journal += journal_replay(journal_cur->files[i]);
2598 journal_new_file();
2600 /* it must have been a crash. start a flush */
2601 if (had_journal && config_flush_at_shutdown)
2602 flush_old_values(-1);
2604 pthread_mutex_unlock(&journal_lock);
2606 RRDD_LOG(LOG_INFO, "journal processing complete");
2608 } /* }}} static void journal_init */
2610 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2611 {
2612 assert(sock != NULL);
2614 free(sock->rbuf); sock->rbuf = NULL;
2615 free(sock->wbuf); sock->wbuf = NULL;
2616 free(sock);
2617 } /* }}} void free_listen_socket */
2619 static void close_connection(listen_socket_t *sock) /* {{{ */
2620 {
2621 if (sock->fd >= 0)
2622 {
2623 close(sock->fd);
2624 sock->fd = -1;
2625 }
2627 free_listen_socket(sock);
2629 } /* }}} void close_connection */
2631 static void *connection_thread_main (void *args) /* {{{ */
2632 {
2633 listen_socket_t *sock;
2634 int fd;
2636 sock = (listen_socket_t *) args;
2637 fd = sock->fd;
2639 /* init read buffers */
2640 sock->next_read = sock->next_cmd = 0;
2641 sock->rbuf = malloc(RBUF_SIZE);
2642 if (sock->rbuf == NULL)
2643 {
2644 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2645 close_connection(sock);
2646 return NULL;
2647 }
2649 pthread_mutex_lock (&connection_threads_lock);
2650 #ifdef HAVE_LIBWRAP
2651 /* LIBWRAP does not support multiple threads! By putting this code
2652 inside pthread_mutex_lock we do not have to worry about request_info
2653 getting overwritten by another thread.
2654 */
2655 struct request_info req;
2656 request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2657 fromhost(&req);
2658 if(!hosts_access(&req)) {
2659 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2660 pthread_mutex_unlock (&connection_threads_lock);
2661 close_connection(sock);
2662 return NULL;
2663 }
2664 #endif /* HAVE_LIBWRAP */
2665 connection_threads_num++;
2666 pthread_mutex_unlock (&connection_threads_lock);
2668 while (state == RUNNING)
2669 {
2670 char *cmd;
2671 ssize_t cmd_len;
2672 ssize_t rbytes;
2673 time_t now;
2675 struct pollfd pollfd;
2676 int status;
2678 pollfd.fd = fd;
2679 pollfd.events = POLLIN | POLLPRI;
2680 pollfd.revents = 0;
2682 status = poll (&pollfd, 1, /* timeout = */ 500);
2683 if (state != RUNNING)
2684 break;
2685 else if (status == 0) /* timeout */
2686 continue;
2687 else if (status < 0) /* error */
2688 {
2689 status = errno;
2690 if (status != EINTR)
2691 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2692 continue;
2693 }
2695 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2696 break;
2697 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2698 {
2699 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2700 "poll(2) returned something unexpected: %#04hx",
2701 pollfd.revents);
2702 break;
2703 }
2705 rbytes = read(fd, sock->rbuf + sock->next_read,
2706 RBUF_SIZE - sock->next_read);
2707 if (rbytes < 0)
2708 {
2709 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2710 break;
2711 }
2712 else if (rbytes == 0)
2713 break; /* eof */
2715 sock->next_read += rbytes;
2717 if (sock->batch_start)
2718 now = sock->batch_start;
2719 else
2720 now = time(NULL);
2722 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2723 {
2724 status = handle_request (sock, now, cmd, cmd_len+1);
2725 if (status != 0)
2726 goto out_close;
2727 }
2728 }
2730 out_close:
2731 close_connection(sock);
2733 /* Remove this thread from the connection threads list */
2734 pthread_mutex_lock (&connection_threads_lock);
2735 connection_threads_num--;
2736 if (connection_threads_num <= 0)
2737 pthread_cond_broadcast(&connection_threads_done);
2738 pthread_mutex_unlock (&connection_threads_lock);
2740 return (NULL);
2741 } /* }}} void *connection_thread_main */
2743 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2744 {
2745 int fd;
2746 struct sockaddr_un sa;
2747 listen_socket_t *temp;
2748 int status;
2749 const char *path;
2750 char *path_copy, *dir;
2752 path = sock->addr;
2753 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2754 path += strlen("unix:");
2756 /* dirname may modify its argument */
2757 path_copy = strdup(path);
2758 if (path_copy == NULL)
2759 {
2760 fprintf(stderr, "rrdcached: strdup(): %s\n",
2761 rrd_strerror(errno));
2762 return (-1);
2763 }
2765 dir = dirname(path_copy);
2766 if (rrd_mkdir_p(dir, 0777) != 0)
2767 {
2768 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2769 dir, rrd_strerror(errno));
2770 return (-1);
2771 }
2773 free(path_copy);
2775 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2776 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2777 if (temp == NULL)
2778 {
2779 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2780 return (-1);
2781 }
2782 listen_fds = temp;
2783 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2785 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2786 if (fd < 0)
2787 {
2788 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2789 rrd_strerror(errno));
2790 return (-1);
2791 }
2793 memset (&sa, 0, sizeof (sa));
2794 sa.sun_family = AF_UNIX;
2795 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2797 /* if we've gotten this far, we own the pid file. any daemon started
2798 * with the same args must not be alive. therefore, ensure that we can
2799 * create the socket...
2800 */
2801 unlink(path);
2803 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2804 if (status != 0)
2805 {
2806 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2807 path, rrd_strerror(errno));
2808 close (fd);
2809 return (-1);
2810 }
2812 /* tweak the sockets group ownership */
2813 if (sock->socket_group != (gid_t)-1)
2814 {
2815 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2816 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2817 {
2818 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2819 }
2820 }
2822 if (sock->socket_permissions != (mode_t)-1)
2823 {
2824 if (chmod(path, sock->socket_permissions) != 0)
2825 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2826 (unsigned int)sock->socket_permissions, strerror(errno));
2827 }
2829 status = listen (fd, /* backlog = */ 10);
2830 if (status != 0)
2831 {
2832 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2833 path, rrd_strerror(errno));
2834 close (fd);
2835 unlink (path);
2836 return (-1);
2837 }
2839 listen_fds[listen_fds_num].fd = fd;
2840 listen_fds[listen_fds_num].family = PF_UNIX;
2841 strncpy(listen_fds[listen_fds_num].addr, path,
2842 sizeof (listen_fds[listen_fds_num].addr) - 1);
2843 listen_fds_num++;
2845 return (0);
2846 } /* }}} int open_listen_socket_unix */
2848 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2849 {
2850 struct addrinfo ai_hints;
2851 struct addrinfo *ai_res;
2852 struct addrinfo *ai_ptr;
2853 char addr_copy[NI_MAXHOST];
2854 char *addr;
2855 char *port;
2856 int status;
2858 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2859 addr_copy[sizeof (addr_copy) - 1] = 0;
2860 addr = addr_copy;
2862 memset (&ai_hints, 0, sizeof (ai_hints));
2863 ai_hints.ai_flags = 0;
2864 #ifdef AI_ADDRCONFIG
2865 ai_hints.ai_flags |= AI_ADDRCONFIG;
2866 #endif
2867 ai_hints.ai_family = AF_UNSPEC;
2868 ai_hints.ai_socktype = SOCK_STREAM;
2870 port = NULL;
2871 if (*addr == '[') /* IPv6+port format */
2872 {
2873 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2874 addr++;
2876 port = strchr (addr, ']');
2877 if (port == NULL)
2878 {
2879 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2880 return (-1);
2881 }
2882 *port = 0;
2883 port++;
2885 if (*port == ':')
2886 port++;
2887 else if (*port == 0)
2888 port = NULL;
2889 else
2890 {
2891 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2892 return (-1);
2893 }
2894 } /* if (*addr == '[') */
2895 else
2896 {
2897 port = rindex(addr, ':');
2898 if (port != NULL)
2899 {
2900 *port = 0;
2901 port++;
2902 }
2903 }
2904 ai_res = NULL;
2905 status = getaddrinfo (addr,
2906 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2907 &ai_hints, &ai_res);
2908 if (status != 0)
2909 {
2910 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2911 addr, gai_strerror (status));
2912 return (-1);
2913 }
2915 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2916 {
2917 int fd;
2918 listen_socket_t *temp;
2919 int one = 1;
2921 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2922 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2923 if (temp == NULL)
2924 {
2925 fprintf (stderr,
2926 "rrdcached: open_listen_socket_network: realloc failed.\n");
2927 continue;
2928 }
2929 listen_fds = temp;
2930 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2932 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2933 if (fd < 0)
2934 {
2935 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2936 rrd_strerror(errno));
2937 continue;
2938 }
2940 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2942 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2943 if (status != 0)
2944 {
2945 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2946 sock->addr, rrd_strerror(errno));
2947 close (fd);
2948 continue;
2949 }
2951 status = listen (fd, /* backlog = */ 10);
2952 if (status != 0)
2953 {
2954 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2955 sock->addr, rrd_strerror(errno));
2956 close (fd);
2957 freeaddrinfo(ai_res);
2958 return (-1);
2959 }
2961 listen_fds[listen_fds_num].fd = fd;
2962 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2963 listen_fds_num++;
2964 } /* for (ai_ptr) */
2966 freeaddrinfo(ai_res);
2967 return (0);
2968 } /* }}} static int open_listen_socket_network */
2970 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2971 {
2972 assert(sock != NULL);
2973 assert(sock->addr != NULL);
2975 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2976 || sock->addr[0] == '/')
2977 return (open_listen_socket_unix(sock));
2978 else
2979 return (open_listen_socket_network(sock));
2980 } /* }}} int open_listen_socket */
2982 static int close_listen_sockets (void) /* {{{ */
2983 {
2984 size_t i;
2986 for (i = 0; i < listen_fds_num; i++)
2987 {
2988 close (listen_fds[i].fd);
2990 if (listen_fds[i].family == PF_UNIX)
2991 unlink(listen_fds[i].addr);
2992 }
2994 free (listen_fds);
2995 listen_fds = NULL;
2996 listen_fds_num = 0;
2998 return (0);
2999 } /* }}} int close_listen_sockets */
3001 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3002 {
3003 struct pollfd *pollfds;
3004 int pollfds_num;
3005 int status;
3006 int i;
3008 if (listen_fds_num < 1)
3009 {
3010 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3011 return (NULL);
3012 }
3014 pollfds_num = listen_fds_num;
3015 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3016 if (pollfds == NULL)
3017 {
3018 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3019 return (NULL);
3020 }
3021 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3023 RRDD_LOG(LOG_INFO, "listening for connections");
3025 while (state == RUNNING)
3026 {
3027 for (i = 0; i < pollfds_num; i++)
3028 {
3029 pollfds[i].fd = listen_fds[i].fd;
3030 pollfds[i].events = POLLIN | POLLPRI;
3031 pollfds[i].revents = 0;
3032 }
3034 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3035 if (state != RUNNING)
3036 break;
3037 else if (status == 0) /* timeout */
3038 continue;
3039 else if (status < 0) /* error */
3040 {
3041 status = errno;
3042 if (status != EINTR)
3043 {
3044 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3045 }
3046 continue;
3047 }
3049 for (i = 0; i < pollfds_num; i++)
3050 {
3051 listen_socket_t *client_sock;
3052 struct sockaddr_storage client_sa;
3053 socklen_t client_sa_size;
3054 pthread_t tid;
3055 pthread_attr_t attr;
3057 if (pollfds[i].revents == 0)
3058 continue;
3060 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3061 {
3062 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3063 "poll(2) returned something unexpected for listen FD #%i.",
3064 pollfds[i].fd);
3065 continue;
3066 }
3068 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3069 if (client_sock == NULL)
3070 {
3071 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3072 continue;
3073 }
3074 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3076 client_sa_size = sizeof (client_sa);
3077 client_sock->fd = accept (pollfds[i].fd,
3078 (struct sockaddr *) &client_sa, &client_sa_size);
3079 if (client_sock->fd < 0)
3080 {
3081 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3082 free(client_sock);
3083 continue;
3084 }
3086 pthread_attr_init (&attr);
3087 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3089 status = pthread_create (&tid, &attr, connection_thread_main,
3090 client_sock);
3091 if (status != 0)
3092 {
3093 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3094 close_connection(client_sock);
3095 continue;
3096 }
3097 } /* for (pollfds_num) */
3098 } /* while (state == RUNNING) */
3100 RRDD_LOG(LOG_INFO, "starting shutdown");
3102 close_listen_sockets ();
3104 pthread_mutex_lock (&connection_threads_lock);
3105 while (connection_threads_num > 0)
3106 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3107 pthread_mutex_unlock (&connection_threads_lock);
3109 free(pollfds);
3111 return (NULL);
3112 } /* }}} void *listen_thread_main */
3114 static int daemonize (void) /* {{{ */
3115 {
3116 int pid_fd;
3117 char *base_dir;
3119 daemon_uid = geteuid();
3121 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3122 if (pid_fd < 0)
3123 pid_fd = check_pidfile();
3124 if (pid_fd < 0)
3125 return pid_fd;
3127 /* open all the listen sockets */
3128 if (config_listen_address_list_len > 0)
3129 {
3130 for (size_t i = 0; i < config_listen_address_list_len; i++)
3131 open_listen_socket (config_listen_address_list[i]);
3133 rrd_free_ptrs((void ***) &config_listen_address_list,
3134 &config_listen_address_list_len);
3135 }
3136 else
3137 {
3138 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3139 sizeof(default_socket.addr) - 1);
3140 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3142 if (default_socket.permissions == 0)
3143 socket_permission_set_all (&default_socket);
3145 open_listen_socket (&default_socket);
3146 }
3148 if (listen_fds_num < 1)
3149 {
3150 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3151 goto error;
3152 }
3154 if (!stay_foreground)
3155 {
3156 pid_t child;
3158 child = fork ();
3159 if (child < 0)
3160 {
3161 fprintf (stderr, "daemonize: fork(2) failed.\n");
3162 goto error;
3163 }
3164 else if (child > 0)
3165 exit(0);
3167 /* Become session leader */
3168 setsid ();
3170 /* Open the first three file descriptors to /dev/null */
3171 close (2);
3172 close (1);
3173 close (0);
3175 open ("/dev/null", O_RDWR);
3176 if (dup(0) == -1 || dup(0) == -1){
3177 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3178 }
3179 } /* if (!stay_foreground) */
3181 /* Change into the /tmp directory. */
3182 base_dir = (config_base_dir != NULL)
3183 ? config_base_dir
3184 : "/tmp";
3186 if (chdir (base_dir) != 0)
3187 {
3188 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3189 goto error;
3190 }
3192 install_signal_handlers();
3194 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3195 RRDD_LOG(LOG_INFO, "starting up");
3197 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3198 (GDestroyNotify) free_cache_item);
3199 if (cache_tree == NULL)
3200 {
3201 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3202 goto error;
3203 }
3205 return write_pidfile (pid_fd);
3207 error:
3208 remove_pidfile();
3209 return -1;
3210 } /* }}} int daemonize */
3212 static int cleanup (void) /* {{{ */
3213 {
3214 pthread_cond_broadcast (&flush_cond);
3215 pthread_join (flush_thread, NULL);
3217 pthread_cond_broadcast (&queue_cond);
3218 for (int i = 0; i < config_queue_threads; i++)
3219 pthread_join (queue_threads[i], NULL);
3221 if (config_flush_at_shutdown)
3222 {
3223 assert(cache_queue_head == NULL);
3224 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3225 }
3227 free(queue_threads);
3228 free(config_base_dir);
3230 pthread_mutex_lock(&cache_lock);
3231 g_tree_destroy(cache_tree);
3233 pthread_mutex_lock(&journal_lock);
3234 journal_done();
3236 RRDD_LOG(LOG_INFO, "goodbye");
3237 closelog ();
3239 remove_pidfile ();
3240 free(config_pid_file);
3242 return (0);
3243 } /* }}} int cleanup */
3245 static int read_options (int argc, char **argv) /* {{{ */
3246 {
3247 int option;
3248 int status = 0;
3250 socket_permission_clear (&default_socket);
3252 default_socket.socket_group = (gid_t)-1;
3253 default_socket.socket_permissions = (mode_t)-1;
3255 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3256 {
3257 switch (option)
3258 {
3259 case 'O':
3260 opt_no_overwrite = 1;
3261 break;
3263 case 'g':
3264 stay_foreground=1;
3265 break;
3267 case 'l':
3268 {
3269 listen_socket_t *new;
3271 new = malloc(sizeof(listen_socket_t));
3272 if (new == NULL)
3273 {
3274 fprintf(stderr, "read_options: malloc failed.\n");
3275 return(2);
3276 }
3277 memset(new, 0, sizeof(listen_socket_t));
3279 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3281 /* Add permissions to the socket {{{ */
3282 if (default_socket.permissions != 0)
3283 {
3284 socket_permission_copy (new, &default_socket);
3285 }
3286 else /* if (default_socket.permissions == 0) */
3287 {
3288 /* Add permission for ALL commands to the socket. */
3289 socket_permission_set_all (new);
3290 }
3291 /* }}} Done adding permissions. */
3293 new->socket_group = default_socket.socket_group;
3294 new->socket_permissions = default_socket.socket_permissions;
3296 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3297 &config_listen_address_list_len, new))
3298 {
3299 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3300 return (2);
3301 }
3302 }
3303 break;
3305 /* set socket group permissions */
3306 case 's':
3307 {
3308 gid_t group_gid;
3309 struct group *grp;
3311 group_gid = strtoul(optarg, NULL, 10);
3312 if (errno != EINVAL && group_gid>0)
3313 {
3314 /* we were passed a number */
3315 grp = getgrgid(group_gid);
3316 }
3317 else
3318 {
3319 grp = getgrnam(optarg);
3320 }
3322 if (grp)
3323 {
3324 default_socket.socket_group = grp->gr_gid;
3325 }
3326 else
3327 {
3328 /* no idea what the user wanted... */
3329 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3330 return (5);
3331 }
3332 }
3333 break;
3335 /* set socket file permissions */
3336 case 'm':
3337 {
3338 long tmp;
3339 char *endptr = NULL;
3341 tmp = strtol (optarg, &endptr, 8);
3342 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3343 || (tmp > 07777) || (tmp < 0)) {
3344 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3345 optarg);
3346 return (5);
3347 }
3349 default_socket.socket_permissions = (mode_t)tmp;
3350 }
3351 break;
3353 case 'P':
3354 {
3355 char *optcopy;
3356 char *saveptr;
3357 char *dummy;
3358 char *ptr;
3360 socket_permission_clear (&default_socket);
3362 optcopy = strdup (optarg);
3363 dummy = optcopy;
3364 saveptr = NULL;
3365 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3366 {
3367 dummy = NULL;
3368 status = socket_permission_add (&default_socket, ptr);
3369 if (status != 0)
3370 {
3371 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3372 "socket failed. Most likely, this permission doesn't "
3373 "exist. Check your command line.\n", ptr);
3374 status = 4;
3375 }
3376 }
3378 free (optcopy);
3379 }
3380 break;
3382 case 'f':
3383 {
3384 int temp;
3386 temp = atoi (optarg);
3387 if (temp > 0)
3388 config_flush_interval = temp;
3389 else
3390 {
3391 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3392 status = 3;
3393 }
3394 }
3395 break;
3397 case 'w':
3398 {
3399 int temp;
3401 temp = atoi (optarg);
3402 if (temp > 0)
3403 config_write_interval = temp;
3404 else
3405 {
3406 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3407 status = 2;
3408 }
3409 }
3410 break;
3412 case 'z':
3413 {
3414 int temp;
3416 temp = atoi(optarg);
3417 if (temp > 0)
3418 config_write_jitter = temp;
3419 else
3420 {
3421 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3422 status = 2;
3423 }
3425 break;
3426 }
3428 case 't':
3429 {
3430 int threads;
3431 threads = atoi(optarg);
3432 if (threads >= 1)
3433 config_queue_threads = threads;
3434 else
3435 {
3436 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3437 return 1;
3438 }
3439 }
3440 break;
3442 case 'B':
3443 config_write_base_only = 1;
3444 break;
3446 case 'b':
3447 {
3448 size_t len;
3449 char base_realpath[PATH_MAX];
3451 if (config_base_dir != NULL)
3452 free (config_base_dir);
3453 config_base_dir = strdup (optarg);
3454 if (config_base_dir == NULL)
3455 {
3456 fprintf (stderr, "read_options: strdup failed.\n");
3457 return (3);
3458 }
3460 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3461 {
3462 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3463 config_base_dir, rrd_strerror (errno));
3464 return (3);
3465 }
3467 /* make sure that the base directory is not resolved via
3468 * symbolic links. this makes some performance-enhancing
3469 * assumptions possible (we don't have to resolve paths
3470 * that start with a "/")
3471 */
3472 if (realpath(config_base_dir, base_realpath) == NULL)
3473 {
3474 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3475 "%s\n", config_base_dir, rrd_strerror(errno));
3476 return 5;
3477 }
3479 len = strlen (config_base_dir);
3480 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3481 {
3482 config_base_dir[len - 1] = 0;
3483 len--;
3484 }
3486 if (len < 1)
3487 {
3488 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3489 return (4);
3490 }
3492 _config_base_dir_len = len;
3494 len = strlen (base_realpath);
3495 while ((len > 0) && (base_realpath[len - 1] == '/'))
3496 {
3497 base_realpath[len - 1] = '\0';
3498 len--;
3499 }
3501 if (strncmp(config_base_dir,
3502 base_realpath, sizeof(base_realpath)) != 0)
3503 {
3504 fprintf(stderr,
3505 "Base directory (-b) resolved via file system links!\n"
3506 "Please consult rrdcached '-b' documentation!\n"
3507 "Consider specifying the real directory (%s)\n",
3508 base_realpath);
3509 return 5;
3510 }
3511 }
3512 break;
3514 case 'p':
3515 {
3516 if (config_pid_file != NULL)
3517 free (config_pid_file);
3518 config_pid_file = strdup (optarg);
3519 if (config_pid_file == NULL)
3520 {
3521 fprintf (stderr, "read_options: strdup failed.\n");
3522 return (3);
3523 }
3524 }
3525 break;
3527 case 'F':
3528 config_flush_at_shutdown = 1;
3529 break;
3531 case 'j':
3532 {
3533 char journal_dir_actual[PATH_MAX];
3534 journal_dir = realpath((const char *)optarg, journal_dir_actual);
3535 if (journal_dir)
3536 {
3537 // if we were able to properly resolve the path, lets have a copy
3538 // for use outside this block.
3539 journal_dir = strdup(journal_dir);
3540 status = rrd_mkdir_p(journal_dir, 0777);
3541 if (status != 0)
3542 {
3543 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3544 journal_dir, rrd_strerror(errno));
3545 return 6;
3546 }
3547 if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3548 {
3549 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3550 errno ? rrd_strerror(errno) : "");
3551 return 6;
3552 }
3553 } else {
3554 fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3555 errno ? rrd_strerror(errno) : "");
3556 return 6;
3557 }
3558 }
3559 break;
3561 case 'a':
3562 {
3563 int temp = atoi(optarg);
3564 if (temp > 0)
3565 config_alloc_chunk = temp;
3566 else
3567 {
3568 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3569 return 10;
3570 }
3571 }
3572 break;
3574 case 'h':
3575 case '?':
3576 printf ("RRDCacheD %s\n"
3577 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3578 "\n"
3579 "Usage: rrdcached [options]\n"
3580 "\n"
3581 "Valid options are:\n"
3582 " -l <address> Socket address to listen to.\n"
3583 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3584 " -P <perms> Sets the permissions to assign to all following "
3585 "sockets\n"
3586 " -w <seconds> Interval in which to write data.\n"
3587 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3588 " -t <threads> Number of write threads.\n"
3589 " -f <seconds> Interval in which to flush dead data.\n"
3590 " -p <file> Location of the PID-file.\n"
3591 " -b <dir> Base directory to change to.\n"
3592 " -B Restrict file access to paths within -b <dir>\n"
3593 " -g Do not fork and run in the foreground.\n"
3594 " -j <dir> Directory in which to create the journal files.\n"
3595 " -F Always flush all updates at shutdown\n"
3596 " -s <id|name> Group owner of all following UNIX sockets\n"
3597 " (the socket will also have read/write permissions "
3598 "for that group)\n"
3599 " -m <mode> File permissions (octal) of all following UNIX "
3600 "sockets\n"
3601 " -a <size> Memory allocation chunk size. Default is 1.\n"
3602 " -O Do not allow CREATE commands to overwrite existing\n"
3603 " files, even if asked to.\n"
3604 "\n"
3605 "For more information and a detailed description of all options "
3606 "please refer\n"
3607 "to the rrdcached(1) manual page.\n",
3608 VERSION);
3609 if (option == 'h')
3610 status = -1;
3611 else
3612 status = 1;
3613 break;
3614 } /* switch (option) */
3615 } /* while (getopt) */
3617 /* advise the user when values are not sane */
3618 if (config_flush_interval < 2 * config_write_interval)
3619 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3620 " 2x write interval (-w) !\n");
3621 if (config_write_jitter > config_write_interval)
3622 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3623 " write interval (-w) !\n");
3625 if (config_write_base_only && config_base_dir == NULL)
3626 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3627 " Consult the rrdcached documentation\n");
3629 if (journal_dir == NULL)
3630 config_flush_at_shutdown = 1;
3632 return (status);
3633 } /* }}} int read_options */
3635 int main (int argc, char **argv)
3636 {
3637 int status;
3639 status = read_options (argc, argv);
3640 if (status != 0)
3641 {
3642 if (status < 0)
3643 status = 0;
3644 return (status);
3645 }
3647 status = daemonize ();
3648 if (status != 0)
3649 {
3650 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3651 return (1);
3652 }
3654 journal_init();
3656 /* start the queue threads */
3657 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3658 if (queue_threads == NULL)
3659 {
3660 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3661 cleanup();
3662 return (1);
3663 }
3664 for (int i = 0; i < config_queue_threads; i++)
3665 {
3666 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3667 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3668 if (status != 0)
3669 {
3670 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3671 cleanup();
3672 return (1);
3673 }
3674 }
3676 /* start the flush thread */
3677 memset(&flush_thread, 0, sizeof(flush_thread));
3678 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3679 if (status != 0)
3680 {
3681 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3682 cleanup();
3683 return (1);
3684 }
3686 listen_thread_main (NULL);
3687 cleanup ();
3689 return (0);
3690 } /* int main */
3692 /*
3693 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3694 */