1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 # include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
110 #include <glib-2.0/glib.h>
111 /* }}} */
113 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
115 #ifndef __GNUC__
116 # define __attribute__(x) /**/
117 #endif
119 /*
120 * Types
121 */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124 struct listen_socket_s
125 {
126 int fd;
127 char addr[PATH_MAX + 1];
128 int family;
130 /* state for BATCH processing */
131 time_t batch_start;
132 int batch_cmd;
134 /* buffered IO */
135 char *rbuf;
136 off_t next_cmd;
137 off_t next_read;
139 char *wbuf;
140 ssize_t wbuf_len;
142 uint32_t permissions;
143 };
144 typedef struct listen_socket_s listen_socket_t;
146 struct command_s;
147 typedef struct command_s command_t;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command_s {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
161 char context; /* where we expect to see it */
162 #define CMD_CONTEXT_CLIENT (1<<0)
163 #define CMD_CONTEXT_BATCH (1<<1)
164 #define CMD_CONTEXT_JOURNAL (1<<2)
165 #define CMD_CONTEXT_ANY (0x7f)
167 char *syntax;
168 char *help;
169 };
171 struct cache_item_s;
172 typedef struct cache_item_s cache_item_t;
173 struct cache_item_s
174 {
175 char *file;
176 char **values;
177 size_t values_num;
178 time_t last_flush_time;
179 time_t last_update_stamp;
180 #define CI_FLAGS_IN_TREE (1<<0)
181 #define CI_FLAGS_IN_QUEUE (1<<1)
182 int flags;
183 pthread_cond_t flushed;
184 cache_item_t *prev;
185 cache_item_t *next;
186 };
188 struct callback_flush_data_s
189 {
190 time_t now;
191 time_t abs_timeout;
192 char **keys;
193 size_t keys_num;
194 };
195 typedef struct callback_flush_data_s callback_flush_data_t;
197 enum queue_side_e
198 {
199 HEAD,
200 TAIL
201 };
202 typedef enum queue_side_e queue_side_t;
204 /* describe a set of journal files */
205 typedef struct {
206 char **files;
207 size_t files_num;
208 } journal_set;
210 /* max length of socket command or response */
211 #define CMD_MAX 4096
212 #define RBUF_SIZE (CMD_MAX*2)
214 /*
215 * Variables
216 */
217 static int stay_foreground = 0;
218 static uid_t daemon_uid;
220 static listen_socket_t *listen_fds = NULL;
221 static size_t listen_fds_num = 0;
223 enum {
224 RUNNING, /* normal operation */
225 FLUSHING, /* flushing remaining values */
226 SHUTDOWN /* shutting down */
227 } state = RUNNING;
229 static pthread_t *queue_threads;
230 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
231 static int config_queue_threads = 4;
233 static pthread_t flush_thread;
234 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
236 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
237 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
238 static int connection_threads_num = 0;
240 /* Cache stuff */
241 static GTree *cache_tree = NULL;
242 static cache_item_t *cache_queue_head = NULL;
243 static cache_item_t *cache_queue_tail = NULL;
244 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
246 static int config_write_interval = 300;
247 static int config_write_jitter = 0;
248 static int config_flush_interval = 3600;
249 static int config_flush_at_shutdown = 0;
250 static char *config_pid_file = NULL;
251 static char *config_base_dir = NULL;
252 static size_t _config_base_dir_len = 0;
253 static int config_write_base_only = 0;
255 static listen_socket_t **config_listen_address_list = NULL;
256 static size_t config_listen_address_list_len = 0;
258 static uint64_t stats_queue_length = 0;
259 static uint64_t stats_updates_received = 0;
260 static uint64_t stats_flush_received = 0;
261 static uint64_t stats_updates_written = 0;
262 static uint64_t stats_data_sets_written = 0;
263 static uint64_t stats_journal_bytes = 0;
264 static uint64_t stats_journal_rotate = 0;
265 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
267 /* Journaled updates */
268 #define JOURNAL_BASE "rrd.journal"
269 static journal_set *journal_cur = NULL;
270 static journal_set *journal_old = NULL;
271 static char *journal_dir = NULL;
272 static FILE *journal_fh = NULL; /* current journal file handle */
273 static long journal_size = 0; /* current journal size */
274 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
275 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
276 static int journal_write(char *cmd, char *args);
277 static void journal_done(void);
278 static void journal_rotate(void);
280 /* prototypes for forward refernces */
281 static int handle_request_help (HANDLER_PROTO);
283 /*
284 * Functions
285 */
286 static void sig_common (const char *sig) /* {{{ */
287 {
288 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
289 state = FLUSHING;
290 pthread_cond_broadcast(&flush_cond);
291 pthread_cond_broadcast(&queue_cond);
292 } /* }}} void sig_common */
294 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
295 {
296 sig_common("INT");
297 } /* }}} void sig_int_handler */
299 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
300 {
301 sig_common("TERM");
302 } /* }}} void sig_term_handler */
304 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
305 {
306 config_flush_at_shutdown = 1;
307 sig_common("USR1");
308 } /* }}} void sig_usr1_handler */
310 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
311 {
312 config_flush_at_shutdown = 0;
313 sig_common("USR2");
314 } /* }}} void sig_usr2_handler */
316 static void install_signal_handlers(void) /* {{{ */
317 {
318 /* These structures are static, because `sigaction' behaves weird if the are
319 * overwritten.. */
320 static struct sigaction sa_int;
321 static struct sigaction sa_term;
322 static struct sigaction sa_pipe;
323 static struct sigaction sa_usr1;
324 static struct sigaction sa_usr2;
326 /* Install signal handlers */
327 memset (&sa_int, 0, sizeof (sa_int));
328 sa_int.sa_handler = sig_int_handler;
329 sigaction (SIGINT, &sa_int, NULL);
331 memset (&sa_term, 0, sizeof (sa_term));
332 sa_term.sa_handler = sig_term_handler;
333 sigaction (SIGTERM, &sa_term, NULL);
335 memset (&sa_pipe, 0, sizeof (sa_pipe));
336 sa_pipe.sa_handler = SIG_IGN;
337 sigaction (SIGPIPE, &sa_pipe, NULL);
339 memset (&sa_pipe, 0, sizeof (sa_usr1));
340 sa_usr1.sa_handler = sig_usr1_handler;
341 sigaction (SIGUSR1, &sa_usr1, NULL);
343 memset (&sa_usr2, 0, sizeof (sa_usr2));
344 sa_usr2.sa_handler = sig_usr2_handler;
345 sigaction (SIGUSR2, &sa_usr2, NULL);
347 } /* }}} void install_signal_handlers */
349 static int open_pidfile(char *action, int oflag) /* {{{ */
350 {
351 int fd;
352 const char *file;
353 char *file_copy, *dir;
355 file = (config_pid_file != NULL)
356 ? config_pid_file
357 : LOCALSTATEDIR "/run/rrdcached.pid";
359 /* dirname may modify its argument */
360 file_copy = strdup(file);
361 if (file_copy == NULL)
362 {
363 fprintf(stderr, "rrdcached: strdup(): %s\n",
364 rrd_strerror(errno));
365 return -1;
366 }
368 dir = dirname(file_copy);
369 if (rrd_mkdir_p(dir, 0777) != 0)
370 {
371 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
372 dir, rrd_strerror(errno));
373 return -1;
374 }
376 free(file_copy);
378 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
379 if (fd < 0)
380 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
381 action, file, rrd_strerror(errno));
383 return(fd);
384 } /* }}} static int open_pidfile */
386 /* check existing pid file to see whether a daemon is running */
387 static int check_pidfile(void)
388 {
389 int pid_fd;
390 pid_t pid;
391 char pid_str[16];
393 pid_fd = open_pidfile("open", O_RDWR);
394 if (pid_fd < 0)
395 return pid_fd;
397 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
398 return -1;
400 pid = atoi(pid_str);
401 if (pid <= 0)
402 return -1;
404 /* another running process that we can signal COULD be
405 * a competing rrdcached */
406 if (pid != getpid() && kill(pid, 0) == 0)
407 {
408 fprintf(stderr,
409 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
410 close(pid_fd);
411 return -1;
412 }
414 lseek(pid_fd, 0, SEEK_SET);
415 if (ftruncate(pid_fd, 0) == -1)
416 {
417 fprintf(stderr,
418 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
419 close(pid_fd);
420 return -1;
421 }
423 fprintf(stderr,
424 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
425 "rrdcached: starting normally.\n", pid);
427 return pid_fd;
428 } /* }}} static int check_pidfile */
430 static int write_pidfile (int fd) /* {{{ */
431 {
432 pid_t pid;
433 FILE *fh;
435 pid = getpid ();
437 fh = fdopen (fd, "w");
438 if (fh == NULL)
439 {
440 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
441 close(fd);
442 return (-1);
443 }
445 fprintf (fh, "%i\n", (int) pid);
446 fclose (fh);
448 return (0);
449 } /* }}} int write_pidfile */
451 static int remove_pidfile (void) /* {{{ */
452 {
453 char *file;
454 int status;
456 file = (config_pid_file != NULL)
457 ? config_pid_file
458 : LOCALSTATEDIR "/run/rrdcached.pid";
460 status = unlink (file);
461 if (status == 0)
462 return (0);
463 return (errno);
464 } /* }}} int remove_pidfile */
466 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
467 {
468 char *eol;
470 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
471 sock->next_read - sock->next_cmd);
473 if (eol == NULL)
474 {
475 /* no commands left, move remainder back to front of rbuf */
476 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
477 sock->next_read - sock->next_cmd);
478 sock->next_read -= sock->next_cmd;
479 sock->next_cmd = 0;
480 *len = 0;
481 return NULL;
482 }
483 else
484 {
485 char *cmd = sock->rbuf + sock->next_cmd;
486 *eol = '\0';
488 sock->next_cmd = eol - sock->rbuf + 1;
490 if (eol > sock->rbuf && *(eol-1) == '\r')
491 *(--eol) = '\0'; /* handle "\r\n" EOL */
493 *len = eol - cmd;
495 return cmd;
496 }
498 /* NOTREACHED */
499 assert(1==0);
500 } /* }}} char *next_cmd */
502 /* add the characters directly to the write buffer */
503 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
504 {
505 char *new_buf;
507 assert(sock != NULL);
509 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
510 if (new_buf == NULL)
511 {
512 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
513 return -1;
514 }
516 strncpy(new_buf + sock->wbuf_len, str, len + 1);
518 sock->wbuf = new_buf;
519 sock->wbuf_len += len;
521 return 0;
522 } /* }}} static int add_to_wbuf */
524 /* add the text to the "extra" info that's sent after the status line */
525 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
526 {
527 va_list argp;
528 char buffer[CMD_MAX];
529 int len;
531 if (sock == NULL) return 0; /* journal replay mode */
532 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
534 va_start(argp, fmt);
535 #ifdef HAVE_VSNPRINTF
536 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
537 #else
538 len = vsprintf(buffer, fmt, argp);
539 #endif
540 va_end(argp);
541 if (len < 0)
542 {
543 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
544 return -1;
545 }
547 return add_to_wbuf(sock, buffer, len);
548 } /* }}} static int add_response_info */
550 static int count_lines(char *str) /* {{{ */
551 {
552 int lines = 0;
554 if (str != NULL)
555 {
556 while ((str = strchr(str, '\n')) != NULL)
557 {
558 ++lines;
559 ++str;
560 }
561 }
563 return lines;
564 } /* }}} static int count_lines */
566 /* send the response back to the user.
567 * returns 0 on success, -1 on error
568 * write buffer is always zeroed after this call */
569 static int send_response (listen_socket_t *sock, response_code rc,
570 char *fmt, ...) /* {{{ */
571 {
572 va_list argp;
573 char buffer[CMD_MAX];
574 int lines;
575 ssize_t wrote;
576 int rclen, len;
578 if (sock == NULL) return rc; /* journal replay mode */
580 if (sock->batch_start)
581 {
582 if (rc == RESP_OK)
583 return rc; /* no response on success during BATCH */
584 lines = sock->batch_cmd;
585 }
586 else if (rc == RESP_OK)
587 lines = count_lines(sock->wbuf);
588 else
589 lines = -1;
591 rclen = sprintf(buffer, "%d ", lines);
592 va_start(argp, fmt);
593 #ifdef HAVE_VSNPRINTF
594 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
595 #else
596 len = vsprintf(buffer+rclen, fmt, argp);
597 #endif
598 va_end(argp);
599 if (len < 0)
600 return -1;
602 len += rclen;
604 /* append the result to the wbuf, don't write to the user */
605 if (sock->batch_start)
606 return add_to_wbuf(sock, buffer, len);
608 /* first write must be complete */
609 if (len != write(sock->fd, buffer, len))
610 {
611 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
612 return -1;
613 }
615 if (sock->wbuf != NULL && rc == RESP_OK)
616 {
617 wrote = 0;
618 while (wrote < sock->wbuf_len)
619 {
620 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
621 if (wb <= 0)
622 {
623 RRDD_LOG(LOG_INFO, "send_response: could not write results");
624 return -1;
625 }
626 wrote += wb;
627 }
628 }
630 free(sock->wbuf); sock->wbuf = NULL;
631 sock->wbuf_len = 0;
633 return 0;
634 } /* }}} */
636 static void wipe_ci_values(cache_item_t *ci, time_t when)
637 {
638 ci->values = NULL;
639 ci->values_num = 0;
641 ci->last_flush_time = when;
642 if (config_write_jitter > 0)
643 ci->last_flush_time += (rrd_random() % config_write_jitter);
644 }
646 /* remove_from_queue
647 * remove a "cache_item_t" item from the queue.
648 * must hold 'cache_lock' when calling this
649 */
650 static void remove_from_queue(cache_item_t *ci) /* {{{ */
651 {
652 if (ci == NULL) return;
653 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
655 if (ci->prev == NULL)
656 cache_queue_head = ci->next; /* reset head */
657 else
658 ci->prev->next = ci->next;
660 if (ci->next == NULL)
661 cache_queue_tail = ci->prev; /* reset the tail */
662 else
663 ci->next->prev = ci->prev;
665 ci->next = ci->prev = NULL;
666 ci->flags &= ~CI_FLAGS_IN_QUEUE;
668 pthread_mutex_lock (&stats_lock);
669 assert (stats_queue_length > 0);
670 stats_queue_length--;
671 pthread_mutex_unlock (&stats_lock);
673 } /* }}} static void remove_from_queue */
675 /* free the resources associated with the cache_item_t
676 * must hold cache_lock when calling this function
677 */
678 static void *free_cache_item(cache_item_t *ci) /* {{{ */
679 {
680 if (ci == NULL) return NULL;
682 remove_from_queue(ci);
684 for (size_t i=0; i < ci->values_num; i++)
685 free(ci->values[i]);
687 free (ci->values);
688 free (ci->file);
690 /* in case anyone is waiting */
691 pthread_cond_broadcast(&ci->flushed);
692 pthread_cond_destroy(&ci->flushed);
694 free (ci);
696 return NULL;
697 } /* }}} static void *free_cache_item */
699 /*
700 * enqueue_cache_item:
701 * `cache_lock' must be acquired before calling this function!
702 */
703 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
704 queue_side_t side)
705 {
706 if (ci == NULL)
707 return (-1);
709 if (ci->values_num == 0)
710 return (0);
712 if (side == HEAD)
713 {
714 if (cache_queue_head == ci)
715 return 0;
717 /* remove if further down in queue */
718 remove_from_queue(ci);
720 ci->prev = NULL;
721 ci->next = cache_queue_head;
722 if (ci->next != NULL)
723 ci->next->prev = ci;
724 cache_queue_head = ci;
726 if (cache_queue_tail == NULL)
727 cache_queue_tail = cache_queue_head;
728 }
729 else /* (side == TAIL) */
730 {
731 /* We don't move values back in the list.. */
732 if (ci->flags & CI_FLAGS_IN_QUEUE)
733 return (0);
735 assert (ci->next == NULL);
736 assert (ci->prev == NULL);
738 ci->prev = cache_queue_tail;
740 if (cache_queue_tail == NULL)
741 cache_queue_head = ci;
742 else
743 cache_queue_tail->next = ci;
745 cache_queue_tail = ci;
746 }
748 ci->flags |= CI_FLAGS_IN_QUEUE;
750 pthread_cond_signal(&queue_cond);
751 pthread_mutex_lock (&stats_lock);
752 stats_queue_length++;
753 pthread_mutex_unlock (&stats_lock);
755 return (0);
756 } /* }}} int enqueue_cache_item */
758 /*
759 * tree_callback_flush:
760 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
761 * while this is in progress.
762 */
763 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
764 gpointer data)
765 {
766 cache_item_t *ci;
767 callback_flush_data_t *cfd;
769 ci = (cache_item_t *) value;
770 cfd = (callback_flush_data_t *) data;
772 if (ci->flags & CI_FLAGS_IN_QUEUE)
773 return FALSE;
775 if (ci->values_num > 0
776 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
777 {
778 enqueue_cache_item (ci, TAIL);
779 }
780 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
781 && (ci->values_num <= 0))
782 {
783 assert ((char *) key == ci->file);
784 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
785 {
786 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
787 return (FALSE);
788 }
789 }
791 return (FALSE);
792 } /* }}} gboolean tree_callback_flush */
794 static int flush_old_values (int max_age)
795 {
796 callback_flush_data_t cfd;
797 size_t k;
799 memset (&cfd, 0, sizeof (cfd));
800 /* Pass the current time as user data so that we don't need to call
801 * `time' for each node. */
802 cfd.now = time (NULL);
803 cfd.keys = NULL;
804 cfd.keys_num = 0;
806 if (max_age > 0)
807 cfd.abs_timeout = cfd.now - max_age;
808 else
809 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
811 /* `tree_callback_flush' will return the keys of all values that haven't
812 * been touched in the last `config_flush_interval' seconds in `cfd'.
813 * The char*'s in this array point to the same memory as ci->file, so we
814 * don't need to free them separately. */
815 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
817 for (k = 0; k < cfd.keys_num; k++)
818 {
819 /* should never fail, since we have held the cache_lock
820 * the entire time */
821 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
822 }
824 if (cfd.keys != NULL)
825 {
826 free (cfd.keys);
827 cfd.keys = NULL;
828 }
830 return (0);
831 } /* int flush_old_values */
833 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
834 {
835 struct timeval now;
836 struct timespec next_flush;
837 int status;
839 gettimeofday (&now, NULL);
840 next_flush.tv_sec = now.tv_sec + config_flush_interval;
841 next_flush.tv_nsec = 1000 * now.tv_usec;
843 pthread_mutex_lock(&cache_lock);
845 while (state == RUNNING)
846 {
847 gettimeofday (&now, NULL);
848 if ((now.tv_sec > next_flush.tv_sec)
849 || ((now.tv_sec == next_flush.tv_sec)
850 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
851 {
852 RRDD_LOG(LOG_DEBUG, "flushing old values");
854 /* Determine the time of the next cache flush. */
855 next_flush.tv_sec = now.tv_sec + config_flush_interval;
857 /* Flush all values that haven't been written in the last
858 * `config_write_interval' seconds. */
859 flush_old_values (config_write_interval);
861 /* unlock the cache while we rotate so we don't block incoming
862 * updates if the fsync() blocks on disk I/O */
863 pthread_mutex_unlock(&cache_lock);
864 journal_rotate();
865 pthread_mutex_lock(&cache_lock);
866 }
868 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
869 if (status != 0 && status != ETIMEDOUT)
870 {
871 RRDD_LOG (LOG_ERR, "flush_thread_main: "
872 "pthread_cond_timedwait returned %i.", status);
873 }
874 }
876 if (config_flush_at_shutdown)
877 flush_old_values (-1); /* flush everything */
879 state = SHUTDOWN;
881 pthread_mutex_unlock(&cache_lock);
883 return NULL;
884 } /* void *flush_thread_main */
886 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
887 {
888 pthread_mutex_lock (&cache_lock);
890 while (state != SHUTDOWN
891 || (cache_queue_head != NULL && config_flush_at_shutdown))
892 {
893 cache_item_t *ci;
894 char *file;
895 char **values;
896 size_t values_num;
897 int status;
899 /* Now, check if there's something to store away. If not, wait until
900 * something comes in. */
901 if (cache_queue_head == NULL)
902 {
903 status = pthread_cond_wait (&queue_cond, &cache_lock);
904 if ((status != 0) && (status != ETIMEDOUT))
905 {
906 RRDD_LOG (LOG_ERR, "queue_thread_main: "
907 "pthread_cond_wait returned %i.", status);
908 }
909 }
911 /* Check if a value has arrived. This may be NULL if we timed out or there
912 * was an interrupt such as a signal. */
913 if (cache_queue_head == NULL)
914 continue;
916 ci = cache_queue_head;
918 /* copy the relevant parts */
919 file = strdup (ci->file);
920 if (file == NULL)
921 {
922 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
923 continue;
924 }
926 assert(ci->values != NULL);
927 assert(ci->values_num > 0);
929 values = ci->values;
930 values_num = ci->values_num;
932 wipe_ci_values(ci, time(NULL));
933 remove_from_queue(ci);
935 pthread_mutex_unlock (&cache_lock);
937 rrd_clear_error ();
938 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
939 if (status != 0)
940 {
941 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
942 "rrd_update_r (%s) failed with status %i. (%s)",
943 file, status, rrd_get_error());
944 }
946 journal_write("wrote", file);
948 /* Search again in the tree. It's possible someone issued a "FORGET"
949 * while we were writing the update values. */
950 pthread_mutex_lock(&cache_lock);
951 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
952 if (ci)
953 pthread_cond_broadcast(&ci->flushed);
954 pthread_mutex_unlock(&cache_lock);
956 if (status == 0)
957 {
958 pthread_mutex_lock (&stats_lock);
959 stats_updates_written++;
960 stats_data_sets_written += values_num;
961 pthread_mutex_unlock (&stats_lock);
962 }
964 rrd_free_ptrs((void ***) &values, &values_num);
965 free(file);
967 pthread_mutex_lock (&cache_lock);
968 }
969 pthread_mutex_unlock (&cache_lock);
971 return (NULL);
972 } /* }}} void *queue_thread_main */
974 static int buffer_get_field (char **buffer_ret, /* {{{ */
975 size_t *buffer_size_ret, char **field_ret)
976 {
977 char *buffer;
978 size_t buffer_pos;
979 size_t buffer_size;
980 char *field;
981 size_t field_size;
982 int status;
984 buffer = *buffer_ret;
985 buffer_pos = 0;
986 buffer_size = *buffer_size_ret;
987 field = *buffer_ret;
988 field_size = 0;
990 if (buffer_size <= 0)
991 return (-1);
993 /* This is ensured by `handle_request'. */
994 assert (buffer[buffer_size - 1] == '\0');
996 status = -1;
997 while (buffer_pos < buffer_size)
998 {
999 /* Check for end-of-field or end-of-buffer */
1000 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1001 {
1002 field[field_size] = 0;
1003 field_size++;
1004 buffer_pos++;
1005 status = 0;
1006 break;
1007 }
1008 /* Handle escaped characters. */
1009 else if (buffer[buffer_pos] == '\\')
1010 {
1011 if (buffer_pos >= (buffer_size - 1))
1012 break;
1013 buffer_pos++;
1014 field[field_size] = buffer[buffer_pos];
1015 field_size++;
1016 buffer_pos++;
1017 }
1018 /* Normal operation */
1019 else
1020 {
1021 field[field_size] = buffer[buffer_pos];
1022 field_size++;
1023 buffer_pos++;
1024 }
1025 } /* while (buffer_pos < buffer_size) */
1027 if (status != 0)
1028 return (status);
1030 *buffer_ret = buffer + buffer_pos;
1031 *buffer_size_ret = buffer_size - buffer_pos;
1032 *field_ret = field;
1034 return (0);
1035 } /* }}} int buffer_get_field */
1037 /* if we're restricting writes to the base directory,
1038 * check whether the file falls within the dir
1039 * returns 1 if OK, otherwise 0
1040 */
1041 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1042 {
1043 assert(file != NULL);
1045 if (!config_write_base_only
1046 || sock == NULL /* journal replay */
1047 || config_base_dir == NULL)
1048 return 1;
1050 if (strstr(file, "../") != NULL) goto err;
1052 /* relative paths without "../" are ok */
1053 if (*file != '/') return 1;
1055 /* file must be of the format base + "/" + <1+ char filename> */
1056 if (strlen(file) < _config_base_dir_len + 2) goto err;
1057 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1058 if (*(file + _config_base_dir_len) != '/') goto err;
1060 return 1;
1062 err:
1063 if (sock != NULL && sock->fd >= 0)
1064 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1066 return 0;
1067 } /* }}} static int check_file_access */
1069 /* when using a base dir, convert relative paths to absolute paths.
1070 * if necessary, modifies the "filename" pointer to point
1071 * to the new path created in "tmp". "tmp" is provided
1072 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1073 *
1074 * this allows us to optimize for the expected case (absolute path)
1075 * with a no-op.
1076 */
1077 static void get_abs_path(char **filename, char *tmp)
1078 {
1079 assert(tmp != NULL);
1080 assert(filename != NULL && *filename != NULL);
1082 if (config_base_dir == NULL || **filename == '/')
1083 return;
1085 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1086 *filename = tmp;
1087 } /* }}} static int get_abs_path */
1089 static int flush_file (const char *filename) /* {{{ */
1090 {
1091 cache_item_t *ci;
1093 pthread_mutex_lock (&cache_lock);
1095 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1096 if (ci == NULL)
1097 {
1098 pthread_mutex_unlock (&cache_lock);
1099 return (ENOENT);
1100 }
1102 if (ci->values_num > 0)
1103 {
1104 /* Enqueue at head */
1105 enqueue_cache_item (ci, HEAD);
1106 pthread_cond_wait(&ci->flushed, &cache_lock);
1107 }
1109 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1110 * may have been purged during our cond_wait() */
1112 pthread_mutex_unlock(&cache_lock);
1114 return (0);
1115 } /* }}} int flush_file */
1117 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1118 {
1119 char *err = "Syntax error.\n";
1121 if (cmd && cmd->syntax)
1122 err = cmd->syntax;
1124 return send_response(sock, RESP_ERR, "Usage: %s", err);
1125 } /* }}} static int syntax_error() */
1127 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1128 {
1129 uint64_t copy_queue_length;
1130 uint64_t copy_updates_received;
1131 uint64_t copy_flush_received;
1132 uint64_t copy_updates_written;
1133 uint64_t copy_data_sets_written;
1134 uint64_t copy_journal_bytes;
1135 uint64_t copy_journal_rotate;
1137 uint64_t tree_nodes_number;
1138 uint64_t tree_depth;
1140 pthread_mutex_lock (&stats_lock);
1141 copy_queue_length = stats_queue_length;
1142 copy_updates_received = stats_updates_received;
1143 copy_flush_received = stats_flush_received;
1144 copy_updates_written = stats_updates_written;
1145 copy_data_sets_written = stats_data_sets_written;
1146 copy_journal_bytes = stats_journal_bytes;
1147 copy_journal_rotate = stats_journal_rotate;
1148 pthread_mutex_unlock (&stats_lock);
1150 pthread_mutex_lock (&cache_lock);
1151 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1152 tree_depth = (uint64_t) g_tree_height (cache_tree);
1153 pthread_mutex_unlock (&cache_lock);
1155 add_response_info(sock,
1156 "QueueLength: %"PRIu64"\n", copy_queue_length);
1157 add_response_info(sock,
1158 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1159 add_response_info(sock,
1160 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1161 add_response_info(sock,
1162 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1163 add_response_info(sock,
1164 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1165 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1166 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1167 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1168 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1170 send_response(sock, RESP_OK, "Statistics follow\n");
1172 return (0);
1173 } /* }}} int handle_request_stats */
1175 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1176 {
1177 char *file, file_tmp[PATH_MAX];
1178 int status;
1180 status = buffer_get_field (&buffer, &buffer_size, &file);
1181 if (status != 0)
1182 {
1183 return syntax_error(sock,cmd);
1184 }
1185 else
1186 {
1187 pthread_mutex_lock(&stats_lock);
1188 stats_flush_received++;
1189 pthread_mutex_unlock(&stats_lock);
1191 get_abs_path(&file, file_tmp);
1192 if (!check_file_access(file, sock)) return 0;
1194 status = flush_file (file);
1195 if (status == 0)
1196 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1197 else if (status == ENOENT)
1198 {
1199 /* no file in our tree; see whether it exists at all */
1200 struct stat statbuf;
1202 memset(&statbuf, 0, sizeof(statbuf));
1203 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1204 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1205 else
1206 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1207 }
1208 else if (status < 0)
1209 return send_response(sock, RESP_ERR, "Internal error.\n");
1210 else
1211 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1212 }
1214 /* NOTREACHED */
1215 assert(1==0);
1216 } /* }}} int handle_request_flush */
1218 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1219 {
1220 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1222 pthread_mutex_lock(&cache_lock);
1223 flush_old_values(-1);
1224 pthread_mutex_unlock(&cache_lock);
1226 return send_response(sock, RESP_OK, "Started flush.\n");
1227 } /* }}} static int handle_request_flushall */
1229 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1230 {
1231 int status;
1232 char *file, file_tmp[PATH_MAX];
1233 cache_item_t *ci;
1235 status = buffer_get_field(&buffer, &buffer_size, &file);
1236 if (status != 0)
1237 return syntax_error(sock,cmd);
1239 get_abs_path(&file, file_tmp);
1241 pthread_mutex_lock(&cache_lock);
1242 ci = g_tree_lookup(cache_tree, file);
1243 if (ci == NULL)
1244 {
1245 pthread_mutex_unlock(&cache_lock);
1246 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1247 }
1249 for (size_t i=0; i < ci->values_num; i++)
1250 add_response_info(sock, "%s\n", ci->values[i]);
1252 pthread_mutex_unlock(&cache_lock);
1253 return send_response(sock, RESP_OK, "updates pending\n");
1254 } /* }}} static int handle_request_pending */
1256 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1257 {
1258 int status;
1259 gboolean found;
1260 char *file, file_tmp[PATH_MAX];
1262 status = buffer_get_field(&buffer, &buffer_size, &file);
1263 if (status != 0)
1264 return syntax_error(sock,cmd);
1266 get_abs_path(&file, file_tmp);
1267 if (!check_file_access(file, sock)) return 0;
1269 pthread_mutex_lock(&cache_lock);
1270 found = g_tree_remove(cache_tree, file);
1271 pthread_mutex_unlock(&cache_lock);
1273 if (found == TRUE)
1274 {
1275 if (sock != NULL)
1276 journal_write("forget", file);
1278 return send_response(sock, RESP_OK, "Gone!\n");
1279 }
1280 else
1281 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1283 /* NOTREACHED */
1284 assert(1==0);
1285 } /* }}} static int handle_request_forget */
1287 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1288 {
1289 cache_item_t *ci;
1291 pthread_mutex_lock(&cache_lock);
1293 ci = cache_queue_head;
1294 while (ci != NULL)
1295 {
1296 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1297 ci = ci->next;
1298 }
1300 pthread_mutex_unlock(&cache_lock);
1302 return send_response(sock, RESP_OK, "in queue.\n");
1303 } /* }}} int handle_request_queue */
1305 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1306 {
1307 char *file, file_tmp[PATH_MAX];
1308 int values_num = 0;
1309 int status;
1310 char orig_buf[CMD_MAX];
1312 cache_item_t *ci;
1314 /* save it for the journal later */
1315 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1317 status = buffer_get_field (&buffer, &buffer_size, &file);
1318 if (status != 0)
1319 return syntax_error(sock,cmd);
1321 pthread_mutex_lock(&stats_lock);
1322 stats_updates_received++;
1323 pthread_mutex_unlock(&stats_lock);
1325 get_abs_path(&file, file_tmp);
1326 if (!check_file_access(file, sock)) return 0;
1328 pthread_mutex_lock (&cache_lock);
1329 ci = g_tree_lookup (cache_tree, file);
1331 if (ci == NULL) /* {{{ */
1332 {
1333 struct stat statbuf;
1334 cache_item_t *tmp;
1336 /* don't hold the lock while we setup; stat(2) might block */
1337 pthread_mutex_unlock(&cache_lock);
1339 memset (&statbuf, 0, sizeof (statbuf));
1340 status = stat (file, &statbuf);
1341 if (status != 0)
1342 {
1343 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1345 status = errno;
1346 if (status == ENOENT)
1347 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1348 else
1349 return send_response(sock, RESP_ERR,
1350 "stat failed with error %i.\n", status);
1351 }
1352 if (!S_ISREG (statbuf.st_mode))
1353 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1355 if (access(file, R_OK|W_OK) != 0)
1356 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1357 file, rrd_strerror(errno));
1359 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1360 if (ci == NULL)
1361 {
1362 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1364 return send_response(sock, RESP_ERR, "malloc failed.\n");
1365 }
1366 memset (ci, 0, sizeof (cache_item_t));
1368 ci->file = strdup (file);
1369 if (ci->file == NULL)
1370 {
1371 free (ci);
1372 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1374 return send_response(sock, RESP_ERR, "strdup failed.\n");
1375 }
1377 wipe_ci_values(ci, now);
1378 ci->flags = CI_FLAGS_IN_TREE;
1379 pthread_cond_init(&ci->flushed, NULL);
1381 pthread_mutex_lock(&cache_lock);
1383 /* another UPDATE might have added this entry in the meantime */
1384 tmp = g_tree_lookup (cache_tree, file);
1385 if (tmp == NULL)
1386 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1387 else
1388 {
1389 free_cache_item (ci);
1390 ci = tmp;
1391 }
1393 /* state may have changed while we were unlocked */
1394 if (state == SHUTDOWN)
1395 return -1;
1396 } /* }}} */
1397 assert (ci != NULL);
1399 /* don't re-write updates in replay mode */
1400 if (sock != NULL)
1401 journal_write("update", orig_buf);
1403 while (buffer_size > 0)
1404 {
1405 char *value;
1406 time_t stamp;
1407 char *eostamp;
1409 status = buffer_get_field (&buffer, &buffer_size, &value);
1410 if (status != 0)
1411 {
1412 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1413 break;
1414 }
1416 /* make sure update time is always moving forward */
1417 stamp = strtol(value, &eostamp, 10);
1418 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1419 {
1420 pthread_mutex_unlock(&cache_lock);
1421 return send_response(sock, RESP_ERR,
1422 "Cannot find timestamp in '%s'!\n", value);
1423 }
1424 else if (stamp <= ci->last_update_stamp)
1425 {
1426 pthread_mutex_unlock(&cache_lock);
1427 return send_response(sock, RESP_ERR,
1428 "illegal attempt to update using time %ld when last"
1429 " update time is %ld (minimum one second step)\n",
1430 stamp, ci->last_update_stamp);
1431 }
1432 else
1433 ci->last_update_stamp = stamp;
1435 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1436 {
1437 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1438 continue;
1439 }
1441 values_num++;
1442 }
1444 if (((now - ci->last_flush_time) >= config_write_interval)
1445 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1446 && (ci->values_num > 0))
1447 {
1448 enqueue_cache_item (ci, TAIL);
1449 }
1451 pthread_mutex_unlock (&cache_lock);
1453 if (values_num < 1)
1454 return send_response(sock, RESP_ERR, "No values updated.\n");
1455 else
1456 return send_response(sock, RESP_OK,
1457 "errors, enqueued %i value(s).\n", values_num);
1459 /* NOTREACHED */
1460 assert(1==0);
1462 } /* }}} int handle_request_update */
1464 /* we came across a "WROTE" entry during journal replay.
1465 * throw away any values that we have accumulated for this file
1466 */
1467 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1468 {
1469 cache_item_t *ci;
1470 const char *file = buffer;
1472 pthread_mutex_lock(&cache_lock);
1474 ci = g_tree_lookup(cache_tree, file);
1475 if (ci == NULL)
1476 {
1477 pthread_mutex_unlock(&cache_lock);
1478 return (0);
1479 }
1481 if (ci->values)
1482 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1484 wipe_ci_values(ci, now);
1485 remove_from_queue(ci);
1487 pthread_mutex_unlock(&cache_lock);
1488 return (0);
1489 } /* }}} int handle_request_wrote */
1491 /* start "BATCH" processing */
1492 static int batch_start (HANDLER_PROTO) /* {{{ */
1493 {
1494 int status;
1495 if (sock->batch_start)
1496 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1498 status = send_response(sock, RESP_OK,
1499 "Go ahead. End with dot '.' on its own line.\n");
1500 sock->batch_start = time(NULL);
1501 sock->batch_cmd = 0;
1503 return status;
1504 } /* }}} static int batch_start */
1506 /* finish "BATCH" processing and return results to the client */
1507 static int batch_done (HANDLER_PROTO) /* {{{ */
1508 {
1509 assert(sock->batch_start);
1510 sock->batch_start = 0;
1511 sock->batch_cmd = 0;
1512 return send_response(sock, RESP_OK, "errors\n");
1513 } /* }}} static int batch_done */
1515 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1516 {
1517 return -1;
1518 } /* }}} static int handle_request_quit */
1520 static command_t list_of_commands[] = { /* {{{ */
1521 {
1522 "UPDATE",
1523 handle_request_update,
1524 CMD_CONTEXT_ANY,
1525 "UPDATE <filename> <values> [<values> ...]\n"
1526 ,
1527 "Adds the given file to the internal cache if it is not yet known and\n"
1528 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1529 "for details.\n"
1530 "\n"
1531 "Each <values> has the following form:\n"
1532 " <values> = <time>:<value>[:<value>[...]]\n"
1533 "See the rrdupdate(1) manpage for details.\n"
1534 },
1535 {
1536 "WROTE",
1537 handle_request_wrote,
1538 CMD_CONTEXT_JOURNAL,
1539 NULL,
1540 NULL
1541 },
1542 {
1543 "FLUSH",
1544 handle_request_flush,
1545 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1546 "FLUSH <filename>\n"
1547 ,
1548 "Adds the given filename to the head of the update queue and returns\n"
1549 "after it has been dequeued.\n"
1550 },
1551 {
1552 "FLUSHALL",
1553 handle_request_flushall,
1554 CMD_CONTEXT_CLIENT,
1555 "FLUSHALL\n"
1556 ,
1557 "Triggers writing of all pending updates. Returns immediately.\n"
1558 },
1559 {
1560 "PENDING",
1561 handle_request_pending,
1562 CMD_CONTEXT_CLIENT,
1563 "PENDING <filename>\n"
1564 ,
1565 "Shows any 'pending' updates for a file, in order.\n"
1566 "The updates shown have not yet been written to the underlying RRD file.\n"
1567 },
1568 {
1569 "FORGET",
1570 handle_request_forget,
1571 CMD_CONTEXT_ANY,
1572 "FORGET <filename>\n"
1573 ,
1574 "Removes the file completely from the cache.\n"
1575 "Any pending updates for the file will be lost.\n"
1576 },
1577 {
1578 "QUEUE",
1579 handle_request_queue,
1580 CMD_CONTEXT_CLIENT,
1581 "QUEUE\n"
1582 ,
1583 "Shows all files in the output queue.\n"
1584 "The output is zero or more lines in the following format:\n"
1585 "(where <num_vals> is the number of values to be written)\n"
1586 "\n"
1587 "<num_vals> <filename>\n"
1588 },
1589 {
1590 "STATS",
1591 handle_request_stats,
1592 CMD_CONTEXT_CLIENT,
1593 "STATS\n"
1594 ,
1595 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1596 "a description of the values.\n"
1597 },
1598 {
1599 "HELP",
1600 handle_request_help,
1601 CMD_CONTEXT_CLIENT,
1602 "HELP [<command>]\n",
1603 NULL, /* special! */
1604 },
1605 {
1606 "BATCH",
1607 batch_start,
1608 CMD_CONTEXT_CLIENT,
1609 "BATCH\n"
1610 ,
1611 "The 'BATCH' command permits the client to initiate a bulk load\n"
1612 " of commands to rrdcached.\n"
1613 "\n"
1614 "Usage:\n"
1615 "\n"
1616 " client: BATCH\n"
1617 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1618 " client: command #1\n"
1619 " client: command #2\n"
1620 " client: ... and so on\n"
1621 " client: .\n"
1622 " server: 2 errors\n"
1623 " server: 7 message for command #7\n"
1624 " server: 9 message for command #9\n"
1625 "\n"
1626 "For more information, consult the rrdcached(1) documentation.\n"
1627 },
1628 {
1629 ".", /* BATCH terminator */
1630 batch_done,
1631 CMD_CONTEXT_BATCH,
1632 NULL,
1633 NULL
1634 },
1635 {
1636 "QUIT",
1637 handle_request_quit,
1638 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1639 "QUIT\n"
1640 ,
1641 "Disconnect from rrdcached.\n"
1642 }
1643 }; /* }}} command_t list_of_commands[] */
1644 static size_t list_of_commands_len = sizeof (list_of_commands)
1645 / sizeof (list_of_commands[0]);
1647 static command_t *find_command(char *cmd)
1648 {
1649 size_t i;
1651 for (i = 0; i < list_of_commands_len; i++)
1652 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1653 return (&list_of_commands[i]);
1654 return NULL;
1655 }
1657 /* We currently use the index in the `list_of_commands' array as a bit position
1658 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1659 * outside these functions so that switching to a more elegant storage method
1660 * is easily possible. */
1661 static ssize_t find_command_index (const char *cmd) /* {{{ */
1662 {
1663 size_t i;
1665 for (i = 0; i < list_of_commands_len; i++)
1666 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1667 return ((ssize_t) i);
1668 return (-1);
1669 } /* }}} ssize_t find_command_index */
1671 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1672 const char *cmd)
1673 {
1674 ssize_t i;
1676 if (cmd == NULL)
1677 return (-1);
1679 if ((strcasecmp ("QUIT", cmd) == 0)
1680 || (strcasecmp ("HELP", cmd) == 0))
1681 return (1);
1682 else if (strcmp (".", cmd) == 0)
1683 cmd = "BATCH";
1685 i = find_command_index (cmd);
1686 if (i < 0)
1687 return (-1);
1688 assert (i < 32);
1690 if ((sock->permissions & (1 << i)) != 0)
1691 return (1);
1692 return (0);
1693 } /* }}} int socket_permission_check */
1695 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1696 const char *cmd)
1697 {
1698 ssize_t i;
1700 i = find_command_index (cmd);
1701 if (i < 0)
1702 return (-1);
1703 assert (i < 32);
1705 sock->permissions |= (1 << i);
1706 return (0);
1707 } /* }}} int socket_permission_add */
1709 /* check whether commands are received in the expected context */
1710 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1711 {
1712 if (sock == NULL)
1713 return (cmd->context & CMD_CONTEXT_JOURNAL);
1714 else if (sock->batch_start)
1715 return (cmd->context & CMD_CONTEXT_BATCH);
1716 else
1717 return (cmd->context & CMD_CONTEXT_CLIENT);
1719 /* NOTREACHED */
1720 assert(1==0);
1721 }
1723 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1724 {
1725 int status;
1726 char *cmd_str;
1727 char *resp_txt;
1728 command_t *help = NULL;
1730 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1731 if (status == 0)
1732 help = find_command(cmd_str);
1734 if (help && (help->syntax || help->help))
1735 {
1736 char tmp[CMD_MAX];
1738 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1739 resp_txt = tmp;
1741 if (help->syntax)
1742 add_response_info(sock, "Usage: %s\n", help->syntax);
1744 if (help->help)
1745 add_response_info(sock, "%s\n", help->help);
1746 }
1747 else
1748 {
1749 size_t i;
1751 resp_txt = "Command overview\n";
1753 for (i = 0; i < list_of_commands_len; i++)
1754 {
1755 if (list_of_commands[i].syntax == NULL)
1756 continue;
1757 add_response_info (sock, "%s", list_of_commands[i].syntax);
1758 }
1759 }
1761 return send_response(sock, RESP_OK, resp_txt);
1762 } /* }}} int handle_request_help */
1764 /* if sock==NULL, we are in journal replay mode */
1765 static int handle_request (DISPATCH_PROTO) /* {{{ */
1766 {
1767 char *buffer_ptr = buffer;
1768 char *cmd_str = NULL;
1769 command_t *cmd = NULL;
1770 int status;
1772 assert (buffer[buffer_size - 1] == '\0');
1774 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1775 if (status != 0)
1776 {
1777 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1778 return (-1);
1779 }
1781 if (sock != NULL && sock->batch_start)
1782 sock->batch_cmd++;
1784 cmd = find_command(cmd_str);
1785 if (!cmd)
1786 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1788 if (!socket_permission_check (sock, cmd->cmd))
1789 return send_response(sock, RESP_ERR, "Permission denied.\n");
1791 if (!command_check_context(sock, cmd))
1792 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1794 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1795 } /* }}} int handle_request */
1797 static void journal_set_free (journal_set *js) /* {{{ */
1798 {
1799 if (js == NULL)
1800 return;
1802 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1804 free(js);
1805 } /* }}} journal_set_free */
1807 static void journal_set_remove (journal_set *js) /* {{{ */
1808 {
1809 if (js == NULL)
1810 return;
1812 for (uint i=0; i < js->files_num; i++)
1813 {
1814 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1815 unlink(js->files[i]);
1816 }
1817 } /* }}} journal_set_remove */
1819 /* close current journal file handle.
1820 * MUST hold journal_lock before calling */
1821 static void journal_close(void) /* {{{ */
1822 {
1823 if (journal_fh != NULL)
1824 {
1825 if (fclose(journal_fh) != 0)
1826 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1827 }
1829 journal_fh = NULL;
1830 journal_size = 0;
1831 } /* }}} journal_close */
1833 /* MUST hold journal_lock before calling */
1834 static void journal_new_file(void) /* {{{ */
1835 {
1836 struct timeval now;
1837 int new_fd;
1838 char new_file[PATH_MAX + 1];
1840 assert(journal_dir != NULL);
1841 assert(journal_cur != NULL);
1843 journal_close();
1845 gettimeofday(&now, NULL);
1846 /* this format assures that the files sort in strcmp() order */
1847 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1848 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1850 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1851 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1852 if (new_fd < 0)
1853 goto error;
1855 journal_fh = fdopen(new_fd, "a");
1856 if (journal_fh == NULL)
1857 goto error;
1859 journal_size = ftell(journal_fh);
1860 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1862 /* record the file in the journal set */
1863 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1865 return;
1867 error:
1868 RRDD_LOG(LOG_CRIT,
1869 "JOURNALING DISABLED: Error while trying to create %s : %s",
1870 new_file, rrd_strerror(errno));
1871 RRDD_LOG(LOG_CRIT,
1872 "JOURNALING DISABLED: All values will be flushed at shutdown");
1874 close(new_fd);
1875 config_flush_at_shutdown = 1;
1877 } /* }}} journal_new_file */
1879 /* MUST NOT hold journal_lock before calling this */
1880 static void journal_rotate(void) /* {{{ */
1881 {
1882 journal_set *old_js = NULL;
1884 if (journal_dir == NULL)
1885 return;
1887 RRDD_LOG(LOG_DEBUG, "rotating journals");
1889 pthread_mutex_lock(&stats_lock);
1890 ++stats_journal_rotate;
1891 pthread_mutex_unlock(&stats_lock);
1893 pthread_mutex_lock(&journal_lock);
1895 journal_close();
1897 /* rotate the journal sets */
1898 old_js = journal_old;
1899 journal_old = journal_cur;
1900 journal_cur = calloc(1, sizeof(journal_set));
1902 if (journal_cur != NULL)
1903 journal_new_file();
1904 else
1905 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1907 pthread_mutex_unlock(&journal_lock);
1909 journal_set_remove(old_js);
1910 journal_set_free (old_js);
1912 } /* }}} static void journal_rotate */
1914 /* MUST hold journal_lock when calling */
1915 static void journal_done(void) /* {{{ */
1916 {
1917 if (journal_cur == NULL)
1918 return;
1920 journal_close();
1922 if (config_flush_at_shutdown)
1923 {
1924 RRDD_LOG(LOG_INFO, "removing journals");
1925 journal_set_remove(journal_old);
1926 journal_set_remove(journal_cur);
1927 }
1928 else
1929 {
1930 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1931 "journals will be used at next startup");
1932 }
1934 journal_set_free(journal_cur);
1935 journal_set_free(journal_old);
1936 free(journal_dir);
1938 } /* }}} static void journal_done */
1940 static int journal_write(char *cmd, char *args) /* {{{ */
1941 {
1942 int chars;
1944 if (journal_fh == NULL)
1945 return 0;
1947 pthread_mutex_lock(&journal_lock);
1948 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1949 journal_size += chars;
1951 if (journal_size > JOURNAL_MAX)
1952 journal_new_file();
1954 pthread_mutex_unlock(&journal_lock);
1956 if (chars > 0)
1957 {
1958 pthread_mutex_lock(&stats_lock);
1959 stats_journal_bytes += chars;
1960 pthread_mutex_unlock(&stats_lock);
1961 }
1963 return chars;
1964 } /* }}} static int journal_write */
1966 static int journal_replay (const char *file) /* {{{ */
1967 {
1968 FILE *fh;
1969 int entry_cnt = 0;
1970 int fail_cnt = 0;
1971 uint64_t line = 0;
1972 char entry[CMD_MAX];
1973 time_t now;
1975 if (file == NULL) return 0;
1977 {
1978 char *reason = "unknown error";
1979 int status = 0;
1980 struct stat statbuf;
1982 memset(&statbuf, 0, sizeof(statbuf));
1983 if (stat(file, &statbuf) != 0)
1984 {
1985 reason = "stat error";
1986 status = errno;
1987 }
1988 else if (!S_ISREG(statbuf.st_mode))
1989 {
1990 reason = "not a regular file";
1991 status = EPERM;
1992 }
1993 if (statbuf.st_uid != daemon_uid)
1994 {
1995 reason = "not owned by daemon user";
1996 status = EACCES;
1997 }
1998 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1999 {
2000 reason = "must not be user/group writable";
2001 status = EACCES;
2002 }
2004 if (status != 0)
2005 {
2006 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2007 file, rrd_strerror(status), reason);
2008 return 0;
2009 }
2010 }
2012 fh = fopen(file, "r");
2013 if (fh == NULL)
2014 {
2015 if (errno != ENOENT)
2016 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2017 file, rrd_strerror(errno));
2018 return 0;
2019 }
2020 else
2021 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2023 now = time(NULL);
2025 while(!feof(fh))
2026 {
2027 size_t entry_len;
2029 ++line;
2030 if (fgets(entry, sizeof(entry), fh) == NULL)
2031 break;
2032 entry_len = strlen(entry);
2034 /* check \n termination in case journal writing crashed mid-line */
2035 if (entry_len == 0)
2036 continue;
2037 else if (entry[entry_len - 1] != '\n')
2038 {
2039 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2040 ++fail_cnt;
2041 continue;
2042 }
2044 entry[entry_len - 1] = '\0';
2046 if (handle_request(NULL, now, entry, entry_len) == 0)
2047 ++entry_cnt;
2048 else
2049 ++fail_cnt;
2050 }
2052 fclose(fh);
2054 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2055 entry_cnt, fail_cnt);
2057 return entry_cnt > 0 ? 1 : 0;
2058 } /* }}} static int journal_replay */
2060 static int journal_sort(const void *v1, const void *v2)
2061 {
2062 char **jn1 = (char **) v1;
2063 char **jn2 = (char **) v2;
2065 return strcmp(*jn1,*jn2);
2066 }
2068 static void journal_init(void) /* {{{ */
2069 {
2070 int had_journal = 0;
2071 DIR *dir;
2072 struct dirent *dent;
2073 char path[PATH_MAX+1];
2075 if (journal_dir == NULL) return;
2077 pthread_mutex_lock(&journal_lock);
2079 journal_cur = calloc(1, sizeof(journal_set));
2080 if (journal_cur == NULL)
2081 {
2082 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2083 return;
2084 }
2086 RRDD_LOG(LOG_INFO, "checking for journal files");
2088 /* Handle old journal files during transition. This gives them the
2089 * correct sort order. TODO: remove after first release
2090 */
2091 {
2092 char old_path[PATH_MAX+1];
2093 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2094 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2095 rename(old_path, path);
2097 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2098 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2099 rename(old_path, path);
2100 }
2102 dir = opendir(journal_dir);
2103 while ((dent = readdir(dir)) != NULL)
2104 {
2105 /* looks like a journal file? */
2106 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2107 continue;
2109 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2111 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2112 {
2113 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2114 dent->d_name);
2115 break;
2116 }
2117 }
2118 closedir(dir);
2120 qsort(journal_cur->files, journal_cur->files_num,
2121 sizeof(journal_cur->files[0]), journal_sort);
2123 for (uint i=0; i < journal_cur->files_num; i++)
2124 had_journal += journal_replay(journal_cur->files[i]);
2126 journal_new_file();
2128 /* it must have been a crash. start a flush */
2129 if (had_journal && config_flush_at_shutdown)
2130 flush_old_values(-1);
2132 pthread_mutex_unlock(&journal_lock);
2134 RRDD_LOG(LOG_INFO, "journal processing complete");
2136 } /* }}} static void journal_init */
2138 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2139 {
2140 assert(sock != NULL);
2142 free(sock->rbuf); sock->rbuf = NULL;
2143 free(sock->wbuf); sock->wbuf = NULL;
2144 free(sock);
2145 } /* }}} void free_listen_socket */
2147 static void close_connection(listen_socket_t *sock) /* {{{ */
2148 {
2149 if (sock->fd >= 0)
2150 {
2151 close(sock->fd);
2152 sock->fd = -1;
2153 }
2155 free_listen_socket(sock);
2157 } /* }}} void close_connection */
2159 static void *connection_thread_main (void *args) /* {{{ */
2160 {
2161 listen_socket_t *sock;
2162 int fd;
2164 sock = (listen_socket_t *) args;
2165 fd = sock->fd;
2167 /* init read buffers */
2168 sock->next_read = sock->next_cmd = 0;
2169 sock->rbuf = malloc(RBUF_SIZE);
2170 if (sock->rbuf == NULL)
2171 {
2172 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2173 close_connection(sock);
2174 return NULL;
2175 }
2177 pthread_mutex_lock (&connection_threads_lock);
2178 connection_threads_num++;
2179 pthread_mutex_unlock (&connection_threads_lock);
2181 while (state == RUNNING)
2182 {
2183 char *cmd;
2184 ssize_t cmd_len;
2185 ssize_t rbytes;
2186 time_t now;
2188 struct pollfd pollfd;
2189 int status;
2191 pollfd.fd = fd;
2192 pollfd.events = POLLIN | POLLPRI;
2193 pollfd.revents = 0;
2195 status = poll (&pollfd, 1, /* timeout = */ 500);
2196 if (state != RUNNING)
2197 break;
2198 else if (status == 0) /* timeout */
2199 continue;
2200 else if (status < 0) /* error */
2201 {
2202 status = errno;
2203 if (status != EINTR)
2204 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2205 continue;
2206 }
2208 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2209 break;
2210 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2211 {
2212 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2213 "poll(2) returned something unexpected: %#04hx",
2214 pollfd.revents);
2215 break;
2216 }
2218 rbytes = read(fd, sock->rbuf + sock->next_read,
2219 RBUF_SIZE - sock->next_read);
2220 if (rbytes < 0)
2221 {
2222 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2223 break;
2224 }
2225 else if (rbytes == 0)
2226 break; /* eof */
2228 sock->next_read += rbytes;
2230 if (sock->batch_start)
2231 now = sock->batch_start;
2232 else
2233 now = time(NULL);
2235 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2236 {
2237 status = handle_request (sock, now, cmd, cmd_len+1);
2238 if (status != 0)
2239 goto out_close;
2240 }
2241 }
2243 out_close:
2244 close_connection(sock);
2246 /* Remove this thread from the connection threads list */
2247 pthread_mutex_lock (&connection_threads_lock);
2248 connection_threads_num--;
2249 if (connection_threads_num <= 0)
2250 pthread_cond_broadcast(&connection_threads_done);
2251 pthread_mutex_unlock (&connection_threads_lock);
2253 return (NULL);
2254 } /* }}} void *connection_thread_main */
2256 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2257 {
2258 int fd;
2259 struct sockaddr_un sa;
2260 listen_socket_t *temp;
2261 int status;
2262 const char *path;
2263 char *path_copy, *dir;
2265 path = sock->addr;
2266 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2267 path += strlen("unix:");
2269 /* dirname may modify its argument */
2270 path_copy = strdup(path);
2271 if (path_copy == NULL)
2272 {
2273 fprintf(stderr, "rrdcached: strdup(): %s\n",
2274 rrd_strerror(errno));
2275 return (-1);
2276 }
2278 dir = dirname(path_copy);
2279 if (rrd_mkdir_p(dir, 0777) != 0)
2280 {
2281 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2282 dir, rrd_strerror(errno));
2283 return (-1);
2284 }
2286 free(path_copy);
2288 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2289 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2290 if (temp == NULL)
2291 {
2292 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2293 return (-1);
2294 }
2295 listen_fds = temp;
2296 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2298 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2299 if (fd < 0)
2300 {
2301 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2302 rrd_strerror(errno));
2303 return (-1);
2304 }
2306 memset (&sa, 0, sizeof (sa));
2307 sa.sun_family = AF_UNIX;
2308 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2310 /* if we've gotten this far, we own the pid file. any daemon started
2311 * with the same args must not be alive. therefore, ensure that we can
2312 * create the socket...
2313 */
2314 unlink(path);
2316 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2317 if (status != 0)
2318 {
2319 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2320 path, rrd_strerror(errno));
2321 close (fd);
2322 return (-1);
2323 }
2325 status = listen (fd, /* backlog = */ 10);
2326 if (status != 0)
2327 {
2328 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2329 path, rrd_strerror(errno));
2330 close (fd);
2331 unlink (path);
2332 return (-1);
2333 }
2335 listen_fds[listen_fds_num].fd = fd;
2336 listen_fds[listen_fds_num].family = PF_UNIX;
2337 strncpy(listen_fds[listen_fds_num].addr, path,
2338 sizeof (listen_fds[listen_fds_num].addr) - 1);
2339 listen_fds_num++;
2341 return (0);
2342 } /* }}} int open_listen_socket_unix */
2344 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2345 {
2346 struct addrinfo ai_hints;
2347 struct addrinfo *ai_res;
2348 struct addrinfo *ai_ptr;
2349 char addr_copy[NI_MAXHOST];
2350 char *addr;
2351 char *port;
2352 int status;
2354 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2355 addr_copy[sizeof (addr_copy) - 1] = 0;
2356 addr = addr_copy;
2358 memset (&ai_hints, 0, sizeof (ai_hints));
2359 ai_hints.ai_flags = 0;
2360 #ifdef AI_ADDRCONFIG
2361 ai_hints.ai_flags |= AI_ADDRCONFIG;
2362 #endif
2363 ai_hints.ai_family = AF_UNSPEC;
2364 ai_hints.ai_socktype = SOCK_STREAM;
2366 port = NULL;
2367 if (*addr == '[') /* IPv6+port format */
2368 {
2369 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2370 addr++;
2372 port = strchr (addr, ']');
2373 if (port == NULL)
2374 {
2375 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2376 return (-1);
2377 }
2378 *port = 0;
2379 port++;
2381 if (*port == ':')
2382 port++;
2383 else if (*port == 0)
2384 port = NULL;
2385 else
2386 {
2387 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2388 return (-1);
2389 }
2390 } /* if (*addr = ']') */
2391 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2392 {
2393 port = rindex(addr, ':');
2394 if (port != NULL)
2395 {
2396 *port = 0;
2397 port++;
2398 }
2399 }
2400 ai_res = NULL;
2401 status = getaddrinfo (addr,
2402 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2403 &ai_hints, &ai_res);
2404 if (status != 0)
2405 {
2406 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2407 addr, gai_strerror (status));
2408 return (-1);
2409 }
2411 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2412 {
2413 int fd;
2414 listen_socket_t *temp;
2415 int one = 1;
2417 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2418 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2419 if (temp == NULL)
2420 {
2421 fprintf (stderr,
2422 "rrdcached: open_listen_socket_network: realloc failed.\n");
2423 continue;
2424 }
2425 listen_fds = temp;
2426 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2428 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2429 if (fd < 0)
2430 {
2431 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2432 rrd_strerror(errno));
2433 continue;
2434 }
2436 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2438 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2439 if (status != 0)
2440 {
2441 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2442 sock->addr, rrd_strerror(errno));
2443 close (fd);
2444 continue;
2445 }
2447 status = listen (fd, /* backlog = */ 10);
2448 if (status != 0)
2449 {
2450 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2451 sock->addr, rrd_strerror(errno));
2452 close (fd);
2453 freeaddrinfo(ai_res);
2454 return (-1);
2455 }
2457 listen_fds[listen_fds_num].fd = fd;
2458 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2459 listen_fds_num++;
2460 } /* for (ai_ptr) */
2462 freeaddrinfo(ai_res);
2463 return (0);
2464 } /* }}} static int open_listen_socket_network */
2466 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2467 {
2468 assert(sock != NULL);
2469 assert(sock->addr != NULL);
2471 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2472 || sock->addr[0] == '/')
2473 return (open_listen_socket_unix(sock));
2474 else
2475 return (open_listen_socket_network(sock));
2476 } /* }}} int open_listen_socket */
2478 static int close_listen_sockets (void) /* {{{ */
2479 {
2480 size_t i;
2482 for (i = 0; i < listen_fds_num; i++)
2483 {
2484 close (listen_fds[i].fd);
2486 if (listen_fds[i].family == PF_UNIX)
2487 unlink(listen_fds[i].addr);
2488 }
2490 free (listen_fds);
2491 listen_fds = NULL;
2492 listen_fds_num = 0;
2494 return (0);
2495 } /* }}} int close_listen_sockets */
2497 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2498 {
2499 struct pollfd *pollfds;
2500 int pollfds_num;
2501 int status;
2502 int i;
2504 if (listen_fds_num < 1)
2505 {
2506 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2507 return (NULL);
2508 }
2510 pollfds_num = listen_fds_num;
2511 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2512 if (pollfds == NULL)
2513 {
2514 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2515 return (NULL);
2516 }
2517 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2519 RRDD_LOG(LOG_INFO, "listening for connections");
2521 while (state == RUNNING)
2522 {
2523 for (i = 0; i < pollfds_num; i++)
2524 {
2525 pollfds[i].fd = listen_fds[i].fd;
2526 pollfds[i].events = POLLIN | POLLPRI;
2527 pollfds[i].revents = 0;
2528 }
2530 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2531 if (state != RUNNING)
2532 break;
2533 else if (status == 0) /* timeout */
2534 continue;
2535 else if (status < 0) /* error */
2536 {
2537 status = errno;
2538 if (status != EINTR)
2539 {
2540 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2541 }
2542 continue;
2543 }
2545 for (i = 0; i < pollfds_num; i++)
2546 {
2547 listen_socket_t *client_sock;
2548 struct sockaddr_storage client_sa;
2549 socklen_t client_sa_size;
2550 pthread_t tid;
2551 pthread_attr_t attr;
2553 if (pollfds[i].revents == 0)
2554 continue;
2556 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2557 {
2558 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2559 "poll(2) returned something unexpected for listen FD #%i.",
2560 pollfds[i].fd);
2561 continue;
2562 }
2564 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2565 if (client_sock == NULL)
2566 {
2567 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2568 continue;
2569 }
2570 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2572 client_sa_size = sizeof (client_sa);
2573 client_sock->fd = accept (pollfds[i].fd,
2574 (struct sockaddr *) &client_sa, &client_sa_size);
2575 if (client_sock->fd < 0)
2576 {
2577 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2578 free(client_sock);
2579 continue;
2580 }
2582 pthread_attr_init (&attr);
2583 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2585 status = pthread_create (&tid, &attr, connection_thread_main,
2586 client_sock);
2587 if (status != 0)
2588 {
2589 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2590 close_connection(client_sock);
2591 continue;
2592 }
2593 } /* for (pollfds_num) */
2594 } /* while (state == RUNNING) */
2596 RRDD_LOG(LOG_INFO, "starting shutdown");
2598 close_listen_sockets ();
2600 pthread_mutex_lock (&connection_threads_lock);
2601 while (connection_threads_num > 0)
2602 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2603 pthread_mutex_unlock (&connection_threads_lock);
2605 free(pollfds);
2607 return (NULL);
2608 } /* }}} void *listen_thread_main */
2610 static int daemonize (void) /* {{{ */
2611 {
2612 int pid_fd;
2613 char *base_dir;
2615 daemon_uid = geteuid();
2617 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2618 if (pid_fd < 0)
2619 pid_fd = check_pidfile();
2620 if (pid_fd < 0)
2621 return pid_fd;
2623 /* open all the listen sockets */
2624 if (config_listen_address_list_len > 0)
2625 {
2626 for (size_t i = 0; i < config_listen_address_list_len; i++)
2627 open_listen_socket (config_listen_address_list[i]);
2629 rrd_free_ptrs((void ***) &config_listen_address_list,
2630 &config_listen_address_list_len);
2631 }
2632 else
2633 {
2634 listen_socket_t sock;
2635 memset(&sock, 0, sizeof(sock));
2636 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2637 open_listen_socket (&sock);
2638 }
2640 if (listen_fds_num < 1)
2641 {
2642 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2643 goto error;
2644 }
2646 if (!stay_foreground)
2647 {
2648 pid_t child;
2650 child = fork ();
2651 if (child < 0)
2652 {
2653 fprintf (stderr, "daemonize: fork(2) failed.\n");
2654 goto error;
2655 }
2656 else if (child > 0)
2657 exit(0);
2659 /* Become session leader */
2660 setsid ();
2662 /* Open the first three file descriptors to /dev/null */
2663 close (2);
2664 close (1);
2665 close (0);
2667 open ("/dev/null", O_RDWR);
2668 if (dup(0) == -1 || dup(0) == -1){
2669 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2670 }
2671 } /* if (!stay_foreground) */
2673 /* Change into the /tmp directory. */
2674 base_dir = (config_base_dir != NULL)
2675 ? config_base_dir
2676 : "/tmp";
2678 if (chdir (base_dir) != 0)
2679 {
2680 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2681 goto error;
2682 }
2684 install_signal_handlers();
2686 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2687 RRDD_LOG(LOG_INFO, "starting up");
2689 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2690 (GDestroyNotify) free_cache_item);
2691 if (cache_tree == NULL)
2692 {
2693 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2694 goto error;
2695 }
2697 return write_pidfile (pid_fd);
2699 error:
2700 remove_pidfile();
2701 return -1;
2702 } /* }}} int daemonize */
2704 static int cleanup (void) /* {{{ */
2705 {
2706 pthread_cond_broadcast (&flush_cond);
2707 pthread_join (flush_thread, NULL);
2709 pthread_cond_broadcast (&queue_cond);
2710 for (int i = 0; i < config_queue_threads; i++)
2711 pthread_join (queue_threads[i], NULL);
2713 if (config_flush_at_shutdown)
2714 {
2715 assert(cache_queue_head == NULL);
2716 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2717 }
2719 free(queue_threads);
2720 free(config_base_dir);
2722 pthread_mutex_lock(&cache_lock);
2723 g_tree_destroy(cache_tree);
2725 pthread_mutex_lock(&journal_lock);
2726 journal_done();
2728 RRDD_LOG(LOG_INFO, "goodbye");
2729 closelog ();
2731 remove_pidfile ();
2732 free(config_pid_file);
2734 return (0);
2735 } /* }}} int cleanup */
2737 static int read_options (int argc, char **argv) /* {{{ */
2738 {
2739 int option;
2740 int status = 0;
2742 char **permissions = NULL;
2743 size_t permissions_len = 0;
2745 while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2746 {
2747 switch (option)
2748 {
2749 case 'g':
2750 stay_foreground=1;
2751 break;
2753 case 'l':
2754 {
2755 listen_socket_t *new;
2757 new = malloc(sizeof(listen_socket_t));
2758 if (new == NULL)
2759 {
2760 fprintf(stderr, "read_options: malloc failed.\n");
2761 return(2);
2762 }
2763 memset(new, 0, sizeof(listen_socket_t));
2765 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2767 /* Add permissions to the socket {{{ */
2768 if (permissions_len != 0)
2769 {
2770 size_t i;
2771 for (i = 0; i < permissions_len; i++)
2772 {
2773 status = socket_permission_add (new, permissions[i]);
2774 if (status != 0)
2775 {
2776 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2777 "socket failed. Most likely, this permission doesn't "
2778 "exist. Check your command line.\n", permissions[i]);
2779 status = 4;
2780 }
2781 }
2782 }
2783 else /* if (permissions_len == 0) */
2784 {
2785 /* Add permission for ALL commands to the socket. */
2786 size_t i;
2787 for (i = 0; i < list_of_commands_len; i++)
2788 {
2789 status = socket_permission_add (new, list_of_commands[i].cmd);
2790 if (status != 0)
2791 {
2792 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2793 "socket failed. This should never happen, ever! Sorry.\n",
2794 permissions[i]);
2795 status = 4;
2796 }
2797 }
2798 }
2799 /* }}} Done adding permissions. */
2801 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2802 &config_listen_address_list_len, new))
2803 {
2804 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2805 return (2);
2806 }
2807 }
2808 break;
2810 case 'P':
2811 {
2812 char *optcopy;
2813 char *saveptr;
2814 char *dummy;
2815 char *ptr;
2817 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2819 optcopy = strdup (optarg);
2820 dummy = optcopy;
2821 saveptr = NULL;
2822 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2823 {
2824 dummy = NULL;
2825 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2826 }
2828 free (optcopy);
2829 }
2830 break;
2832 case 'f':
2833 {
2834 int temp;
2836 temp = atoi (optarg);
2837 if (temp > 0)
2838 config_flush_interval = temp;
2839 else
2840 {
2841 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2842 status = 3;
2843 }
2844 }
2845 break;
2847 case 'w':
2848 {
2849 int temp;
2851 temp = atoi (optarg);
2852 if (temp > 0)
2853 config_write_interval = temp;
2854 else
2855 {
2856 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2857 status = 2;
2858 }
2859 }
2860 break;
2862 case 'z':
2863 {
2864 int temp;
2866 temp = atoi(optarg);
2867 if (temp > 0)
2868 config_write_jitter = temp;
2869 else
2870 {
2871 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2872 status = 2;
2873 }
2875 break;
2876 }
2878 case 't':
2879 {
2880 int threads;
2881 threads = atoi(optarg);
2882 if (threads >= 1)
2883 config_queue_threads = threads;
2884 else
2885 {
2886 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2887 return 1;
2888 }
2889 }
2890 break;
2892 case 'B':
2893 config_write_base_only = 1;
2894 break;
2896 case 'b':
2897 {
2898 size_t len;
2899 char base_realpath[PATH_MAX];
2901 if (config_base_dir != NULL)
2902 free (config_base_dir);
2903 config_base_dir = strdup (optarg);
2904 if (config_base_dir == NULL)
2905 {
2906 fprintf (stderr, "read_options: strdup failed.\n");
2907 return (3);
2908 }
2910 /* make sure that the base directory is not resolved via
2911 * symbolic links. this makes some performance-enhancing
2912 * assumptions possible (we don't have to resolve paths
2913 * that start with a "/")
2914 */
2915 if (realpath(config_base_dir, base_realpath) == NULL)
2916 {
2917 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2918 "%s\n", config_base_dir, rrd_strerror(errno));
2919 return 5;
2920 }
2921 else if (strncmp(config_base_dir,
2922 base_realpath, sizeof(base_realpath)) != 0)
2923 {
2924 fprintf(stderr,
2925 "Base directory (-b) resolved via file system links!\n"
2926 "Please consult rrdcached '-b' documentation!\n"
2927 "Consider specifying the real directory (%s)\n",
2928 base_realpath);
2929 return 5;
2930 }
2932 len = strlen (config_base_dir);
2933 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2934 {
2935 config_base_dir[len - 1] = 0;
2936 len--;
2937 }
2939 if (len < 1)
2940 {
2941 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2942 return (4);
2943 }
2945 _config_base_dir_len = len;
2946 }
2947 break;
2949 case 'p':
2950 {
2951 if (config_pid_file != NULL)
2952 free (config_pid_file);
2953 config_pid_file = strdup (optarg);
2954 if (config_pid_file == NULL)
2955 {
2956 fprintf (stderr, "read_options: strdup failed.\n");
2957 return (3);
2958 }
2959 }
2960 break;
2962 case 'F':
2963 config_flush_at_shutdown = 1;
2964 break;
2966 case 'j':
2967 {
2968 const char *dir = journal_dir = strdup(optarg);
2970 status = rrd_mkdir_p(dir, 0777);
2971 if (status != 0)
2972 {
2973 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
2974 dir, rrd_strerror(errno));
2975 return 6;
2976 }
2978 if (access(dir, R_OK|W_OK|X_OK) != 0)
2979 {
2980 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2981 errno ? rrd_strerror(errno) : "");
2982 return 6;
2983 }
2984 }
2985 break;
2987 case 'h':
2988 case '?':
2989 printf ("RRDCacheD %s\n"
2990 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
2991 "\n"
2992 "Usage: rrdcached [options]\n"
2993 "\n"
2994 "Valid options are:\n"
2995 " -l <address> Socket address to listen to.\n"
2996 " -P <perms> Sets the permissions to assign to all following "
2997 "sockets\n"
2998 " -w <seconds> Interval in which to write data.\n"
2999 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3000 " -t <threads> Number of write threads.\n"
3001 " -f <seconds> Interval in which to flush dead data.\n"
3002 " -p <file> Location of the PID-file.\n"
3003 " -b <dir> Base directory to change to.\n"
3004 " -B Restrict file access to paths within -b <dir>\n"
3005 " -g Do not fork and run in the foreground.\n"
3006 " -j <dir> Directory in which to create the journal files.\n"
3007 " -F Always flush all updates at shutdown\n"
3008 "\n"
3009 "For more information and a detailed description of all options "
3010 "please refer\n"
3011 "to the rrdcached(1) manual page.\n",
3012 VERSION);
3013 status = -1;
3014 break;
3015 } /* switch (option) */
3016 } /* while (getopt) */
3018 /* advise the user when values are not sane */
3019 if (config_flush_interval < 2 * config_write_interval)
3020 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3021 " 2x write interval (-w) !\n");
3022 if (config_write_jitter > config_write_interval)
3023 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3024 " write interval (-w) !\n");
3026 if (config_write_base_only && config_base_dir == NULL)
3027 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3028 " Consult the rrdcached documentation\n");
3030 if (journal_dir == NULL)
3031 config_flush_at_shutdown = 1;
3033 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3035 return (status);
3036 } /* }}} int read_options */
3038 int main (int argc, char **argv)
3039 {
3040 int status;
3042 status = read_options (argc, argv);
3043 if (status != 0)
3044 {
3045 if (status < 0)
3046 status = 0;
3047 return (status);
3048 }
3050 status = daemonize ();
3051 if (status != 0)
3052 {
3053 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3054 return (1);
3055 }
3057 journal_init();
3059 /* start the queue threads */
3060 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3061 if (queue_threads == NULL)
3062 {
3063 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3064 cleanup();
3065 return (1);
3066 }
3067 for (int i = 0; i < config_queue_threads; i++)
3068 {
3069 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3070 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3071 if (status != 0)
3072 {
3073 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3074 cleanup();
3075 return (1);
3076 }
3077 }
3079 /* start the flush thread */
3080 memset(&flush_thread, 0, sizeof(flush_thread));
3081 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3082 if (status != 0)
3083 {
3084 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3085 cleanup();
3086 return (1);
3087 }
3089 listen_thread_main (NULL);
3090 cleanup ();
3092 return (0);
3093 } /* int main */
3095 /*
3096 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3097 */