1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 # include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116 * Types
117 */
118 typedef enum
119 {
120 PRIV_LOW,
121 PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
127 {
128 int fd;
129 char addr[PATH_MAX + 1];
130 int family;
131 socket_privilege privilege;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO struct command *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
160 socket_privilege min_priv;
162 char context; /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT (1<<0)
164 #define CMD_CONTEXT_BATCH (1<<1)
165 #define CMD_CONTEXT_JOURNAL (1<<2)
166 #define CMD_CONTEXT_ANY (0x7f)
168 char *syntax;
169 char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176 char *file;
177 char **values;
178 size_t values_num;
179 time_t last_flush_time;
180 time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183 int flags;
184 pthread_cond_t flushed;
185 cache_item_t *prev;
186 cache_item_t *next;
187 };
189 struct callback_flush_data_s
190 {
191 time_t now;
192 time_t abs_timeout;
193 char **keys;
194 size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
199 {
200 HEAD,
201 TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210 * Variables
211 */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 static int do_shutdown = 0;
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
231 /* Cache stuff */
232 static GTree *cache_tree = NULL;
233 static cache_item_t *cache_queue_head = NULL;
234 static cache_item_t *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
237 static int config_write_interval = 300;
238 static int config_write_jitter = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
246 static listen_socket_t **config_listen_address_list = NULL;
247 static size_t config_listen_address_list_len = 0;
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
270 /*
271 * Functions
272 */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276 do_shutdown++;
277 pthread_cond_broadcast(&flush_cond);
278 pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283 sig_common("INT");
284 } /* }}} void sig_int_handler */
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288 sig_common("TERM");
289 } /* }}} void sig_term_handler */
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293 config_flush_at_shutdown = 1;
294 sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299 config_flush_at_shutdown = 0;
300 sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
303 static void install_signal_handlers(void) /* {{{ */
304 {
305 /* These structures are static, because `sigaction' behaves weird if the are
306 * overwritten.. */
307 static struct sigaction sa_int;
308 static struct sigaction sa_term;
309 static struct sigaction sa_pipe;
310 static struct sigaction sa_usr1;
311 static struct sigaction sa_usr2;
313 /* Install signal handlers */
314 memset (&sa_int, 0, sizeof (sa_int));
315 sa_int.sa_handler = sig_int_handler;
316 sigaction (SIGINT, &sa_int, NULL);
318 memset (&sa_term, 0, sizeof (sa_term));
319 sa_term.sa_handler = sig_term_handler;
320 sigaction (SIGTERM, &sa_term, NULL);
322 memset (&sa_pipe, 0, sizeof (sa_pipe));
323 sa_pipe.sa_handler = SIG_IGN;
324 sigaction (SIGPIPE, &sa_pipe, NULL);
326 memset (&sa_pipe, 0, sizeof (sa_usr1));
327 sa_usr1.sa_handler = sig_usr1_handler;
328 sigaction (SIGUSR1, &sa_usr1, NULL);
330 memset (&sa_usr2, 0, sizeof (sa_usr2));
331 sa_usr2.sa_handler = sig_usr2_handler;
332 sigaction (SIGUSR2, &sa_usr2, NULL);
334 } /* }}} void install_signal_handlers */
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338 int fd;
339 char *file;
341 file = (config_pid_file != NULL)
342 ? config_pid_file
343 : LOCALSTATEDIR "/run/rrdcached.pid";
345 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346 if (fd < 0)
347 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348 action, file, rrd_strerror(errno));
350 return(fd);
351 } /* }}} static int open_pidfile */
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356 int pid_fd;
357 pid_t pid;
358 char pid_str[16];
360 pid_fd = open_pidfile("open", O_RDWR);
361 if (pid_fd < 0)
362 return pid_fd;
364 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365 return -1;
367 pid = atoi(pid_str);
368 if (pid <= 0)
369 return -1;
371 /* another running process that we can signal COULD be
372 * a competing rrdcached */
373 if (pid != getpid() && kill(pid, 0) == 0)
374 {
375 fprintf(stderr,
376 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377 close(pid_fd);
378 return -1;
379 }
381 lseek(pid_fd, 0, SEEK_SET);
382 if (ftruncate(pid_fd, 0) == -1)
383 {
384 fprintf(stderr,
385 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
386 close(pid_fd);
387 return -1;
388 }
390 fprintf(stderr,
391 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
392 "rrdcached: starting normally.\n", pid);
394 return pid_fd;
395 } /* }}} static int check_pidfile */
397 static int write_pidfile (int fd) /* {{{ */
398 {
399 pid_t pid;
400 FILE *fh;
402 pid = getpid ();
404 fh = fdopen (fd, "w");
405 if (fh == NULL)
406 {
407 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
408 close(fd);
409 return (-1);
410 }
412 fprintf (fh, "%i\n", (int) pid);
413 fclose (fh);
415 return (0);
416 } /* }}} int write_pidfile */
418 static int remove_pidfile (void) /* {{{ */
419 {
420 char *file;
421 int status;
423 file = (config_pid_file != NULL)
424 ? config_pid_file
425 : LOCALSTATEDIR "/run/rrdcached.pid";
427 status = unlink (file);
428 if (status == 0)
429 return (0);
430 return (errno);
431 } /* }}} int remove_pidfile */
433 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
434 {
435 char *eol;
437 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
438 sock->next_read - sock->next_cmd);
440 if (eol == NULL)
441 {
442 /* no commands left, move remainder back to front of rbuf */
443 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
444 sock->next_read - sock->next_cmd);
445 sock->next_read -= sock->next_cmd;
446 sock->next_cmd = 0;
447 *len = 0;
448 return NULL;
449 }
450 else
451 {
452 char *cmd = sock->rbuf + sock->next_cmd;
453 *eol = '\0';
455 sock->next_cmd = eol - sock->rbuf + 1;
457 if (eol > sock->rbuf && *(eol-1) == '\r')
458 *(--eol) = '\0'; /* handle "\r\n" EOL */
460 *len = eol - cmd;
462 return cmd;
463 }
465 /* NOTREACHED */
466 assert(1==0);
467 }
469 /* add the characters directly to the write buffer */
470 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
471 {
472 char *new_buf;
474 assert(sock != NULL);
476 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
477 if (new_buf == NULL)
478 {
479 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
480 return -1;
481 }
483 strncpy(new_buf + sock->wbuf_len, str, len + 1);
485 sock->wbuf = new_buf;
486 sock->wbuf_len += len;
488 return 0;
489 } /* }}} static int add_to_wbuf */
491 /* add the text to the "extra" info that's sent after the status line */
492 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
493 {
494 va_list argp;
495 char buffer[CMD_MAX];
496 int len;
498 if (sock == NULL) return 0; /* journal replay mode */
499 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
501 va_start(argp, fmt);
502 #ifdef HAVE_VSNPRINTF
503 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
504 #else
505 len = vsprintf(buffer, fmt, argp);
506 #endif
507 va_end(argp);
508 if (len < 0)
509 {
510 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
511 return -1;
512 }
514 return add_to_wbuf(sock, buffer, len);
515 } /* }}} static int add_response_info */
517 static int count_lines(char *str) /* {{{ */
518 {
519 int lines = 0;
521 if (str != NULL)
522 {
523 while ((str = strchr(str, '\n')) != NULL)
524 {
525 ++lines;
526 ++str;
527 }
528 }
530 return lines;
531 } /* }}} static int count_lines */
533 /* send the response back to the user.
534 * returns 0 on success, -1 on error
535 * write buffer is always zeroed after this call */
536 static int send_response (listen_socket_t *sock, response_code rc,
537 char *fmt, ...) /* {{{ */
538 {
539 va_list argp;
540 char buffer[CMD_MAX];
541 int lines;
542 ssize_t wrote;
543 int rclen, len;
545 if (sock == NULL) return rc; /* journal replay mode */
547 if (sock->batch_start)
548 {
549 if (rc == RESP_OK)
550 return rc; /* no response on success during BATCH */
551 lines = sock->batch_cmd;
552 }
553 else if (rc == RESP_OK)
554 lines = count_lines(sock->wbuf);
555 else
556 lines = -1;
558 rclen = sprintf(buffer, "%d ", lines);
559 va_start(argp, fmt);
560 #ifdef HAVE_VSNPRINTF
561 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
562 #else
563 len = vsprintf(buffer+rclen, fmt, argp);
564 #endif
565 va_end(argp);
566 if (len < 0)
567 return -1;
569 len += rclen;
571 /* append the result to the wbuf, don't write to the user */
572 if (sock->batch_start)
573 return add_to_wbuf(sock, buffer, len);
575 /* first write must be complete */
576 if (len != write(sock->fd, buffer, len))
577 {
578 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
579 return -1;
580 }
582 if (sock->wbuf != NULL && rc == RESP_OK)
583 {
584 wrote = 0;
585 while (wrote < sock->wbuf_len)
586 {
587 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
588 if (wb <= 0)
589 {
590 RRDD_LOG(LOG_INFO, "send_response: could not write results");
591 return -1;
592 }
593 wrote += wb;
594 }
595 }
597 free(sock->wbuf); sock->wbuf = NULL;
598 sock->wbuf_len = 0;
600 return 0;
601 } /* }}} */
603 static void wipe_ci_values(cache_item_t *ci, time_t when)
604 {
605 ci->values = NULL;
606 ci->values_num = 0;
608 ci->last_flush_time = when;
609 if (config_write_jitter > 0)
610 ci->last_flush_time += (rrd_random() % config_write_jitter);
611 }
613 /* remove_from_queue
614 * remove a "cache_item_t" item from the queue.
615 * must hold 'cache_lock' when calling this
616 */
617 static void remove_from_queue(cache_item_t *ci) /* {{{ */
618 {
619 if (ci == NULL) return;
620 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
622 if (ci->prev == NULL)
623 cache_queue_head = ci->next; /* reset head */
624 else
625 ci->prev->next = ci->next;
627 if (ci->next == NULL)
628 cache_queue_tail = ci->prev; /* reset the tail */
629 else
630 ci->next->prev = ci->prev;
632 ci->next = ci->prev = NULL;
633 ci->flags &= ~CI_FLAGS_IN_QUEUE;
635 pthread_mutex_lock (&stats_lock);
636 assert (stats_queue_length > 0);
637 stats_queue_length--;
638 pthread_mutex_unlock (&stats_lock);
640 } /* }}} static void remove_from_queue */
642 /* free the resources associated with the cache_item_t
643 * must hold cache_lock when calling this function
644 */
645 static void *free_cache_item(cache_item_t *ci) /* {{{ */
646 {
647 if (ci == NULL) return NULL;
649 remove_from_queue(ci);
651 for (size_t i=0; i < ci->values_num; i++)
652 free(ci->values[i]);
654 free (ci->values);
655 free (ci->file);
657 /* in case anyone is waiting */
658 pthread_cond_broadcast(&ci->flushed);
659 pthread_cond_destroy(&ci->flushed);
661 free (ci);
663 return NULL;
664 } /* }}} static void *free_cache_item */
666 /*
667 * enqueue_cache_item:
668 * `cache_lock' must be acquired before calling this function!
669 */
670 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
671 queue_side_t side)
672 {
673 if (ci == NULL)
674 return (-1);
676 if (ci->values_num == 0)
677 return (0);
679 if (side == HEAD)
680 {
681 if (cache_queue_head == ci)
682 return 0;
684 /* remove if further down in queue */
685 remove_from_queue(ci);
687 ci->prev = NULL;
688 ci->next = cache_queue_head;
689 if (ci->next != NULL)
690 ci->next->prev = ci;
691 cache_queue_head = ci;
693 if (cache_queue_tail == NULL)
694 cache_queue_tail = cache_queue_head;
695 }
696 else /* (side == TAIL) */
697 {
698 /* We don't move values back in the list.. */
699 if (ci->flags & CI_FLAGS_IN_QUEUE)
700 return (0);
702 assert (ci->next == NULL);
703 assert (ci->prev == NULL);
705 ci->prev = cache_queue_tail;
707 if (cache_queue_tail == NULL)
708 cache_queue_head = ci;
709 else
710 cache_queue_tail->next = ci;
712 cache_queue_tail = ci;
713 }
715 ci->flags |= CI_FLAGS_IN_QUEUE;
717 pthread_cond_signal(&queue_cond);
718 pthread_mutex_lock (&stats_lock);
719 stats_queue_length++;
720 pthread_mutex_unlock (&stats_lock);
722 return (0);
723 } /* }}} int enqueue_cache_item */
725 /*
726 * tree_callback_flush:
727 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
728 * while this is in progress.
729 */
730 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
731 gpointer data)
732 {
733 cache_item_t *ci;
734 callback_flush_data_t *cfd;
736 ci = (cache_item_t *) value;
737 cfd = (callback_flush_data_t *) data;
739 if (ci->flags & CI_FLAGS_IN_QUEUE)
740 return FALSE;
742 if ((ci->last_flush_time <= cfd->abs_timeout)
743 && (ci->values_num > 0))
744 {
745 enqueue_cache_item (ci, TAIL);
746 }
747 else if ((do_shutdown != 0)
748 && (ci->values_num > 0))
749 {
750 enqueue_cache_item (ci, TAIL);
751 }
752 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
753 && (ci->values_num <= 0))
754 {
755 assert ((char *) key == ci->file);
756 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
757 {
758 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
759 return (FALSE);
760 }
761 }
763 return (FALSE);
764 } /* }}} gboolean tree_callback_flush */
766 static int flush_old_values (int max_age)
767 {
768 callback_flush_data_t cfd;
769 size_t k;
771 memset (&cfd, 0, sizeof (cfd));
772 /* Pass the current time as user data so that we don't need to call
773 * `time' for each node. */
774 cfd.now = time (NULL);
775 cfd.keys = NULL;
776 cfd.keys_num = 0;
778 if (max_age > 0)
779 cfd.abs_timeout = cfd.now - max_age;
780 else
781 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
783 /* `tree_callback_flush' will return the keys of all values that haven't
784 * been touched in the last `config_flush_interval' seconds in `cfd'.
785 * The char*'s in this array point to the same memory as ci->file, so we
786 * don't need to free them separately. */
787 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
789 for (k = 0; k < cfd.keys_num; k++)
790 {
791 /* should never fail, since we have held the cache_lock
792 * the entire time */
793 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
794 }
796 if (cfd.keys != NULL)
797 {
798 free (cfd.keys);
799 cfd.keys = NULL;
800 }
802 return (0);
803 } /* int flush_old_values */
805 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
806 {
807 struct timeval now;
808 struct timespec next_flush;
809 int status;
811 gettimeofday (&now, NULL);
812 next_flush.tv_sec = now.tv_sec + config_flush_interval;
813 next_flush.tv_nsec = 1000 * now.tv_usec;
815 pthread_mutex_lock(&cache_lock);
817 while (!do_shutdown)
818 {
819 gettimeofday (&now, NULL);
820 if ((now.tv_sec > next_flush.tv_sec)
821 || ((now.tv_sec == next_flush.tv_sec)
822 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
823 {
824 /* Flush all values that haven't been written in the last
825 * `config_write_interval' seconds. */
826 flush_old_values (config_write_interval);
828 /* Determine the time of the next cache flush. */
829 next_flush.tv_sec =
830 now.tv_sec + next_flush.tv_sec % config_flush_interval;
832 /* unlock the cache while we rotate so we don't block incoming
833 * updates if the fsync() blocks on disk I/O */
834 pthread_mutex_unlock(&cache_lock);
835 journal_rotate();
836 pthread_mutex_lock(&cache_lock);
837 }
839 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
840 if (status != 0 && status != ETIMEDOUT)
841 {
842 RRDD_LOG (LOG_ERR, "flush_thread_main: "
843 "pthread_cond_timedwait returned %i.", status);
844 }
845 }
847 if (config_flush_at_shutdown)
848 flush_old_values (-1); /* flush everything */
850 pthread_mutex_unlock(&cache_lock);
852 return NULL;
853 } /* void *flush_thread_main */
855 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
856 {
857 pthread_mutex_lock (&cache_lock);
859 while (!do_shutdown
860 || (cache_queue_head != NULL && config_flush_at_shutdown))
861 {
862 cache_item_t *ci;
863 char *file;
864 char **values;
865 size_t values_num;
866 int status;
868 /* Now, check if there's something to store away. If not, wait until
869 * something comes in. if we are shutting down, do not wait around. */
870 if (cache_queue_head == NULL && !do_shutdown)
871 {
872 status = pthread_cond_wait (&queue_cond, &cache_lock);
873 if ((status != 0) && (status != ETIMEDOUT))
874 {
875 RRDD_LOG (LOG_ERR, "queue_thread_main: "
876 "pthread_cond_wait returned %i.", status);
877 }
878 }
880 /* Check if a value has arrived. This may be NULL if we timed out or there
881 * was an interrupt such as a signal. */
882 if (cache_queue_head == NULL)
883 continue;
885 ci = cache_queue_head;
887 /* copy the relevant parts */
888 file = strdup (ci->file);
889 if (file == NULL)
890 {
891 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
892 continue;
893 }
895 assert(ci->values != NULL);
896 assert(ci->values_num > 0);
898 values = ci->values;
899 values_num = ci->values_num;
901 wipe_ci_values(ci, time(NULL));
902 remove_from_queue(ci);
904 pthread_mutex_unlock (&cache_lock);
906 rrd_clear_error ();
907 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
908 if (status != 0)
909 {
910 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
911 "rrd_update_r (%s) failed with status %i. (%s)",
912 file, status, rrd_get_error());
913 }
915 journal_write("wrote", file);
916 pthread_cond_broadcast(&ci->flushed);
918 rrd_free_ptrs((void ***) &values, &values_num);
919 free(file);
921 if (status == 0)
922 {
923 pthread_mutex_lock (&stats_lock);
924 stats_updates_written++;
925 stats_data_sets_written += values_num;
926 pthread_mutex_unlock (&stats_lock);
927 }
929 pthread_mutex_lock (&cache_lock);
930 }
931 pthread_mutex_unlock (&cache_lock);
933 return (NULL);
934 } /* }}} void *queue_thread_main */
936 static int buffer_get_field (char **buffer_ret, /* {{{ */
937 size_t *buffer_size_ret, char **field_ret)
938 {
939 char *buffer;
940 size_t buffer_pos;
941 size_t buffer_size;
942 char *field;
943 size_t field_size;
944 int status;
946 buffer = *buffer_ret;
947 buffer_pos = 0;
948 buffer_size = *buffer_size_ret;
949 field = *buffer_ret;
950 field_size = 0;
952 if (buffer_size <= 0)
953 return (-1);
955 /* This is ensured by `handle_request'. */
956 assert (buffer[buffer_size - 1] == '\0');
958 status = -1;
959 while (buffer_pos < buffer_size)
960 {
961 /* Check for end-of-field or end-of-buffer */
962 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
963 {
964 field[field_size] = 0;
965 field_size++;
966 buffer_pos++;
967 status = 0;
968 break;
969 }
970 /* Handle escaped characters. */
971 else if (buffer[buffer_pos] == '\\')
972 {
973 if (buffer_pos >= (buffer_size - 1))
974 break;
975 buffer_pos++;
976 field[field_size] = buffer[buffer_pos];
977 field_size++;
978 buffer_pos++;
979 }
980 /* Normal operation */
981 else
982 {
983 field[field_size] = buffer[buffer_pos];
984 field_size++;
985 buffer_pos++;
986 }
987 } /* while (buffer_pos < buffer_size) */
989 if (status != 0)
990 return (status);
992 *buffer_ret = buffer + buffer_pos;
993 *buffer_size_ret = buffer_size - buffer_pos;
994 *field_ret = field;
996 return (0);
997 } /* }}} int buffer_get_field */
999 /* if we're restricting writes to the base directory,
1000 * check whether the file falls within the dir
1001 * returns 1 if OK, otherwise 0
1002 */
1003 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1004 {
1005 assert(file != NULL);
1007 if (!config_write_base_only
1008 || sock == NULL /* journal replay */
1009 || config_base_dir == NULL)
1010 return 1;
1012 if (strstr(file, "../") != NULL) goto err;
1014 /* relative paths without "../" are ok */
1015 if (*file != '/') return 1;
1017 /* file must be of the format base + "/" + <1+ char filename> */
1018 if (strlen(file) < _config_base_dir_len + 2) goto err;
1019 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1020 if (*(file + _config_base_dir_len) != '/') goto err;
1022 return 1;
1024 err:
1025 if (sock != NULL && sock->fd >= 0)
1026 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1028 return 0;
1029 } /* }}} static int check_file_access */
1031 /* when using a base dir, convert relative paths to absolute paths.
1032 * if necessary, modifies the "filename" pointer to point
1033 * to the new path created in "tmp". "tmp" is provided
1034 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1035 *
1036 * this allows us to optimize for the expected case (absolute path)
1037 * with a no-op.
1038 */
1039 static void get_abs_path(char **filename, char *tmp)
1040 {
1041 assert(tmp != NULL);
1042 assert(filename != NULL && *filename != NULL);
1044 if (config_base_dir == NULL || **filename == '/')
1045 return;
1047 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1048 *filename = tmp;
1049 } /* }}} static int get_abs_path */
1051 /* returns 1 if we have the required privilege level,
1052 * otherwise issue an error to the user on sock */
1053 static int has_privilege (listen_socket_t *sock, /* {{{ */
1054 socket_privilege priv)
1055 {
1056 if (sock == NULL) /* journal replay */
1057 return 1;
1059 if (sock->privilege >= priv)
1060 return 1;
1062 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1063 } /* }}} static int has_privilege */
1065 static int flush_file (const char *filename) /* {{{ */
1066 {
1067 cache_item_t *ci;
1069 pthread_mutex_lock (&cache_lock);
1071 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1072 if (ci == NULL)
1073 {
1074 pthread_mutex_unlock (&cache_lock);
1075 return (ENOENT);
1076 }
1078 if (ci->values_num > 0)
1079 {
1080 /* Enqueue at head */
1081 enqueue_cache_item (ci, HEAD);
1082 pthread_cond_wait(&ci->flushed, &cache_lock);
1083 }
1085 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1086 * may have been purged during our cond_wait() */
1088 pthread_mutex_unlock(&cache_lock);
1090 return (0);
1091 } /* }}} int flush_file */
1093 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1094 {
1095 char *err = "Syntax error.\n";
1097 if (cmd && cmd->syntax)
1098 err = cmd->syntax;
1100 return send_response(sock, RESP_ERR, "Usage: %s", err);
1101 } /* }}} static int syntax_error() */
1103 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1104 {
1105 uint64_t copy_queue_length;
1106 uint64_t copy_updates_received;
1107 uint64_t copy_flush_received;
1108 uint64_t copy_updates_written;
1109 uint64_t copy_data_sets_written;
1110 uint64_t copy_journal_bytes;
1111 uint64_t copy_journal_rotate;
1113 uint64_t tree_nodes_number;
1114 uint64_t tree_depth;
1116 pthread_mutex_lock (&stats_lock);
1117 copy_queue_length = stats_queue_length;
1118 copy_updates_received = stats_updates_received;
1119 copy_flush_received = stats_flush_received;
1120 copy_updates_written = stats_updates_written;
1121 copy_data_sets_written = stats_data_sets_written;
1122 copy_journal_bytes = stats_journal_bytes;
1123 copy_journal_rotate = stats_journal_rotate;
1124 pthread_mutex_unlock (&stats_lock);
1126 pthread_mutex_lock (&cache_lock);
1127 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1128 tree_depth = (uint64_t) g_tree_height (cache_tree);
1129 pthread_mutex_unlock (&cache_lock);
1131 add_response_info(sock,
1132 "QueueLength: %"PRIu64"\n", copy_queue_length);
1133 add_response_info(sock,
1134 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1135 add_response_info(sock,
1136 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1137 add_response_info(sock,
1138 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1139 add_response_info(sock,
1140 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1141 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1142 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1143 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1144 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1146 send_response(sock, RESP_OK, "Statistics follow\n");
1148 return (0);
1149 } /* }}} int handle_request_stats */
1151 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1152 {
1153 char *file, file_tmp[PATH_MAX];
1154 int status;
1156 status = buffer_get_field (&buffer, &buffer_size, &file);
1157 if (status != 0)
1158 {
1159 return syntax_error(sock,cmd);
1160 }
1161 else
1162 {
1163 pthread_mutex_lock(&stats_lock);
1164 stats_flush_received++;
1165 pthread_mutex_unlock(&stats_lock);
1167 get_abs_path(&file, file_tmp);
1168 if (!check_file_access(file, sock)) return 0;
1170 status = flush_file (file);
1171 if (status == 0)
1172 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1173 else if (status == ENOENT)
1174 {
1175 /* no file in our tree; see whether it exists at all */
1176 struct stat statbuf;
1178 memset(&statbuf, 0, sizeof(statbuf));
1179 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1180 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1181 else
1182 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1183 }
1184 else if (status < 0)
1185 return send_response(sock, RESP_ERR, "Internal error.\n");
1186 else
1187 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1188 }
1190 /* NOTREACHED */
1191 assert(1==0);
1192 } /* }}} int handle_request_flush */
1194 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1195 {
1196 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1198 pthread_mutex_lock(&cache_lock);
1199 flush_old_values(-1);
1200 pthread_mutex_unlock(&cache_lock);
1202 return send_response(sock, RESP_OK, "Started flush.\n");
1203 } /* }}} static int handle_request_flushall */
1205 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1206 {
1207 int status;
1208 char *file, file_tmp[PATH_MAX];
1209 cache_item_t *ci;
1211 status = buffer_get_field(&buffer, &buffer_size, &file);
1212 if (status != 0)
1213 return syntax_error(sock,cmd);
1215 get_abs_path(&file, file_tmp);
1217 pthread_mutex_lock(&cache_lock);
1218 ci = g_tree_lookup(cache_tree, file);
1219 if (ci == NULL)
1220 {
1221 pthread_mutex_unlock(&cache_lock);
1222 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1223 }
1225 for (size_t i=0; i < ci->values_num; i++)
1226 add_response_info(sock, "%s\n", ci->values[i]);
1228 pthread_mutex_unlock(&cache_lock);
1229 return send_response(sock, RESP_OK, "updates pending\n");
1230 } /* }}} static int handle_request_pending */
1232 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1233 {
1234 int status;
1235 gboolean found;
1236 char *file, file_tmp[PATH_MAX];
1238 status = buffer_get_field(&buffer, &buffer_size, &file);
1239 if (status != 0)
1240 return syntax_error(sock,cmd);
1242 get_abs_path(&file, file_tmp);
1243 if (!check_file_access(file, sock)) return 0;
1245 pthread_mutex_lock(&cache_lock);
1246 found = g_tree_remove(cache_tree, file);
1247 pthread_mutex_unlock(&cache_lock);
1249 if (found == TRUE)
1250 {
1251 if (sock != NULL)
1252 journal_write("forget", file);
1254 return send_response(sock, RESP_OK, "Gone!\n");
1255 }
1256 else
1257 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1259 /* NOTREACHED */
1260 assert(1==0);
1261 } /* }}} static int handle_request_forget */
1263 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1264 {
1265 cache_item_t *ci;
1267 pthread_mutex_lock(&cache_lock);
1269 ci = cache_queue_head;
1270 while (ci != NULL)
1271 {
1272 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1273 ci = ci->next;
1274 }
1276 pthread_mutex_unlock(&cache_lock);
1278 return send_response(sock, RESP_OK, "in queue.\n");
1279 } /* }}} int handle_request_queue */
1281 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1282 {
1283 char *file, file_tmp[PATH_MAX];
1284 int values_num = 0;
1285 int status;
1286 char orig_buf[CMD_MAX];
1288 cache_item_t *ci;
1290 /* save it for the journal later */
1291 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1293 status = buffer_get_field (&buffer, &buffer_size, &file);
1294 if (status != 0)
1295 return syntax_error(sock,cmd);
1297 pthread_mutex_lock(&stats_lock);
1298 stats_updates_received++;
1299 pthread_mutex_unlock(&stats_lock);
1301 get_abs_path(&file, file_tmp);
1302 if (!check_file_access(file, sock)) return 0;
1304 pthread_mutex_lock (&cache_lock);
1305 ci = g_tree_lookup (cache_tree, file);
1307 if (ci == NULL) /* {{{ */
1308 {
1309 struct stat statbuf;
1311 /* don't hold the lock while we setup; stat(2) might block */
1312 pthread_mutex_unlock(&cache_lock);
1314 memset (&statbuf, 0, sizeof (statbuf));
1315 status = stat (file, &statbuf);
1316 if (status != 0)
1317 {
1318 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1320 status = errno;
1321 if (status == ENOENT)
1322 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1323 else
1324 return send_response(sock, RESP_ERR,
1325 "stat failed with error %i.\n", status);
1326 }
1327 if (!S_ISREG (statbuf.st_mode))
1328 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1330 if (access(file, R_OK|W_OK) != 0)
1331 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1332 file, rrd_strerror(errno));
1334 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1335 if (ci == NULL)
1336 {
1337 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1339 return send_response(sock, RESP_ERR, "malloc failed.\n");
1340 }
1341 memset (ci, 0, sizeof (cache_item_t));
1343 ci->file = strdup (file);
1344 if (ci->file == NULL)
1345 {
1346 free (ci);
1347 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1349 return send_response(sock, RESP_ERR, "strdup failed.\n");
1350 }
1352 wipe_ci_values(ci, now);
1353 ci->flags = CI_FLAGS_IN_TREE;
1354 pthread_cond_init(&ci->flushed, NULL);
1356 pthread_mutex_lock(&cache_lock);
1357 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1358 } /* }}} */
1359 assert (ci != NULL);
1361 /* don't re-write updates in replay mode */
1362 if (sock != NULL)
1363 journal_write("update", orig_buf);
1365 while (buffer_size > 0)
1366 {
1367 char *value;
1368 time_t stamp;
1369 char *eostamp;
1371 status = buffer_get_field (&buffer, &buffer_size, &value);
1372 if (status != 0)
1373 {
1374 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1375 break;
1376 }
1378 /* make sure update time is always moving forward */
1379 stamp = strtol(value, &eostamp, 10);
1380 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1381 {
1382 pthread_mutex_unlock(&cache_lock);
1383 return send_response(sock, RESP_ERR,
1384 "Cannot find timestamp in '%s'!\n", value);
1385 }
1386 else if (stamp <= ci->last_update_stamp)
1387 {
1388 pthread_mutex_unlock(&cache_lock);
1389 return send_response(sock, RESP_ERR,
1390 "illegal attempt to update using time %ld when last"
1391 " update time is %ld (minimum one second step)\n",
1392 stamp, ci->last_update_stamp);
1393 }
1394 else
1395 ci->last_update_stamp = stamp;
1397 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1398 {
1399 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1400 continue;
1401 }
1403 values_num++;
1404 }
1406 if (((now - ci->last_flush_time) >= config_write_interval)
1407 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1408 && (ci->values_num > 0))
1409 {
1410 enqueue_cache_item (ci, TAIL);
1411 }
1413 pthread_mutex_unlock (&cache_lock);
1415 if (values_num < 1)
1416 return send_response(sock, RESP_ERR, "No values updated.\n");
1417 else
1418 return send_response(sock, RESP_OK,
1419 "errors, enqueued %i value(s).\n", values_num);
1421 /* NOTREACHED */
1422 assert(1==0);
1424 } /* }}} int handle_request_update */
1426 /* we came across a "WROTE" entry during journal replay.
1427 * throw away any values that we have accumulated for this file
1428 */
1429 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1430 {
1431 cache_item_t *ci;
1432 const char *file = buffer;
1434 pthread_mutex_lock(&cache_lock);
1436 ci = g_tree_lookup(cache_tree, file);
1437 if (ci == NULL)
1438 {
1439 pthread_mutex_unlock(&cache_lock);
1440 return (0);
1441 }
1443 if (ci->values)
1444 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1446 wipe_ci_values(ci, now);
1447 remove_from_queue(ci);
1449 pthread_mutex_unlock(&cache_lock);
1450 return (0);
1451 } /* }}} int handle_request_wrote */
1453 /* start "BATCH" processing */
1454 static int batch_start (HANDLER_PROTO) /* {{{ */
1455 {
1456 int status;
1457 if (sock->batch_start)
1458 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1460 status = send_response(sock, RESP_OK,
1461 "Go ahead. End with dot '.' on its own line.\n");
1462 sock->batch_start = time(NULL);
1463 sock->batch_cmd = 0;
1465 return status;
1466 } /* }}} static int batch_start */
1468 /* finish "BATCH" processing and return results to the client */
1469 static int batch_done (HANDLER_PROTO) /* {{{ */
1470 {
1471 assert(sock->batch_start);
1472 sock->batch_start = 0;
1473 sock->batch_cmd = 0;
1474 return send_response(sock, RESP_OK, "errors\n");
1475 } /* }}} static int batch_done */
1477 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1478 {
1479 return -1;
1480 } /* }}} static int handle_request_quit */
1482 struct command COMMANDS[] = {
1483 {
1484 "UPDATE",
1485 handle_request_update,
1486 PRIV_HIGH,
1487 CMD_CONTEXT_ANY,
1488 "UPDATE <filename> <values> [<values> ...]\n"
1489 ,
1490 "Adds the given file to the internal cache if it is not yet known and\n"
1491 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1492 "for details.\n"
1493 "\n"
1494 "Each <values> has the following form:\n"
1495 " <values> = <time>:<value>[:<value>[...]]\n"
1496 "See the rrdupdate(1) manpage for details.\n"
1497 },
1498 {
1499 "WROTE",
1500 handle_request_wrote,
1501 PRIV_HIGH,
1502 CMD_CONTEXT_JOURNAL,
1503 NULL,
1504 NULL
1505 },
1506 {
1507 "FLUSH",
1508 handle_request_flush,
1509 PRIV_LOW,
1510 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1511 "FLUSH <filename>\n"
1512 ,
1513 "Adds the given filename to the head of the update queue and returns\n"
1514 "after it has been dequeued.\n"
1515 },
1516 {
1517 "FLUSHALL",
1518 handle_request_flushall,
1519 PRIV_HIGH,
1520 CMD_CONTEXT_CLIENT,
1521 "FLUSHALL\n"
1522 ,
1523 "Triggers writing of all pending updates. Returns immediately.\n"
1524 },
1525 {
1526 "PENDING",
1527 handle_request_pending,
1528 PRIV_HIGH,
1529 CMD_CONTEXT_CLIENT,
1530 "PENDING <filename>\n"
1531 ,
1532 "Shows any 'pending' updates for a file, in order.\n"
1533 "The updates shown have not yet been written to the underlying RRD file.\n"
1534 },
1535 {
1536 "FORGET",
1537 handle_request_forget,
1538 PRIV_HIGH,
1539 CMD_CONTEXT_ANY,
1540 "FORGET <filename>\n"
1541 ,
1542 "Removes the file completely from the cache.\n"
1543 "Any pending updates for the file will be lost.\n"
1544 },
1545 {
1546 "QUEUE",
1547 handle_request_queue,
1548 PRIV_LOW,
1549 CMD_CONTEXT_CLIENT,
1550 "QUEUE\n"
1551 ,
1552 "Shows all files in the output queue.\n"
1553 "The output is zero or more lines in the following format:\n"
1554 "(where <num_vals> is the number of values to be written)\n"
1555 "\n"
1556 "<num_vals> <filename>\n"
1557 },
1558 {
1559 "STATS",
1560 handle_request_stats,
1561 PRIV_LOW,
1562 CMD_CONTEXT_CLIENT,
1563 "STATS\n"
1564 ,
1565 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1566 "a description of the values.\n"
1567 },
1568 {
1569 "HELP",
1570 handle_request_help,
1571 PRIV_LOW,
1572 CMD_CONTEXT_CLIENT,
1573 "HELP [<command>]\n",
1574 NULL, /* special! */
1575 },
1576 {
1577 "BATCH",
1578 batch_start,
1579 PRIV_LOW,
1580 CMD_CONTEXT_CLIENT,
1581 "BATCH\n"
1582 ,
1583 "The 'BATCH' command permits the client to initiate a bulk load\n"
1584 " of commands to rrdcached.\n"
1585 "\n"
1586 "Usage:\n"
1587 "\n"
1588 " client: BATCH\n"
1589 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1590 " client: command #1\n"
1591 " client: command #2\n"
1592 " client: ... and so on\n"
1593 " client: .\n"
1594 " server: 2 errors\n"
1595 " server: 7 message for command #7\n"
1596 " server: 9 message for command #9\n"
1597 "\n"
1598 "For more information, consult the rrdcached(1) documentation.\n"
1599 },
1600 {
1601 ".", /* BATCH terminator */
1602 batch_done,
1603 PRIV_LOW,
1604 CMD_CONTEXT_BATCH,
1605 NULL,
1606 NULL
1607 },
1608 {
1609 "QUIT",
1610 handle_request_quit,
1611 PRIV_LOW,
1612 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1613 "QUIT\n"
1614 ,
1615 "Disconnect from rrdcached.\n"
1616 },
1617 {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */
1618 };
1620 static struct command *find_command(char *cmd)
1621 {
1622 struct command *c = COMMANDS;
1624 while (c->cmd != NULL)
1625 {
1626 if (strcasecmp(cmd, c->cmd) == 0)
1627 break;
1628 c++;
1629 }
1631 if (c->cmd == NULL)
1632 return NULL;
1633 else
1634 return c;
1635 }
1637 /* check whether commands are received in the expected context */
1638 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1639 {
1640 if (sock == NULL)
1641 return (cmd->context & CMD_CONTEXT_JOURNAL);
1642 else if (sock->batch_start)
1643 return (cmd->context & CMD_CONTEXT_BATCH);
1644 else
1645 return (cmd->context & CMD_CONTEXT_CLIENT);
1647 /* NOTREACHED */
1648 assert(1==0);
1649 }
1651 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1652 {
1653 int status;
1654 char *cmd_str;
1655 char *resp_txt;
1656 struct command *help = NULL;
1658 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1659 if (status == 0)
1660 help = find_command(cmd_str);
1662 if (help && (help->syntax || help->help))
1663 {
1664 char tmp[CMD_MAX];
1666 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1667 resp_txt = tmp;
1669 if (help->syntax)
1670 add_response_info(sock, "Usage: %s\n", help->syntax);
1672 if (help->help)
1673 add_response_info(sock, "%s\n", help->help);
1674 }
1675 else
1676 {
1677 help = COMMANDS;
1678 resp_txt = "Command overview\n";
1680 while (help->cmd)
1681 {
1682 if (help->syntax)
1683 add_response_info(sock, "%s", help->syntax);
1684 help++;
1685 }
1686 }
1688 return send_response(sock, RESP_OK, resp_txt);
1689 } /* }}} int handle_request_help */
1691 /* if sock==NULL, we are in journal replay mode */
1692 static int handle_request (DISPATCH_PROTO) /* {{{ */
1693 {
1694 char *buffer_ptr = buffer;
1695 char *cmd_str = NULL;
1696 struct command *cmd = NULL;
1697 int status;
1699 assert (buffer[buffer_size - 1] == '\0');
1701 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1702 if (status != 0)
1703 {
1704 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1705 return (-1);
1706 }
1708 if (sock != NULL && sock->batch_start)
1709 sock->batch_cmd++;
1711 cmd = find_command(cmd_str);
1712 if (!cmd)
1713 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1715 status = has_privilege(sock, cmd->min_priv);
1716 if (status <= 0)
1717 return status;
1719 if (!command_check_context(sock, cmd))
1720 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1722 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1723 } /* }}} int handle_request */
1725 /* MUST NOT hold journal_lock before calling this */
1726 static void journal_rotate(void) /* {{{ */
1727 {
1728 FILE *old_fh = NULL;
1729 int new_fd;
1731 if (journal_cur == NULL || journal_old == NULL)
1732 return;
1734 pthread_mutex_lock(&journal_lock);
1736 /* we rotate this way (rename before close) so that the we can release
1737 * the journal lock as fast as possible. Journal writes to the new
1738 * journal can proceed immediately after the new file is opened. The
1739 * fclose can then block without affecting new updates.
1740 */
1741 if (journal_fh != NULL)
1742 {
1743 old_fh = journal_fh;
1744 journal_fh = NULL;
1745 rename(journal_cur, journal_old);
1746 ++stats_journal_rotate;
1747 }
1749 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1750 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1751 if (new_fd >= 0)
1752 {
1753 journal_fh = fdopen(new_fd, "a");
1754 if (journal_fh == NULL)
1755 close(new_fd);
1756 }
1758 pthread_mutex_unlock(&journal_lock);
1760 if (old_fh != NULL)
1761 fclose(old_fh);
1763 if (journal_fh == NULL)
1764 {
1765 RRDD_LOG(LOG_CRIT,
1766 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1767 journal_cur, rrd_strerror(errno));
1769 RRDD_LOG(LOG_ERR,
1770 "JOURNALING DISABLED: All values will be flushed at shutdown");
1771 config_flush_at_shutdown = 1;
1772 }
1774 } /* }}} static void journal_rotate */
1776 static void journal_done(void) /* {{{ */
1777 {
1778 if (journal_cur == NULL)
1779 return;
1781 pthread_mutex_lock(&journal_lock);
1782 if (journal_fh != NULL)
1783 {
1784 fclose(journal_fh);
1785 journal_fh = NULL;
1786 }
1788 if (config_flush_at_shutdown)
1789 {
1790 RRDD_LOG(LOG_INFO, "removing journals");
1791 unlink(journal_old);
1792 unlink(journal_cur);
1793 }
1794 else
1795 {
1796 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1797 "journals will be used at next startup");
1798 }
1800 pthread_mutex_unlock(&journal_lock);
1802 } /* }}} static void journal_done */
1804 static int journal_write(char *cmd, char *args) /* {{{ */
1805 {
1806 int chars;
1808 if (journal_fh == NULL)
1809 return 0;
1811 pthread_mutex_lock(&journal_lock);
1812 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1813 pthread_mutex_unlock(&journal_lock);
1815 if (chars > 0)
1816 {
1817 pthread_mutex_lock(&stats_lock);
1818 stats_journal_bytes += chars;
1819 pthread_mutex_unlock(&stats_lock);
1820 }
1822 return chars;
1823 } /* }}} static int journal_write */
1825 static int journal_replay (const char *file) /* {{{ */
1826 {
1827 FILE *fh;
1828 int entry_cnt = 0;
1829 int fail_cnt = 0;
1830 uint64_t line = 0;
1831 char entry[CMD_MAX];
1832 time_t now;
1834 if (file == NULL) return 0;
1836 {
1837 char *reason = "unknown error";
1838 int status = 0;
1839 struct stat statbuf;
1841 memset(&statbuf, 0, sizeof(statbuf));
1842 if (stat(file, &statbuf) != 0)
1843 {
1844 if (errno == ENOENT)
1845 return 0;
1847 reason = "stat error";
1848 status = errno;
1849 }
1850 else if (!S_ISREG(statbuf.st_mode))
1851 {
1852 reason = "not a regular file";
1853 status = EPERM;
1854 }
1855 if (statbuf.st_uid != daemon_uid)
1856 {
1857 reason = "not owned by daemon user";
1858 status = EACCES;
1859 }
1860 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1861 {
1862 reason = "must not be user/group writable";
1863 status = EACCES;
1864 }
1866 if (status != 0)
1867 {
1868 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1869 file, rrd_strerror(status), reason);
1870 return 0;
1871 }
1872 }
1874 fh = fopen(file, "r");
1875 if (fh == NULL)
1876 {
1877 if (errno != ENOENT)
1878 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1879 file, rrd_strerror(errno));
1880 return 0;
1881 }
1882 else
1883 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1885 now = time(NULL);
1887 while(!feof(fh))
1888 {
1889 size_t entry_len;
1891 ++line;
1892 if (fgets(entry, sizeof(entry), fh) == NULL)
1893 break;
1894 entry_len = strlen(entry);
1896 /* check \n termination in case journal writing crashed mid-line */
1897 if (entry_len == 0)
1898 continue;
1899 else if (entry[entry_len - 1] != '\n')
1900 {
1901 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1902 ++fail_cnt;
1903 continue;
1904 }
1906 entry[entry_len - 1] = '\0';
1908 if (handle_request(NULL, now, entry, entry_len) == 0)
1909 ++entry_cnt;
1910 else
1911 ++fail_cnt;
1912 }
1914 fclose(fh);
1916 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1917 entry_cnt, fail_cnt);
1919 return entry_cnt > 0 ? 1 : 0;
1920 } /* }}} static int journal_replay */
1922 static void journal_init(void) /* {{{ */
1923 {
1924 int had_journal = 0;
1926 if (journal_cur == NULL) return;
1928 pthread_mutex_lock(&journal_lock);
1930 RRDD_LOG(LOG_INFO, "checking for journal files");
1932 had_journal += journal_replay(journal_old);
1933 had_journal += journal_replay(journal_cur);
1935 /* it must have been a crash. start a flush */
1936 if (had_journal && config_flush_at_shutdown)
1937 flush_old_values(-1);
1939 pthread_mutex_unlock(&journal_lock);
1940 journal_rotate();
1942 RRDD_LOG(LOG_INFO, "journal processing complete");
1944 } /* }}} static void journal_init */
1946 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1947 {
1948 assert(sock != NULL);
1950 free(sock->rbuf); sock->rbuf = NULL;
1951 free(sock->wbuf); sock->wbuf = NULL;
1952 free(sock);
1953 } /* }}} void free_listen_socket */
1955 static void close_connection(listen_socket_t *sock) /* {{{ */
1956 {
1957 if (sock->fd >= 0)
1958 {
1959 close(sock->fd);
1960 sock->fd = -1;
1961 }
1963 free_listen_socket(sock);
1965 } /* }}} void close_connection */
1967 static void *connection_thread_main (void *args) /* {{{ */
1968 {
1969 listen_socket_t *sock;
1970 int fd;
1972 sock = (listen_socket_t *) args;
1973 fd = sock->fd;
1975 /* init read buffers */
1976 sock->next_read = sock->next_cmd = 0;
1977 sock->rbuf = malloc(RBUF_SIZE);
1978 if (sock->rbuf == NULL)
1979 {
1980 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1981 close_connection(sock);
1982 return NULL;
1983 }
1985 pthread_mutex_lock (&connection_threads_lock);
1986 connection_threads_num++;
1987 pthread_mutex_unlock (&connection_threads_lock);
1989 while (do_shutdown == 0)
1990 {
1991 char *cmd;
1992 ssize_t cmd_len;
1993 ssize_t rbytes;
1994 time_t now;
1996 struct pollfd pollfd;
1997 int status;
1999 pollfd.fd = fd;
2000 pollfd.events = POLLIN | POLLPRI;
2001 pollfd.revents = 0;
2003 status = poll (&pollfd, 1, /* timeout = */ 500);
2004 if (do_shutdown)
2005 break;
2006 else if (status == 0) /* timeout */
2007 continue;
2008 else if (status < 0) /* error */
2009 {
2010 status = errno;
2011 if (status != EINTR)
2012 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2013 continue;
2014 }
2016 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2017 break;
2018 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2019 {
2020 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2021 "poll(2) returned something unexpected: %#04hx",
2022 pollfd.revents);
2023 break;
2024 }
2026 rbytes = read(fd, sock->rbuf + sock->next_read,
2027 RBUF_SIZE - sock->next_read);
2028 if (rbytes < 0)
2029 {
2030 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2031 break;
2032 }
2033 else if (rbytes == 0)
2034 break; /* eof */
2036 sock->next_read += rbytes;
2038 if (sock->batch_start)
2039 now = sock->batch_start;
2040 else
2041 now = time(NULL);
2043 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2044 {
2045 status = handle_request (sock, now, cmd, cmd_len+1);
2046 if (status != 0)
2047 goto out_close;
2048 }
2049 }
2051 out_close:
2052 close_connection(sock);
2054 /* Remove this thread from the connection threads list */
2055 pthread_mutex_lock (&connection_threads_lock);
2056 connection_threads_num--;
2057 if (connection_threads_num <= 0)
2058 pthread_cond_broadcast(&connection_threads_done);
2059 pthread_mutex_unlock (&connection_threads_lock);
2061 return (NULL);
2062 } /* }}} void *connection_thread_main */
2064 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2065 {
2066 int fd;
2067 struct sockaddr_un sa;
2068 listen_socket_t *temp;
2069 int status;
2070 const char *path;
2072 path = sock->addr;
2073 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2074 path += strlen("unix:");
2076 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2077 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2078 if (temp == NULL)
2079 {
2080 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2081 return (-1);
2082 }
2083 listen_fds = temp;
2084 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2086 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2087 if (fd < 0)
2088 {
2089 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2090 rrd_strerror(errno));
2091 return (-1);
2092 }
2094 memset (&sa, 0, sizeof (sa));
2095 sa.sun_family = AF_UNIX;
2096 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2098 /* if we've gotten this far, we own the pid file. any daemon started
2099 * with the same args must not be alive. therefore, ensure that we can
2100 * create the socket...
2101 */
2102 unlink(path);
2104 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2105 if (status != 0)
2106 {
2107 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2108 path, rrd_strerror(errno));
2109 close (fd);
2110 return (-1);
2111 }
2113 status = listen (fd, /* backlog = */ 10);
2114 if (status != 0)
2115 {
2116 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2117 path, rrd_strerror(errno));
2118 close (fd);
2119 unlink (path);
2120 return (-1);
2121 }
2123 listen_fds[listen_fds_num].fd = fd;
2124 listen_fds[listen_fds_num].family = PF_UNIX;
2125 strncpy(listen_fds[listen_fds_num].addr, path,
2126 sizeof (listen_fds[listen_fds_num].addr) - 1);
2127 listen_fds_num++;
2129 return (0);
2130 } /* }}} int open_listen_socket_unix */
2132 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2133 {
2134 struct addrinfo ai_hints;
2135 struct addrinfo *ai_res;
2136 struct addrinfo *ai_ptr;
2137 char addr_copy[NI_MAXHOST];
2138 char *addr;
2139 char *port;
2140 int status;
2142 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2143 addr_copy[sizeof (addr_copy) - 1] = 0;
2144 addr = addr_copy;
2146 memset (&ai_hints, 0, sizeof (ai_hints));
2147 ai_hints.ai_flags = 0;
2148 #ifdef AI_ADDRCONFIG
2149 ai_hints.ai_flags |= AI_ADDRCONFIG;
2150 #endif
2151 ai_hints.ai_family = AF_UNSPEC;
2152 ai_hints.ai_socktype = SOCK_STREAM;
2154 port = NULL;
2155 if (*addr == '[') /* IPv6+port format */
2156 {
2157 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2158 addr++;
2160 port = strchr (addr, ']');
2161 if (port == NULL)
2162 {
2163 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2164 return (-1);
2165 }
2166 *port = 0;
2167 port++;
2169 if (*port == ':')
2170 port++;
2171 else if (*port == 0)
2172 port = NULL;
2173 else
2174 {
2175 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2176 return (-1);
2177 }
2178 } /* if (*addr = ']') */
2179 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2180 {
2181 port = rindex(addr, ':');
2182 if (port != NULL)
2183 {
2184 *port = 0;
2185 port++;
2186 }
2187 }
2188 ai_res = NULL;
2189 status = getaddrinfo (addr,
2190 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2191 &ai_hints, &ai_res);
2192 if (status != 0)
2193 {
2194 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2195 addr, gai_strerror (status));
2196 return (-1);
2197 }
2199 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2200 {
2201 int fd;
2202 listen_socket_t *temp;
2203 int one = 1;
2205 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2206 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2207 if (temp == NULL)
2208 {
2209 fprintf (stderr,
2210 "rrdcached: open_listen_socket_network: realloc failed.\n");
2211 continue;
2212 }
2213 listen_fds = temp;
2214 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2216 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2217 if (fd < 0)
2218 {
2219 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2220 rrd_strerror(errno));
2221 continue;
2222 }
2224 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2226 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2227 if (status != 0)
2228 {
2229 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2230 sock->addr, rrd_strerror(errno));
2231 close (fd);
2232 continue;
2233 }
2235 status = listen (fd, /* backlog = */ 10);
2236 if (status != 0)
2237 {
2238 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2239 sock->addr, rrd_strerror(errno));
2240 close (fd);
2241 freeaddrinfo(ai_res);
2242 return (-1);
2243 }
2245 listen_fds[listen_fds_num].fd = fd;
2246 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2247 listen_fds_num++;
2248 } /* for (ai_ptr) */
2250 freeaddrinfo(ai_res);
2251 return (0);
2252 } /* }}} static int open_listen_socket_network */
2254 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2255 {
2256 assert(sock != NULL);
2257 assert(sock->addr != NULL);
2259 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2260 || sock->addr[0] == '/')
2261 return (open_listen_socket_unix(sock));
2262 else
2263 return (open_listen_socket_network(sock));
2264 } /* }}} int open_listen_socket */
2266 static int close_listen_sockets (void) /* {{{ */
2267 {
2268 size_t i;
2270 for (i = 0; i < listen_fds_num; i++)
2271 {
2272 close (listen_fds[i].fd);
2274 if (listen_fds[i].family == PF_UNIX)
2275 unlink(listen_fds[i].addr);
2276 }
2278 free (listen_fds);
2279 listen_fds = NULL;
2280 listen_fds_num = 0;
2282 return (0);
2283 } /* }}} int close_listen_sockets */
2285 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2286 {
2287 struct pollfd *pollfds;
2288 int pollfds_num;
2289 int status;
2290 int i;
2292 if (listen_fds_num < 1)
2293 {
2294 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2295 return (NULL);
2296 }
2298 pollfds_num = listen_fds_num;
2299 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2300 if (pollfds == NULL)
2301 {
2302 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2303 return (NULL);
2304 }
2305 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2307 RRDD_LOG(LOG_INFO, "listening for connections");
2309 while (do_shutdown == 0)
2310 {
2311 for (i = 0; i < pollfds_num; i++)
2312 {
2313 pollfds[i].fd = listen_fds[i].fd;
2314 pollfds[i].events = POLLIN | POLLPRI;
2315 pollfds[i].revents = 0;
2316 }
2318 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2319 if (do_shutdown)
2320 break;
2321 else if (status == 0) /* timeout */
2322 continue;
2323 else if (status < 0) /* error */
2324 {
2325 status = errno;
2326 if (status != EINTR)
2327 {
2328 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2329 }
2330 continue;
2331 }
2333 for (i = 0; i < pollfds_num; i++)
2334 {
2335 listen_socket_t *client_sock;
2336 struct sockaddr_storage client_sa;
2337 socklen_t client_sa_size;
2338 pthread_t tid;
2339 pthread_attr_t attr;
2341 if (pollfds[i].revents == 0)
2342 continue;
2344 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2345 {
2346 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2347 "poll(2) returned something unexpected for listen FD #%i.",
2348 pollfds[i].fd);
2349 continue;
2350 }
2352 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2353 if (client_sock == NULL)
2354 {
2355 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2356 continue;
2357 }
2358 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2360 client_sa_size = sizeof (client_sa);
2361 client_sock->fd = accept (pollfds[i].fd,
2362 (struct sockaddr *) &client_sa, &client_sa_size);
2363 if (client_sock->fd < 0)
2364 {
2365 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2366 free(client_sock);
2367 continue;
2368 }
2370 pthread_attr_init (&attr);
2371 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2373 status = pthread_create (&tid, &attr, connection_thread_main,
2374 client_sock);
2375 if (status != 0)
2376 {
2377 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2378 close_connection(client_sock);
2379 continue;
2380 }
2381 } /* for (pollfds_num) */
2382 } /* while (do_shutdown == 0) */
2384 RRDD_LOG(LOG_INFO, "starting shutdown");
2386 close_listen_sockets ();
2388 pthread_mutex_lock (&connection_threads_lock);
2389 while (connection_threads_num > 0)
2390 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2391 pthread_mutex_unlock (&connection_threads_lock);
2393 free(pollfds);
2395 return (NULL);
2396 } /* }}} void *listen_thread_main */
2398 static int daemonize (void) /* {{{ */
2399 {
2400 int pid_fd;
2401 char *base_dir;
2403 daemon_uid = geteuid();
2405 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2406 if (pid_fd < 0)
2407 pid_fd = check_pidfile();
2408 if (pid_fd < 0)
2409 return pid_fd;
2411 /* open all the listen sockets */
2412 if (config_listen_address_list_len > 0)
2413 {
2414 for (size_t i = 0; i < config_listen_address_list_len; i++)
2415 open_listen_socket (config_listen_address_list[i]);
2417 rrd_free_ptrs((void ***) &config_listen_address_list,
2418 &config_listen_address_list_len);
2419 }
2420 else
2421 {
2422 listen_socket_t sock;
2423 memset(&sock, 0, sizeof(sock));
2424 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2425 open_listen_socket (&sock);
2426 }
2428 if (listen_fds_num < 1)
2429 {
2430 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2431 goto error;
2432 }
2434 if (!stay_foreground)
2435 {
2436 pid_t child;
2438 child = fork ();
2439 if (child < 0)
2440 {
2441 fprintf (stderr, "daemonize: fork(2) failed.\n");
2442 goto error;
2443 }
2444 else if (child > 0)
2445 exit(0);
2447 /* Become session leader */
2448 setsid ();
2450 /* Open the first three file descriptors to /dev/null */
2451 close (2);
2452 close (1);
2453 close (0);
2455 open ("/dev/null", O_RDWR);
2456 if (dup(0) == -1 || dup(0) == -1){
2457 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2458 }
2459 } /* if (!stay_foreground) */
2461 /* Change into the /tmp directory. */
2462 base_dir = (config_base_dir != NULL)
2463 ? config_base_dir
2464 : "/tmp";
2466 if (chdir (base_dir) != 0)
2467 {
2468 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2469 goto error;
2470 }
2472 install_signal_handlers();
2474 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2475 RRDD_LOG(LOG_INFO, "starting up");
2477 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2478 (GDestroyNotify) free_cache_item);
2479 if (cache_tree == NULL)
2480 {
2481 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2482 goto error;
2483 }
2485 return write_pidfile (pid_fd);
2487 error:
2488 remove_pidfile();
2489 return -1;
2490 } /* }}} int daemonize */
2492 static int cleanup (void) /* {{{ */
2493 {
2494 do_shutdown++;
2496 pthread_cond_broadcast (&flush_cond);
2497 pthread_join (flush_thread, NULL);
2499 pthread_cond_broadcast (&queue_cond);
2500 for (int i = 0; i < config_queue_threads; i++)
2501 pthread_join (queue_threads[i], NULL);
2503 if (config_flush_at_shutdown)
2504 {
2505 assert(cache_queue_head == NULL);
2506 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2507 }
2509 journal_done();
2510 remove_pidfile ();
2512 free(queue_threads);
2513 free(config_base_dir);
2514 free(config_pid_file);
2515 free(journal_cur);
2516 free(journal_old);
2518 pthread_mutex_lock(&cache_lock);
2519 g_tree_destroy(cache_tree);
2521 RRDD_LOG(LOG_INFO, "goodbye");
2522 closelog ();
2524 return (0);
2525 } /* }}} int cleanup */
2527 static int read_options (int argc, char **argv) /* {{{ */
2528 {
2529 int option;
2530 int status = 0;
2532 while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2533 {
2534 switch (option)
2535 {
2536 case 'g':
2537 stay_foreground=1;
2538 break;
2540 case 'L':
2541 case 'l':
2542 {
2543 listen_socket_t *new;
2545 new = malloc(sizeof(listen_socket_t));
2546 if (new == NULL)
2547 {
2548 fprintf(stderr, "read_options: malloc failed.\n");
2549 return(2);
2550 }
2551 memset(new, 0, sizeof(listen_socket_t));
2553 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2554 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2556 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2557 &config_listen_address_list_len, new))
2558 {
2559 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2560 return (2);
2561 }
2562 }
2563 break;
2565 case 'f':
2566 {
2567 int temp;
2569 temp = atoi (optarg);
2570 if (temp > 0)
2571 config_flush_interval = temp;
2572 else
2573 {
2574 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2575 status = 3;
2576 }
2577 }
2578 break;
2580 case 'w':
2581 {
2582 int temp;
2584 temp = atoi (optarg);
2585 if (temp > 0)
2586 config_write_interval = temp;
2587 else
2588 {
2589 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2590 status = 2;
2591 }
2592 }
2593 break;
2595 case 'z':
2596 {
2597 int temp;
2599 temp = atoi(optarg);
2600 if (temp > 0)
2601 config_write_jitter = temp;
2602 else
2603 {
2604 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2605 status = 2;
2606 }
2608 break;
2609 }
2611 case 't':
2612 {
2613 int threads;
2614 threads = atoi(optarg);
2615 if (threads >= 1)
2616 config_queue_threads = threads;
2617 else
2618 {
2619 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2620 return 1;
2621 }
2622 }
2623 break;
2625 case 'B':
2626 config_write_base_only = 1;
2627 break;
2629 case 'b':
2630 {
2631 size_t len;
2632 char base_realpath[PATH_MAX];
2634 if (config_base_dir != NULL)
2635 free (config_base_dir);
2636 config_base_dir = strdup (optarg);
2637 if (config_base_dir == NULL)
2638 {
2639 fprintf (stderr, "read_options: strdup failed.\n");
2640 return (3);
2641 }
2643 /* make sure that the base directory is not resolved via
2644 * symbolic links. this makes some performance-enhancing
2645 * assumptions possible (we don't have to resolve paths
2646 * that start with a "/")
2647 */
2648 if (realpath(config_base_dir, base_realpath) == NULL)
2649 {
2650 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2651 return 5;
2652 }
2653 else if (strncmp(config_base_dir,
2654 base_realpath, sizeof(base_realpath)) != 0)
2655 {
2656 fprintf(stderr,
2657 "Base directory (-b) resolved via file system links!\n"
2658 "Please consult rrdcached '-b' documentation!\n"
2659 "Consider specifying the real directory (%s)\n",
2660 base_realpath);
2661 return 5;
2662 }
2664 len = strlen (config_base_dir);
2665 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2666 {
2667 config_base_dir[len - 1] = 0;
2668 len--;
2669 }
2671 if (len < 1)
2672 {
2673 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2674 return (4);
2675 }
2677 _config_base_dir_len = len;
2678 }
2679 break;
2681 case 'p':
2682 {
2683 if (config_pid_file != NULL)
2684 free (config_pid_file);
2685 config_pid_file = strdup (optarg);
2686 if (config_pid_file == NULL)
2687 {
2688 fprintf (stderr, "read_options: strdup failed.\n");
2689 return (3);
2690 }
2691 }
2692 break;
2694 case 'F':
2695 config_flush_at_shutdown = 1;
2696 break;
2698 case 'j':
2699 {
2700 struct stat statbuf;
2701 const char *dir = optarg;
2703 status = stat(dir, &statbuf);
2704 if (status != 0)
2705 {
2706 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2707 return 6;
2708 }
2710 if (!S_ISDIR(statbuf.st_mode)
2711 || access(dir, R_OK|W_OK|X_OK) != 0)
2712 {
2713 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2714 errno ? rrd_strerror(errno) : "");
2715 return 6;
2716 }
2718 journal_cur = malloc(PATH_MAX + 1);
2719 journal_old = malloc(PATH_MAX + 1);
2720 if (journal_cur == NULL || journal_old == NULL)
2721 {
2722 fprintf(stderr, "malloc failure for journal files\n");
2723 return 6;
2724 }
2725 else
2726 {
2727 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2728 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2729 }
2730 }
2731 break;
2733 case 'h':
2734 case '?':
2735 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2736 "\n"
2737 "Usage: rrdcached [options]\n"
2738 "\n"
2739 "Valid options are:\n"
2740 " -l <address> Socket address to listen to.\n"
2741 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2742 " -w <seconds> Interval in which to write data.\n"
2743 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2744 " -t <threads> Number of write threads.\n"
2745 " -f <seconds> Interval in which to flush dead data.\n"
2746 " -p <file> Location of the PID-file.\n"
2747 " -b <dir> Base directory to change to.\n"
2748 " -B Restrict file access to paths within -b <dir>\n"
2749 " -g Do not fork and run in the foreground.\n"
2750 " -j <dir> Directory in which to create the journal files.\n"
2751 " -F Always flush all updates at shutdown\n"
2752 "\n"
2753 "For more information and a detailed description of all options "
2754 "please refer\n"
2755 "to the rrdcached(1) manual page.\n",
2756 VERSION);
2757 status = -1;
2758 break;
2759 } /* switch (option) */
2760 } /* while (getopt) */
2762 /* advise the user when values are not sane */
2763 if (config_flush_interval < 2 * config_write_interval)
2764 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2765 " 2x write interval (-w) !\n");
2766 if (config_write_jitter > config_write_interval)
2767 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2768 " write interval (-w) !\n");
2770 if (config_write_base_only && config_base_dir == NULL)
2771 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2772 " Consult the rrdcached documentation\n");
2774 if (journal_cur == NULL)
2775 config_flush_at_shutdown = 1;
2777 return (status);
2778 } /* }}} int read_options */
2780 int main (int argc, char **argv)
2781 {
2782 int status;
2784 status = read_options (argc, argv);
2785 if (status != 0)
2786 {
2787 if (status < 0)
2788 status = 0;
2789 return (status);
2790 }
2792 status = daemonize ();
2793 if (status != 0)
2794 {
2795 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2796 return (1);
2797 }
2799 journal_init();
2801 /* start the queue threads */
2802 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2803 if (queue_threads == NULL)
2804 {
2805 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2806 cleanup();
2807 return (1);
2808 }
2809 for (int i = 0; i < config_queue_threads; i++)
2810 {
2811 memset (&queue_threads[i], 0, sizeof (*queue_threads));
2812 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2813 if (status != 0)
2814 {
2815 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2816 cleanup();
2817 return (1);
2818 }
2819 }
2821 /* start the flush thread */
2822 memset(&flush_thread, 0, sizeof(flush_thread));
2823 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2824 if (status != 0)
2825 {
2826 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2827 cleanup();
2828 return (1);
2829 }
2831 listen_thread_main (NULL);
2832 cleanup ();
2834 return (0);
2835 } /* int main */
2837 /*
2838 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2839 */