1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 # include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116 * Types
117 */
118 typedef enum
119 {
120 PRIV_LOW,
121 PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
127 {
128 int fd;
129 char addr[PATH_MAX + 1];
130 int family;
131 socket_privilege privilege;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO struct command *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
160 socket_privilege min_priv;
162 char context; /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT (1<<0)
164 #define CMD_CONTEXT_BATCH (1<<1)
165 #define CMD_CONTEXT_JOURNAL (1<<2)
166 #define CMD_CONTEXT_ANY (0x7f)
168 char *syntax;
169 char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176 char *file;
177 char **values;
178 size_t values_num;
179 time_t last_flush_time;
180 time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183 int flags;
184 pthread_cond_t flushed;
185 cache_item_t *prev;
186 cache_item_t *next;
187 };
189 struct callback_flush_data_s
190 {
191 time_t now;
192 time_t abs_timeout;
193 char **keys;
194 size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
199 {
200 HEAD,
201 TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210 * Variables
211 */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 enum {
219 RUNNING, /* normal operation */
220 FLUSHING, /* flushing remaining values */
221 SHUTDOWN /* shutting down */
222 } state = RUNNING;
224 static pthread_t *queue_threads;
225 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
226 static int config_queue_threads = 4;
228 static pthread_t flush_thread;
229 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
231 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
232 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
233 static int connection_threads_num = 0;
235 /* Cache stuff */
236 static GTree *cache_tree = NULL;
237 static cache_item_t *cache_queue_head = NULL;
238 static cache_item_t *cache_queue_tail = NULL;
239 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
241 static int config_write_interval = 300;
242 static int config_write_jitter = 0;
243 static int config_flush_interval = 3600;
244 static int config_flush_at_shutdown = 0;
245 static char *config_pid_file = NULL;
246 static char *config_base_dir = NULL;
247 static size_t _config_base_dir_len = 0;
248 static int config_write_base_only = 0;
250 static listen_socket_t **config_listen_address_list = NULL;
251 static size_t config_listen_address_list_len = 0;
253 static uint64_t stats_queue_length = 0;
254 static uint64_t stats_updates_received = 0;
255 static uint64_t stats_flush_received = 0;
256 static uint64_t stats_updates_written = 0;
257 static uint64_t stats_data_sets_written = 0;
258 static uint64_t stats_journal_bytes = 0;
259 static uint64_t stats_journal_rotate = 0;
260 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
262 /* Journaled updates */
263 static char *journal_cur = NULL;
264 static char *journal_old = NULL;
265 static FILE *journal_fh = NULL;
266 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
267 static int journal_write(char *cmd, char *args);
268 static void journal_done(void);
269 static void journal_rotate(void);
271 /* prototypes for forward refernces */
272 static int handle_request_help (HANDLER_PROTO);
274 /*
275 * Functions
276 */
277 static void sig_common (const char *sig) /* {{{ */
278 {
279 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
280 state = FLUSHING;
281 pthread_cond_broadcast(&flush_cond);
282 pthread_cond_broadcast(&queue_cond);
283 } /* }}} void sig_common */
285 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
286 {
287 sig_common("INT");
288 } /* }}} void sig_int_handler */
290 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
291 {
292 sig_common("TERM");
293 } /* }}} void sig_term_handler */
295 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
296 {
297 config_flush_at_shutdown = 1;
298 sig_common("USR1");
299 } /* }}} void sig_usr1_handler */
301 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
302 {
303 config_flush_at_shutdown = 0;
304 sig_common("USR2");
305 } /* }}} void sig_usr2_handler */
307 static void install_signal_handlers(void) /* {{{ */
308 {
309 /* These structures are static, because `sigaction' behaves weird if the are
310 * overwritten.. */
311 static struct sigaction sa_int;
312 static struct sigaction sa_term;
313 static struct sigaction sa_pipe;
314 static struct sigaction sa_usr1;
315 static struct sigaction sa_usr2;
317 /* Install signal handlers */
318 memset (&sa_int, 0, sizeof (sa_int));
319 sa_int.sa_handler = sig_int_handler;
320 sigaction (SIGINT, &sa_int, NULL);
322 memset (&sa_term, 0, sizeof (sa_term));
323 sa_term.sa_handler = sig_term_handler;
324 sigaction (SIGTERM, &sa_term, NULL);
326 memset (&sa_pipe, 0, sizeof (sa_pipe));
327 sa_pipe.sa_handler = SIG_IGN;
328 sigaction (SIGPIPE, &sa_pipe, NULL);
330 memset (&sa_pipe, 0, sizeof (sa_usr1));
331 sa_usr1.sa_handler = sig_usr1_handler;
332 sigaction (SIGUSR1, &sa_usr1, NULL);
334 memset (&sa_usr2, 0, sizeof (sa_usr2));
335 sa_usr2.sa_handler = sig_usr2_handler;
336 sigaction (SIGUSR2, &sa_usr2, NULL);
338 } /* }}} void install_signal_handlers */
340 static int open_pidfile(char *action, int oflag) /* {{{ */
341 {
342 int fd;
343 char *file;
345 file = (config_pid_file != NULL)
346 ? config_pid_file
347 : LOCALSTATEDIR "/run/rrdcached.pid";
349 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
350 if (fd < 0)
351 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
352 action, file, rrd_strerror(errno));
354 return(fd);
355 } /* }}} static int open_pidfile */
357 /* check existing pid file to see whether a daemon is running */
358 static int check_pidfile(void)
359 {
360 int pid_fd;
361 pid_t pid;
362 char pid_str[16];
364 pid_fd = open_pidfile("open", O_RDWR);
365 if (pid_fd < 0)
366 return pid_fd;
368 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
369 return -1;
371 pid = atoi(pid_str);
372 if (pid <= 0)
373 return -1;
375 /* another running process that we can signal COULD be
376 * a competing rrdcached */
377 if (pid != getpid() && kill(pid, 0) == 0)
378 {
379 fprintf(stderr,
380 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
381 close(pid_fd);
382 return -1;
383 }
385 lseek(pid_fd, 0, SEEK_SET);
386 if (ftruncate(pid_fd, 0) == -1)
387 {
388 fprintf(stderr,
389 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
390 close(pid_fd);
391 return -1;
392 }
394 fprintf(stderr,
395 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
396 "rrdcached: starting normally.\n", pid);
398 return pid_fd;
399 } /* }}} static int check_pidfile */
401 static int write_pidfile (int fd) /* {{{ */
402 {
403 pid_t pid;
404 FILE *fh;
406 pid = getpid ();
408 fh = fdopen (fd, "w");
409 if (fh == NULL)
410 {
411 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
412 close(fd);
413 return (-1);
414 }
416 fprintf (fh, "%i\n", (int) pid);
417 fclose (fh);
419 return (0);
420 } /* }}} int write_pidfile */
422 static int remove_pidfile (void) /* {{{ */
423 {
424 char *file;
425 int status;
427 file = (config_pid_file != NULL)
428 ? config_pid_file
429 : LOCALSTATEDIR "/run/rrdcached.pid";
431 status = unlink (file);
432 if (status == 0)
433 return (0);
434 return (errno);
435 } /* }}} int remove_pidfile */
437 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
438 {
439 char *eol;
441 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
442 sock->next_read - sock->next_cmd);
444 if (eol == NULL)
445 {
446 /* no commands left, move remainder back to front of rbuf */
447 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
448 sock->next_read - sock->next_cmd);
449 sock->next_read -= sock->next_cmd;
450 sock->next_cmd = 0;
451 *len = 0;
452 return NULL;
453 }
454 else
455 {
456 char *cmd = sock->rbuf + sock->next_cmd;
457 *eol = '\0';
459 sock->next_cmd = eol - sock->rbuf + 1;
461 if (eol > sock->rbuf && *(eol-1) == '\r')
462 *(--eol) = '\0'; /* handle "\r\n" EOL */
464 *len = eol - cmd;
466 return cmd;
467 }
469 /* NOTREACHED */
470 assert(1==0);
471 }
473 /* add the characters directly to the write buffer */
474 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
475 {
476 char *new_buf;
478 assert(sock != NULL);
480 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
481 if (new_buf == NULL)
482 {
483 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
484 return -1;
485 }
487 strncpy(new_buf + sock->wbuf_len, str, len + 1);
489 sock->wbuf = new_buf;
490 sock->wbuf_len += len;
492 return 0;
493 } /* }}} static int add_to_wbuf */
495 /* add the text to the "extra" info that's sent after the status line */
496 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
497 {
498 va_list argp;
499 char buffer[CMD_MAX];
500 int len;
502 if (sock == NULL) return 0; /* journal replay mode */
503 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
505 va_start(argp, fmt);
506 #ifdef HAVE_VSNPRINTF
507 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
508 #else
509 len = vsprintf(buffer, fmt, argp);
510 #endif
511 va_end(argp);
512 if (len < 0)
513 {
514 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
515 return -1;
516 }
518 return add_to_wbuf(sock, buffer, len);
519 } /* }}} static int add_response_info */
521 static int count_lines(char *str) /* {{{ */
522 {
523 int lines = 0;
525 if (str != NULL)
526 {
527 while ((str = strchr(str, '\n')) != NULL)
528 {
529 ++lines;
530 ++str;
531 }
532 }
534 return lines;
535 } /* }}} static int count_lines */
537 /* send the response back to the user.
538 * returns 0 on success, -1 on error
539 * write buffer is always zeroed after this call */
540 static int send_response (listen_socket_t *sock, response_code rc,
541 char *fmt, ...) /* {{{ */
542 {
543 va_list argp;
544 char buffer[CMD_MAX];
545 int lines;
546 ssize_t wrote;
547 int rclen, len;
549 if (sock == NULL) return rc; /* journal replay mode */
551 if (sock->batch_start)
552 {
553 if (rc == RESP_OK)
554 return rc; /* no response on success during BATCH */
555 lines = sock->batch_cmd;
556 }
557 else if (rc == RESP_OK)
558 lines = count_lines(sock->wbuf);
559 else
560 lines = -1;
562 rclen = sprintf(buffer, "%d ", lines);
563 va_start(argp, fmt);
564 #ifdef HAVE_VSNPRINTF
565 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
566 #else
567 len = vsprintf(buffer+rclen, fmt, argp);
568 #endif
569 va_end(argp);
570 if (len < 0)
571 return -1;
573 len += rclen;
575 /* append the result to the wbuf, don't write to the user */
576 if (sock->batch_start)
577 return add_to_wbuf(sock, buffer, len);
579 /* first write must be complete */
580 if (len != write(sock->fd, buffer, len))
581 {
582 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
583 return -1;
584 }
586 if (sock->wbuf != NULL && rc == RESP_OK)
587 {
588 wrote = 0;
589 while (wrote < sock->wbuf_len)
590 {
591 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
592 if (wb <= 0)
593 {
594 RRDD_LOG(LOG_INFO, "send_response: could not write results");
595 return -1;
596 }
597 wrote += wb;
598 }
599 }
601 free(sock->wbuf); sock->wbuf = NULL;
602 sock->wbuf_len = 0;
604 return 0;
605 } /* }}} */
607 static void wipe_ci_values(cache_item_t *ci, time_t when)
608 {
609 ci->values = NULL;
610 ci->values_num = 0;
612 ci->last_flush_time = when;
613 if (config_write_jitter > 0)
614 ci->last_flush_time += (rrd_random() % config_write_jitter);
615 }
617 /* remove_from_queue
618 * remove a "cache_item_t" item from the queue.
619 * must hold 'cache_lock' when calling this
620 */
621 static void remove_from_queue(cache_item_t *ci) /* {{{ */
622 {
623 if (ci == NULL) return;
624 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
626 if (ci->prev == NULL)
627 cache_queue_head = ci->next; /* reset head */
628 else
629 ci->prev->next = ci->next;
631 if (ci->next == NULL)
632 cache_queue_tail = ci->prev; /* reset the tail */
633 else
634 ci->next->prev = ci->prev;
636 ci->next = ci->prev = NULL;
637 ci->flags &= ~CI_FLAGS_IN_QUEUE;
639 pthread_mutex_lock (&stats_lock);
640 assert (stats_queue_length > 0);
641 stats_queue_length--;
642 pthread_mutex_unlock (&stats_lock);
644 } /* }}} static void remove_from_queue */
646 /* free the resources associated with the cache_item_t
647 * must hold cache_lock when calling this function
648 */
649 static void *free_cache_item(cache_item_t *ci) /* {{{ */
650 {
651 if (ci == NULL) return NULL;
653 remove_from_queue(ci);
655 for (size_t i=0; i < ci->values_num; i++)
656 free(ci->values[i]);
658 free (ci->values);
659 free (ci->file);
661 /* in case anyone is waiting */
662 pthread_cond_broadcast(&ci->flushed);
663 pthread_cond_destroy(&ci->flushed);
665 free (ci);
667 return NULL;
668 } /* }}} static void *free_cache_item */
670 /*
671 * enqueue_cache_item:
672 * `cache_lock' must be acquired before calling this function!
673 */
674 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
675 queue_side_t side)
676 {
677 if (ci == NULL)
678 return (-1);
680 if (ci->values_num == 0)
681 return (0);
683 if (side == HEAD)
684 {
685 if (cache_queue_head == ci)
686 return 0;
688 /* remove if further down in queue */
689 remove_from_queue(ci);
691 ci->prev = NULL;
692 ci->next = cache_queue_head;
693 if (ci->next != NULL)
694 ci->next->prev = ci;
695 cache_queue_head = ci;
697 if (cache_queue_tail == NULL)
698 cache_queue_tail = cache_queue_head;
699 }
700 else /* (side == TAIL) */
701 {
702 /* We don't move values back in the list.. */
703 if (ci->flags & CI_FLAGS_IN_QUEUE)
704 return (0);
706 assert (ci->next == NULL);
707 assert (ci->prev == NULL);
709 ci->prev = cache_queue_tail;
711 if (cache_queue_tail == NULL)
712 cache_queue_head = ci;
713 else
714 cache_queue_tail->next = ci;
716 cache_queue_tail = ci;
717 }
719 ci->flags |= CI_FLAGS_IN_QUEUE;
721 pthread_cond_signal(&queue_cond);
722 pthread_mutex_lock (&stats_lock);
723 stats_queue_length++;
724 pthread_mutex_unlock (&stats_lock);
726 return (0);
727 } /* }}} int enqueue_cache_item */
729 /*
730 * tree_callback_flush:
731 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
732 * while this is in progress.
733 */
734 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
735 gpointer data)
736 {
737 cache_item_t *ci;
738 callback_flush_data_t *cfd;
740 ci = (cache_item_t *) value;
741 cfd = (callback_flush_data_t *) data;
743 if (ci->flags & CI_FLAGS_IN_QUEUE)
744 return FALSE;
746 if (ci->values_num > 0
747 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
748 {
749 enqueue_cache_item (ci, TAIL);
750 }
751 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
752 && (ci->values_num <= 0))
753 {
754 assert ((char *) key == ci->file);
755 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
756 {
757 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
758 return (FALSE);
759 }
760 }
762 return (FALSE);
763 } /* }}} gboolean tree_callback_flush */
765 static int flush_old_values (int max_age)
766 {
767 callback_flush_data_t cfd;
768 size_t k;
770 memset (&cfd, 0, sizeof (cfd));
771 /* Pass the current time as user data so that we don't need to call
772 * `time' for each node. */
773 cfd.now = time (NULL);
774 cfd.keys = NULL;
775 cfd.keys_num = 0;
777 if (max_age > 0)
778 cfd.abs_timeout = cfd.now - max_age;
779 else
780 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
782 /* `tree_callback_flush' will return the keys of all values that haven't
783 * been touched in the last `config_flush_interval' seconds in `cfd'.
784 * The char*'s in this array point to the same memory as ci->file, so we
785 * don't need to free them separately. */
786 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
788 for (k = 0; k < cfd.keys_num; k++)
789 {
790 /* should never fail, since we have held the cache_lock
791 * the entire time */
792 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
793 }
795 if (cfd.keys != NULL)
796 {
797 free (cfd.keys);
798 cfd.keys = NULL;
799 }
801 return (0);
802 } /* int flush_old_values */
804 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
805 {
806 struct timeval now;
807 struct timespec next_flush;
808 int status;
810 gettimeofday (&now, NULL);
811 next_flush.tv_sec = now.tv_sec + config_flush_interval;
812 next_flush.tv_nsec = 1000 * now.tv_usec;
814 pthread_mutex_lock(&cache_lock);
816 while (state == RUNNING)
817 {
818 gettimeofday (&now, NULL);
819 if ((now.tv_sec > next_flush.tv_sec)
820 || ((now.tv_sec == next_flush.tv_sec)
821 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
822 {
823 RRDD_LOG(LOG_DEBUG, "flushing old values");
825 /* Determine the time of the next cache flush. */
826 next_flush.tv_sec = now.tv_sec + config_flush_interval;
828 /* Flush all values that haven't been written in the last
829 * `config_write_interval' seconds. */
830 flush_old_values (config_write_interval);
832 /* unlock the cache while we rotate so we don't block incoming
833 * updates if the fsync() blocks on disk I/O */
834 pthread_mutex_unlock(&cache_lock);
835 journal_rotate();
836 pthread_mutex_lock(&cache_lock);
837 }
839 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
840 if (status != 0 && status != ETIMEDOUT)
841 {
842 RRDD_LOG (LOG_ERR, "flush_thread_main: "
843 "pthread_cond_timedwait returned %i.", status);
844 }
845 }
847 if (config_flush_at_shutdown)
848 flush_old_values (-1); /* flush everything */
850 state = SHUTDOWN;
852 pthread_mutex_unlock(&cache_lock);
854 return NULL;
855 } /* void *flush_thread_main */
857 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
858 {
859 pthread_mutex_lock (&cache_lock);
861 while (state != SHUTDOWN
862 || (cache_queue_head != NULL && config_flush_at_shutdown))
863 {
864 cache_item_t *ci;
865 char *file;
866 char **values;
867 size_t values_num;
868 int status;
870 /* Now, check if there's something to store away. If not, wait until
871 * something comes in. */
872 if (cache_queue_head == NULL)
873 {
874 status = pthread_cond_wait (&queue_cond, &cache_lock);
875 if ((status != 0) && (status != ETIMEDOUT))
876 {
877 RRDD_LOG (LOG_ERR, "queue_thread_main: "
878 "pthread_cond_wait returned %i.", status);
879 }
880 }
882 /* Check if a value has arrived. This may be NULL if we timed out or there
883 * was an interrupt such as a signal. */
884 if (cache_queue_head == NULL)
885 continue;
887 ci = cache_queue_head;
889 /* copy the relevant parts */
890 file = strdup (ci->file);
891 if (file == NULL)
892 {
893 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
894 continue;
895 }
897 assert(ci->values != NULL);
898 assert(ci->values_num > 0);
900 values = ci->values;
901 values_num = ci->values_num;
903 wipe_ci_values(ci, time(NULL));
904 remove_from_queue(ci);
906 pthread_mutex_unlock (&cache_lock);
908 rrd_clear_error ();
909 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
910 if (status != 0)
911 {
912 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
913 "rrd_update_r (%s) failed with status %i. (%s)",
914 file, status, rrd_get_error());
915 }
917 journal_write("wrote", file);
919 /* Search again in the tree. It's possible someone issued a "FORGET"
920 * while we were writing the update values. */
921 pthread_mutex_lock(&cache_lock);
922 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
923 if (ci)
924 pthread_cond_broadcast(&ci->flushed);
925 pthread_mutex_unlock(&cache_lock);
927 if (status == 0)
928 {
929 pthread_mutex_lock (&stats_lock);
930 stats_updates_written++;
931 stats_data_sets_written += values_num;
932 pthread_mutex_unlock (&stats_lock);
933 }
935 rrd_free_ptrs((void ***) &values, &values_num);
936 free(file);
938 pthread_mutex_lock (&cache_lock);
939 }
940 pthread_mutex_unlock (&cache_lock);
942 return (NULL);
943 } /* }}} void *queue_thread_main */
945 static int buffer_get_field (char **buffer_ret, /* {{{ */
946 size_t *buffer_size_ret, char **field_ret)
947 {
948 char *buffer;
949 size_t buffer_pos;
950 size_t buffer_size;
951 char *field;
952 size_t field_size;
953 int status;
955 buffer = *buffer_ret;
956 buffer_pos = 0;
957 buffer_size = *buffer_size_ret;
958 field = *buffer_ret;
959 field_size = 0;
961 if (buffer_size <= 0)
962 return (-1);
964 /* This is ensured by `handle_request'. */
965 assert (buffer[buffer_size - 1] == '\0');
967 status = -1;
968 while (buffer_pos < buffer_size)
969 {
970 /* Check for end-of-field or end-of-buffer */
971 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
972 {
973 field[field_size] = 0;
974 field_size++;
975 buffer_pos++;
976 status = 0;
977 break;
978 }
979 /* Handle escaped characters. */
980 else if (buffer[buffer_pos] == '\\')
981 {
982 if (buffer_pos >= (buffer_size - 1))
983 break;
984 buffer_pos++;
985 field[field_size] = buffer[buffer_pos];
986 field_size++;
987 buffer_pos++;
988 }
989 /* Normal operation */
990 else
991 {
992 field[field_size] = buffer[buffer_pos];
993 field_size++;
994 buffer_pos++;
995 }
996 } /* while (buffer_pos < buffer_size) */
998 if (status != 0)
999 return (status);
1001 *buffer_ret = buffer + buffer_pos;
1002 *buffer_size_ret = buffer_size - buffer_pos;
1003 *field_ret = field;
1005 return (0);
1006 } /* }}} int buffer_get_field */
1008 /* if we're restricting writes to the base directory,
1009 * check whether the file falls within the dir
1010 * returns 1 if OK, otherwise 0
1011 */
1012 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1013 {
1014 assert(file != NULL);
1016 if (!config_write_base_only
1017 || sock == NULL /* journal replay */
1018 || config_base_dir == NULL)
1019 return 1;
1021 if (strstr(file, "../") != NULL) goto err;
1023 /* relative paths without "../" are ok */
1024 if (*file != '/') return 1;
1026 /* file must be of the format base + "/" + <1+ char filename> */
1027 if (strlen(file) < _config_base_dir_len + 2) goto err;
1028 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1029 if (*(file + _config_base_dir_len) != '/') goto err;
1031 return 1;
1033 err:
1034 if (sock != NULL && sock->fd >= 0)
1035 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1037 return 0;
1038 } /* }}} static int check_file_access */
1040 /* when using a base dir, convert relative paths to absolute paths.
1041 * if necessary, modifies the "filename" pointer to point
1042 * to the new path created in "tmp". "tmp" is provided
1043 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1044 *
1045 * this allows us to optimize for the expected case (absolute path)
1046 * with a no-op.
1047 */
1048 static void get_abs_path(char **filename, char *tmp)
1049 {
1050 assert(tmp != NULL);
1051 assert(filename != NULL && *filename != NULL);
1053 if (config_base_dir == NULL || **filename == '/')
1054 return;
1056 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1057 *filename = tmp;
1058 } /* }}} static int get_abs_path */
1060 /* returns 1 if we have the required privilege level,
1061 * otherwise issue an error to the user on sock */
1062 static int has_privilege (listen_socket_t *sock, /* {{{ */
1063 socket_privilege priv)
1064 {
1065 if (sock == NULL) /* journal replay */
1066 return 1;
1068 if (sock->privilege >= priv)
1069 return 1;
1071 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1072 } /* }}} static int has_privilege */
1074 static int flush_file (const char *filename) /* {{{ */
1075 {
1076 cache_item_t *ci;
1078 pthread_mutex_lock (&cache_lock);
1080 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1081 if (ci == NULL)
1082 {
1083 pthread_mutex_unlock (&cache_lock);
1084 return (ENOENT);
1085 }
1087 if (ci->values_num > 0)
1088 {
1089 /* Enqueue at head */
1090 enqueue_cache_item (ci, HEAD);
1091 pthread_cond_wait(&ci->flushed, &cache_lock);
1092 }
1094 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1095 * may have been purged during our cond_wait() */
1097 pthread_mutex_unlock(&cache_lock);
1099 return (0);
1100 } /* }}} int flush_file */
1102 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1103 {
1104 char *err = "Syntax error.\n";
1106 if (cmd && cmd->syntax)
1107 err = cmd->syntax;
1109 return send_response(sock, RESP_ERR, "Usage: %s", err);
1110 } /* }}} static int syntax_error() */
1112 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1113 {
1114 uint64_t copy_queue_length;
1115 uint64_t copy_updates_received;
1116 uint64_t copy_flush_received;
1117 uint64_t copy_updates_written;
1118 uint64_t copy_data_sets_written;
1119 uint64_t copy_journal_bytes;
1120 uint64_t copy_journal_rotate;
1122 uint64_t tree_nodes_number;
1123 uint64_t tree_depth;
1125 pthread_mutex_lock (&stats_lock);
1126 copy_queue_length = stats_queue_length;
1127 copy_updates_received = stats_updates_received;
1128 copy_flush_received = stats_flush_received;
1129 copy_updates_written = stats_updates_written;
1130 copy_data_sets_written = stats_data_sets_written;
1131 copy_journal_bytes = stats_journal_bytes;
1132 copy_journal_rotate = stats_journal_rotate;
1133 pthread_mutex_unlock (&stats_lock);
1135 pthread_mutex_lock (&cache_lock);
1136 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1137 tree_depth = (uint64_t) g_tree_height (cache_tree);
1138 pthread_mutex_unlock (&cache_lock);
1140 add_response_info(sock,
1141 "QueueLength: %"PRIu64"\n", copy_queue_length);
1142 add_response_info(sock,
1143 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1144 add_response_info(sock,
1145 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1146 add_response_info(sock,
1147 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1148 add_response_info(sock,
1149 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1150 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1151 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1152 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1153 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1155 send_response(sock, RESP_OK, "Statistics follow\n");
1157 return (0);
1158 } /* }}} int handle_request_stats */
1160 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1161 {
1162 char *file, file_tmp[PATH_MAX];
1163 int status;
1165 status = buffer_get_field (&buffer, &buffer_size, &file);
1166 if (status != 0)
1167 {
1168 return syntax_error(sock,cmd);
1169 }
1170 else
1171 {
1172 pthread_mutex_lock(&stats_lock);
1173 stats_flush_received++;
1174 pthread_mutex_unlock(&stats_lock);
1176 get_abs_path(&file, file_tmp);
1177 if (!check_file_access(file, sock)) return 0;
1179 status = flush_file (file);
1180 if (status == 0)
1181 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1182 else if (status == ENOENT)
1183 {
1184 /* no file in our tree; see whether it exists at all */
1185 struct stat statbuf;
1187 memset(&statbuf, 0, sizeof(statbuf));
1188 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1189 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1190 else
1191 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1192 }
1193 else if (status < 0)
1194 return send_response(sock, RESP_ERR, "Internal error.\n");
1195 else
1196 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1197 }
1199 /* NOTREACHED */
1200 assert(1==0);
1201 } /* }}} int handle_request_flush */
1203 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1204 {
1205 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1207 pthread_mutex_lock(&cache_lock);
1208 flush_old_values(-1);
1209 pthread_mutex_unlock(&cache_lock);
1211 return send_response(sock, RESP_OK, "Started flush.\n");
1212 } /* }}} static int handle_request_flushall */
1214 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1215 {
1216 int status;
1217 char *file, file_tmp[PATH_MAX];
1218 cache_item_t *ci;
1220 status = buffer_get_field(&buffer, &buffer_size, &file);
1221 if (status != 0)
1222 return syntax_error(sock,cmd);
1224 get_abs_path(&file, file_tmp);
1226 pthread_mutex_lock(&cache_lock);
1227 ci = g_tree_lookup(cache_tree, file);
1228 if (ci == NULL)
1229 {
1230 pthread_mutex_unlock(&cache_lock);
1231 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1232 }
1234 for (size_t i=0; i < ci->values_num; i++)
1235 add_response_info(sock, "%s\n", ci->values[i]);
1237 pthread_mutex_unlock(&cache_lock);
1238 return send_response(sock, RESP_OK, "updates pending\n");
1239 } /* }}} static int handle_request_pending */
1241 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1242 {
1243 int status;
1244 gboolean found;
1245 char *file, file_tmp[PATH_MAX];
1247 status = buffer_get_field(&buffer, &buffer_size, &file);
1248 if (status != 0)
1249 return syntax_error(sock,cmd);
1251 get_abs_path(&file, file_tmp);
1252 if (!check_file_access(file, sock)) return 0;
1254 pthread_mutex_lock(&cache_lock);
1255 found = g_tree_remove(cache_tree, file);
1256 pthread_mutex_unlock(&cache_lock);
1258 if (found == TRUE)
1259 {
1260 if (sock != NULL)
1261 journal_write("forget", file);
1263 return send_response(sock, RESP_OK, "Gone!\n");
1264 }
1265 else
1266 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1268 /* NOTREACHED */
1269 assert(1==0);
1270 } /* }}} static int handle_request_forget */
1272 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1273 {
1274 cache_item_t *ci;
1276 pthread_mutex_lock(&cache_lock);
1278 ci = cache_queue_head;
1279 while (ci != NULL)
1280 {
1281 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1282 ci = ci->next;
1283 }
1285 pthread_mutex_unlock(&cache_lock);
1287 return send_response(sock, RESP_OK, "in queue.\n");
1288 } /* }}} int handle_request_queue */
1290 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1291 {
1292 char *file, file_tmp[PATH_MAX];
1293 int values_num = 0;
1294 int status;
1295 char orig_buf[CMD_MAX];
1297 cache_item_t *ci;
1299 /* save it for the journal later */
1300 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1302 status = buffer_get_field (&buffer, &buffer_size, &file);
1303 if (status != 0)
1304 return syntax_error(sock,cmd);
1306 pthread_mutex_lock(&stats_lock);
1307 stats_updates_received++;
1308 pthread_mutex_unlock(&stats_lock);
1310 get_abs_path(&file, file_tmp);
1311 if (!check_file_access(file, sock)) return 0;
1313 pthread_mutex_lock (&cache_lock);
1314 ci = g_tree_lookup (cache_tree, file);
1316 if (ci == NULL) /* {{{ */
1317 {
1318 struct stat statbuf;
1319 cache_item_t *tmp;
1321 /* don't hold the lock while we setup; stat(2) might block */
1322 pthread_mutex_unlock(&cache_lock);
1324 memset (&statbuf, 0, sizeof (statbuf));
1325 status = stat (file, &statbuf);
1326 if (status != 0)
1327 {
1328 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1330 status = errno;
1331 if (status == ENOENT)
1332 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1333 else
1334 return send_response(sock, RESP_ERR,
1335 "stat failed with error %i.\n", status);
1336 }
1337 if (!S_ISREG (statbuf.st_mode))
1338 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1340 if (access(file, R_OK|W_OK) != 0)
1341 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1342 file, rrd_strerror(errno));
1344 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1345 if (ci == NULL)
1346 {
1347 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1349 return send_response(sock, RESP_ERR, "malloc failed.\n");
1350 }
1351 memset (ci, 0, sizeof (cache_item_t));
1353 ci->file = strdup (file);
1354 if (ci->file == NULL)
1355 {
1356 free (ci);
1357 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1359 return send_response(sock, RESP_ERR, "strdup failed.\n");
1360 }
1362 wipe_ci_values(ci, now);
1363 ci->flags = CI_FLAGS_IN_TREE;
1364 pthread_cond_init(&ci->flushed, NULL);
1366 pthread_mutex_lock(&cache_lock);
1368 /* another UPDATE might have added this entry in the meantime */
1369 tmp = g_tree_lookup (cache_tree, file);
1370 if (tmp == NULL)
1371 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1372 else
1373 {
1374 free_cache_item (ci);
1375 ci = tmp;
1376 }
1378 /* state may have changed while we were unlocked */
1379 if (state == SHUTDOWN)
1380 return -1;
1381 } /* }}} */
1382 assert (ci != NULL);
1384 /* don't re-write updates in replay mode */
1385 if (sock != NULL)
1386 journal_write("update", orig_buf);
1388 while (buffer_size > 0)
1389 {
1390 char *value;
1391 time_t stamp;
1392 char *eostamp;
1394 status = buffer_get_field (&buffer, &buffer_size, &value);
1395 if (status != 0)
1396 {
1397 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1398 break;
1399 }
1401 /* make sure update time is always moving forward */
1402 stamp = strtol(value, &eostamp, 10);
1403 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1404 {
1405 pthread_mutex_unlock(&cache_lock);
1406 return send_response(sock, RESP_ERR,
1407 "Cannot find timestamp in '%s'!\n", value);
1408 }
1409 else if (stamp <= ci->last_update_stamp)
1410 {
1411 pthread_mutex_unlock(&cache_lock);
1412 return send_response(sock, RESP_ERR,
1413 "illegal attempt to update using time %ld when last"
1414 " update time is %ld (minimum one second step)\n",
1415 stamp, ci->last_update_stamp);
1416 }
1417 else
1418 ci->last_update_stamp = stamp;
1420 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1421 {
1422 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1423 continue;
1424 }
1426 values_num++;
1427 }
1429 if (((now - ci->last_flush_time) >= config_write_interval)
1430 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1431 && (ci->values_num > 0))
1432 {
1433 enqueue_cache_item (ci, TAIL);
1434 }
1436 pthread_mutex_unlock (&cache_lock);
1438 if (values_num < 1)
1439 return send_response(sock, RESP_ERR, "No values updated.\n");
1440 else
1441 return send_response(sock, RESP_OK,
1442 "errors, enqueued %i value(s).\n", values_num);
1444 /* NOTREACHED */
1445 assert(1==0);
1447 } /* }}} int handle_request_update */
1449 /* we came across a "WROTE" entry during journal replay.
1450 * throw away any values that we have accumulated for this file
1451 */
1452 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1453 {
1454 cache_item_t *ci;
1455 const char *file = buffer;
1457 pthread_mutex_lock(&cache_lock);
1459 ci = g_tree_lookup(cache_tree, file);
1460 if (ci == NULL)
1461 {
1462 pthread_mutex_unlock(&cache_lock);
1463 return (0);
1464 }
1466 if (ci->values)
1467 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1469 wipe_ci_values(ci, now);
1470 remove_from_queue(ci);
1472 pthread_mutex_unlock(&cache_lock);
1473 return (0);
1474 } /* }}} int handle_request_wrote */
1476 /* start "BATCH" processing */
1477 static int batch_start (HANDLER_PROTO) /* {{{ */
1478 {
1479 int status;
1480 if (sock->batch_start)
1481 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1483 status = send_response(sock, RESP_OK,
1484 "Go ahead. End with dot '.' on its own line.\n");
1485 sock->batch_start = time(NULL);
1486 sock->batch_cmd = 0;
1488 return status;
1489 } /* }}} static int batch_start */
1491 /* finish "BATCH" processing and return results to the client */
1492 static int batch_done (HANDLER_PROTO) /* {{{ */
1493 {
1494 assert(sock->batch_start);
1495 sock->batch_start = 0;
1496 sock->batch_cmd = 0;
1497 return send_response(sock, RESP_OK, "errors\n");
1498 } /* }}} static int batch_done */
1500 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1501 {
1502 return -1;
1503 } /* }}} static int handle_request_quit */
1505 struct command COMMANDS[] = {
1506 {
1507 "UPDATE",
1508 handle_request_update,
1509 PRIV_HIGH,
1510 CMD_CONTEXT_ANY,
1511 "UPDATE <filename> <values> [<values> ...]\n"
1512 ,
1513 "Adds the given file to the internal cache if it is not yet known and\n"
1514 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1515 "for details.\n"
1516 "\n"
1517 "Each <values> has the following form:\n"
1518 " <values> = <time>:<value>[:<value>[...]]\n"
1519 "See the rrdupdate(1) manpage for details.\n"
1520 },
1521 {
1522 "WROTE",
1523 handle_request_wrote,
1524 PRIV_HIGH,
1525 CMD_CONTEXT_JOURNAL,
1526 NULL,
1527 NULL
1528 },
1529 {
1530 "FLUSH",
1531 handle_request_flush,
1532 PRIV_LOW,
1533 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1534 "FLUSH <filename>\n"
1535 ,
1536 "Adds the given filename to the head of the update queue and returns\n"
1537 "after it has been dequeued.\n"
1538 },
1539 {
1540 "FLUSHALL",
1541 handle_request_flushall,
1542 PRIV_HIGH,
1543 CMD_CONTEXT_CLIENT,
1544 "FLUSHALL\n"
1545 ,
1546 "Triggers writing of all pending updates. Returns immediately.\n"
1547 },
1548 {
1549 "PENDING",
1550 handle_request_pending,
1551 PRIV_HIGH,
1552 CMD_CONTEXT_CLIENT,
1553 "PENDING <filename>\n"
1554 ,
1555 "Shows any 'pending' updates for a file, in order.\n"
1556 "The updates shown have not yet been written to the underlying RRD file.\n"
1557 },
1558 {
1559 "FORGET",
1560 handle_request_forget,
1561 PRIV_HIGH,
1562 CMD_CONTEXT_ANY,
1563 "FORGET <filename>\n"
1564 ,
1565 "Removes the file completely from the cache.\n"
1566 "Any pending updates for the file will be lost.\n"
1567 },
1568 {
1569 "QUEUE",
1570 handle_request_queue,
1571 PRIV_LOW,
1572 CMD_CONTEXT_CLIENT,
1573 "QUEUE\n"
1574 ,
1575 "Shows all files in the output queue.\n"
1576 "The output is zero or more lines in the following format:\n"
1577 "(where <num_vals> is the number of values to be written)\n"
1578 "\n"
1579 "<num_vals> <filename>\n"
1580 },
1581 {
1582 "STATS",
1583 handle_request_stats,
1584 PRIV_LOW,
1585 CMD_CONTEXT_CLIENT,
1586 "STATS\n"
1587 ,
1588 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1589 "a description of the values.\n"
1590 },
1591 {
1592 "HELP",
1593 handle_request_help,
1594 PRIV_LOW,
1595 CMD_CONTEXT_CLIENT,
1596 "HELP [<command>]\n",
1597 NULL, /* special! */
1598 },
1599 {
1600 "BATCH",
1601 batch_start,
1602 PRIV_LOW,
1603 CMD_CONTEXT_CLIENT,
1604 "BATCH\n"
1605 ,
1606 "The 'BATCH' command permits the client to initiate a bulk load\n"
1607 " of commands to rrdcached.\n"
1608 "\n"
1609 "Usage:\n"
1610 "\n"
1611 " client: BATCH\n"
1612 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1613 " client: command #1\n"
1614 " client: command #2\n"
1615 " client: ... and so on\n"
1616 " client: .\n"
1617 " server: 2 errors\n"
1618 " server: 7 message for command #7\n"
1619 " server: 9 message for command #9\n"
1620 "\n"
1621 "For more information, consult the rrdcached(1) documentation.\n"
1622 },
1623 {
1624 ".", /* BATCH terminator */
1625 batch_done,
1626 PRIV_LOW,
1627 CMD_CONTEXT_BATCH,
1628 NULL,
1629 NULL
1630 },
1631 {
1632 "QUIT",
1633 handle_request_quit,
1634 PRIV_LOW,
1635 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1636 "QUIT\n"
1637 ,
1638 "Disconnect from rrdcached.\n"
1639 },
1640 {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */
1641 };
1643 static struct command *find_command(char *cmd)
1644 {
1645 struct command *c = COMMANDS;
1647 while (c->cmd != NULL)
1648 {
1649 if (strcasecmp(cmd, c->cmd) == 0)
1650 break;
1651 c++;
1652 }
1654 if (c->cmd == NULL)
1655 return NULL;
1656 else
1657 return c;
1658 }
1660 /* check whether commands are received in the expected context */
1661 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1662 {
1663 if (sock == NULL)
1664 return (cmd->context & CMD_CONTEXT_JOURNAL);
1665 else if (sock->batch_start)
1666 return (cmd->context & CMD_CONTEXT_BATCH);
1667 else
1668 return (cmd->context & CMD_CONTEXT_CLIENT);
1670 /* NOTREACHED */
1671 assert(1==0);
1672 }
1674 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1675 {
1676 int status;
1677 char *cmd_str;
1678 char *resp_txt;
1679 struct command *help = NULL;
1681 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1682 if (status == 0)
1683 help = find_command(cmd_str);
1685 if (help && (help->syntax || help->help))
1686 {
1687 char tmp[CMD_MAX];
1689 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1690 resp_txt = tmp;
1692 if (help->syntax)
1693 add_response_info(sock, "Usage: %s\n", help->syntax);
1695 if (help->help)
1696 add_response_info(sock, "%s\n", help->help);
1697 }
1698 else
1699 {
1700 help = COMMANDS;
1701 resp_txt = "Command overview\n";
1703 while (help->cmd)
1704 {
1705 if (help->syntax)
1706 add_response_info(sock, "%s", help->syntax);
1707 help++;
1708 }
1709 }
1711 return send_response(sock, RESP_OK, resp_txt);
1712 } /* }}} int handle_request_help */
1714 /* if sock==NULL, we are in journal replay mode */
1715 static int handle_request (DISPATCH_PROTO) /* {{{ */
1716 {
1717 char *buffer_ptr = buffer;
1718 char *cmd_str = NULL;
1719 struct command *cmd = NULL;
1720 int status;
1722 assert (buffer[buffer_size - 1] == '\0');
1724 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1725 if (status != 0)
1726 {
1727 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1728 return (-1);
1729 }
1731 if (sock != NULL && sock->batch_start)
1732 sock->batch_cmd++;
1734 cmd = find_command(cmd_str);
1735 if (!cmd)
1736 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1738 status = has_privilege(sock, cmd->min_priv);
1739 if (status <= 0)
1740 return status;
1742 if (!command_check_context(sock, cmd))
1743 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1745 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1746 } /* }}} int handle_request */
1748 /* MUST NOT hold journal_lock before calling this */
1749 static void journal_rotate(void) /* {{{ */
1750 {
1751 FILE *old_fh = NULL;
1752 int new_fd;
1754 if (journal_cur == NULL || journal_old == NULL)
1755 return;
1757 pthread_mutex_lock(&journal_lock);
1759 /* we rotate this way (rename before close) so that the we can release
1760 * the journal lock as fast as possible. Journal writes to the new
1761 * journal can proceed immediately after the new file is opened. The
1762 * fclose can then block without affecting new updates.
1763 */
1764 if (journal_fh != NULL)
1765 {
1766 old_fh = journal_fh;
1767 journal_fh = NULL;
1768 rename(journal_cur, journal_old);
1769 ++stats_journal_rotate;
1770 }
1772 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1773 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1774 if (new_fd >= 0)
1775 {
1776 journal_fh = fdopen(new_fd, "a");
1777 if (journal_fh == NULL)
1778 close(new_fd);
1779 }
1781 pthread_mutex_unlock(&journal_lock);
1783 if (old_fh != NULL)
1784 fclose(old_fh);
1786 if (journal_fh == NULL)
1787 {
1788 RRDD_LOG(LOG_CRIT,
1789 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1790 journal_cur, rrd_strerror(errno));
1792 RRDD_LOG(LOG_ERR,
1793 "JOURNALING DISABLED: All values will be flushed at shutdown");
1794 config_flush_at_shutdown = 1;
1795 }
1797 } /* }}} static void journal_rotate */
1799 static void journal_done(void) /* {{{ */
1800 {
1801 if (journal_cur == NULL)
1802 return;
1804 pthread_mutex_lock(&journal_lock);
1805 if (journal_fh != NULL)
1806 {
1807 fclose(journal_fh);
1808 journal_fh = NULL;
1809 }
1811 if (config_flush_at_shutdown)
1812 {
1813 RRDD_LOG(LOG_INFO, "removing journals");
1814 unlink(journal_old);
1815 unlink(journal_cur);
1816 }
1817 else
1818 {
1819 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1820 "journals will be used at next startup");
1821 }
1823 pthread_mutex_unlock(&journal_lock);
1825 } /* }}} static void journal_done */
1827 static int journal_write(char *cmd, char *args) /* {{{ */
1828 {
1829 int chars;
1831 if (journal_fh == NULL)
1832 return 0;
1834 pthread_mutex_lock(&journal_lock);
1835 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1836 pthread_mutex_unlock(&journal_lock);
1838 if (chars > 0)
1839 {
1840 pthread_mutex_lock(&stats_lock);
1841 stats_journal_bytes += chars;
1842 pthread_mutex_unlock(&stats_lock);
1843 }
1845 return chars;
1846 } /* }}} static int journal_write */
1848 static int journal_replay (const char *file) /* {{{ */
1849 {
1850 FILE *fh;
1851 int entry_cnt = 0;
1852 int fail_cnt = 0;
1853 uint64_t line = 0;
1854 char entry[CMD_MAX];
1855 time_t now;
1857 if (file == NULL) return 0;
1859 {
1860 char *reason = "unknown error";
1861 int status = 0;
1862 struct stat statbuf;
1864 memset(&statbuf, 0, sizeof(statbuf));
1865 if (stat(file, &statbuf) != 0)
1866 {
1867 if (errno == ENOENT)
1868 return 0;
1870 reason = "stat error";
1871 status = errno;
1872 }
1873 else if (!S_ISREG(statbuf.st_mode))
1874 {
1875 reason = "not a regular file";
1876 status = EPERM;
1877 }
1878 if (statbuf.st_uid != daemon_uid)
1879 {
1880 reason = "not owned by daemon user";
1881 status = EACCES;
1882 }
1883 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1884 {
1885 reason = "must not be user/group writable";
1886 status = EACCES;
1887 }
1889 if (status != 0)
1890 {
1891 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1892 file, rrd_strerror(status), reason);
1893 return 0;
1894 }
1895 }
1897 fh = fopen(file, "r");
1898 if (fh == NULL)
1899 {
1900 if (errno != ENOENT)
1901 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1902 file, rrd_strerror(errno));
1903 return 0;
1904 }
1905 else
1906 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1908 now = time(NULL);
1910 while(!feof(fh))
1911 {
1912 size_t entry_len;
1914 ++line;
1915 if (fgets(entry, sizeof(entry), fh) == NULL)
1916 break;
1917 entry_len = strlen(entry);
1919 /* check \n termination in case journal writing crashed mid-line */
1920 if (entry_len == 0)
1921 continue;
1922 else if (entry[entry_len - 1] != '\n')
1923 {
1924 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1925 ++fail_cnt;
1926 continue;
1927 }
1929 entry[entry_len - 1] = '\0';
1931 if (handle_request(NULL, now, entry, entry_len) == 0)
1932 ++entry_cnt;
1933 else
1934 ++fail_cnt;
1935 }
1937 fclose(fh);
1939 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1940 entry_cnt, fail_cnt);
1942 return entry_cnt > 0 ? 1 : 0;
1943 } /* }}} static int journal_replay */
1945 static void journal_init(void) /* {{{ */
1946 {
1947 int had_journal = 0;
1949 if (journal_cur == NULL) return;
1951 pthread_mutex_lock(&journal_lock);
1953 RRDD_LOG(LOG_INFO, "checking for journal files");
1955 had_journal += journal_replay(journal_old);
1956 had_journal += journal_replay(journal_cur);
1958 /* it must have been a crash. start a flush */
1959 if (had_journal && config_flush_at_shutdown)
1960 flush_old_values(-1);
1962 pthread_mutex_unlock(&journal_lock);
1963 journal_rotate();
1965 RRDD_LOG(LOG_INFO, "journal processing complete");
1967 } /* }}} static void journal_init */
1969 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1970 {
1971 assert(sock != NULL);
1973 free(sock->rbuf); sock->rbuf = NULL;
1974 free(sock->wbuf); sock->wbuf = NULL;
1975 free(sock);
1976 } /* }}} void free_listen_socket */
1978 static void close_connection(listen_socket_t *sock) /* {{{ */
1979 {
1980 if (sock->fd >= 0)
1981 {
1982 close(sock->fd);
1983 sock->fd = -1;
1984 }
1986 free_listen_socket(sock);
1988 } /* }}} void close_connection */
1990 static void *connection_thread_main (void *args) /* {{{ */
1991 {
1992 listen_socket_t *sock;
1993 int fd;
1995 sock = (listen_socket_t *) args;
1996 fd = sock->fd;
1998 /* init read buffers */
1999 sock->next_read = sock->next_cmd = 0;
2000 sock->rbuf = malloc(RBUF_SIZE);
2001 if (sock->rbuf == NULL)
2002 {
2003 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2004 close_connection(sock);
2005 return NULL;
2006 }
2008 pthread_mutex_lock (&connection_threads_lock);
2009 connection_threads_num++;
2010 pthread_mutex_unlock (&connection_threads_lock);
2012 while (state == RUNNING)
2013 {
2014 char *cmd;
2015 ssize_t cmd_len;
2016 ssize_t rbytes;
2017 time_t now;
2019 struct pollfd pollfd;
2020 int status;
2022 pollfd.fd = fd;
2023 pollfd.events = POLLIN | POLLPRI;
2024 pollfd.revents = 0;
2026 status = poll (&pollfd, 1, /* timeout = */ 500);
2027 if (state != RUNNING)
2028 break;
2029 else if (status == 0) /* timeout */
2030 continue;
2031 else if (status < 0) /* error */
2032 {
2033 status = errno;
2034 if (status != EINTR)
2035 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2036 continue;
2037 }
2039 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2040 break;
2041 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2042 {
2043 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2044 "poll(2) returned something unexpected: %#04hx",
2045 pollfd.revents);
2046 break;
2047 }
2049 rbytes = read(fd, sock->rbuf + sock->next_read,
2050 RBUF_SIZE - sock->next_read);
2051 if (rbytes < 0)
2052 {
2053 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2054 break;
2055 }
2056 else if (rbytes == 0)
2057 break; /* eof */
2059 sock->next_read += rbytes;
2061 if (sock->batch_start)
2062 now = sock->batch_start;
2063 else
2064 now = time(NULL);
2066 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2067 {
2068 status = handle_request (sock, now, cmd, cmd_len+1);
2069 if (status != 0)
2070 goto out_close;
2071 }
2072 }
2074 out_close:
2075 close_connection(sock);
2077 /* Remove this thread from the connection threads list */
2078 pthread_mutex_lock (&connection_threads_lock);
2079 connection_threads_num--;
2080 if (connection_threads_num <= 0)
2081 pthread_cond_broadcast(&connection_threads_done);
2082 pthread_mutex_unlock (&connection_threads_lock);
2084 return (NULL);
2085 } /* }}} void *connection_thread_main */
2087 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2088 {
2089 int fd;
2090 struct sockaddr_un sa;
2091 listen_socket_t *temp;
2092 int status;
2093 const char *path;
2095 path = sock->addr;
2096 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2097 path += strlen("unix:");
2099 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2100 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2101 if (temp == NULL)
2102 {
2103 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2104 return (-1);
2105 }
2106 listen_fds = temp;
2107 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2109 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2110 if (fd < 0)
2111 {
2112 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2113 rrd_strerror(errno));
2114 return (-1);
2115 }
2117 memset (&sa, 0, sizeof (sa));
2118 sa.sun_family = AF_UNIX;
2119 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2121 /* if we've gotten this far, we own the pid file. any daemon started
2122 * with the same args must not be alive. therefore, ensure that we can
2123 * create the socket...
2124 */
2125 unlink(path);
2127 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2128 if (status != 0)
2129 {
2130 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2131 path, rrd_strerror(errno));
2132 close (fd);
2133 return (-1);
2134 }
2136 status = listen (fd, /* backlog = */ 10);
2137 if (status != 0)
2138 {
2139 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2140 path, rrd_strerror(errno));
2141 close (fd);
2142 unlink (path);
2143 return (-1);
2144 }
2146 listen_fds[listen_fds_num].fd = fd;
2147 listen_fds[listen_fds_num].family = PF_UNIX;
2148 strncpy(listen_fds[listen_fds_num].addr, path,
2149 sizeof (listen_fds[listen_fds_num].addr) - 1);
2150 listen_fds_num++;
2152 return (0);
2153 } /* }}} int open_listen_socket_unix */
2155 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2156 {
2157 struct addrinfo ai_hints;
2158 struct addrinfo *ai_res;
2159 struct addrinfo *ai_ptr;
2160 char addr_copy[NI_MAXHOST];
2161 char *addr;
2162 char *port;
2163 int status;
2165 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2166 addr_copy[sizeof (addr_copy) - 1] = 0;
2167 addr = addr_copy;
2169 memset (&ai_hints, 0, sizeof (ai_hints));
2170 ai_hints.ai_flags = 0;
2171 #ifdef AI_ADDRCONFIG
2172 ai_hints.ai_flags |= AI_ADDRCONFIG;
2173 #endif
2174 ai_hints.ai_family = AF_UNSPEC;
2175 ai_hints.ai_socktype = SOCK_STREAM;
2177 port = NULL;
2178 if (*addr == '[') /* IPv6+port format */
2179 {
2180 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2181 addr++;
2183 port = strchr (addr, ']');
2184 if (port == NULL)
2185 {
2186 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2187 return (-1);
2188 }
2189 *port = 0;
2190 port++;
2192 if (*port == ':')
2193 port++;
2194 else if (*port == 0)
2195 port = NULL;
2196 else
2197 {
2198 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2199 return (-1);
2200 }
2201 } /* if (*addr = ']') */
2202 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2203 {
2204 port = rindex(addr, ':');
2205 if (port != NULL)
2206 {
2207 *port = 0;
2208 port++;
2209 }
2210 }
2211 ai_res = NULL;
2212 status = getaddrinfo (addr,
2213 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2214 &ai_hints, &ai_res);
2215 if (status != 0)
2216 {
2217 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2218 addr, gai_strerror (status));
2219 return (-1);
2220 }
2222 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2223 {
2224 int fd;
2225 listen_socket_t *temp;
2226 int one = 1;
2228 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2229 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2230 if (temp == NULL)
2231 {
2232 fprintf (stderr,
2233 "rrdcached: open_listen_socket_network: realloc failed.\n");
2234 continue;
2235 }
2236 listen_fds = temp;
2237 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2239 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2240 if (fd < 0)
2241 {
2242 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2243 rrd_strerror(errno));
2244 continue;
2245 }
2247 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2249 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2250 if (status != 0)
2251 {
2252 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2253 sock->addr, rrd_strerror(errno));
2254 close (fd);
2255 continue;
2256 }
2258 status = listen (fd, /* backlog = */ 10);
2259 if (status != 0)
2260 {
2261 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2262 sock->addr, rrd_strerror(errno));
2263 close (fd);
2264 freeaddrinfo(ai_res);
2265 return (-1);
2266 }
2268 listen_fds[listen_fds_num].fd = fd;
2269 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2270 listen_fds_num++;
2271 } /* for (ai_ptr) */
2273 freeaddrinfo(ai_res);
2274 return (0);
2275 } /* }}} static int open_listen_socket_network */
2277 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2278 {
2279 assert(sock != NULL);
2280 assert(sock->addr != NULL);
2282 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2283 || sock->addr[0] == '/')
2284 return (open_listen_socket_unix(sock));
2285 else
2286 return (open_listen_socket_network(sock));
2287 } /* }}} int open_listen_socket */
2289 static int close_listen_sockets (void) /* {{{ */
2290 {
2291 size_t i;
2293 for (i = 0; i < listen_fds_num; i++)
2294 {
2295 close (listen_fds[i].fd);
2297 if (listen_fds[i].family == PF_UNIX)
2298 unlink(listen_fds[i].addr);
2299 }
2301 free (listen_fds);
2302 listen_fds = NULL;
2303 listen_fds_num = 0;
2305 return (0);
2306 } /* }}} int close_listen_sockets */
2308 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2309 {
2310 struct pollfd *pollfds;
2311 int pollfds_num;
2312 int status;
2313 int i;
2315 if (listen_fds_num < 1)
2316 {
2317 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2318 return (NULL);
2319 }
2321 pollfds_num = listen_fds_num;
2322 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2323 if (pollfds == NULL)
2324 {
2325 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2326 return (NULL);
2327 }
2328 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2330 RRDD_LOG(LOG_INFO, "listening for connections");
2332 while (state == RUNNING)
2333 {
2334 for (i = 0; i < pollfds_num; i++)
2335 {
2336 pollfds[i].fd = listen_fds[i].fd;
2337 pollfds[i].events = POLLIN | POLLPRI;
2338 pollfds[i].revents = 0;
2339 }
2341 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2342 if (state != RUNNING)
2343 break;
2344 else if (status == 0) /* timeout */
2345 continue;
2346 else if (status < 0) /* error */
2347 {
2348 status = errno;
2349 if (status != EINTR)
2350 {
2351 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2352 }
2353 continue;
2354 }
2356 for (i = 0; i < pollfds_num; i++)
2357 {
2358 listen_socket_t *client_sock;
2359 struct sockaddr_storage client_sa;
2360 socklen_t client_sa_size;
2361 pthread_t tid;
2362 pthread_attr_t attr;
2364 if (pollfds[i].revents == 0)
2365 continue;
2367 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2368 {
2369 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2370 "poll(2) returned something unexpected for listen FD #%i.",
2371 pollfds[i].fd);
2372 continue;
2373 }
2375 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2376 if (client_sock == NULL)
2377 {
2378 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2379 continue;
2380 }
2381 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2383 client_sa_size = sizeof (client_sa);
2384 client_sock->fd = accept (pollfds[i].fd,
2385 (struct sockaddr *) &client_sa, &client_sa_size);
2386 if (client_sock->fd < 0)
2387 {
2388 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2389 free(client_sock);
2390 continue;
2391 }
2393 pthread_attr_init (&attr);
2394 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2396 status = pthread_create (&tid, &attr, connection_thread_main,
2397 client_sock);
2398 if (status != 0)
2399 {
2400 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2401 close_connection(client_sock);
2402 continue;
2403 }
2404 } /* for (pollfds_num) */
2405 } /* while (state == RUNNING) */
2407 RRDD_LOG(LOG_INFO, "starting shutdown");
2409 close_listen_sockets ();
2411 pthread_mutex_lock (&connection_threads_lock);
2412 while (connection_threads_num > 0)
2413 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2414 pthread_mutex_unlock (&connection_threads_lock);
2416 free(pollfds);
2418 return (NULL);
2419 } /* }}} void *listen_thread_main */
2421 static int daemonize (void) /* {{{ */
2422 {
2423 int pid_fd;
2424 char *base_dir;
2426 daemon_uid = geteuid();
2428 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2429 if (pid_fd < 0)
2430 pid_fd = check_pidfile();
2431 if (pid_fd < 0)
2432 return pid_fd;
2434 /* open all the listen sockets */
2435 if (config_listen_address_list_len > 0)
2436 {
2437 for (size_t i = 0; i < config_listen_address_list_len; i++)
2438 open_listen_socket (config_listen_address_list[i]);
2440 rrd_free_ptrs((void ***) &config_listen_address_list,
2441 &config_listen_address_list_len);
2442 }
2443 else
2444 {
2445 listen_socket_t sock;
2446 memset(&sock, 0, sizeof(sock));
2447 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2448 open_listen_socket (&sock);
2449 }
2451 if (listen_fds_num < 1)
2452 {
2453 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2454 goto error;
2455 }
2457 if (!stay_foreground)
2458 {
2459 pid_t child;
2461 child = fork ();
2462 if (child < 0)
2463 {
2464 fprintf (stderr, "daemonize: fork(2) failed.\n");
2465 goto error;
2466 }
2467 else if (child > 0)
2468 exit(0);
2470 /* Become session leader */
2471 setsid ();
2473 /* Open the first three file descriptors to /dev/null */
2474 close (2);
2475 close (1);
2476 close (0);
2478 open ("/dev/null", O_RDWR);
2479 if (dup(0) == -1 || dup(0) == -1){
2480 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2481 }
2482 } /* if (!stay_foreground) */
2484 /* Change into the /tmp directory. */
2485 base_dir = (config_base_dir != NULL)
2486 ? config_base_dir
2487 : "/tmp";
2489 if (chdir (base_dir) != 0)
2490 {
2491 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2492 goto error;
2493 }
2495 install_signal_handlers();
2497 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2498 RRDD_LOG(LOG_INFO, "starting up");
2500 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2501 (GDestroyNotify) free_cache_item);
2502 if (cache_tree == NULL)
2503 {
2504 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2505 goto error;
2506 }
2508 return write_pidfile (pid_fd);
2510 error:
2511 remove_pidfile();
2512 return -1;
2513 } /* }}} int daemonize */
2515 static int cleanup (void) /* {{{ */
2516 {
2517 pthread_cond_broadcast (&flush_cond);
2518 pthread_join (flush_thread, NULL);
2520 pthread_cond_broadcast (&queue_cond);
2521 for (int i = 0; i < config_queue_threads; i++)
2522 pthread_join (queue_threads[i], NULL);
2524 if (config_flush_at_shutdown)
2525 {
2526 assert(cache_queue_head == NULL);
2527 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2528 }
2530 journal_done();
2531 remove_pidfile ();
2533 free(queue_threads);
2534 free(config_base_dir);
2535 free(config_pid_file);
2536 free(journal_cur);
2537 free(journal_old);
2539 pthread_mutex_lock(&cache_lock);
2540 g_tree_destroy(cache_tree);
2542 RRDD_LOG(LOG_INFO, "goodbye");
2543 closelog ();
2545 return (0);
2546 } /* }}} int cleanup */
2548 static int read_options (int argc, char **argv) /* {{{ */
2549 {
2550 int option;
2551 int status = 0;
2553 while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2554 {
2555 switch (option)
2556 {
2557 case 'g':
2558 stay_foreground=1;
2559 break;
2561 case 'L':
2562 case 'l':
2563 {
2564 listen_socket_t *new;
2566 new = malloc(sizeof(listen_socket_t));
2567 if (new == NULL)
2568 {
2569 fprintf(stderr, "read_options: malloc failed.\n");
2570 return(2);
2571 }
2572 memset(new, 0, sizeof(listen_socket_t));
2574 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2575 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2577 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2578 &config_listen_address_list_len, new))
2579 {
2580 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2581 return (2);
2582 }
2583 }
2584 break;
2586 case 'f':
2587 {
2588 int temp;
2590 temp = atoi (optarg);
2591 if (temp > 0)
2592 config_flush_interval = temp;
2593 else
2594 {
2595 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2596 status = 3;
2597 }
2598 }
2599 break;
2601 case 'w':
2602 {
2603 int temp;
2605 temp = atoi (optarg);
2606 if (temp > 0)
2607 config_write_interval = temp;
2608 else
2609 {
2610 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2611 status = 2;
2612 }
2613 }
2614 break;
2616 case 'z':
2617 {
2618 int temp;
2620 temp = atoi(optarg);
2621 if (temp > 0)
2622 config_write_jitter = temp;
2623 else
2624 {
2625 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2626 status = 2;
2627 }
2629 break;
2630 }
2632 case 't':
2633 {
2634 int threads;
2635 threads = atoi(optarg);
2636 if (threads >= 1)
2637 config_queue_threads = threads;
2638 else
2639 {
2640 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2641 return 1;
2642 }
2643 }
2644 break;
2646 case 'B':
2647 config_write_base_only = 1;
2648 break;
2650 case 'b':
2651 {
2652 size_t len;
2653 char base_realpath[PATH_MAX];
2655 if (config_base_dir != NULL)
2656 free (config_base_dir);
2657 config_base_dir = strdup (optarg);
2658 if (config_base_dir == NULL)
2659 {
2660 fprintf (stderr, "read_options: strdup failed.\n");
2661 return (3);
2662 }
2664 /* make sure that the base directory is not resolved via
2665 * symbolic links. this makes some performance-enhancing
2666 * assumptions possible (we don't have to resolve paths
2667 * that start with a "/")
2668 */
2669 if (realpath(config_base_dir, base_realpath) == NULL)
2670 {
2671 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2672 return 5;
2673 }
2674 else if (strncmp(config_base_dir,
2675 base_realpath, sizeof(base_realpath)) != 0)
2676 {
2677 fprintf(stderr,
2678 "Base directory (-b) resolved via file system links!\n"
2679 "Please consult rrdcached '-b' documentation!\n"
2680 "Consider specifying the real directory (%s)\n",
2681 base_realpath);
2682 return 5;
2683 }
2685 len = strlen (config_base_dir);
2686 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2687 {
2688 config_base_dir[len - 1] = 0;
2689 len--;
2690 }
2692 if (len < 1)
2693 {
2694 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2695 return (4);
2696 }
2698 _config_base_dir_len = len;
2699 }
2700 break;
2702 case 'p':
2703 {
2704 if (config_pid_file != NULL)
2705 free (config_pid_file);
2706 config_pid_file = strdup (optarg);
2707 if (config_pid_file == NULL)
2708 {
2709 fprintf (stderr, "read_options: strdup failed.\n");
2710 return (3);
2711 }
2712 }
2713 break;
2715 case 'F':
2716 config_flush_at_shutdown = 1;
2717 break;
2719 case 'j':
2720 {
2721 struct stat statbuf;
2722 const char *dir = optarg;
2724 status = stat(dir, &statbuf);
2725 if (status != 0)
2726 {
2727 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2728 return 6;
2729 }
2731 if (!S_ISDIR(statbuf.st_mode)
2732 || access(dir, R_OK|W_OK|X_OK) != 0)
2733 {
2734 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2735 errno ? rrd_strerror(errno) : "");
2736 return 6;
2737 }
2739 journal_cur = malloc(PATH_MAX + 1);
2740 journal_old = malloc(PATH_MAX + 1);
2741 if (journal_cur == NULL || journal_old == NULL)
2742 {
2743 fprintf(stderr, "malloc failure for journal files\n");
2744 return 6;
2745 }
2746 else
2747 {
2748 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2749 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2750 }
2751 }
2752 break;
2754 case 'h':
2755 case '?':
2756 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2757 "\n"
2758 "Usage: rrdcached [options]\n"
2759 "\n"
2760 "Valid options are:\n"
2761 " -l <address> Socket address to listen to.\n"
2762 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2763 " -w <seconds> Interval in which to write data.\n"
2764 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2765 " -t <threads> Number of write threads.\n"
2766 " -f <seconds> Interval in which to flush dead data.\n"
2767 " -p <file> Location of the PID-file.\n"
2768 " -b <dir> Base directory to change to.\n"
2769 " -B Restrict file access to paths within -b <dir>\n"
2770 " -g Do not fork and run in the foreground.\n"
2771 " -j <dir> Directory in which to create the journal files.\n"
2772 " -F Always flush all updates at shutdown\n"
2773 "\n"
2774 "For more information and a detailed description of all options "
2775 "please refer\n"
2776 "to the rrdcached(1) manual page.\n",
2777 VERSION);
2778 status = -1;
2779 break;
2780 } /* switch (option) */
2781 } /* while (getopt) */
2783 /* advise the user when values are not sane */
2784 if (config_flush_interval < 2 * config_write_interval)
2785 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2786 " 2x write interval (-w) !\n");
2787 if (config_write_jitter > config_write_interval)
2788 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2789 " write interval (-w) !\n");
2791 if (config_write_base_only && config_base_dir == NULL)
2792 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2793 " Consult the rrdcached documentation\n");
2795 if (journal_cur == NULL)
2796 config_flush_at_shutdown = 1;
2798 return (status);
2799 } /* }}} int read_options */
2801 int main (int argc, char **argv)
2802 {
2803 int status;
2805 status = read_options (argc, argv);
2806 if (status != 0)
2807 {
2808 if (status < 0)
2809 status = 0;
2810 return (status);
2811 }
2813 status = daemonize ();
2814 if (status != 0)
2815 {
2816 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2817 return (1);
2818 }
2820 journal_init();
2822 /* start the queue threads */
2823 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2824 if (queue_threads == NULL)
2825 {
2826 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2827 cleanup();
2828 return (1);
2829 }
2830 for (int i = 0; i < config_queue_threads; i++)
2831 {
2832 memset (&queue_threads[i], 0, sizeof (*queue_threads));
2833 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2834 if (status != 0)
2835 {
2836 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2837 cleanup();
2838 return (1);
2839 }
2840 }
2842 /* start the flush thread */
2843 memset(&flush_thread, 0, sizeof(flush_thread));
2844 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2845 if (status != 0)
2846 {
2847 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2848 cleanup();
2849 return (1);
2850 }
2852 listen_thread_main (NULL);
2853 cleanup ();
2855 return (0);
2856 } /* int main */
2858 /*
2859 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2860 */