340ea1539db6271709497a263d0d9253cc7c50e5
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
67 #include "rrd_tool.h"
68 #include "rrd_client.h"
69 #include "unused.h"
71 #include <stdlib.h>
73 #ifndef WIN32
74 #ifdef HAVE_STDINT_H
75 # include <stdint.h>
76 #endif
77 #include <unistd.h>
78 #include <strings.h>
79 #include <inttypes.h>
80 #include <sys/socket.h>
82 #else
84 #endif
85 #include <stdio.h>
86 #include <string.h>
88 #include <sys/types.h>
89 #include <sys/stat.h>
90 #include <dirent.h>
91 #include <fcntl.h>
92 #include <signal.h>
93 #include <sys/un.h>
94 #include <netdb.h>
95 #include <poll.h>
96 #include <syslog.h>
97 #include <pthread.h>
98 #include <errno.h>
99 #include <assert.h>
100 #include <sys/time.h>
101 #include <time.h>
102 #include <libgen.h>
103 #include <grp.h>
105 #ifdef HAVE_LIBWRAP
106 #include <tcpd.h>
107 #endif /* HAVE_LIBWRAP */
109 #include <glib-2.0/glib.h>
110 /* }}} */
112 #define RRDD_LOG(severity, ...) \
113 do { \
114 if (stay_foreground) { \
115 fprintf(stderr, __VA_ARGS__); \
116 fprintf(stderr, "\n"); } \
117 syslog ((severity), __VA_ARGS__); \
118 } while (0)
120 /*
121 * Types
122 */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125 struct listen_socket_s
126 {
127 int fd;
128 char addr[PATH_MAX + 1];
129 int family;
131 /* state for BATCH processing */
132 time_t batch_start;
133 int batch_cmd;
135 /* buffered IO */
136 char *rbuf;
137 off_t next_cmd;
138 off_t next_read;
140 char *wbuf;
141 ssize_t wbuf_len;
143 uint32_t permissions;
145 gid_t socket_group;
146 mode_t socket_permissions;
147 };
148 typedef struct listen_socket_s listen_socket_t;
150 struct command_s;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
154 time_t UNUSED(now),\
155 char UNUSED(*buffer),\
156 size_t UNUSED(buffer_size)
158 #define HANDLER_PROTO command_t UNUSED(*cmd),\
159 DISPATCH_PROTO
161 struct command_s {
162 char *cmd;
163 int (*handler)(HANDLER_PROTO);
165 char context; /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT (1<<0)
167 #define CMD_CONTEXT_BATCH (1<<1)
168 #define CMD_CONTEXT_JOURNAL (1<<2)
169 #define CMD_CONTEXT_ANY (0x7f)
171 char *syntax;
172 char *help;
173 };
175 struct cache_item_s;
176 typedef struct cache_item_s cache_item_t;
177 struct cache_item_s
178 {
179 char *file;
180 char **values;
181 size_t values_num; /* number of valid pointers */
182 size_t values_alloc; /* number of allocated pointers */
183 time_t last_flush_time;
184 double last_update_stamp;
185 #define CI_FLAGS_IN_TREE (1<<0)
186 #define CI_FLAGS_IN_QUEUE (1<<1)
187 int flags;
188 pthread_cond_t flushed;
189 cache_item_t *prev;
190 cache_item_t *next;
191 };
193 struct callback_flush_data_s
194 {
195 time_t now;
196 time_t abs_timeout;
197 char **keys;
198 size_t keys_num;
199 };
200 typedef struct callback_flush_data_s callback_flush_data_t;
202 enum queue_side_e
203 {
204 HEAD,
205 TAIL
206 };
207 typedef enum queue_side_e queue_side_t;
209 /* describe a set of journal files */
210 typedef struct {
211 char **files;
212 size_t files_num;
213 } journal_set;
215 #define RBUF_SIZE (RRD_CMD_MAX*2)
217 /*
218 * Variables
219 */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
226 static listen_socket_t default_socket;
228 enum {
229 RUNNING, /* normal operation */
230 FLUSHING, /* flushing remaining values */
231 SHUTDOWN /* shutting down */
232 } state = RUNNING;
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
245 /* Cache stuff */
246 static GTree *cache_tree = NULL;
247 static cache_item_t *cache_queue_head = NULL;
248 static cache_item_t *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
251 static int config_write_interval = 300;
252 static int config_write_jitter = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
259 static size_t config_alloc_chunk = 1;
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
273 static int opt_no_overwrite = 0; /* default for the daemon */
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL; /* current journal file handle */
282 static long journal_size = 0; /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
292 /*
293 * Functions
294 */
295 static void sig_common (const char *sig) /* {{{ */
296 {
297 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298 state = FLUSHING;
299 pthread_cond_broadcast(&flush_cond);
300 pthread_cond_broadcast(&queue_cond);
301 } /* }}} void sig_common */
303 static void sig_int_handler (int UNUSED(s)) /* {{{ */
304 {
305 sig_common("INT");
306 } /* }}} void sig_int_handler */
308 static void sig_term_handler (int UNUSED(s)) /* {{{ */
309 {
310 sig_common("TERM");
311 } /* }}} void sig_term_handler */
313 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
314 {
315 config_flush_at_shutdown = 1;
316 sig_common("USR1");
317 } /* }}} void sig_usr1_handler */
319 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
320 {
321 config_flush_at_shutdown = 0;
322 sig_common("USR2");
323 } /* }}} void sig_usr2_handler */
325 static void install_signal_handlers(void) /* {{{ */
326 {
327 /* These structures are static, because `sigaction' behaves weird if the are
328 * overwritten.. */
329 static struct sigaction sa_int;
330 static struct sigaction sa_term;
331 static struct sigaction sa_pipe;
332 static struct sigaction sa_usr1;
333 static struct sigaction sa_usr2;
335 /* Install signal handlers */
336 memset (&sa_int, 0, sizeof (sa_int));
337 sa_int.sa_handler = sig_int_handler;
338 sigaction (SIGINT, &sa_int, NULL);
340 memset (&sa_term, 0, sizeof (sa_term));
341 sa_term.sa_handler = sig_term_handler;
342 sigaction (SIGTERM, &sa_term, NULL);
344 memset (&sa_pipe, 0, sizeof (sa_pipe));
345 sa_pipe.sa_handler = SIG_IGN;
346 sigaction (SIGPIPE, &sa_pipe, NULL);
348 memset (&sa_pipe, 0, sizeof (sa_usr1));
349 sa_usr1.sa_handler = sig_usr1_handler;
350 sigaction (SIGUSR1, &sa_usr1, NULL);
352 memset (&sa_usr2, 0, sizeof (sa_usr2));
353 sa_usr2.sa_handler = sig_usr2_handler;
354 sigaction (SIGUSR2, &sa_usr2, NULL);
356 } /* }}} void install_signal_handlers */
358 static int open_pidfile(char *action, int oflag) /* {{{ */
359 {
360 int fd;
361 const char *file;
362 char *file_copy, *dir;
364 file = (config_pid_file != NULL)
365 ? config_pid_file
366 : LOCALSTATEDIR "/run/rrdcached.pid";
368 /* dirname may modify its argument */
369 file_copy = strdup(file);
370 if (file_copy == NULL)
371 {
372 fprintf(stderr, "rrdcached: strdup(): %s\n",
373 rrd_strerror(errno));
374 return -1;
375 }
377 dir = dirname(file_copy);
378 if (rrd_mkdir_p(dir, 0777) != 0)
379 {
380 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
381 dir, rrd_strerror(errno));
382 return -1;
383 }
385 free(file_copy);
387 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
388 if (fd < 0)
389 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
390 action, file, rrd_strerror(errno));
392 return(fd);
393 } /* }}} static int open_pidfile */
395 /* check existing pid file to see whether a daemon is running */
396 static int check_pidfile(void)
397 {
398 int pid_fd;
399 pid_t pid;
400 char pid_str[16];
402 pid_fd = open_pidfile("open", O_RDWR);
403 if (pid_fd < 0)
404 return pid_fd;
406 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
407 return -1;
409 pid = atoi(pid_str);
410 if (pid <= 0)
411 return -1;
413 /* another running process that we can signal COULD be
414 * a competing rrdcached */
415 if (pid != getpid() && kill(pid, 0) == 0)
416 {
417 fprintf(stderr,
418 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
419 close(pid_fd);
420 return -1;
421 }
423 lseek(pid_fd, 0, SEEK_SET);
424 if (ftruncate(pid_fd, 0) == -1)
425 {
426 fprintf(stderr,
427 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
428 close(pid_fd);
429 return -1;
430 }
432 fprintf(stderr,
433 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
434 "rrdcached: starting normally.\n", pid);
436 return pid_fd;
437 } /* }}} static int check_pidfile */
439 static int write_pidfile (int fd) /* {{{ */
440 {
441 pid_t pid;
442 FILE *fh;
444 pid = getpid ();
446 fh = fdopen (fd, "w");
447 if (fh == NULL)
448 {
449 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
450 close(fd);
451 return (-1);
452 }
454 fprintf (fh, "%i\n", (int) pid);
455 fclose (fh);
457 return (0);
458 } /* }}} int write_pidfile */
460 static int remove_pidfile (void) /* {{{ */
461 {
462 char *file;
463 int status;
465 file = (config_pid_file != NULL)
466 ? config_pid_file
467 : LOCALSTATEDIR "/run/rrdcached.pid";
469 status = unlink (file);
470 if (status == 0)
471 return (0);
472 return (errno);
473 } /* }}} int remove_pidfile */
475 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
476 {
477 char *eol;
479 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
480 sock->next_read - sock->next_cmd);
482 if (eol == NULL)
483 {
484 /* no commands left, move remainder back to front of rbuf */
485 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
486 sock->next_read - sock->next_cmd);
487 sock->next_read -= sock->next_cmd;
488 sock->next_cmd = 0;
489 *len = 0;
490 return NULL;
491 }
492 else
493 {
494 char *cmd = sock->rbuf + sock->next_cmd;
495 *eol = '\0';
497 sock->next_cmd = eol - sock->rbuf + 1;
499 if (eol > sock->rbuf && *(eol-1) == '\r')
500 *(--eol) = '\0'; /* handle "\r\n" EOL */
502 *len = eol - cmd;
504 return cmd;
505 }
507 /* NOTREACHED */
508 assert(1==0);
509 } /* }}} char *next_cmd */
511 /* add the characters directly to the write buffer */
512 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
513 {
514 char *new_buf;
516 assert(sock != NULL);
518 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
519 if (new_buf == NULL)
520 {
521 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
522 return -1;
523 }
525 strncpy(new_buf + sock->wbuf_len, str, len + 1);
527 sock->wbuf = new_buf;
528 sock->wbuf_len += len;
530 return 0;
531 } /* }}} static int add_to_wbuf */
533 /* add the text to the "extra" info that's sent after the status line */
534 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
535 {
536 va_list argp;
537 char buffer[RRD_CMD_MAX];
538 int len;
540 if (JOURNAL_REPLAY(sock)) return 0;
541 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
543 va_start(argp, fmt);
544 #ifdef HAVE_VSNPRINTF
545 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
546 #else
547 len = vsprintf(buffer, fmt, argp);
548 #endif
549 va_end(argp);
550 if (len < 0)
551 {
552 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
553 return -1;
554 }
556 return add_to_wbuf(sock, buffer, len);
557 } /* }}} static int add_response_info */
559 static int count_lines(char *str) /* {{{ */
560 {
561 int lines = 0;
563 if (str != NULL)
564 {
565 while ((str = strchr(str, '\n')) != NULL)
566 {
567 ++lines;
568 ++str;
569 }
570 }
572 return lines;
573 } /* }}} static int count_lines */
575 /* send the response back to the user.
576 * returns 0 on success, -1 on error
577 * write buffer is always zeroed after this call */
578 static int send_response (listen_socket_t *sock, response_code rc,
579 char *fmt, ...) /* {{{ */
580 {
581 va_list argp;
582 char buffer[RRD_CMD_MAX];
583 int lines;
584 ssize_t wrote;
585 int rclen, len;
587 if (JOURNAL_REPLAY(sock)) return rc;
589 if (sock->batch_start)
590 {
591 if (rc == RESP_OK)
592 return rc; /* no response on success during BATCH */
593 lines = sock->batch_cmd;
594 }
595 else if (rc == RESP_OK)
596 lines = count_lines(sock->wbuf);
597 else
598 lines = -1;
600 rclen = sprintf(buffer, "%d ", lines);
601 va_start(argp, fmt);
602 #ifdef HAVE_VSNPRINTF
603 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
604 #else
605 len = vsprintf(buffer+rclen, fmt, argp);
606 #endif
607 va_end(argp);
608 if (len < 0)
609 return -1;
611 len += rclen;
613 /* append the result to the wbuf, don't write to the user */
614 if (sock->batch_start)
615 return add_to_wbuf(sock, buffer, len);
617 /* first write must be complete */
618 if (len != write(sock->fd, buffer, len))
619 {
620 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
621 return -1;
622 }
624 if (sock->wbuf != NULL && rc == RESP_OK)
625 {
626 wrote = 0;
627 while (wrote < sock->wbuf_len)
628 {
629 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
630 if (wb <= 0)
631 {
632 RRDD_LOG(LOG_INFO, "send_response: could not write results");
633 return -1;
634 }
635 wrote += wb;
636 }
637 }
639 free(sock->wbuf); sock->wbuf = NULL;
640 sock->wbuf_len = 0;
642 return 0;
643 } /* }}} */
645 static void wipe_ci_values(cache_item_t *ci, time_t when)
646 {
647 ci->values = NULL;
648 ci->values_num = 0;
649 ci->values_alloc = 0;
651 ci->last_flush_time = when;
652 if (config_write_jitter > 0)
653 ci->last_flush_time += (rrd_random() % config_write_jitter);
654 }
656 /* remove_from_queue
657 * remove a "cache_item_t" item from the queue.
658 * must hold 'cache_lock' when calling this
659 */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
661 {
662 if (ci == NULL) return;
663 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
665 if (ci->prev == NULL)
666 cache_queue_head = ci->next; /* reset head */
667 else
668 ci->prev->next = ci->next;
670 if (ci->next == NULL)
671 cache_queue_tail = ci->prev; /* reset the tail */
672 else
673 ci->next->prev = ci->prev;
675 ci->next = ci->prev = NULL;
676 ci->flags &= ~CI_FLAGS_IN_QUEUE;
678 pthread_mutex_lock (&stats_lock);
679 assert (stats_queue_length > 0);
680 stats_queue_length--;
681 pthread_mutex_unlock (&stats_lock);
683 } /* }}} static void remove_from_queue */
685 /* free the resources associated with the cache_item_t
686 * must hold cache_lock when calling this function
687 */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
689 {
690 if (ci == NULL) return NULL;
692 remove_from_queue(ci);
694 for (size_t i=0; i < ci->values_num; i++)
695 free(ci->values[i]);
697 free (ci->values);
698 free (ci->file);
700 /* in case anyone is waiting */
701 pthread_cond_broadcast(&ci->flushed);
702 pthread_cond_destroy(&ci->flushed);
704 free (ci);
706 return NULL;
707 } /* }}} static void *free_cache_item */
709 /*
710 * enqueue_cache_item:
711 * `cache_lock' must be acquired before calling this function!
712 */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714 queue_side_t side)
715 {
716 if (ci == NULL)
717 return (-1);
719 if (ci->values_num == 0)
720 return (0);
722 if (side == HEAD)
723 {
724 if (cache_queue_head == ci)
725 return 0;
727 /* remove if further down in queue */
728 remove_from_queue(ci);
730 ci->prev = NULL;
731 ci->next = cache_queue_head;
732 if (ci->next != NULL)
733 ci->next->prev = ci;
734 cache_queue_head = ci;
736 if (cache_queue_tail == NULL)
737 cache_queue_tail = cache_queue_head;
738 }
739 else /* (side == TAIL) */
740 {
741 /* We don't move values back in the list.. */
742 if (ci->flags & CI_FLAGS_IN_QUEUE)
743 return (0);
745 assert (ci->next == NULL);
746 assert (ci->prev == NULL);
748 ci->prev = cache_queue_tail;
750 if (cache_queue_tail == NULL)
751 cache_queue_head = ci;
752 else
753 cache_queue_tail->next = ci;
755 cache_queue_tail = ci;
756 }
758 ci->flags |= CI_FLAGS_IN_QUEUE;
760 pthread_cond_signal(&queue_cond);
761 pthread_mutex_lock (&stats_lock);
762 stats_queue_length++;
763 pthread_mutex_unlock (&stats_lock);
765 return (0);
766 } /* }}} int enqueue_cache_item */
768 /*
769 * tree_callback_flush:
770 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771 * while this is in progress.
772 */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774 gpointer data)
775 {
776 cache_item_t *ci;
777 callback_flush_data_t *cfd;
779 ci = (cache_item_t *) value;
780 cfd = (callback_flush_data_t *) data;
782 if (ci->flags & CI_FLAGS_IN_QUEUE)
783 return FALSE;
785 if (ci->values_num > 0
786 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787 {
788 enqueue_cache_item (ci, TAIL);
789 }
790 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791 && (ci->values_num <= 0))
792 {
793 assert ((char *) key == ci->file);
794 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795 {
796 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797 return (FALSE);
798 }
799 }
801 return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
804 static int flush_old_values (int max_age)
805 {
806 callback_flush_data_t cfd;
807 size_t k;
809 memset (&cfd, 0, sizeof (cfd));
810 /* Pass the current time as user data so that we don't need to call
811 * `time' for each node. */
812 cfd.now = time (NULL);
813 cfd.keys = NULL;
814 cfd.keys_num = 0;
816 if (max_age > 0)
817 cfd.abs_timeout = cfd.now - max_age;
818 else
819 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
821 /* `tree_callback_flush' will return the keys of all values that haven't
822 * been touched in the last `config_flush_interval' seconds in `cfd'.
823 * The char*'s in this array point to the same memory as ci->file, so we
824 * don't need to free them separately. */
825 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
827 for (k = 0; k < cfd.keys_num; k++)
828 {
829 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830 /* should never fail, since we have held the cache_lock
831 * the entire time */
832 assert(status == TRUE);
833 }
835 if (cfd.keys != NULL)
836 {
837 free (cfd.keys);
838 cfd.keys = NULL;
839 }
841 return (0);
842 } /* int flush_old_values */
844 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
845 {
846 struct timeval now;
847 struct timespec next_flush;
848 int status;
850 gettimeofday (&now, NULL);
851 next_flush.tv_sec = now.tv_sec + config_flush_interval;
852 next_flush.tv_nsec = 1000 * now.tv_usec;
854 pthread_mutex_lock(&cache_lock);
856 while (state == RUNNING)
857 {
858 gettimeofday (&now, NULL);
859 if ((now.tv_sec > next_flush.tv_sec)
860 || ((now.tv_sec == next_flush.tv_sec)
861 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862 {
863 RRDD_LOG(LOG_DEBUG, "flushing old values");
865 /* Determine the time of the next cache flush. */
866 next_flush.tv_sec = now.tv_sec + config_flush_interval;
868 /* Flush all values that haven't been written in the last
869 * `config_write_interval' seconds. */
870 flush_old_values (config_write_interval);
872 /* unlock the cache while we rotate so we don't block incoming
873 * updates if the fsync() blocks on disk I/O */
874 pthread_mutex_unlock(&cache_lock);
875 journal_rotate();
876 pthread_mutex_lock(&cache_lock);
877 }
879 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880 if (status != 0 && status != ETIMEDOUT)
881 {
882 RRDD_LOG (LOG_ERR, "flush_thread_main: "
883 "pthread_cond_timedwait returned %i.", status);
884 }
885 }
887 if (config_flush_at_shutdown)
888 flush_old_values (-1); /* flush everything */
890 state = SHUTDOWN;
892 pthread_mutex_unlock(&cache_lock);
894 return NULL;
895 } /* void *flush_thread_main */
897 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
898 {
899 pthread_mutex_lock (&cache_lock);
901 while (state != SHUTDOWN
902 || (cache_queue_head != NULL && config_flush_at_shutdown))
903 {
904 cache_item_t *ci;
905 char *file;
906 char **values;
907 size_t values_num;
908 int status;
910 /* Now, check if there's something to store away. If not, wait until
911 * something comes in. */
912 if (cache_queue_head == NULL)
913 {
914 status = pthread_cond_wait (&queue_cond, &cache_lock);
915 if ((status != 0) && (status != ETIMEDOUT))
916 {
917 RRDD_LOG (LOG_ERR, "queue_thread_main: "
918 "pthread_cond_wait returned %i.", status);
919 }
920 }
922 /* Check if a value has arrived. This may be NULL if we timed out or there
923 * was an interrupt such as a signal. */
924 if (cache_queue_head == NULL)
925 continue;
927 ci = cache_queue_head;
929 /* copy the relevant parts */
930 file = strdup (ci->file);
931 if (file == NULL)
932 {
933 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934 continue;
935 }
937 assert(ci->values != NULL);
938 assert(ci->values_num > 0);
940 values = ci->values;
941 values_num = ci->values_num;
943 wipe_ci_values(ci, time(NULL));
944 remove_from_queue(ci);
946 pthread_mutex_unlock (&cache_lock);
948 rrd_clear_error ();
949 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950 if (status != 0)
951 {
952 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953 "rrd_update_r (%s) failed with status %i. (%s)",
954 file, status, rrd_get_error());
955 }
957 journal_write("wrote", file);
959 /* Search again in the tree. It's possible someone issued a "FORGET"
960 * while we were writing the update values. */
961 pthread_mutex_lock(&cache_lock);
962 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963 if (ci)
964 pthread_cond_broadcast(&ci->flushed);
965 pthread_mutex_unlock(&cache_lock);
967 if (status == 0)
968 {
969 pthread_mutex_lock (&stats_lock);
970 stats_updates_written++;
971 stats_data_sets_written += values_num;
972 pthread_mutex_unlock (&stats_lock);
973 }
975 rrd_free_ptrs((void ***) &values, &values_num);
976 free(file);
978 pthread_mutex_lock (&cache_lock);
979 }
980 pthread_mutex_unlock (&cache_lock);
982 return (NULL);
983 } /* }}} void *queue_thread_main */
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986 size_t *buffer_size_ret, char **field_ret)
987 {
988 char *buffer;
989 size_t buffer_pos;
990 size_t buffer_size;
991 char *field;
992 size_t field_size;
993 int status;
995 buffer = *buffer_ret;
996 buffer_pos = 0;
997 buffer_size = *buffer_size_ret;
998 field = *buffer_ret;
999 field_size = 0;
1001 if (buffer_size <= 0)
1002 return (-1);
1004 /* This is ensured by `handle_request'. */
1005 assert (buffer[buffer_size - 1] == '\0');
1007 status = -1;
1008 while (buffer_pos < buffer_size)
1009 {
1010 /* Check for end-of-field or end-of-buffer */
1011 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012 {
1013 field[field_size] = 0;
1014 field_size++;
1015 buffer_pos++;
1016 status = 0;
1017 break;
1018 }
1019 /* Handle escaped characters. */
1020 else if (buffer[buffer_pos] == '\\')
1021 {
1022 if (buffer_pos >= (buffer_size - 1))
1023 break;
1024 buffer_pos++;
1025 field[field_size] = buffer[buffer_pos];
1026 field_size++;
1027 buffer_pos++;
1028 }
1029 /* Normal operation */
1030 else
1031 {
1032 field[field_size] = buffer[buffer_pos];
1033 field_size++;
1034 buffer_pos++;
1035 }
1036 } /* while (buffer_pos < buffer_size) */
1038 if (status != 0)
1039 return (status);
1041 *buffer_ret = buffer + buffer_pos;
1042 *buffer_size_ret = buffer_size - buffer_pos;
1043 *field_ret = field;
1045 return (0);
1046 } /* }}} int buffer_get_field */
1048 /* if we're restricting writes to the base directory,
1049 * check whether the file falls within the dir
1050 * returns 1 if OK, otherwise 0
1051 */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1053 {
1054 assert(file != NULL);
1056 if (!config_write_base_only
1057 || JOURNAL_REPLAY(sock)
1058 || config_base_dir == NULL)
1059 return 1;
1061 if (strstr(file, "../") != NULL) goto err;
1063 /* relative paths without "../" are ok */
1064 if (*file != '/') return 1;
1066 /* file must be of the format base + "/" + <1+ char filename> */
1067 if (strlen(file) < _config_base_dir_len + 2) goto err;
1068 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069 if (*(file + _config_base_dir_len) != '/') goto err;
1071 return 1;
1073 err:
1074 if (sock != NULL && sock->fd >= 0)
1075 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1077 return 0;
1078 } /* }}} static int check_file_access */
1080 /* when using a base dir, convert relative paths to absolute paths.
1081 * if necessary, modifies the "filename" pointer to point
1082 * to the new path created in "tmp". "tmp" is provided
1083 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084 *
1085 * this allows us to optimize for the expected case (absolute path)
1086 * with a no-op.
1087 */
1088 static void get_abs_path(char **filename, char *tmp)
1089 {
1090 assert(tmp != NULL);
1091 assert(filename != NULL && *filename != NULL);
1093 if (config_base_dir == NULL || **filename == '/')
1094 return;
1096 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097 *filename = tmp;
1098 } /* }}} static int get_abs_path */
1100 static int flush_file (const char *filename) /* {{{ */
1101 {
1102 cache_item_t *ci;
1104 pthread_mutex_lock (&cache_lock);
1106 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107 if (ci == NULL)
1108 {
1109 pthread_mutex_unlock (&cache_lock);
1110 return (ENOENT);
1111 }
1113 if (ci->values_num > 0)
1114 {
1115 /* Enqueue at head */
1116 enqueue_cache_item (ci, HEAD);
1117 pthread_cond_wait(&ci->flushed, &cache_lock);
1118 }
1120 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1121 * may have been purged during our cond_wait() */
1123 pthread_mutex_unlock(&cache_lock);
1125 return (0);
1126 } /* }}} int flush_file */
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1129 {
1130 char *err = "Syntax error.\n";
1132 if (cmd && cmd->syntax)
1133 err = cmd->syntax;
1135 return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1139 {
1140 uint64_t copy_queue_length;
1141 uint64_t copy_updates_received;
1142 uint64_t copy_flush_received;
1143 uint64_t copy_updates_written;
1144 uint64_t copy_data_sets_written;
1145 uint64_t copy_journal_bytes;
1146 uint64_t copy_journal_rotate;
1148 uint64_t tree_nodes_number;
1149 uint64_t tree_depth;
1151 pthread_mutex_lock (&stats_lock);
1152 copy_queue_length = stats_queue_length;
1153 copy_updates_received = stats_updates_received;
1154 copy_flush_received = stats_flush_received;
1155 copy_updates_written = stats_updates_written;
1156 copy_data_sets_written = stats_data_sets_written;
1157 copy_journal_bytes = stats_journal_bytes;
1158 copy_journal_rotate = stats_journal_rotate;
1159 pthread_mutex_unlock (&stats_lock);
1161 pthread_mutex_lock (&cache_lock);
1162 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163 tree_depth = (uint64_t) g_tree_height (cache_tree);
1164 pthread_mutex_unlock (&cache_lock);
1166 add_response_info(sock,
1167 "QueueLength: %"PRIu64"\n", copy_queue_length);
1168 add_response_info(sock,
1169 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170 add_response_info(sock,
1171 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172 add_response_info(sock,
1173 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174 add_response_info(sock,
1175 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1181 send_response(sock, RESP_OK, "Statistics follow\n");
1183 return (0);
1184 } /* }}} int handle_request_stats */
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1187 {
1188 char *file, file_tmp[PATH_MAX];
1189 int status;
1191 status = buffer_get_field (&buffer, &buffer_size, &file);
1192 if (status != 0)
1193 {
1194 return syntax_error(sock,cmd);
1195 }
1196 else
1197 {
1198 pthread_mutex_lock(&stats_lock);
1199 stats_flush_received++;
1200 pthread_mutex_unlock(&stats_lock);
1202 get_abs_path(&file, file_tmp);
1203 if (!check_file_access(file, sock)) return 0;
1205 status = flush_file (file);
1206 if (status == 0)
1207 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208 else if (status == ENOENT)
1209 {
1210 /* no file in our tree; see whether it exists at all */
1211 struct stat statbuf;
1213 memset(&statbuf, 0, sizeof(statbuf));
1214 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216 else
1217 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218 }
1219 else if (status < 0)
1220 return send_response(sock, RESP_ERR, "Internal error.\n");
1221 else
1222 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223 }
1225 /* NOTREACHED */
1226 assert(1==0);
1227 } /* }}} int handle_request_flush */
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1230 {
1231 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1233 pthread_mutex_lock(&cache_lock);
1234 flush_old_values(-1);
1235 pthread_mutex_unlock(&cache_lock);
1237 return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1241 {
1242 int status;
1243 char *file, file_tmp[PATH_MAX];
1244 cache_item_t *ci;
1246 status = buffer_get_field(&buffer, &buffer_size, &file);
1247 if (status != 0)
1248 return syntax_error(sock,cmd);
1250 get_abs_path(&file, file_tmp);
1252 pthread_mutex_lock(&cache_lock);
1253 ci = g_tree_lookup(cache_tree, file);
1254 if (ci == NULL)
1255 {
1256 pthread_mutex_unlock(&cache_lock);
1257 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258 }
1260 for (size_t i=0; i < ci->values_num; i++)
1261 add_response_info(sock, "%s\n", ci->values[i]);
1263 pthread_mutex_unlock(&cache_lock);
1264 return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1268 {
1269 int status;
1270 gboolean found;
1271 char *file, file_tmp[PATH_MAX];
1273 status = buffer_get_field(&buffer, &buffer_size, &file);
1274 if (status != 0)
1275 return syntax_error(sock,cmd);
1277 get_abs_path(&file, file_tmp);
1278 if (!check_file_access(file, sock)) return 0;
1280 pthread_mutex_lock(&cache_lock);
1281 found = g_tree_remove(cache_tree, file);
1282 pthread_mutex_unlock(&cache_lock);
1284 if (found == TRUE)
1285 {
1286 if (!JOURNAL_REPLAY(sock))
1287 journal_write("forget", file);
1289 return send_response(sock, RESP_OK, "Gone!\n");
1290 }
1291 else
1292 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1294 /* NOTREACHED */
1295 assert(1==0);
1296 } /* }}} static int handle_request_forget */
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1299 {
1300 cache_item_t *ci;
1302 pthread_mutex_lock(&cache_lock);
1304 ci = cache_queue_head;
1305 while (ci != NULL)
1306 {
1307 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308 ci = ci->next;
1309 }
1311 pthread_mutex_unlock(&cache_lock);
1313 return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1317 {
1318 char *file, file_tmp[PATH_MAX];
1319 int values_num = 0;
1320 int status;
1321 char orig_buf[RRD_CMD_MAX];
1323 cache_item_t *ci;
1325 /* save it for the journal later */
1326 if (!JOURNAL_REPLAY(sock))
1327 strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
1329 status = buffer_get_field (&buffer, &buffer_size, &file);
1330 if (status != 0)
1331 return syntax_error(sock,cmd);
1333 pthread_mutex_lock(&stats_lock);
1334 stats_updates_received++;
1335 pthread_mutex_unlock(&stats_lock);
1337 get_abs_path(&file, file_tmp);
1338 if (!check_file_access(file, sock)) return 0;
1340 pthread_mutex_lock (&cache_lock);
1341 ci = g_tree_lookup (cache_tree, file);
1343 if (ci == NULL) /* {{{ */
1344 {
1345 struct stat statbuf;
1346 cache_item_t *tmp;
1348 /* don't hold the lock while we setup; stat(2) might block */
1349 pthread_mutex_unlock(&cache_lock);
1351 memset (&statbuf, 0, sizeof (statbuf));
1352 status = stat (file, &statbuf);
1353 if (status != 0)
1354 {
1355 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1357 status = errno;
1358 if (status == ENOENT)
1359 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360 else
1361 return send_response(sock, RESP_ERR,
1362 "stat failed with error %i.\n", status);
1363 }
1364 if (!S_ISREG (statbuf.st_mode))
1365 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1367 if (access(file, R_OK|W_OK) != 0)
1368 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369 file, rrd_strerror(errno));
1371 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372 if (ci == NULL)
1373 {
1374 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1376 return send_response(sock, RESP_ERR, "malloc failed.\n");
1377 }
1378 memset (ci, 0, sizeof (cache_item_t));
1380 ci->file = strdup (file);
1381 if (ci->file == NULL)
1382 {
1383 free (ci);
1384 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1386 return send_response(sock, RESP_ERR, "strdup failed.\n");
1387 }
1389 wipe_ci_values(ci, now);
1390 ci->flags = CI_FLAGS_IN_TREE;
1391 pthread_cond_init(&ci->flushed, NULL);
1393 pthread_mutex_lock(&cache_lock);
1395 /* another UPDATE might have added this entry in the meantime */
1396 tmp = g_tree_lookup (cache_tree, file);
1397 if (tmp == NULL)
1398 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399 else
1400 {
1401 free_cache_item (ci);
1402 ci = tmp;
1403 }
1405 /* state may have changed while we were unlocked */
1406 if (state == SHUTDOWN)
1407 return -1;
1408 } /* }}} */
1409 assert (ci != NULL);
1411 /* don't re-write updates in replay mode */
1412 if (!JOURNAL_REPLAY(sock))
1413 journal_write("update", orig_buf);
1415 while (buffer_size > 0)
1416 {
1417 char *value;
1418 double stamp;
1419 char *eostamp;
1421 status = buffer_get_field (&buffer, &buffer_size, &value);
1422 if (status != 0)
1423 {
1424 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425 break;
1426 }
1428 /* make sure update time is always moving forward. We use double here since
1429 update does support subsecond precision for timestamps ... */
1430 stamp = strtod(value, &eostamp);
1431 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1432 {
1433 pthread_mutex_unlock(&cache_lock);
1434 return send_response(sock, RESP_ERR,
1435 "Cannot find timestamp in '%s'!\n", value);
1436 }
1437 else if (stamp <= ci->last_update_stamp)
1438 {
1439 pthread_mutex_unlock(&cache_lock);
1440 return send_response(sock, RESP_ERR,
1441 "illegal attempt to update using time %lf when last"
1442 " update time is %lf (minimum one second step)\n",
1443 stamp, ci->last_update_stamp);
1444 }
1445 else
1446 ci->last_update_stamp = stamp;
1448 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1449 &ci->values_alloc, config_alloc_chunk))
1450 {
1451 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1452 continue;
1453 }
1455 values_num++;
1456 }
1458 if (((now - ci->last_flush_time) >= config_write_interval)
1459 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1460 && (ci->values_num > 0))
1461 {
1462 enqueue_cache_item (ci, TAIL);
1463 }
1465 pthread_mutex_unlock (&cache_lock);
1467 if (values_num < 1)
1468 return send_response(sock, RESP_ERR, "No values updated.\n");
1469 else
1470 return send_response(sock, RESP_OK,
1471 "errors, enqueued %i value(s).\n", values_num);
1473 /* NOTREACHED */
1474 assert(1==0);
1476 } /* }}} int handle_request_update */
1478 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1479 {
1480 char *file, file_tmp[PATH_MAX];
1481 char *cf;
1483 char *start_str;
1484 char *end_str;
1485 time_t start_tm;
1486 time_t end_tm;
1488 unsigned long step;
1489 unsigned long ds_cnt;
1490 char **ds_namv;
1491 rrd_value_t *data;
1493 int status;
1494 unsigned long i;
1495 time_t t;
1496 rrd_value_t *data_ptr;
1498 file = NULL;
1499 cf = NULL;
1500 start_str = NULL;
1501 end_str = NULL;
1503 /* Read the arguments */
1504 do /* while (0) */
1505 {
1506 status = buffer_get_field (&buffer, &buffer_size, &file);
1507 if (status != 0)
1508 break;
1510 status = buffer_get_field (&buffer, &buffer_size, &cf);
1511 if (status != 0)
1512 break;
1514 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1515 if (status != 0)
1516 {
1517 start_str = NULL;
1518 status = 0;
1519 break;
1520 }
1522 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1523 if (status != 0)
1524 {
1525 end_str = NULL;
1526 status = 0;
1527 break;
1528 }
1529 } while (0);
1531 if (status != 0)
1532 return (syntax_error(sock,cmd));
1534 get_abs_path(&file, file_tmp);
1535 if (!check_file_access(file, sock)) return 0;
1537 status = flush_file (file);
1538 if ((status != 0) && (status != ENOENT))
1539 return (send_response (sock, RESP_ERR,
1540 "flush_file (%s) failed with status %i.\n", file, status));
1542 t = time (NULL); /* "now" */
1544 /* Parse start time */
1545 if (start_str != NULL)
1546 {
1547 char *endptr;
1548 long value;
1550 endptr = NULL;
1551 errno = 0;
1552 value = strtol (start_str, &endptr, /* base = */ 0);
1553 if ((endptr == start_str) || (errno != 0))
1554 return (send_response(sock, RESP_ERR,
1555 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1556 start_str));
1558 if (value > 0)
1559 start_tm = (time_t) value;
1560 else
1561 start_tm = (time_t) (t + value);
1562 }
1563 else
1564 {
1565 start_tm = t - 86400;
1566 }
1568 /* Parse end time */
1569 if (end_str != NULL)
1570 {
1571 char *endptr;
1572 long value;
1574 endptr = NULL;
1575 errno = 0;
1576 value = strtol (end_str, &endptr, /* base = */ 0);
1577 if ((endptr == end_str) || (errno != 0))
1578 return (send_response(sock, RESP_ERR,
1579 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1580 end_str));
1582 if (value > 0)
1583 end_tm = (time_t) value;
1584 else
1585 end_tm = (time_t) (t + value);
1586 }
1587 else
1588 {
1589 end_tm = t;
1590 }
1592 step = -1;
1593 ds_cnt = 0;
1594 ds_namv = NULL;
1595 data = NULL;
1597 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1598 &ds_cnt, &ds_namv, &data);
1599 if (status != 0)
1600 return (send_response(sock, RESP_ERR,
1601 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1603 add_response_info (sock, "FlushVersion: %lu\n", 1);
1604 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1605 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1606 add_response_info (sock, "Step: %lu\n", step);
1607 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1609 #define SSTRCAT(buffer,str,buffer_fill) do { \
1610 size_t str_len = strlen (str); \
1611 if ((buffer_fill + str_len) > sizeof (buffer)) \
1612 str_len = sizeof (buffer) - buffer_fill; \
1613 if (str_len > 0) { \
1614 strncpy (buffer + buffer_fill, str, str_len); \
1615 buffer_fill += str_len; \
1616 assert (buffer_fill <= sizeof (buffer)); \
1617 if (buffer_fill == sizeof (buffer)) \
1618 buffer[buffer_fill - 1] = 0; \
1619 else \
1620 buffer[buffer_fill] = 0; \
1621 } \
1622 } while (0)
1624 { /* Add list of DS names */
1625 char linebuf[1024];
1626 size_t linebuf_fill;
1628 memset (linebuf, 0, sizeof (linebuf));
1629 linebuf_fill = 0;
1630 for (i = 0; i < ds_cnt; i++)
1631 {
1632 if (i > 0)
1633 SSTRCAT (linebuf, " ", linebuf_fill);
1634 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1635 rrd_freemem(ds_namv[i]);
1636 }
1637 rrd_freemem(ds_namv);
1638 add_response_info (sock, "DSName: %s\n", linebuf);
1639 }
1641 /* Add the actual data */
1642 assert (step > 0);
1643 data_ptr = data;
1644 for (t = start_tm + step; t <= end_tm; t += step)
1645 {
1646 char linebuf[1024];
1647 size_t linebuf_fill;
1648 char tmp[128];
1650 memset (linebuf, 0, sizeof (linebuf));
1651 linebuf_fill = 0;
1652 for (i = 0; i < ds_cnt; i++)
1653 {
1654 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1655 tmp[sizeof (tmp) - 1] = 0;
1656 SSTRCAT (linebuf, tmp, linebuf_fill);
1658 data_ptr++;
1659 }
1661 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1662 } /* for (t) */
1663 rrd_freemem(data);
1665 return (send_response (sock, RESP_OK, "Success\n"));
1666 #undef SSTRCAT
1667 } /* }}} int handle_request_fetch */
1669 /* we came across a "WROTE" entry during journal replay.
1670 * throw away any values that we have accumulated for this file
1671 */
1672 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1673 {
1674 cache_item_t *ci;
1675 const char *file = buffer;
1677 pthread_mutex_lock(&cache_lock);
1679 ci = g_tree_lookup(cache_tree, file);
1680 if (ci == NULL)
1681 {
1682 pthread_mutex_unlock(&cache_lock);
1683 return (0);
1684 }
1686 if (ci->values)
1687 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1689 wipe_ci_values(ci, now);
1690 remove_from_queue(ci);
1692 pthread_mutex_unlock(&cache_lock);
1693 return (0);
1694 } /* }}} int handle_request_wrote */
1696 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1697 {
1698 char *file, file_tmp[PATH_MAX];
1699 int status;
1700 rrd_info_t *info;
1702 /* obtain filename */
1703 status = buffer_get_field(&buffer, &buffer_size, &file);
1704 if (status != 0)
1705 return syntax_error(sock,cmd);
1706 /* get full pathname */
1707 get_abs_path(&file, file_tmp);
1708 if (!check_file_access(file, sock)) {
1709 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1710 }
1711 /* get data */
1712 rrd_clear_error ();
1713 info = rrd_info_r(file);
1714 if(!info) {
1715 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1716 }
1717 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1718 switch (data->type) {
1719 case RD_I_VAL:
1720 if (isnan(data->value.u_val))
1721 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1722 else
1723 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1724 break;
1725 case RD_I_CNT:
1726 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1727 break;
1728 case RD_I_INT:
1729 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1730 break;
1731 case RD_I_STR:
1732 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1733 break;
1734 case RD_I_BLO:
1735 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1736 break;
1737 }
1738 }
1740 rrd_info_free(info);
1742 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1743 } /* }}} static int handle_request_info */
1745 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1746 {
1747 char *i, *file, file_tmp[PATH_MAX];
1748 int status;
1749 int idx;
1750 time_t t;
1752 /* obtain filename */
1753 status = buffer_get_field(&buffer, &buffer_size, &file);
1754 if (status != 0)
1755 return syntax_error(sock,cmd);
1756 /* get full pathname */
1757 get_abs_path(&file, file_tmp);
1758 if (!check_file_access(file, sock)) {
1759 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1760 }
1762 status = buffer_get_field(&buffer, &buffer_size, &i);
1763 if (status != 0)
1764 return syntax_error(sock,cmd);
1765 idx = atoi(i);
1766 if(idx<0) {
1767 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1768 }
1770 /* get data */
1771 rrd_clear_error ();
1772 t = rrd_first_r(file,idx);
1773 if(t<1) {
1774 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1775 }
1776 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1777 } /* }}} static int handle_request_first */
1780 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1781 {
1782 char *file, file_tmp[PATH_MAX];
1783 int status;
1784 time_t t, from_file, step;
1785 rrd_file_t * rrd_file;
1786 cache_item_t * ci;
1787 rrd_t rrd;
1789 /* obtain filename */
1790 status = buffer_get_field(&buffer, &buffer_size, &file);
1791 if (status != 0)
1792 return syntax_error(sock,cmd);
1793 /* get full pathname */
1794 get_abs_path(&file, file_tmp);
1795 if (!check_file_access(file, sock)) {
1796 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1797 }
1798 rrd_clear_error();
1799 rrd_init(&rrd);
1800 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1801 if(!rrd_file) {
1802 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1803 }
1804 from_file = rrd.live_head->last_up;
1805 step = rrd.stat_head->pdp_step;
1806 rrd_close(rrd_file);
1807 pthread_mutex_lock(&cache_lock);
1808 ci = g_tree_lookup(cache_tree, file);
1809 if (ci)
1810 t = ci->last_update_stamp;
1811 else
1812 t = from_file;
1813 pthread_mutex_unlock(&cache_lock);
1814 t -= t % step;
1815 rrd_free(&rrd);
1816 if(t<1) {
1817 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1818 }
1819 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1820 } /* }}} static int handle_request_last */
1822 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1823 {
1824 char *file, file_tmp[PATH_MAX];
1825 char *tok;
1826 int ac = 0;
1827 char *av[128];
1828 int status;
1829 unsigned long step = 300;
1830 time_t last_up = time(NULL)-10;
1831 int no_overwrite = opt_no_overwrite;
1834 /* obtain filename */
1835 status = buffer_get_field(&buffer, &buffer_size, &file);
1836 if (status != 0)
1837 return syntax_error(sock,cmd);
1838 /* get full pathname */
1839 get_abs_path(&file, file_tmp);
1840 if (!check_file_access(file, sock)) {
1841 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1842 }
1843 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1845 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1846 if( ! strncmp(tok,"-b",2) ) {
1847 status = buffer_get_field(&buffer, &buffer_size, &tok );
1848 if (status != 0) return syntax_error(sock,cmd);
1849 last_up = (time_t) atol(tok);
1850 continue;
1851 }
1852 if( ! strncmp(tok,"-s",2) ) {
1853 status = buffer_get_field(&buffer, &buffer_size, &tok );
1854 if (status != 0) return syntax_error(sock,cmd);
1855 step = atol(tok);
1856 continue;
1857 }
1858 if( ! strncmp(tok,"-O",2) ) {
1859 no_overwrite = 1;
1860 continue;
1861 }
1862 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1863 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1864 return syntax_error(sock,cmd);
1865 }
1866 if(step<1) {
1867 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1868 }
1869 if (last_up < 3600 * 24 * 365 * 10) {
1870 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1871 }
1873 rrd_clear_error ();
1874 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1876 if(!status) {
1877 return send_response(sock, RESP_OK, "RRD created OK\n");
1878 }
1879 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1880 } /* }}} static int handle_request_create */
1882 /* start "BATCH" processing */
1883 static int batch_start (HANDLER_PROTO) /* {{{ */
1884 {
1885 int status;
1886 if (sock->batch_start)
1887 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1889 status = send_response(sock, RESP_OK,
1890 "Go ahead. End with dot '.' on its own line.\n");
1891 sock->batch_start = time(NULL);
1892 sock->batch_cmd = 0;
1894 return status;
1895 } /* }}} static int batch_start */
1897 /* finish "BATCH" processing and return results to the client */
1898 static int batch_done (HANDLER_PROTO) /* {{{ */
1899 {
1900 assert(sock->batch_start);
1901 sock->batch_start = 0;
1902 sock->batch_cmd = 0;
1903 return send_response(sock, RESP_OK, "errors\n");
1904 } /* }}} static int batch_done */
1906 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1907 {
1908 return -1;
1909 } /* }}} static int handle_request_quit */
1911 static command_t list_of_commands[] = { /* {{{ */
1912 {
1913 "UPDATE",
1914 handle_request_update,
1915 CMD_CONTEXT_ANY,
1916 "UPDATE <filename> <values> [<values> ...]\n"
1917 ,
1918 "Adds the given file to the internal cache if it is not yet known and\n"
1919 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1920 "for details.\n"
1921 "\n"
1922 "Each <values> has the following form:\n"
1923 " <values> = <time>:<value>[:<value>[...]]\n"
1924 "See the rrdupdate(1) manpage for details.\n"
1925 },
1926 {
1927 "WROTE",
1928 handle_request_wrote,
1929 CMD_CONTEXT_JOURNAL,
1930 NULL,
1931 NULL
1932 },
1933 {
1934 "FLUSH",
1935 handle_request_flush,
1936 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1937 "FLUSH <filename>\n"
1938 ,
1939 "Adds the given filename to the head of the update queue and returns\n"
1940 "after it has been dequeued.\n"
1941 },
1942 {
1943 "FLUSHALL",
1944 handle_request_flushall,
1945 CMD_CONTEXT_CLIENT,
1946 "FLUSHALL\n"
1947 ,
1948 "Triggers writing of all pending updates. Returns immediately.\n"
1949 },
1950 {
1951 "PENDING",
1952 handle_request_pending,
1953 CMD_CONTEXT_CLIENT,
1954 "PENDING <filename>\n"
1955 ,
1956 "Shows any 'pending' updates for a file, in order.\n"
1957 "The updates shown have not yet been written to the underlying RRD file.\n"
1958 },
1959 {
1960 "FORGET",
1961 handle_request_forget,
1962 CMD_CONTEXT_ANY,
1963 "FORGET <filename>\n"
1964 ,
1965 "Removes the file completely from the cache.\n"
1966 "Any pending updates for the file will be lost.\n"
1967 },
1968 {
1969 "QUEUE",
1970 handle_request_queue,
1971 CMD_CONTEXT_CLIENT,
1972 "QUEUE\n"
1973 ,
1974 "Shows all files in the output queue.\n"
1975 "The output is zero or more lines in the following format:\n"
1976 "(where <num_vals> is the number of values to be written)\n"
1977 "\n"
1978 "<num_vals> <filename>\n"
1979 },
1980 {
1981 "STATS",
1982 handle_request_stats,
1983 CMD_CONTEXT_CLIENT,
1984 "STATS\n"
1985 ,
1986 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1987 "a description of the values.\n"
1988 },
1989 {
1990 "HELP",
1991 handle_request_help,
1992 CMD_CONTEXT_CLIENT,
1993 "HELP [<command>]\n",
1994 NULL, /* special! */
1995 },
1996 {
1997 "BATCH",
1998 batch_start,
1999 CMD_CONTEXT_CLIENT,
2000 "BATCH\n"
2001 ,
2002 "The 'BATCH' command permits the client to initiate a bulk load\n"
2003 " of commands to rrdcached.\n"
2004 "\n"
2005 "Usage:\n"
2006 "\n"
2007 " client: BATCH\n"
2008 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2009 " client: command #1\n"
2010 " client: command #2\n"
2011 " client: ... and so on\n"
2012 " client: .\n"
2013 " server: 2 errors\n"
2014 " server: 7 message for command #7\n"
2015 " server: 9 message for command #9\n"
2016 "\n"
2017 "For more information, consult the rrdcached(1) documentation.\n"
2018 },
2019 {
2020 ".", /* BATCH terminator */
2021 batch_done,
2022 CMD_CONTEXT_BATCH,
2023 NULL,
2024 NULL
2025 },
2026 {
2027 "FETCH",
2028 handle_request_fetch,
2029 CMD_CONTEXT_CLIENT,
2030 "FETCH <file> <CF> [<start> [<end>]]\n"
2031 ,
2032 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2033 },
2034 {
2035 "INFO",
2036 handle_request_info,
2037 CMD_CONTEXT_CLIENT,
2038 "INFO <filename>\n",
2039 "The INFO command retrieves information about a specified RRD file.\n"
2040 "This is returned in standard rrdinfo format, a sequence of lines\n"
2041 "with the format <keyname> = <value>\n"
2042 "Note that this is the data as of the last update of the RRD file itself,\n"
2043 "not the last time data was received via rrdcached, so there may be pending\n"
2044 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2045 },
2046 {
2047 "FIRST",
2048 handle_request_first,
2049 CMD_CONTEXT_CLIENT,
2050 "FIRST <filename> <rra index>\n",
2051 "The FIRST command retrieves the first data time for a specified RRA in\n"
2052 "an RRD file.\n"
2053 },
2054 {
2055 "LAST",
2056 handle_request_last,
2057 CMD_CONTEXT_CLIENT,
2058 "LAST <filename>\n",
2059 "The LAST command retrieves the last update time for a specified RRD file.\n"
2060 "Note that this is the time of the last update of the RRD file itself, not\n"
2061 "the last time data was received via rrdcached, so there may be pending\n"
2062 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2063 },
2064 {
2065 "CREATE",
2066 handle_request_create,
2067 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2068 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2069 "The CREATE command will create an RRD file, overwriting any existing file\n"
2070 "unless the -O option is given or rrdcached was started with the -O option.\n"
2071 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2072 "not acceptable) and the step is in seconds (default is 300).\n"
2073 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2074 },
2075 {
2076 "QUIT",
2077 handle_request_quit,
2078 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2079 "QUIT\n"
2080 ,
2081 "Disconnect from rrdcached.\n"
2082 }
2083 }; /* }}} command_t list_of_commands[] */
2084 static size_t list_of_commands_len = sizeof (list_of_commands)
2085 / sizeof (list_of_commands[0]);
2087 static command_t *find_command(char *cmd)
2088 {
2089 size_t i;
2091 for (i = 0; i < list_of_commands_len; i++)
2092 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2093 return (&list_of_commands[i]);
2094 return NULL;
2095 }
2097 /* We currently use the index in the `list_of_commands' array as a bit position
2098 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2099 * outside these functions so that switching to a more elegant storage method
2100 * is easily possible. */
2101 static ssize_t find_command_index (const char *cmd) /* {{{ */
2102 {
2103 size_t i;
2105 for (i = 0; i < list_of_commands_len; i++)
2106 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2107 return ((ssize_t) i);
2108 return (-1);
2109 } /* }}} ssize_t find_command_index */
2111 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2112 const char *cmd)
2113 {
2114 ssize_t i;
2116 if (JOURNAL_REPLAY(sock))
2117 return (1);
2119 if (cmd == NULL)
2120 return (-1);
2122 if ((strcasecmp ("QUIT", cmd) == 0)
2123 || (strcasecmp ("HELP", cmd) == 0))
2124 return (1);
2125 else if (strcmp (".", cmd) == 0)
2126 cmd = "BATCH";
2128 i = find_command_index (cmd);
2129 if (i < 0)
2130 return (-1);
2131 assert (i < 32);
2133 if ((sock->permissions & (1 << i)) != 0)
2134 return (1);
2135 return (0);
2136 } /* }}} int socket_permission_check */
2138 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2139 const char *cmd)
2140 {
2141 ssize_t i;
2143 i = find_command_index (cmd);
2144 if (i < 0)
2145 return (-1);
2146 assert (i < 32);
2148 sock->permissions |= (1 << i);
2149 return (0);
2150 } /* }}} int socket_permission_add */
2152 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2153 {
2154 sock->permissions = 0;
2155 } /* }}} socket_permission_clear */
2157 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2158 listen_socket_t *src)
2159 {
2160 dest->permissions = src->permissions;
2161 } /* }}} socket_permission_copy */
2163 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2164 {
2165 size_t i;
2167 sock->permissions = 0;
2168 for (i = 0; i < list_of_commands_len; i++)
2169 sock->permissions |= (1 << i);
2170 } /* }}} void socket_permission_set_all */
2172 /* check whether commands are received in the expected context */
2173 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2174 {
2175 if (JOURNAL_REPLAY(sock))
2176 return (cmd->context & CMD_CONTEXT_JOURNAL);
2177 else if (sock->batch_start)
2178 return (cmd->context & CMD_CONTEXT_BATCH);
2179 else
2180 return (cmd->context & CMD_CONTEXT_CLIENT);
2182 /* NOTREACHED */
2183 assert(1==0);
2184 }
2186 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2187 {
2188 int status;
2189 char *cmd_str;
2190 char *resp_txt;
2191 command_t *help = NULL;
2193 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2194 if (status == 0)
2195 help = find_command(cmd_str);
2197 if (help && (help->syntax || help->help))
2198 {
2199 char tmp[RRD_CMD_MAX];
2201 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2202 resp_txt = tmp;
2204 if (help->syntax)
2205 add_response_info(sock, "Usage: %s\n", help->syntax);
2207 if (help->help)
2208 add_response_info(sock, "%s\n", help->help);
2209 }
2210 else
2211 {
2212 size_t i;
2214 resp_txt = "Command overview\n";
2216 for (i = 0; i < list_of_commands_len; i++)
2217 {
2218 if (list_of_commands[i].syntax == NULL)
2219 continue;
2220 add_response_info (sock, "%s", list_of_commands[i].syntax);
2221 }
2222 }
2224 return send_response(sock, RESP_OK, resp_txt);
2225 } /* }}} int handle_request_help */
2227 static int handle_request (DISPATCH_PROTO) /* {{{ */
2228 {
2229 char *buffer_ptr = buffer;
2230 char *cmd_str = NULL;
2231 command_t *cmd = NULL;
2232 int status;
2234 assert (buffer[buffer_size - 1] == '\0');
2236 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2237 if (status != 0)
2238 {
2239 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2240 return (-1);
2241 }
2243 if (sock != NULL && sock->batch_start)
2244 sock->batch_cmd++;
2246 cmd = find_command(cmd_str);
2247 if (!cmd)
2248 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2250 if (!socket_permission_check (sock, cmd->cmd))
2251 return send_response(sock, RESP_ERR, "Permission denied.\n");
2253 if (!command_check_context(sock, cmd))
2254 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2256 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2257 } /* }}} int handle_request */
2259 static void journal_set_free (journal_set *js) /* {{{ */
2260 {
2261 if (js == NULL)
2262 return;
2264 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2266 free(js);
2267 } /* }}} journal_set_free */
2269 static void journal_set_remove (journal_set *js) /* {{{ */
2270 {
2271 if (js == NULL)
2272 return;
2274 for (uint i=0; i < js->files_num; i++)
2275 {
2276 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2277 unlink(js->files[i]);
2278 }
2279 } /* }}} journal_set_remove */
2281 /* close current journal file handle.
2282 * MUST hold journal_lock before calling */
2283 static void journal_close(void) /* {{{ */
2284 {
2285 if (journal_fh != NULL)
2286 {
2287 if (fclose(journal_fh) != 0)
2288 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2289 }
2291 journal_fh = NULL;
2292 journal_size = 0;
2293 } /* }}} journal_close */
2295 /* MUST hold journal_lock before calling */
2296 static void journal_new_file(void) /* {{{ */
2297 {
2298 struct timeval now;
2299 int new_fd;
2300 char new_file[PATH_MAX + 1];
2302 assert(journal_dir != NULL);
2303 assert(journal_cur != NULL);
2305 journal_close();
2307 gettimeofday(&now, NULL);
2308 /* this format assures that the files sort in strcmp() order */
2309 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2310 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2312 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2313 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2314 if (new_fd < 0)
2315 goto error;
2317 journal_fh = fdopen(new_fd, "a");
2318 if (journal_fh == NULL)
2319 goto error;
2321 journal_size = ftell(journal_fh);
2322 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2324 /* record the file in the journal set */
2325 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2327 return;
2329 error:
2330 RRDD_LOG(LOG_CRIT,
2331 "JOURNALING DISABLED: Error while trying to create %s : %s",
2332 new_file, rrd_strerror(errno));
2333 RRDD_LOG(LOG_CRIT,
2334 "JOURNALING DISABLED: All values will be flushed at shutdown");
2336 close(new_fd);
2337 config_flush_at_shutdown = 1;
2339 } /* }}} journal_new_file */
2341 /* MUST NOT hold journal_lock before calling this */
2342 static void journal_rotate(void) /* {{{ */
2343 {
2344 journal_set *old_js = NULL;
2346 if (journal_dir == NULL)
2347 return;
2349 RRDD_LOG(LOG_DEBUG, "rotating journals");
2351 pthread_mutex_lock(&stats_lock);
2352 ++stats_journal_rotate;
2353 pthread_mutex_unlock(&stats_lock);
2355 pthread_mutex_lock(&journal_lock);
2357 journal_close();
2359 /* rotate the journal sets */
2360 old_js = journal_old;
2361 journal_old = journal_cur;
2362 journal_cur = calloc(1, sizeof(journal_set));
2364 if (journal_cur != NULL)
2365 journal_new_file();
2366 else
2367 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2369 pthread_mutex_unlock(&journal_lock);
2371 journal_set_remove(old_js);
2372 journal_set_free (old_js);
2374 } /* }}} static void journal_rotate */
2376 /* MUST hold journal_lock when calling */
2377 static void journal_done(void) /* {{{ */
2378 {
2379 if (journal_cur == NULL)
2380 return;
2382 journal_close();
2384 if (config_flush_at_shutdown)
2385 {
2386 RRDD_LOG(LOG_INFO, "removing journals");
2387 journal_set_remove(journal_old);
2388 journal_set_remove(journal_cur);
2389 }
2390 else
2391 {
2392 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2393 "journals will be used at next startup");
2394 }
2396 journal_set_free(journal_cur);
2397 journal_set_free(journal_old);
2398 free(journal_dir);
2400 } /* }}} static void journal_done */
2402 static int journal_write(char *cmd, char *args) /* {{{ */
2403 {
2404 int chars;
2406 if (journal_fh == NULL)
2407 return 0;
2409 pthread_mutex_lock(&journal_lock);
2410 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2411 journal_size += chars;
2413 if (journal_size > JOURNAL_MAX)
2414 journal_new_file();
2416 pthread_mutex_unlock(&journal_lock);
2418 if (chars > 0)
2419 {
2420 pthread_mutex_lock(&stats_lock);
2421 stats_journal_bytes += chars;
2422 pthread_mutex_unlock(&stats_lock);
2423 }
2425 return chars;
2426 } /* }}} static int journal_write */
2428 static int journal_replay (const char *file) /* {{{ */
2429 {
2430 FILE *fh;
2431 int entry_cnt = 0;
2432 int fail_cnt = 0;
2433 uint64_t line = 0;
2434 char entry[RRD_CMD_MAX];
2435 time_t now;
2437 if (file == NULL) return 0;
2439 {
2440 char *reason = "unknown error";
2441 int status = 0;
2442 struct stat statbuf;
2444 memset(&statbuf, 0, sizeof(statbuf));
2445 if (stat(file, &statbuf) != 0)
2446 {
2447 reason = "stat error";
2448 status = errno;
2449 }
2450 else if (!S_ISREG(statbuf.st_mode))
2451 {
2452 reason = "not a regular file";
2453 status = EPERM;
2454 }
2455 if (statbuf.st_uid != daemon_uid)
2456 {
2457 reason = "not owned by daemon user";
2458 status = EACCES;
2459 }
2460 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2461 {
2462 reason = "must not be user/group writable";
2463 status = EACCES;
2464 }
2466 if (status != 0)
2467 {
2468 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2469 file, rrd_strerror(status), reason);
2470 return 0;
2471 }
2472 }
2474 fh = fopen(file, "r");
2475 if (fh == NULL)
2476 {
2477 if (errno != ENOENT)
2478 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2479 file, rrd_strerror(errno));
2480 return 0;
2481 }
2482 else
2483 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2485 now = time(NULL);
2487 while(!feof(fh))
2488 {
2489 size_t entry_len;
2491 ++line;
2492 if (fgets(entry, sizeof(entry), fh) == NULL)
2493 break;
2494 entry_len = strlen(entry);
2496 /* check \n termination in case journal writing crashed mid-line */
2497 if (entry_len == 0)
2498 continue;
2499 else if (entry[entry_len - 1] != '\n')
2500 {
2501 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2502 ++fail_cnt;
2503 continue;
2504 }
2506 entry[entry_len - 1] = '\0';
2508 if (handle_request(NULL, now, entry, entry_len) == 0)
2509 ++entry_cnt;
2510 else
2511 ++fail_cnt;
2512 }
2514 fclose(fh);
2516 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2517 entry_cnt, fail_cnt);
2519 return entry_cnt > 0 ? 1 : 0;
2520 } /* }}} static int journal_replay */
2522 static int journal_sort(const void *v1, const void *v2)
2523 {
2524 char **jn1 = (char **) v1;
2525 char **jn2 = (char **) v2;
2527 return strcmp(*jn1,*jn2);
2528 }
2530 static void journal_init(void) /* {{{ */
2531 {
2532 int had_journal = 0;
2533 DIR *dir;
2534 struct dirent *dent;
2535 char path[PATH_MAX+1];
2537 if (journal_dir == NULL) return;
2539 pthread_mutex_lock(&journal_lock);
2541 journal_cur = calloc(1, sizeof(journal_set));
2542 if (journal_cur == NULL)
2543 {
2544 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2545 return;
2546 }
2548 RRDD_LOG(LOG_INFO, "checking for journal files");
2550 /* Handle old journal files during transition. This gives them the
2551 * correct sort order. TODO: remove after first release
2552 */
2553 {
2554 char old_path[PATH_MAX+1];
2555 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2556 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2557 rename(old_path, path);
2559 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2560 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2561 rename(old_path, path);
2562 }
2564 dir = opendir(journal_dir);
2565 if (!dir) {
2566 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2567 return;
2568 }
2569 while ((dent = readdir(dir)) != NULL)
2570 {
2571 /* looks like a journal file? */
2572 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2573 continue;
2575 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2577 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2578 {
2579 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2580 dent->d_name);
2581 break;
2582 }
2583 }
2584 closedir(dir);
2586 qsort(journal_cur->files, journal_cur->files_num,
2587 sizeof(journal_cur->files[0]), journal_sort);
2589 for (uint i=0; i < journal_cur->files_num; i++)
2590 had_journal += journal_replay(journal_cur->files[i]);
2592 journal_new_file();
2594 /* it must have been a crash. start a flush */
2595 if (had_journal && config_flush_at_shutdown)
2596 flush_old_values(-1);
2598 pthread_mutex_unlock(&journal_lock);
2600 RRDD_LOG(LOG_INFO, "journal processing complete");
2602 } /* }}} static void journal_init */
2604 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2605 {
2606 assert(sock != NULL);
2608 free(sock->rbuf); sock->rbuf = NULL;
2609 free(sock->wbuf); sock->wbuf = NULL;
2610 free(sock);
2611 } /* }}} void free_listen_socket */
2613 static void close_connection(listen_socket_t *sock) /* {{{ */
2614 {
2615 if (sock->fd >= 0)
2616 {
2617 close(sock->fd);
2618 sock->fd = -1;
2619 }
2621 free_listen_socket(sock);
2623 } /* }}} void close_connection */
2625 static void *connection_thread_main (void *args) /* {{{ */
2626 {
2627 listen_socket_t *sock;
2628 int fd;
2630 sock = (listen_socket_t *) args;
2631 fd = sock->fd;
2633 /* init read buffers */
2634 sock->next_read = sock->next_cmd = 0;
2635 sock->rbuf = malloc(RBUF_SIZE);
2636 if (sock->rbuf == NULL)
2637 {
2638 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2639 close_connection(sock);
2640 return NULL;
2641 }
2643 pthread_mutex_lock (&connection_threads_lock);
2644 #ifdef HAVE_LIBWRAP
2645 /* LIBWRAP does not support multiple threads! By putting this code
2646 inside pthread_mutex_lock we do not have to worry about request_info
2647 getting overwritten by another thread.
2648 */
2649 struct request_info req;
2650 request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2651 fromhost(&req);
2652 if(!hosts_access(&req)) {
2653 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2654 pthread_mutex_unlock (&connection_threads_lock);
2655 close_connection(sock);
2656 return NULL;
2657 }
2658 #endif /* HAVE_LIBWRAP */
2659 connection_threads_num++;
2660 pthread_mutex_unlock (&connection_threads_lock);
2662 while (state == RUNNING)
2663 {
2664 char *cmd;
2665 ssize_t cmd_len;
2666 ssize_t rbytes;
2667 time_t now;
2669 struct pollfd pollfd;
2670 int status;
2672 pollfd.fd = fd;
2673 pollfd.events = POLLIN | POLLPRI;
2674 pollfd.revents = 0;
2676 status = poll (&pollfd, 1, /* timeout = */ 500);
2677 if (state != RUNNING)
2678 break;
2679 else if (status == 0) /* timeout */
2680 continue;
2681 else if (status < 0) /* error */
2682 {
2683 status = errno;
2684 if (status != EINTR)
2685 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2686 continue;
2687 }
2689 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2690 break;
2691 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2692 {
2693 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2694 "poll(2) returned something unexpected: %#04hx",
2695 pollfd.revents);
2696 break;
2697 }
2699 rbytes = read(fd, sock->rbuf + sock->next_read,
2700 RBUF_SIZE - sock->next_read);
2701 if (rbytes < 0)
2702 {
2703 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2704 break;
2705 }
2706 else if (rbytes == 0)
2707 break; /* eof */
2709 sock->next_read += rbytes;
2711 if (sock->batch_start)
2712 now = sock->batch_start;
2713 else
2714 now = time(NULL);
2716 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2717 {
2718 status = handle_request (sock, now, cmd, cmd_len+1);
2719 if (status != 0)
2720 goto out_close;
2721 }
2722 }
2724 out_close:
2725 close_connection(sock);
2727 /* Remove this thread from the connection threads list */
2728 pthread_mutex_lock (&connection_threads_lock);
2729 connection_threads_num--;
2730 if (connection_threads_num <= 0)
2731 pthread_cond_broadcast(&connection_threads_done);
2732 pthread_mutex_unlock (&connection_threads_lock);
2734 return (NULL);
2735 } /* }}} void *connection_thread_main */
2737 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2738 {
2739 int fd;
2740 struct sockaddr_un sa;
2741 listen_socket_t *temp;
2742 int status;
2743 const char *path;
2744 char *path_copy, *dir;
2746 path = sock->addr;
2747 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2748 path += strlen("unix:");
2750 /* dirname may modify its argument */
2751 path_copy = strdup(path);
2752 if (path_copy == NULL)
2753 {
2754 fprintf(stderr, "rrdcached: strdup(): %s\n",
2755 rrd_strerror(errno));
2756 return (-1);
2757 }
2759 dir = dirname(path_copy);
2760 if (rrd_mkdir_p(dir, 0777) != 0)
2761 {
2762 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2763 dir, rrd_strerror(errno));
2764 return (-1);
2765 }
2767 free(path_copy);
2769 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2770 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2771 if (temp == NULL)
2772 {
2773 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2774 return (-1);
2775 }
2776 listen_fds = temp;
2777 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2779 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2780 if (fd < 0)
2781 {
2782 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2783 rrd_strerror(errno));
2784 return (-1);
2785 }
2787 memset (&sa, 0, sizeof (sa));
2788 sa.sun_family = AF_UNIX;
2789 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2791 /* if we've gotten this far, we own the pid file. any daemon started
2792 * with the same args must not be alive. therefore, ensure that we can
2793 * create the socket...
2794 */
2795 unlink(path);
2797 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2798 if (status != 0)
2799 {
2800 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2801 path, rrd_strerror(errno));
2802 close (fd);
2803 return (-1);
2804 }
2806 /* tweak the sockets group ownership */
2807 if (sock->socket_group != (gid_t)-1)
2808 {
2809 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2810 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2811 {
2812 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2813 }
2814 }
2816 if (sock->socket_permissions != (mode_t)-1)
2817 {
2818 if (chmod(path, sock->socket_permissions) != 0)
2819 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2820 (unsigned int)sock->socket_permissions, strerror(errno));
2821 }
2823 status = listen (fd, /* backlog = */ 10);
2824 if (status != 0)
2825 {
2826 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2827 path, rrd_strerror(errno));
2828 close (fd);
2829 unlink (path);
2830 return (-1);
2831 }
2833 listen_fds[listen_fds_num].fd = fd;
2834 listen_fds[listen_fds_num].family = PF_UNIX;
2835 strncpy(listen_fds[listen_fds_num].addr, path,
2836 sizeof (listen_fds[listen_fds_num].addr) - 1);
2837 listen_fds_num++;
2839 return (0);
2840 } /* }}} int open_listen_socket_unix */
2842 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2843 {
2844 struct addrinfo ai_hints;
2845 struct addrinfo *ai_res;
2846 struct addrinfo *ai_ptr;
2847 char addr_copy[NI_MAXHOST];
2848 char *addr;
2849 char *port;
2850 int status;
2852 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2853 addr_copy[sizeof (addr_copy) - 1] = 0;
2854 addr = addr_copy;
2856 memset (&ai_hints, 0, sizeof (ai_hints));
2857 ai_hints.ai_flags = 0;
2858 #ifdef AI_ADDRCONFIG
2859 ai_hints.ai_flags |= AI_ADDRCONFIG;
2860 #endif
2861 ai_hints.ai_family = AF_UNSPEC;
2862 ai_hints.ai_socktype = SOCK_STREAM;
2864 port = NULL;
2865 if (*addr == '[') /* IPv6+port format */
2866 {
2867 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2868 addr++;
2870 port = strchr (addr, ']');
2871 if (port == NULL)
2872 {
2873 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2874 return (-1);
2875 }
2876 *port = 0;
2877 port++;
2879 if (*port == ':')
2880 port++;
2881 else if (*port == 0)
2882 port = NULL;
2883 else
2884 {
2885 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2886 return (-1);
2887 }
2888 } /* if (*addr == '[') */
2889 else
2890 {
2891 port = rindex(addr, ':');
2892 if (port != NULL)
2893 {
2894 *port = 0;
2895 port++;
2896 }
2897 }
2898 ai_res = NULL;
2899 status = getaddrinfo (addr,
2900 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2901 &ai_hints, &ai_res);
2902 if (status != 0)
2903 {
2904 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2905 addr, gai_strerror (status));
2906 return (-1);
2907 }
2909 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2910 {
2911 int fd;
2912 listen_socket_t *temp;
2913 int one = 1;
2915 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2916 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2917 if (temp == NULL)
2918 {
2919 fprintf (stderr,
2920 "rrdcached: open_listen_socket_network: realloc failed.\n");
2921 continue;
2922 }
2923 listen_fds = temp;
2924 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2926 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2927 if (fd < 0)
2928 {
2929 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2930 rrd_strerror(errno));
2931 continue;
2932 }
2934 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2936 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2937 if (status != 0)
2938 {
2939 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2940 sock->addr, rrd_strerror(errno));
2941 close (fd);
2942 continue;
2943 }
2945 status = listen (fd, /* backlog = */ 10);
2946 if (status != 0)
2947 {
2948 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2949 sock->addr, rrd_strerror(errno));
2950 close (fd);
2951 freeaddrinfo(ai_res);
2952 return (-1);
2953 }
2955 listen_fds[listen_fds_num].fd = fd;
2956 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2957 listen_fds_num++;
2958 } /* for (ai_ptr) */
2960 freeaddrinfo(ai_res);
2961 return (0);
2962 } /* }}} static int open_listen_socket_network */
2964 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2965 {
2966 assert(sock != NULL);
2967 assert(sock->addr != NULL);
2969 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2970 || sock->addr[0] == '/')
2971 return (open_listen_socket_unix(sock));
2972 else
2973 return (open_listen_socket_network(sock));
2974 } /* }}} int open_listen_socket */
2976 static int close_listen_sockets (void) /* {{{ */
2977 {
2978 size_t i;
2980 for (i = 0; i < listen_fds_num; i++)
2981 {
2982 close (listen_fds[i].fd);
2984 if (listen_fds[i].family == PF_UNIX)
2985 unlink(listen_fds[i].addr);
2986 }
2988 free (listen_fds);
2989 listen_fds = NULL;
2990 listen_fds_num = 0;
2992 return (0);
2993 } /* }}} int close_listen_sockets */
2995 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2996 {
2997 struct pollfd *pollfds;
2998 int pollfds_num;
2999 int status;
3000 int i;
3002 if (listen_fds_num < 1)
3003 {
3004 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3005 return (NULL);
3006 }
3008 pollfds_num = listen_fds_num;
3009 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3010 if (pollfds == NULL)
3011 {
3012 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3013 return (NULL);
3014 }
3015 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3017 RRDD_LOG(LOG_INFO, "listening for connections");
3019 while (state == RUNNING)
3020 {
3021 for (i = 0; i < pollfds_num; i++)
3022 {
3023 pollfds[i].fd = listen_fds[i].fd;
3024 pollfds[i].events = POLLIN | POLLPRI;
3025 pollfds[i].revents = 0;
3026 }
3028 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3029 if (state != RUNNING)
3030 break;
3031 else if (status == 0) /* timeout */
3032 continue;
3033 else if (status < 0) /* error */
3034 {
3035 status = errno;
3036 if (status != EINTR)
3037 {
3038 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3039 }
3040 continue;
3041 }
3043 for (i = 0; i < pollfds_num; i++)
3044 {
3045 listen_socket_t *client_sock;
3046 struct sockaddr_storage client_sa;
3047 socklen_t client_sa_size;
3048 pthread_t tid;
3049 pthread_attr_t attr;
3051 if (pollfds[i].revents == 0)
3052 continue;
3054 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3055 {
3056 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3057 "poll(2) returned something unexpected for listen FD #%i.",
3058 pollfds[i].fd);
3059 continue;
3060 }
3062 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3063 if (client_sock == NULL)
3064 {
3065 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3066 continue;
3067 }
3068 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3070 client_sa_size = sizeof (client_sa);
3071 client_sock->fd = accept (pollfds[i].fd,
3072 (struct sockaddr *) &client_sa, &client_sa_size);
3073 if (client_sock->fd < 0)
3074 {
3075 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3076 free(client_sock);
3077 continue;
3078 }
3080 pthread_attr_init (&attr);
3081 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3083 status = pthread_create (&tid, &attr, connection_thread_main,
3084 client_sock);
3085 if (status != 0)
3086 {
3087 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3088 close_connection(client_sock);
3089 continue;
3090 }
3091 } /* for (pollfds_num) */
3092 } /* while (state == RUNNING) */
3094 RRDD_LOG(LOG_INFO, "starting shutdown");
3096 close_listen_sockets ();
3098 pthread_mutex_lock (&connection_threads_lock);
3099 while (connection_threads_num > 0)
3100 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3101 pthread_mutex_unlock (&connection_threads_lock);
3103 free(pollfds);
3105 return (NULL);
3106 } /* }}} void *listen_thread_main */
3108 static int daemonize (void) /* {{{ */
3109 {
3110 int pid_fd;
3111 char *base_dir;
3113 daemon_uid = geteuid();
3115 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3116 if (pid_fd < 0)
3117 pid_fd = check_pidfile();
3118 if (pid_fd < 0)
3119 return pid_fd;
3121 /* open all the listen sockets */
3122 if (config_listen_address_list_len > 0)
3123 {
3124 for (size_t i = 0; i < config_listen_address_list_len; i++)
3125 open_listen_socket (config_listen_address_list[i]);
3127 rrd_free_ptrs((void ***) &config_listen_address_list,
3128 &config_listen_address_list_len);
3129 }
3130 else
3131 {
3132 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3133 sizeof(default_socket.addr) - 1);
3134 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3136 if (default_socket.permissions == 0)
3137 socket_permission_set_all (&default_socket);
3139 open_listen_socket (&default_socket);
3140 }
3142 if (listen_fds_num < 1)
3143 {
3144 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3145 goto error;
3146 }
3148 if (!stay_foreground)
3149 {
3150 pid_t child;
3152 child = fork ();
3153 if (child < 0)
3154 {
3155 fprintf (stderr, "daemonize: fork(2) failed.\n");
3156 goto error;
3157 }
3158 else if (child > 0)
3159 exit(0);
3161 /* Become session leader */
3162 setsid ();
3164 /* Open the first three file descriptors to /dev/null */
3165 close (2);
3166 close (1);
3167 close (0);
3169 open ("/dev/null", O_RDWR);
3170 if (dup(0) == -1 || dup(0) == -1){
3171 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3172 }
3173 } /* if (!stay_foreground) */
3175 /* Change into the /tmp directory. */
3176 base_dir = (config_base_dir != NULL)
3177 ? config_base_dir
3178 : "/tmp";
3180 if (chdir (base_dir) != 0)
3181 {
3182 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3183 goto error;
3184 }
3186 install_signal_handlers();
3188 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3189 RRDD_LOG(LOG_INFO, "starting up");
3191 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3192 (GDestroyNotify) free_cache_item);
3193 if (cache_tree == NULL)
3194 {
3195 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3196 goto error;
3197 }
3199 return write_pidfile (pid_fd);
3201 error:
3202 remove_pidfile();
3203 return -1;
3204 } /* }}} int daemonize */
3206 static int cleanup (void) /* {{{ */
3207 {
3208 pthread_cond_broadcast (&flush_cond);
3209 pthread_join (flush_thread, NULL);
3211 pthread_cond_broadcast (&queue_cond);
3212 for (int i = 0; i < config_queue_threads; i++)
3213 pthread_join (queue_threads[i], NULL);
3215 if (config_flush_at_shutdown)
3216 {
3217 assert(cache_queue_head == NULL);
3218 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3219 }
3221 free(queue_threads);
3222 free(config_base_dir);
3224 pthread_mutex_lock(&cache_lock);
3225 g_tree_destroy(cache_tree);
3227 pthread_mutex_lock(&journal_lock);
3228 journal_done();
3230 RRDD_LOG(LOG_INFO, "goodbye");
3231 closelog ();
3233 remove_pidfile ();
3234 free(config_pid_file);
3236 return (0);
3237 } /* }}} int cleanup */
3239 static int read_options (int argc, char **argv) /* {{{ */
3240 {
3241 int option;
3242 int status = 0;
3244 socket_permission_clear (&default_socket);
3246 default_socket.socket_group = (gid_t)-1;
3247 default_socket.socket_permissions = (mode_t)-1;
3249 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3250 {
3251 switch (option)
3252 {
3253 case 'O':
3254 opt_no_overwrite = 1;
3255 break;
3257 case 'g':
3258 stay_foreground=1;
3259 break;
3261 case 'l':
3262 {
3263 listen_socket_t *new;
3265 new = malloc(sizeof(listen_socket_t));
3266 if (new == NULL)
3267 {
3268 fprintf(stderr, "read_options: malloc failed.\n");
3269 return(2);
3270 }
3271 memset(new, 0, sizeof(listen_socket_t));
3273 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3275 /* Add permissions to the socket {{{ */
3276 if (default_socket.permissions != 0)
3277 {
3278 socket_permission_copy (new, &default_socket);
3279 }
3280 else /* if (default_socket.permissions == 0) */
3281 {
3282 /* Add permission for ALL commands to the socket. */
3283 socket_permission_set_all (new);
3284 }
3285 /* }}} Done adding permissions. */
3287 new->socket_group = default_socket.socket_group;
3288 new->socket_permissions = default_socket.socket_permissions;
3290 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3291 &config_listen_address_list_len, new))
3292 {
3293 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3294 return (2);
3295 }
3296 }
3297 break;
3299 /* set socket group permissions */
3300 case 's':
3301 {
3302 gid_t group_gid;
3303 struct group *grp;
3305 group_gid = strtoul(optarg, NULL, 10);
3306 if (errno != EINVAL && group_gid>0)
3307 {
3308 /* we were passed a number */
3309 grp = getgrgid(group_gid);
3310 }
3311 else
3312 {
3313 grp = getgrnam(optarg);
3314 }
3316 if (grp)
3317 {
3318 default_socket.socket_group = grp->gr_gid;
3319 }
3320 else
3321 {
3322 /* no idea what the user wanted... */
3323 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3324 return (5);
3325 }
3326 }
3327 break;
3329 /* set socket file permissions */
3330 case 'm':
3331 {
3332 long tmp;
3333 char *endptr = NULL;
3335 tmp = strtol (optarg, &endptr, 8);
3336 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3337 || (tmp > 07777) || (tmp < 0)) {
3338 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3339 optarg);
3340 return (5);
3341 }
3343 default_socket.socket_permissions = (mode_t)tmp;
3344 }
3345 break;
3347 case 'P':
3348 {
3349 char *optcopy;
3350 char *saveptr;
3351 char *dummy;
3352 char *ptr;
3354 socket_permission_clear (&default_socket);
3356 optcopy = strdup (optarg);
3357 dummy = optcopy;
3358 saveptr = NULL;
3359 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3360 {
3361 dummy = NULL;
3362 status = socket_permission_add (&default_socket, ptr);
3363 if (status != 0)
3364 {
3365 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3366 "socket failed. Most likely, this permission doesn't "
3367 "exist. Check your command line.\n", ptr);
3368 status = 4;
3369 }
3370 }
3372 free (optcopy);
3373 }
3374 break;
3376 case 'f':
3377 {
3378 int temp;
3380 temp = atoi (optarg);
3381 if (temp > 0)
3382 config_flush_interval = temp;
3383 else
3384 {
3385 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3386 status = 3;
3387 }
3388 }
3389 break;
3391 case 'w':
3392 {
3393 int temp;
3395 temp = atoi (optarg);
3396 if (temp > 0)
3397 config_write_interval = temp;
3398 else
3399 {
3400 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3401 status = 2;
3402 }
3403 }
3404 break;
3406 case 'z':
3407 {
3408 int temp;
3410 temp = atoi(optarg);
3411 if (temp > 0)
3412 config_write_jitter = temp;
3413 else
3414 {
3415 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3416 status = 2;
3417 }
3419 break;
3420 }
3422 case 't':
3423 {
3424 int threads;
3425 threads = atoi(optarg);
3426 if (threads >= 1)
3427 config_queue_threads = threads;
3428 else
3429 {
3430 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3431 return 1;
3432 }
3433 }
3434 break;
3436 case 'B':
3437 config_write_base_only = 1;
3438 break;
3440 case 'b':
3441 {
3442 size_t len;
3443 char base_realpath[PATH_MAX];
3445 if (config_base_dir != NULL)
3446 free (config_base_dir);
3447 config_base_dir = strdup (optarg);
3448 if (config_base_dir == NULL)
3449 {
3450 fprintf (stderr, "read_options: strdup failed.\n");
3451 return (3);
3452 }
3454 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3455 {
3456 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3457 config_base_dir, rrd_strerror (errno));
3458 return (3);
3459 }
3461 /* make sure that the base directory is not resolved via
3462 * symbolic links. this makes some performance-enhancing
3463 * assumptions possible (we don't have to resolve paths
3464 * that start with a "/")
3465 */
3466 if (realpath(config_base_dir, base_realpath) == NULL)
3467 {
3468 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3469 "%s\n", config_base_dir, rrd_strerror(errno));
3470 return 5;
3471 }
3473 len = strlen (config_base_dir);
3474 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3475 {
3476 config_base_dir[len - 1] = 0;
3477 len--;
3478 }
3480 if (len < 1)
3481 {
3482 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3483 return (4);
3484 }
3486 _config_base_dir_len = len;
3488 len = strlen (base_realpath);
3489 while ((len > 0) && (base_realpath[len - 1] == '/'))
3490 {
3491 base_realpath[len - 1] = '\0';
3492 len--;
3493 }
3495 if (strncmp(config_base_dir,
3496 base_realpath, sizeof(base_realpath)) != 0)
3497 {
3498 fprintf(stderr,
3499 "Base directory (-b) resolved via file system links!\n"
3500 "Please consult rrdcached '-b' documentation!\n"
3501 "Consider specifying the real directory (%s)\n",
3502 base_realpath);
3503 return 5;
3504 }
3505 }
3506 break;
3508 case 'p':
3509 {
3510 if (config_pid_file != NULL)
3511 free (config_pid_file);
3512 config_pid_file = strdup (optarg);
3513 if (config_pid_file == NULL)
3514 {
3515 fprintf (stderr, "read_options: strdup failed.\n");
3516 return (3);
3517 }
3518 }
3519 break;
3521 case 'F':
3522 config_flush_at_shutdown = 1;
3523 break;
3525 case 'j':
3526 {
3527 char journal_dir_actual[PATH_MAX];
3528 journal_dir = realpath((const char *)optarg, journal_dir_actual);
3529 if (journal_dir)
3530 {
3531 // if we were able to properly resolve the path, lets have a copy
3532 // for use outside this block.
3533 journal_dir = strdup(journal_dir);
3534 status = rrd_mkdir_p(journal_dir, 0777);
3535 if (status != 0)
3536 {
3537 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3538 journal_dir, rrd_strerror(errno));
3539 return 6;
3540 }
3541 if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3542 {
3543 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3544 errno ? rrd_strerror(errno) : "");
3545 return 6;
3546 }
3547 } else {
3548 fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3549 errno ? rrd_strerror(errno) : "");
3550 return 6;
3551 }
3552 }
3553 break;
3555 case 'a':
3556 {
3557 int temp = atoi(optarg);
3558 if (temp > 0)
3559 config_alloc_chunk = temp;
3560 else
3561 {
3562 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3563 return 10;
3564 }
3565 }
3566 break;
3568 case 'h':
3569 case '?':
3570 printf ("RRDCacheD %s\n"
3571 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3572 "\n"
3573 "Usage: rrdcached [options]\n"
3574 "\n"
3575 "Valid options are:\n"
3576 " -l <address> Socket address to listen to.\n"
3577 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3578 " -P <perms> Sets the permissions to assign to all following "
3579 "sockets\n"
3580 " -w <seconds> Interval in which to write data.\n"
3581 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3582 " -t <threads> Number of write threads.\n"
3583 " -f <seconds> Interval in which to flush dead data.\n"
3584 " -p <file> Location of the PID-file.\n"
3585 " -b <dir> Base directory to change to.\n"
3586 " -B Restrict file access to paths within -b <dir>\n"
3587 " -g Do not fork and run in the foreground.\n"
3588 " -j <dir> Directory in which to create the journal files.\n"
3589 " -F Always flush all updates at shutdown\n"
3590 " -s <id|name> Group owner of all following UNIX sockets\n"
3591 " (the socket will also have read/write permissions "
3592 "for that group)\n"
3593 " -m <mode> File permissions (octal) of all following UNIX "
3594 "sockets\n"
3595 " -a <size> Memory allocation chunk size. Default is 1.\n"
3596 " -O Do not allow CREATE commands to overwrite existing\n"
3597 " files, even if asked to.\n"
3598 "\n"
3599 "For more information and a detailed description of all options "
3600 "please refer\n"
3601 "to the rrdcached(1) manual page.\n",
3602 VERSION);
3603 if (option == 'h')
3604 status = -1;
3605 else
3606 status = 1;
3607 break;
3608 } /* switch (option) */
3609 } /* while (getopt) */
3611 /* advise the user when values are not sane */
3612 if (config_flush_interval < 2 * config_write_interval)
3613 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3614 " 2x write interval (-w) !\n");
3615 if (config_write_jitter > config_write_interval)
3616 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3617 " write interval (-w) !\n");
3619 if (config_write_base_only && config_base_dir == NULL)
3620 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3621 " Consult the rrdcached documentation\n");
3623 if (journal_dir == NULL)
3624 config_flush_at_shutdown = 1;
3626 return (status);
3627 } /* }}} int read_options */
3629 int main (int argc, char **argv)
3630 {
3631 int status;
3633 status = read_options (argc, argv);
3634 if (status != 0)
3635 {
3636 if (status < 0)
3637 status = 0;
3638 return (status);
3639 }
3641 status = daemonize ();
3642 if (status != 0)
3643 {
3644 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3645 return (1);
3646 }
3648 journal_init();
3650 /* start the queue threads */
3651 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3652 if (queue_threads == NULL)
3653 {
3654 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3655 cleanup();
3656 return (1);
3657 }
3658 for (int i = 0; i < config_queue_threads; i++)
3659 {
3660 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3661 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3662 if (status != 0)
3663 {
3664 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3665 cleanup();
3666 return (1);
3667 }
3668 }
3670 /* start the flush thread */
3671 memset(&flush_thread, 0, sizeof(flush_thread));
3672 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3673 if (status != 0)
3674 {
3675 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3676 cleanup();
3677 return (1);
3678 }
3680 listen_thread_main (NULL);
3681 cleanup ();
3683 return (0);
3684 } /* int main */
3686 /*
3687 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3688 */