1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 # include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116 * Types
117 */
118 typedef enum
119 {
120 PRIV_LOW,
121 PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
127 {
128 int fd;
129 char addr[PATH_MAX + 1];
130 int family;
131 socket_privilege privilege;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO struct command *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
160 socket_privilege min_priv;
162 char context; /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT (1<<0)
164 #define CMD_CONTEXT_BATCH (1<<1)
165 #define CMD_CONTEXT_JOURNAL (1<<2)
166 #define CMD_CONTEXT_ANY (0x7f)
168 char *syntax;
169 char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176 char *file;
177 char **values;
178 size_t values_num;
179 time_t last_flush_time;
180 time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183 int flags;
184 pthread_cond_t flushed;
185 cache_item_t *prev;
186 cache_item_t *next;
187 };
189 struct callback_flush_data_s
190 {
191 time_t now;
192 time_t abs_timeout;
193 char **keys;
194 size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
199 {
200 HEAD,
201 TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210 * Variables
211 */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 static int do_shutdown = 0;
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
231 /* Cache stuff */
232 static GTree *cache_tree = NULL;
233 static cache_item_t *cache_queue_head = NULL;
234 static cache_item_t *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
237 static int config_write_interval = 300;
238 static int config_write_jitter = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
246 static listen_socket_t **config_listen_address_list = NULL;
247 static size_t config_listen_address_list_len = 0;
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
270 /*
271 * Functions
272 */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276 do_shutdown++;
277 pthread_cond_broadcast(&flush_cond);
278 pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283 sig_common("INT");
284 } /* }}} void sig_int_handler */
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288 sig_common("TERM");
289 } /* }}} void sig_term_handler */
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293 config_flush_at_shutdown = 1;
294 sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299 config_flush_at_shutdown = 0;
300 sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
303 static void install_signal_handlers(void) /* {{{ */
304 {
305 /* These structures are static, because `sigaction' behaves weird if the are
306 * overwritten.. */
307 static struct sigaction sa_int;
308 static struct sigaction sa_term;
309 static struct sigaction sa_pipe;
310 static struct sigaction sa_usr1;
311 static struct sigaction sa_usr2;
313 /* Install signal handlers */
314 memset (&sa_int, 0, sizeof (sa_int));
315 sa_int.sa_handler = sig_int_handler;
316 sigaction (SIGINT, &sa_int, NULL);
318 memset (&sa_term, 0, sizeof (sa_term));
319 sa_term.sa_handler = sig_term_handler;
320 sigaction (SIGTERM, &sa_term, NULL);
322 memset (&sa_pipe, 0, sizeof (sa_pipe));
323 sa_pipe.sa_handler = SIG_IGN;
324 sigaction (SIGPIPE, &sa_pipe, NULL);
326 memset (&sa_pipe, 0, sizeof (sa_usr1));
327 sa_usr1.sa_handler = sig_usr1_handler;
328 sigaction (SIGUSR1, &sa_usr1, NULL);
330 memset (&sa_usr2, 0, sizeof (sa_usr2));
331 sa_usr2.sa_handler = sig_usr2_handler;
332 sigaction (SIGUSR2, &sa_usr2, NULL);
334 } /* }}} void install_signal_handlers */
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338 int fd;
339 char *file;
341 file = (config_pid_file != NULL)
342 ? config_pid_file
343 : LOCALSTATEDIR "/run/rrdcached.pid";
345 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346 if (fd < 0)
347 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348 action, file, rrd_strerror(errno));
350 return(fd);
351 } /* }}} static int open_pidfile */
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356 int pid_fd;
357 pid_t pid;
358 char pid_str[16];
360 pid_fd = open_pidfile("open", O_RDWR);
361 if (pid_fd < 0)
362 return pid_fd;
364 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365 return -1;
367 pid = atoi(pid_str);
368 if (pid <= 0)
369 return -1;
371 /* another running process that we can signal COULD be
372 * a competing rrdcached */
373 if (pid != getpid() && kill(pid, 0) == 0)
374 {
375 fprintf(stderr,
376 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377 close(pid_fd);
378 return -1;
379 }
381 lseek(pid_fd, 0, SEEK_SET);
382 if (ftruncate(pid_fd, 0) == -1)
383 {
384 fprintf(stderr,
385 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
386 close(pid_fd);
387 return -1;
388 }
390 fprintf(stderr,
391 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
392 "rrdcached: starting normally.\n", pid);
394 return pid_fd;
395 } /* }}} static int check_pidfile */
397 static int write_pidfile (int fd) /* {{{ */
398 {
399 pid_t pid;
400 FILE *fh;
402 pid = getpid ();
404 fh = fdopen (fd, "w");
405 if (fh == NULL)
406 {
407 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
408 close(fd);
409 return (-1);
410 }
412 fprintf (fh, "%i\n", (int) pid);
413 fclose (fh);
415 return (0);
416 } /* }}} int write_pidfile */
418 static int remove_pidfile (void) /* {{{ */
419 {
420 char *file;
421 int status;
423 file = (config_pid_file != NULL)
424 ? config_pid_file
425 : LOCALSTATEDIR "/run/rrdcached.pid";
427 status = unlink (file);
428 if (status == 0)
429 return (0);
430 return (errno);
431 } /* }}} int remove_pidfile */
433 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
434 {
435 char *eol;
437 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
438 sock->next_read - sock->next_cmd);
440 if (eol == NULL)
441 {
442 /* no commands left, move remainder back to front of rbuf */
443 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
444 sock->next_read - sock->next_cmd);
445 sock->next_read -= sock->next_cmd;
446 sock->next_cmd = 0;
447 *len = 0;
448 return NULL;
449 }
450 else
451 {
452 char *cmd = sock->rbuf + sock->next_cmd;
453 *eol = '\0';
455 sock->next_cmd = eol - sock->rbuf + 1;
457 if (eol > sock->rbuf && *(eol-1) == '\r')
458 *(--eol) = '\0'; /* handle "\r\n" EOL */
460 *len = eol - cmd;
462 return cmd;
463 }
465 /* NOTREACHED */
466 assert(1==0);
467 }
469 /* add the characters directly to the write buffer */
470 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
471 {
472 char *new_buf;
474 assert(sock != NULL);
476 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
477 if (new_buf == NULL)
478 {
479 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
480 return -1;
481 }
483 strncpy(new_buf + sock->wbuf_len, str, len + 1);
485 sock->wbuf = new_buf;
486 sock->wbuf_len += len;
488 return 0;
489 } /* }}} static int add_to_wbuf */
491 /* add the text to the "extra" info that's sent after the status line */
492 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
493 {
494 va_list argp;
495 char buffer[CMD_MAX];
496 int len;
498 if (sock == NULL) return 0; /* journal replay mode */
499 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
501 va_start(argp, fmt);
502 #ifdef HAVE_VSNPRINTF
503 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
504 #else
505 len = vsprintf(buffer, fmt, argp);
506 #endif
507 va_end(argp);
508 if (len < 0)
509 {
510 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
511 return -1;
512 }
514 return add_to_wbuf(sock, buffer, len);
515 } /* }}} static int add_response_info */
517 static int count_lines(char *str) /* {{{ */
518 {
519 int lines = 0;
521 if (str != NULL)
522 {
523 while ((str = strchr(str, '\n')) != NULL)
524 {
525 ++lines;
526 ++str;
527 }
528 }
530 return lines;
531 } /* }}} static int count_lines */
533 /* send the response back to the user.
534 * returns 0 on success, -1 on error
535 * write buffer is always zeroed after this call */
536 static int send_response (listen_socket_t *sock, response_code rc,
537 char *fmt, ...) /* {{{ */
538 {
539 va_list argp;
540 char buffer[CMD_MAX];
541 int lines;
542 ssize_t wrote;
543 int rclen, len;
545 if (sock == NULL) return rc; /* journal replay mode */
547 if (sock->batch_start)
548 {
549 if (rc == RESP_OK)
550 return rc; /* no response on success during BATCH */
551 lines = sock->batch_cmd;
552 }
553 else if (rc == RESP_OK)
554 lines = count_lines(sock->wbuf);
555 else
556 lines = -1;
558 rclen = sprintf(buffer, "%d ", lines);
559 va_start(argp, fmt);
560 #ifdef HAVE_VSNPRINTF
561 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
562 #else
563 len = vsprintf(buffer+rclen, fmt, argp);
564 #endif
565 va_end(argp);
566 if (len < 0)
567 return -1;
569 len += rclen;
571 /* append the result to the wbuf, don't write to the user */
572 if (sock->batch_start)
573 return add_to_wbuf(sock, buffer, len);
575 /* first write must be complete */
576 if (len != write(sock->fd, buffer, len))
577 {
578 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
579 return -1;
580 }
582 if (sock->wbuf != NULL && rc == RESP_OK)
583 {
584 wrote = 0;
585 while (wrote < sock->wbuf_len)
586 {
587 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
588 if (wb <= 0)
589 {
590 RRDD_LOG(LOG_INFO, "send_response: could not write results");
591 return -1;
592 }
593 wrote += wb;
594 }
595 }
597 free(sock->wbuf); sock->wbuf = NULL;
598 sock->wbuf_len = 0;
600 return 0;
601 } /* }}} */
603 static void wipe_ci_values(cache_item_t *ci, time_t when)
604 {
605 ci->values = NULL;
606 ci->values_num = 0;
608 ci->last_flush_time = when;
609 if (config_write_jitter > 0)
610 ci->last_flush_time += (rrd_random() % config_write_jitter);
611 }
613 /* remove_from_queue
614 * remove a "cache_item_t" item from the queue.
615 * must hold 'cache_lock' when calling this
616 */
617 static void remove_from_queue(cache_item_t *ci) /* {{{ */
618 {
619 if (ci == NULL) return;
620 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
622 if (ci->prev == NULL)
623 cache_queue_head = ci->next; /* reset head */
624 else
625 ci->prev->next = ci->next;
627 if (ci->next == NULL)
628 cache_queue_tail = ci->prev; /* reset the tail */
629 else
630 ci->next->prev = ci->prev;
632 ci->next = ci->prev = NULL;
633 ci->flags &= ~CI_FLAGS_IN_QUEUE;
635 pthread_mutex_lock (&stats_lock);
636 assert (stats_queue_length > 0);
637 stats_queue_length--;
638 pthread_mutex_unlock (&stats_lock);
640 } /* }}} static void remove_from_queue */
642 /* free the resources associated with the cache_item_t
643 * must hold cache_lock when calling this function
644 */
645 static void *free_cache_item(cache_item_t *ci) /* {{{ */
646 {
647 if (ci == NULL) return NULL;
649 remove_from_queue(ci);
651 for (size_t i=0; i < ci->values_num; i++)
652 free(ci->values[i]);
654 free (ci->values);
655 free (ci->file);
657 /* in case anyone is waiting */
658 pthread_cond_broadcast(&ci->flushed);
659 pthread_cond_destroy(&ci->flushed);
661 free (ci);
663 return NULL;
664 } /* }}} static void *free_cache_item */
666 /*
667 * enqueue_cache_item:
668 * `cache_lock' must be acquired before calling this function!
669 */
670 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
671 queue_side_t side)
672 {
673 if (ci == NULL)
674 return (-1);
676 if (ci->values_num == 0)
677 return (0);
679 if (side == HEAD)
680 {
681 if (cache_queue_head == ci)
682 return 0;
684 /* remove if further down in queue */
685 remove_from_queue(ci);
687 ci->prev = NULL;
688 ci->next = cache_queue_head;
689 if (ci->next != NULL)
690 ci->next->prev = ci;
691 cache_queue_head = ci;
693 if (cache_queue_tail == NULL)
694 cache_queue_tail = cache_queue_head;
695 }
696 else /* (side == TAIL) */
697 {
698 /* We don't move values back in the list.. */
699 if (ci->flags & CI_FLAGS_IN_QUEUE)
700 return (0);
702 assert (ci->next == NULL);
703 assert (ci->prev == NULL);
705 ci->prev = cache_queue_tail;
707 if (cache_queue_tail == NULL)
708 cache_queue_head = ci;
709 else
710 cache_queue_tail->next = ci;
712 cache_queue_tail = ci;
713 }
715 ci->flags |= CI_FLAGS_IN_QUEUE;
717 pthread_cond_signal(&queue_cond);
718 pthread_mutex_lock (&stats_lock);
719 stats_queue_length++;
720 pthread_mutex_unlock (&stats_lock);
722 return (0);
723 } /* }}} int enqueue_cache_item */
725 /*
726 * tree_callback_flush:
727 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
728 * while this is in progress.
729 */
730 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
731 gpointer data)
732 {
733 cache_item_t *ci;
734 callback_flush_data_t *cfd;
736 ci = (cache_item_t *) value;
737 cfd = (callback_flush_data_t *) data;
739 if (ci->flags & CI_FLAGS_IN_QUEUE)
740 return FALSE;
742 if ((ci->last_flush_time <= cfd->abs_timeout)
743 && (ci->values_num > 0))
744 {
745 enqueue_cache_item (ci, TAIL);
746 }
747 else if ((do_shutdown != 0)
748 && (ci->values_num > 0))
749 {
750 enqueue_cache_item (ci, TAIL);
751 }
752 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
753 && (ci->values_num <= 0))
754 {
755 assert ((char *) key == ci->file);
756 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
757 {
758 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
759 return (FALSE);
760 }
761 }
763 return (FALSE);
764 } /* }}} gboolean tree_callback_flush */
766 static int flush_old_values (int max_age)
767 {
768 callback_flush_data_t cfd;
769 size_t k;
771 memset (&cfd, 0, sizeof (cfd));
772 /* Pass the current time as user data so that we don't need to call
773 * `time' for each node. */
774 cfd.now = time (NULL);
775 cfd.keys = NULL;
776 cfd.keys_num = 0;
778 if (max_age > 0)
779 cfd.abs_timeout = cfd.now - max_age;
780 else
781 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
783 /* `tree_callback_flush' will return the keys of all values that haven't
784 * been touched in the last `config_flush_interval' seconds in `cfd'.
785 * The char*'s in this array point to the same memory as ci->file, so we
786 * don't need to free them separately. */
787 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
789 for (k = 0; k < cfd.keys_num; k++)
790 {
791 /* should never fail, since we have held the cache_lock
792 * the entire time */
793 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
794 }
796 if (cfd.keys != NULL)
797 {
798 free (cfd.keys);
799 cfd.keys = NULL;
800 }
802 return (0);
803 } /* int flush_old_values */
805 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
806 {
807 struct timeval now;
808 struct timespec next_flush;
809 int status;
811 gettimeofday (&now, NULL);
812 next_flush.tv_sec = now.tv_sec + config_flush_interval;
813 next_flush.tv_nsec = 1000 * now.tv_usec;
815 pthread_mutex_lock(&cache_lock);
817 while (!do_shutdown)
818 {
819 gettimeofday (&now, NULL);
820 if ((now.tv_sec > next_flush.tv_sec)
821 || ((now.tv_sec == next_flush.tv_sec)
822 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
823 {
824 /* Flush all values that haven't been written in the last
825 * `config_write_interval' seconds. */
826 flush_old_values (config_write_interval);
828 /* Determine the time of the next cache flush. */
829 next_flush.tv_sec =
830 now.tv_sec + next_flush.tv_sec % config_flush_interval;
832 /* unlock the cache while we rotate so we don't block incoming
833 * updates if the fsync() blocks on disk I/O */
834 pthread_mutex_unlock(&cache_lock);
835 journal_rotate();
836 pthread_mutex_lock(&cache_lock);
837 }
839 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
840 if (status != 0 && status != ETIMEDOUT)
841 {
842 RRDD_LOG (LOG_ERR, "flush_thread_main: "
843 "pthread_cond_timedwait returned %i.", status);
844 }
845 }
847 if (config_flush_at_shutdown)
848 flush_old_values (-1); /* flush everything */
850 pthread_mutex_unlock(&cache_lock);
852 return NULL;
853 } /* void *flush_thread_main */
855 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
856 {
857 pthread_mutex_lock (&cache_lock);
859 while (!do_shutdown
860 || (cache_queue_head != NULL && config_flush_at_shutdown))
861 {
862 cache_item_t *ci;
863 char *file;
864 char **values;
865 size_t values_num;
866 int status;
868 /* Now, check if there's something to store away. If not, wait until
869 * something comes in. if we are shutting down, do not wait around. */
870 if (cache_queue_head == NULL && !do_shutdown)
871 {
872 status = pthread_cond_wait (&queue_cond, &cache_lock);
873 if ((status != 0) && (status != ETIMEDOUT))
874 {
875 RRDD_LOG (LOG_ERR, "queue_thread_main: "
876 "pthread_cond_wait returned %i.", status);
877 }
878 }
880 /* Check if a value has arrived. This may be NULL if we timed out or there
881 * was an interrupt such as a signal. */
882 if (cache_queue_head == NULL)
883 continue;
885 ci = cache_queue_head;
887 /* copy the relevant parts */
888 file = strdup (ci->file);
889 if (file == NULL)
890 {
891 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
892 continue;
893 }
895 assert(ci->values != NULL);
896 assert(ci->values_num > 0);
898 values = ci->values;
899 values_num = ci->values_num;
901 wipe_ci_values(ci, time(NULL));
902 remove_from_queue(ci);
904 pthread_mutex_unlock (&cache_lock);
906 rrd_clear_error ();
907 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
908 if (status != 0)
909 {
910 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
911 "rrd_update_r (%s) failed with status %i. (%s)",
912 file, status, rrd_get_error());
913 }
915 journal_write("wrote", file);
917 /* Search again in the tree. It's possible someone issued a "FORGET"
918 * while we were writing the update values. */
919 pthread_mutex_lock(&cache_lock);
920 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
921 if (ci)
922 pthread_cond_broadcast(&ci->flushed);
923 pthread_mutex_unlock(&cache_lock);
925 rrd_free_ptrs((void ***) &values, &values_num);
926 free(file);
928 if (status == 0)
929 {
930 pthread_mutex_lock (&stats_lock);
931 stats_updates_written++;
932 stats_data_sets_written += values_num;
933 pthread_mutex_unlock (&stats_lock);
934 }
936 pthread_mutex_lock (&cache_lock);
937 }
938 pthread_mutex_unlock (&cache_lock);
940 return (NULL);
941 } /* }}} void *queue_thread_main */
943 static int buffer_get_field (char **buffer_ret, /* {{{ */
944 size_t *buffer_size_ret, char **field_ret)
945 {
946 char *buffer;
947 size_t buffer_pos;
948 size_t buffer_size;
949 char *field;
950 size_t field_size;
951 int status;
953 buffer = *buffer_ret;
954 buffer_pos = 0;
955 buffer_size = *buffer_size_ret;
956 field = *buffer_ret;
957 field_size = 0;
959 if (buffer_size <= 0)
960 return (-1);
962 /* This is ensured by `handle_request'. */
963 assert (buffer[buffer_size - 1] == '\0');
965 status = -1;
966 while (buffer_pos < buffer_size)
967 {
968 /* Check for end-of-field or end-of-buffer */
969 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
970 {
971 field[field_size] = 0;
972 field_size++;
973 buffer_pos++;
974 status = 0;
975 break;
976 }
977 /* Handle escaped characters. */
978 else if (buffer[buffer_pos] == '\\')
979 {
980 if (buffer_pos >= (buffer_size - 1))
981 break;
982 buffer_pos++;
983 field[field_size] = buffer[buffer_pos];
984 field_size++;
985 buffer_pos++;
986 }
987 /* Normal operation */
988 else
989 {
990 field[field_size] = buffer[buffer_pos];
991 field_size++;
992 buffer_pos++;
993 }
994 } /* while (buffer_pos < buffer_size) */
996 if (status != 0)
997 return (status);
999 *buffer_ret = buffer + buffer_pos;
1000 *buffer_size_ret = buffer_size - buffer_pos;
1001 *field_ret = field;
1003 return (0);
1004 } /* }}} int buffer_get_field */
1006 /* if we're restricting writes to the base directory,
1007 * check whether the file falls within the dir
1008 * returns 1 if OK, otherwise 0
1009 */
1010 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1011 {
1012 assert(file != NULL);
1014 if (!config_write_base_only
1015 || sock == NULL /* journal replay */
1016 || config_base_dir == NULL)
1017 return 1;
1019 if (strstr(file, "../") != NULL) goto err;
1021 /* relative paths without "../" are ok */
1022 if (*file != '/') return 1;
1024 /* file must be of the format base + "/" + <1+ char filename> */
1025 if (strlen(file) < _config_base_dir_len + 2) goto err;
1026 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1027 if (*(file + _config_base_dir_len) != '/') goto err;
1029 return 1;
1031 err:
1032 if (sock != NULL && sock->fd >= 0)
1033 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1035 return 0;
1036 } /* }}} static int check_file_access */
1038 /* when using a base dir, convert relative paths to absolute paths.
1039 * if necessary, modifies the "filename" pointer to point
1040 * to the new path created in "tmp". "tmp" is provided
1041 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1042 *
1043 * this allows us to optimize for the expected case (absolute path)
1044 * with a no-op.
1045 */
1046 static void get_abs_path(char **filename, char *tmp)
1047 {
1048 assert(tmp != NULL);
1049 assert(filename != NULL && *filename != NULL);
1051 if (config_base_dir == NULL || **filename == '/')
1052 return;
1054 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1055 *filename = tmp;
1056 } /* }}} static int get_abs_path */
1058 /* returns 1 if we have the required privilege level,
1059 * otherwise issue an error to the user on sock */
1060 static int has_privilege (listen_socket_t *sock, /* {{{ */
1061 socket_privilege priv)
1062 {
1063 if (sock == NULL) /* journal replay */
1064 return 1;
1066 if (sock->privilege >= priv)
1067 return 1;
1069 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1070 } /* }}} static int has_privilege */
1072 static int flush_file (const char *filename) /* {{{ */
1073 {
1074 cache_item_t *ci;
1076 pthread_mutex_lock (&cache_lock);
1078 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1079 if (ci == NULL)
1080 {
1081 pthread_mutex_unlock (&cache_lock);
1082 return (ENOENT);
1083 }
1085 if (ci->values_num > 0)
1086 {
1087 /* Enqueue at head */
1088 enqueue_cache_item (ci, HEAD);
1089 pthread_cond_wait(&ci->flushed, &cache_lock);
1090 }
1092 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1093 * may have been purged during our cond_wait() */
1095 pthread_mutex_unlock(&cache_lock);
1097 return (0);
1098 } /* }}} int flush_file */
1100 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1101 {
1102 char *err = "Syntax error.\n";
1104 if (cmd && cmd->syntax)
1105 err = cmd->syntax;
1107 return send_response(sock, RESP_ERR, "Usage: %s", err);
1108 } /* }}} static int syntax_error() */
1110 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1111 {
1112 uint64_t copy_queue_length;
1113 uint64_t copy_updates_received;
1114 uint64_t copy_flush_received;
1115 uint64_t copy_updates_written;
1116 uint64_t copy_data_sets_written;
1117 uint64_t copy_journal_bytes;
1118 uint64_t copy_journal_rotate;
1120 uint64_t tree_nodes_number;
1121 uint64_t tree_depth;
1123 pthread_mutex_lock (&stats_lock);
1124 copy_queue_length = stats_queue_length;
1125 copy_updates_received = stats_updates_received;
1126 copy_flush_received = stats_flush_received;
1127 copy_updates_written = stats_updates_written;
1128 copy_data_sets_written = stats_data_sets_written;
1129 copy_journal_bytes = stats_journal_bytes;
1130 copy_journal_rotate = stats_journal_rotate;
1131 pthread_mutex_unlock (&stats_lock);
1133 pthread_mutex_lock (&cache_lock);
1134 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1135 tree_depth = (uint64_t) g_tree_height (cache_tree);
1136 pthread_mutex_unlock (&cache_lock);
1138 add_response_info(sock,
1139 "QueueLength: %"PRIu64"\n", copy_queue_length);
1140 add_response_info(sock,
1141 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1142 add_response_info(sock,
1143 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1144 add_response_info(sock,
1145 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1146 add_response_info(sock,
1147 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1148 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1149 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1150 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1151 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1153 send_response(sock, RESP_OK, "Statistics follow\n");
1155 return (0);
1156 } /* }}} int handle_request_stats */
1158 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1159 {
1160 char *file, file_tmp[PATH_MAX];
1161 int status;
1163 status = buffer_get_field (&buffer, &buffer_size, &file);
1164 if (status != 0)
1165 {
1166 return syntax_error(sock,cmd);
1167 }
1168 else
1169 {
1170 pthread_mutex_lock(&stats_lock);
1171 stats_flush_received++;
1172 pthread_mutex_unlock(&stats_lock);
1174 get_abs_path(&file, file_tmp);
1175 if (!check_file_access(file, sock)) return 0;
1177 status = flush_file (file);
1178 if (status == 0)
1179 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1180 else if (status == ENOENT)
1181 {
1182 /* no file in our tree; see whether it exists at all */
1183 struct stat statbuf;
1185 memset(&statbuf, 0, sizeof(statbuf));
1186 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1187 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1188 else
1189 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1190 }
1191 else if (status < 0)
1192 return send_response(sock, RESP_ERR, "Internal error.\n");
1193 else
1194 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1195 }
1197 /* NOTREACHED */
1198 assert(1==0);
1199 } /* }}} int handle_request_flush */
1201 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1202 {
1203 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1205 pthread_mutex_lock(&cache_lock);
1206 flush_old_values(-1);
1207 pthread_mutex_unlock(&cache_lock);
1209 return send_response(sock, RESP_OK, "Started flush.\n");
1210 } /* }}} static int handle_request_flushall */
1212 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1213 {
1214 int status;
1215 char *file, file_tmp[PATH_MAX];
1216 cache_item_t *ci;
1218 status = buffer_get_field(&buffer, &buffer_size, &file);
1219 if (status != 0)
1220 return syntax_error(sock,cmd);
1222 get_abs_path(&file, file_tmp);
1224 pthread_mutex_lock(&cache_lock);
1225 ci = g_tree_lookup(cache_tree, file);
1226 if (ci == NULL)
1227 {
1228 pthread_mutex_unlock(&cache_lock);
1229 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1230 }
1232 for (size_t i=0; i < ci->values_num; i++)
1233 add_response_info(sock, "%s\n", ci->values[i]);
1235 pthread_mutex_unlock(&cache_lock);
1236 return send_response(sock, RESP_OK, "updates pending\n");
1237 } /* }}} static int handle_request_pending */
1239 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1240 {
1241 int status;
1242 gboolean found;
1243 char *file, file_tmp[PATH_MAX];
1245 status = buffer_get_field(&buffer, &buffer_size, &file);
1246 if (status != 0)
1247 return syntax_error(sock,cmd);
1249 get_abs_path(&file, file_tmp);
1250 if (!check_file_access(file, sock)) return 0;
1252 pthread_mutex_lock(&cache_lock);
1253 found = g_tree_remove(cache_tree, file);
1254 pthread_mutex_unlock(&cache_lock);
1256 if (found == TRUE)
1257 {
1258 if (sock != NULL)
1259 journal_write("forget", file);
1261 return send_response(sock, RESP_OK, "Gone!\n");
1262 }
1263 else
1264 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1266 /* NOTREACHED */
1267 assert(1==0);
1268 } /* }}} static int handle_request_forget */
1270 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1271 {
1272 cache_item_t *ci;
1274 pthread_mutex_lock(&cache_lock);
1276 ci = cache_queue_head;
1277 while (ci != NULL)
1278 {
1279 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1280 ci = ci->next;
1281 }
1283 pthread_mutex_unlock(&cache_lock);
1285 return send_response(sock, RESP_OK, "in queue.\n");
1286 } /* }}} int handle_request_queue */
1288 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1289 {
1290 char *file, file_tmp[PATH_MAX];
1291 int values_num = 0;
1292 int status;
1293 char orig_buf[CMD_MAX];
1295 cache_item_t *ci;
1297 /* save it for the journal later */
1298 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1300 status = buffer_get_field (&buffer, &buffer_size, &file);
1301 if (status != 0)
1302 return syntax_error(sock,cmd);
1304 pthread_mutex_lock(&stats_lock);
1305 stats_updates_received++;
1306 pthread_mutex_unlock(&stats_lock);
1308 get_abs_path(&file, file_tmp);
1309 if (!check_file_access(file, sock)) return 0;
1311 pthread_mutex_lock (&cache_lock);
1312 ci = g_tree_lookup (cache_tree, file);
1314 if (ci == NULL) /* {{{ */
1315 {
1316 struct stat statbuf;
1317 cache_item_t *tmp;
1319 /* don't hold the lock while we setup; stat(2) might block */
1320 pthread_mutex_unlock(&cache_lock);
1322 memset (&statbuf, 0, sizeof (statbuf));
1323 status = stat (file, &statbuf);
1324 if (status != 0)
1325 {
1326 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1328 status = errno;
1329 if (status == ENOENT)
1330 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1331 else
1332 return send_response(sock, RESP_ERR,
1333 "stat failed with error %i.\n", status);
1334 }
1335 if (!S_ISREG (statbuf.st_mode))
1336 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1338 if (access(file, R_OK|W_OK) != 0)
1339 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1340 file, rrd_strerror(errno));
1342 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1343 if (ci == NULL)
1344 {
1345 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1347 return send_response(sock, RESP_ERR, "malloc failed.\n");
1348 }
1349 memset (ci, 0, sizeof (cache_item_t));
1351 ci->file = strdup (file);
1352 if (ci->file == NULL)
1353 {
1354 free (ci);
1355 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1357 return send_response(sock, RESP_ERR, "strdup failed.\n");
1358 }
1360 wipe_ci_values(ci, now);
1361 ci->flags = CI_FLAGS_IN_TREE;
1362 pthread_cond_init(&ci->flushed, NULL);
1364 pthread_mutex_lock(&cache_lock);
1366 /* another UPDATE might have added this entry in the meantime */
1367 tmp = g_tree_lookup (cache_tree, file);
1368 if (tmp == NULL)
1369 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1370 else
1371 {
1372 free_cache_item (ci);
1373 ci = tmp;
1374 }
1375 } /* }}} */
1376 assert (ci != NULL);
1378 /* don't re-write updates in replay mode */
1379 if (sock != NULL)
1380 journal_write("update", orig_buf);
1382 while (buffer_size > 0)
1383 {
1384 char *value;
1385 time_t stamp;
1386 char *eostamp;
1388 status = buffer_get_field (&buffer, &buffer_size, &value);
1389 if (status != 0)
1390 {
1391 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1392 break;
1393 }
1395 /* make sure update time is always moving forward */
1396 stamp = strtol(value, &eostamp, 10);
1397 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1398 {
1399 pthread_mutex_unlock(&cache_lock);
1400 return send_response(sock, RESP_ERR,
1401 "Cannot find timestamp in '%s'!\n", value);
1402 }
1403 else if (stamp <= ci->last_update_stamp)
1404 {
1405 pthread_mutex_unlock(&cache_lock);
1406 return send_response(sock, RESP_ERR,
1407 "illegal attempt to update using time %ld when last"
1408 " update time is %ld (minimum one second step)\n",
1409 stamp, ci->last_update_stamp);
1410 }
1411 else
1412 ci->last_update_stamp = stamp;
1414 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1415 {
1416 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1417 continue;
1418 }
1420 values_num++;
1421 }
1423 if (((now - ci->last_flush_time) >= config_write_interval)
1424 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1425 && (ci->values_num > 0))
1426 {
1427 enqueue_cache_item (ci, TAIL);
1428 }
1430 pthread_mutex_unlock (&cache_lock);
1432 if (values_num < 1)
1433 return send_response(sock, RESP_ERR, "No values updated.\n");
1434 else
1435 return send_response(sock, RESP_OK,
1436 "errors, enqueued %i value(s).\n", values_num);
1438 /* NOTREACHED */
1439 assert(1==0);
1441 } /* }}} int handle_request_update */
1443 /* we came across a "WROTE" entry during journal replay.
1444 * throw away any values that we have accumulated for this file
1445 */
1446 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1447 {
1448 cache_item_t *ci;
1449 const char *file = buffer;
1451 pthread_mutex_lock(&cache_lock);
1453 ci = g_tree_lookup(cache_tree, file);
1454 if (ci == NULL)
1455 {
1456 pthread_mutex_unlock(&cache_lock);
1457 return (0);
1458 }
1460 if (ci->values)
1461 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1463 wipe_ci_values(ci, now);
1464 remove_from_queue(ci);
1466 pthread_mutex_unlock(&cache_lock);
1467 return (0);
1468 } /* }}} int handle_request_wrote */
1470 /* start "BATCH" processing */
1471 static int batch_start (HANDLER_PROTO) /* {{{ */
1472 {
1473 int status;
1474 if (sock->batch_start)
1475 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1477 status = send_response(sock, RESP_OK,
1478 "Go ahead. End with dot '.' on its own line.\n");
1479 sock->batch_start = time(NULL);
1480 sock->batch_cmd = 0;
1482 return status;
1483 } /* }}} static int batch_start */
1485 /* finish "BATCH" processing and return results to the client */
1486 static int batch_done (HANDLER_PROTO) /* {{{ */
1487 {
1488 assert(sock->batch_start);
1489 sock->batch_start = 0;
1490 sock->batch_cmd = 0;
1491 return send_response(sock, RESP_OK, "errors\n");
1492 } /* }}} static int batch_done */
1494 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1495 {
1496 return -1;
1497 } /* }}} static int handle_request_quit */
1499 struct command COMMANDS[] = {
1500 {
1501 "UPDATE",
1502 handle_request_update,
1503 PRIV_HIGH,
1504 CMD_CONTEXT_ANY,
1505 "UPDATE <filename> <values> [<values> ...]\n"
1506 ,
1507 "Adds the given file to the internal cache if it is not yet known and\n"
1508 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1509 "for details.\n"
1510 "\n"
1511 "Each <values> has the following form:\n"
1512 " <values> = <time>:<value>[:<value>[...]]\n"
1513 "See the rrdupdate(1) manpage for details.\n"
1514 },
1515 {
1516 "WROTE",
1517 handle_request_wrote,
1518 PRIV_HIGH,
1519 CMD_CONTEXT_JOURNAL,
1520 NULL,
1521 NULL
1522 },
1523 {
1524 "FLUSH",
1525 handle_request_flush,
1526 PRIV_LOW,
1527 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1528 "FLUSH <filename>\n"
1529 ,
1530 "Adds the given filename to the head of the update queue and returns\n"
1531 "after it has been dequeued.\n"
1532 },
1533 {
1534 "FLUSHALL",
1535 handle_request_flushall,
1536 PRIV_HIGH,
1537 CMD_CONTEXT_CLIENT,
1538 "FLUSHALL\n"
1539 ,
1540 "Triggers writing of all pending updates. Returns immediately.\n"
1541 },
1542 {
1543 "PENDING",
1544 handle_request_pending,
1545 PRIV_HIGH,
1546 CMD_CONTEXT_CLIENT,
1547 "PENDING <filename>\n"
1548 ,
1549 "Shows any 'pending' updates for a file, in order.\n"
1550 "The updates shown have not yet been written to the underlying RRD file.\n"
1551 },
1552 {
1553 "FORGET",
1554 handle_request_forget,
1555 PRIV_HIGH,
1556 CMD_CONTEXT_ANY,
1557 "FORGET <filename>\n"
1558 ,
1559 "Removes the file completely from the cache.\n"
1560 "Any pending updates for the file will be lost.\n"
1561 },
1562 {
1563 "QUEUE",
1564 handle_request_queue,
1565 PRIV_LOW,
1566 CMD_CONTEXT_CLIENT,
1567 "QUEUE\n"
1568 ,
1569 "Shows all files in the output queue.\n"
1570 "The output is zero or more lines in the following format:\n"
1571 "(where <num_vals> is the number of values to be written)\n"
1572 "\n"
1573 "<num_vals> <filename>\n"
1574 },
1575 {
1576 "STATS",
1577 handle_request_stats,
1578 PRIV_LOW,
1579 CMD_CONTEXT_CLIENT,
1580 "STATS\n"
1581 ,
1582 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1583 "a description of the values.\n"
1584 },
1585 {
1586 "HELP",
1587 handle_request_help,
1588 PRIV_LOW,
1589 CMD_CONTEXT_CLIENT,
1590 "HELP [<command>]\n",
1591 NULL, /* special! */
1592 },
1593 {
1594 "BATCH",
1595 batch_start,
1596 PRIV_LOW,
1597 CMD_CONTEXT_CLIENT,
1598 "BATCH\n"
1599 ,
1600 "The 'BATCH' command permits the client to initiate a bulk load\n"
1601 " of commands to rrdcached.\n"
1602 "\n"
1603 "Usage:\n"
1604 "\n"
1605 " client: BATCH\n"
1606 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1607 " client: command #1\n"
1608 " client: command #2\n"
1609 " client: ... and so on\n"
1610 " client: .\n"
1611 " server: 2 errors\n"
1612 " server: 7 message for command #7\n"
1613 " server: 9 message for command #9\n"
1614 "\n"
1615 "For more information, consult the rrdcached(1) documentation.\n"
1616 },
1617 {
1618 ".", /* BATCH terminator */
1619 batch_done,
1620 PRIV_LOW,
1621 CMD_CONTEXT_BATCH,
1622 NULL,
1623 NULL
1624 },
1625 {
1626 "QUIT",
1627 handle_request_quit,
1628 PRIV_LOW,
1629 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1630 "QUIT\n"
1631 ,
1632 "Disconnect from rrdcached.\n"
1633 },
1634 {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */
1635 };
1637 static struct command *find_command(char *cmd)
1638 {
1639 struct command *c = COMMANDS;
1641 while (c->cmd != NULL)
1642 {
1643 if (strcasecmp(cmd, c->cmd) == 0)
1644 break;
1645 c++;
1646 }
1648 if (c->cmd == NULL)
1649 return NULL;
1650 else
1651 return c;
1652 }
1654 /* check whether commands are received in the expected context */
1655 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1656 {
1657 if (sock == NULL)
1658 return (cmd->context & CMD_CONTEXT_JOURNAL);
1659 else if (sock->batch_start)
1660 return (cmd->context & CMD_CONTEXT_BATCH);
1661 else
1662 return (cmd->context & CMD_CONTEXT_CLIENT);
1664 /* NOTREACHED */
1665 assert(1==0);
1666 }
1668 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1669 {
1670 int status;
1671 char *cmd_str;
1672 char *resp_txt;
1673 struct command *help = NULL;
1675 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1676 if (status == 0)
1677 help = find_command(cmd_str);
1679 if (help && (help->syntax || help->help))
1680 {
1681 char tmp[CMD_MAX];
1683 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1684 resp_txt = tmp;
1686 if (help->syntax)
1687 add_response_info(sock, "Usage: %s\n", help->syntax);
1689 if (help->help)
1690 add_response_info(sock, "%s\n", help->help);
1691 }
1692 else
1693 {
1694 help = COMMANDS;
1695 resp_txt = "Command overview\n";
1697 while (help->cmd)
1698 {
1699 if (help->syntax)
1700 add_response_info(sock, "%s", help->syntax);
1701 help++;
1702 }
1703 }
1705 return send_response(sock, RESP_OK, resp_txt);
1706 } /* }}} int handle_request_help */
1708 /* if sock==NULL, we are in journal replay mode */
1709 static int handle_request (DISPATCH_PROTO) /* {{{ */
1710 {
1711 char *buffer_ptr = buffer;
1712 char *cmd_str = NULL;
1713 struct command *cmd = NULL;
1714 int status;
1716 assert (buffer[buffer_size - 1] == '\0');
1718 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1719 if (status != 0)
1720 {
1721 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1722 return (-1);
1723 }
1725 if (sock != NULL && sock->batch_start)
1726 sock->batch_cmd++;
1728 cmd = find_command(cmd_str);
1729 if (!cmd)
1730 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1732 status = has_privilege(sock, cmd->min_priv);
1733 if (status <= 0)
1734 return status;
1736 if (!command_check_context(sock, cmd))
1737 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1739 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1740 } /* }}} int handle_request */
1742 /* MUST NOT hold journal_lock before calling this */
1743 static void journal_rotate(void) /* {{{ */
1744 {
1745 FILE *old_fh = NULL;
1746 int new_fd;
1748 if (journal_cur == NULL || journal_old == NULL)
1749 return;
1751 pthread_mutex_lock(&journal_lock);
1753 /* we rotate this way (rename before close) so that the we can release
1754 * the journal lock as fast as possible. Journal writes to the new
1755 * journal can proceed immediately after the new file is opened. The
1756 * fclose can then block without affecting new updates.
1757 */
1758 if (journal_fh != NULL)
1759 {
1760 old_fh = journal_fh;
1761 journal_fh = NULL;
1762 rename(journal_cur, journal_old);
1763 ++stats_journal_rotate;
1764 }
1766 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1767 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1768 if (new_fd >= 0)
1769 {
1770 journal_fh = fdopen(new_fd, "a");
1771 if (journal_fh == NULL)
1772 close(new_fd);
1773 }
1775 pthread_mutex_unlock(&journal_lock);
1777 if (old_fh != NULL)
1778 fclose(old_fh);
1780 if (journal_fh == NULL)
1781 {
1782 RRDD_LOG(LOG_CRIT,
1783 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1784 journal_cur, rrd_strerror(errno));
1786 RRDD_LOG(LOG_ERR,
1787 "JOURNALING DISABLED: All values will be flushed at shutdown");
1788 config_flush_at_shutdown = 1;
1789 }
1791 } /* }}} static void journal_rotate */
1793 static void journal_done(void) /* {{{ */
1794 {
1795 if (journal_cur == NULL)
1796 return;
1798 pthread_mutex_lock(&journal_lock);
1799 if (journal_fh != NULL)
1800 {
1801 fclose(journal_fh);
1802 journal_fh = NULL;
1803 }
1805 if (config_flush_at_shutdown)
1806 {
1807 RRDD_LOG(LOG_INFO, "removing journals");
1808 unlink(journal_old);
1809 unlink(journal_cur);
1810 }
1811 else
1812 {
1813 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1814 "journals will be used at next startup");
1815 }
1817 pthread_mutex_unlock(&journal_lock);
1819 } /* }}} static void journal_done */
1821 static int journal_write(char *cmd, char *args) /* {{{ */
1822 {
1823 int chars;
1825 if (journal_fh == NULL)
1826 return 0;
1828 pthread_mutex_lock(&journal_lock);
1829 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1830 pthread_mutex_unlock(&journal_lock);
1832 if (chars > 0)
1833 {
1834 pthread_mutex_lock(&stats_lock);
1835 stats_journal_bytes += chars;
1836 pthread_mutex_unlock(&stats_lock);
1837 }
1839 return chars;
1840 } /* }}} static int journal_write */
1842 static int journal_replay (const char *file) /* {{{ */
1843 {
1844 FILE *fh;
1845 int entry_cnt = 0;
1846 int fail_cnt = 0;
1847 uint64_t line = 0;
1848 char entry[CMD_MAX];
1849 time_t now;
1851 if (file == NULL) return 0;
1853 {
1854 char *reason = "unknown error";
1855 int status = 0;
1856 struct stat statbuf;
1858 memset(&statbuf, 0, sizeof(statbuf));
1859 if (stat(file, &statbuf) != 0)
1860 {
1861 if (errno == ENOENT)
1862 return 0;
1864 reason = "stat error";
1865 status = errno;
1866 }
1867 else if (!S_ISREG(statbuf.st_mode))
1868 {
1869 reason = "not a regular file";
1870 status = EPERM;
1871 }
1872 if (statbuf.st_uid != daemon_uid)
1873 {
1874 reason = "not owned by daemon user";
1875 status = EACCES;
1876 }
1877 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1878 {
1879 reason = "must not be user/group writable";
1880 status = EACCES;
1881 }
1883 if (status != 0)
1884 {
1885 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1886 file, rrd_strerror(status), reason);
1887 return 0;
1888 }
1889 }
1891 fh = fopen(file, "r");
1892 if (fh == NULL)
1893 {
1894 if (errno != ENOENT)
1895 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1896 file, rrd_strerror(errno));
1897 return 0;
1898 }
1899 else
1900 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1902 now = time(NULL);
1904 while(!feof(fh))
1905 {
1906 size_t entry_len;
1908 ++line;
1909 if (fgets(entry, sizeof(entry), fh) == NULL)
1910 break;
1911 entry_len = strlen(entry);
1913 /* check \n termination in case journal writing crashed mid-line */
1914 if (entry_len == 0)
1915 continue;
1916 else if (entry[entry_len - 1] != '\n')
1917 {
1918 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1919 ++fail_cnt;
1920 continue;
1921 }
1923 entry[entry_len - 1] = '\0';
1925 if (handle_request(NULL, now, entry, entry_len) == 0)
1926 ++entry_cnt;
1927 else
1928 ++fail_cnt;
1929 }
1931 fclose(fh);
1933 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1934 entry_cnt, fail_cnt);
1936 return entry_cnt > 0 ? 1 : 0;
1937 } /* }}} static int journal_replay */
1939 static void journal_init(void) /* {{{ */
1940 {
1941 int had_journal = 0;
1943 if (journal_cur == NULL) return;
1945 pthread_mutex_lock(&journal_lock);
1947 RRDD_LOG(LOG_INFO, "checking for journal files");
1949 had_journal += journal_replay(journal_old);
1950 had_journal += journal_replay(journal_cur);
1952 /* it must have been a crash. start a flush */
1953 if (had_journal && config_flush_at_shutdown)
1954 flush_old_values(-1);
1956 pthread_mutex_unlock(&journal_lock);
1957 journal_rotate();
1959 RRDD_LOG(LOG_INFO, "journal processing complete");
1961 } /* }}} static void journal_init */
1963 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1964 {
1965 assert(sock != NULL);
1967 free(sock->rbuf); sock->rbuf = NULL;
1968 free(sock->wbuf); sock->wbuf = NULL;
1969 free(sock);
1970 } /* }}} void free_listen_socket */
1972 static void close_connection(listen_socket_t *sock) /* {{{ */
1973 {
1974 if (sock->fd >= 0)
1975 {
1976 close(sock->fd);
1977 sock->fd = -1;
1978 }
1980 free_listen_socket(sock);
1982 } /* }}} void close_connection */
1984 static void *connection_thread_main (void *args) /* {{{ */
1985 {
1986 listen_socket_t *sock;
1987 int fd;
1989 sock = (listen_socket_t *) args;
1990 fd = sock->fd;
1992 /* init read buffers */
1993 sock->next_read = sock->next_cmd = 0;
1994 sock->rbuf = malloc(RBUF_SIZE);
1995 if (sock->rbuf == NULL)
1996 {
1997 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1998 close_connection(sock);
1999 return NULL;
2000 }
2002 pthread_mutex_lock (&connection_threads_lock);
2003 connection_threads_num++;
2004 pthread_mutex_unlock (&connection_threads_lock);
2006 while (do_shutdown == 0)
2007 {
2008 char *cmd;
2009 ssize_t cmd_len;
2010 ssize_t rbytes;
2011 time_t now;
2013 struct pollfd pollfd;
2014 int status;
2016 pollfd.fd = fd;
2017 pollfd.events = POLLIN | POLLPRI;
2018 pollfd.revents = 0;
2020 status = poll (&pollfd, 1, /* timeout = */ 500);
2021 if (do_shutdown)
2022 break;
2023 else if (status == 0) /* timeout */
2024 continue;
2025 else if (status < 0) /* error */
2026 {
2027 status = errno;
2028 if (status != EINTR)
2029 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2030 continue;
2031 }
2033 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2034 break;
2035 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2036 {
2037 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2038 "poll(2) returned something unexpected: %#04hx",
2039 pollfd.revents);
2040 break;
2041 }
2043 rbytes = read(fd, sock->rbuf + sock->next_read,
2044 RBUF_SIZE - sock->next_read);
2045 if (rbytes < 0)
2046 {
2047 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2048 break;
2049 }
2050 else if (rbytes == 0)
2051 break; /* eof */
2053 sock->next_read += rbytes;
2055 if (sock->batch_start)
2056 now = sock->batch_start;
2057 else
2058 now = time(NULL);
2060 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2061 {
2062 status = handle_request (sock, now, cmd, cmd_len+1);
2063 if (status != 0)
2064 goto out_close;
2065 }
2066 }
2068 out_close:
2069 close_connection(sock);
2071 /* Remove this thread from the connection threads list */
2072 pthread_mutex_lock (&connection_threads_lock);
2073 connection_threads_num--;
2074 if (connection_threads_num <= 0)
2075 pthread_cond_broadcast(&connection_threads_done);
2076 pthread_mutex_unlock (&connection_threads_lock);
2078 return (NULL);
2079 } /* }}} void *connection_thread_main */
2081 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2082 {
2083 int fd;
2084 struct sockaddr_un sa;
2085 listen_socket_t *temp;
2086 int status;
2087 const char *path;
2089 path = sock->addr;
2090 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2091 path += strlen("unix:");
2093 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2094 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2095 if (temp == NULL)
2096 {
2097 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2098 return (-1);
2099 }
2100 listen_fds = temp;
2101 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2103 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2104 if (fd < 0)
2105 {
2106 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2107 rrd_strerror(errno));
2108 return (-1);
2109 }
2111 memset (&sa, 0, sizeof (sa));
2112 sa.sun_family = AF_UNIX;
2113 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2115 /* if we've gotten this far, we own the pid file. any daemon started
2116 * with the same args must not be alive. therefore, ensure that we can
2117 * create the socket...
2118 */
2119 unlink(path);
2121 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2122 if (status != 0)
2123 {
2124 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2125 path, rrd_strerror(errno));
2126 close (fd);
2127 return (-1);
2128 }
2130 status = listen (fd, /* backlog = */ 10);
2131 if (status != 0)
2132 {
2133 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2134 path, rrd_strerror(errno));
2135 close (fd);
2136 unlink (path);
2137 return (-1);
2138 }
2140 listen_fds[listen_fds_num].fd = fd;
2141 listen_fds[listen_fds_num].family = PF_UNIX;
2142 strncpy(listen_fds[listen_fds_num].addr, path,
2143 sizeof (listen_fds[listen_fds_num].addr) - 1);
2144 listen_fds_num++;
2146 return (0);
2147 } /* }}} int open_listen_socket_unix */
2149 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2150 {
2151 struct addrinfo ai_hints;
2152 struct addrinfo *ai_res;
2153 struct addrinfo *ai_ptr;
2154 char addr_copy[NI_MAXHOST];
2155 char *addr;
2156 char *port;
2157 int status;
2159 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2160 addr_copy[sizeof (addr_copy) - 1] = 0;
2161 addr = addr_copy;
2163 memset (&ai_hints, 0, sizeof (ai_hints));
2164 ai_hints.ai_flags = 0;
2165 #ifdef AI_ADDRCONFIG
2166 ai_hints.ai_flags |= AI_ADDRCONFIG;
2167 #endif
2168 ai_hints.ai_family = AF_UNSPEC;
2169 ai_hints.ai_socktype = SOCK_STREAM;
2171 port = NULL;
2172 if (*addr == '[') /* IPv6+port format */
2173 {
2174 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2175 addr++;
2177 port = strchr (addr, ']');
2178 if (port == NULL)
2179 {
2180 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2181 return (-1);
2182 }
2183 *port = 0;
2184 port++;
2186 if (*port == ':')
2187 port++;
2188 else if (*port == 0)
2189 port = NULL;
2190 else
2191 {
2192 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2193 return (-1);
2194 }
2195 } /* if (*addr = ']') */
2196 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2197 {
2198 port = rindex(addr, ':');
2199 if (port != NULL)
2200 {
2201 *port = 0;
2202 port++;
2203 }
2204 }
2205 ai_res = NULL;
2206 status = getaddrinfo (addr,
2207 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2208 &ai_hints, &ai_res);
2209 if (status != 0)
2210 {
2211 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2212 addr, gai_strerror (status));
2213 return (-1);
2214 }
2216 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2217 {
2218 int fd;
2219 listen_socket_t *temp;
2220 int one = 1;
2222 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2223 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2224 if (temp == NULL)
2225 {
2226 fprintf (stderr,
2227 "rrdcached: open_listen_socket_network: realloc failed.\n");
2228 continue;
2229 }
2230 listen_fds = temp;
2231 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2233 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2234 if (fd < 0)
2235 {
2236 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2237 rrd_strerror(errno));
2238 continue;
2239 }
2241 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2243 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2244 if (status != 0)
2245 {
2246 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2247 sock->addr, rrd_strerror(errno));
2248 close (fd);
2249 continue;
2250 }
2252 status = listen (fd, /* backlog = */ 10);
2253 if (status != 0)
2254 {
2255 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2256 sock->addr, rrd_strerror(errno));
2257 close (fd);
2258 freeaddrinfo(ai_res);
2259 return (-1);
2260 }
2262 listen_fds[listen_fds_num].fd = fd;
2263 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2264 listen_fds_num++;
2265 } /* for (ai_ptr) */
2267 freeaddrinfo(ai_res);
2268 return (0);
2269 } /* }}} static int open_listen_socket_network */
2271 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2272 {
2273 assert(sock != NULL);
2274 assert(sock->addr != NULL);
2276 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2277 || sock->addr[0] == '/')
2278 return (open_listen_socket_unix(sock));
2279 else
2280 return (open_listen_socket_network(sock));
2281 } /* }}} int open_listen_socket */
2283 static int close_listen_sockets (void) /* {{{ */
2284 {
2285 size_t i;
2287 for (i = 0; i < listen_fds_num; i++)
2288 {
2289 close (listen_fds[i].fd);
2291 if (listen_fds[i].family == PF_UNIX)
2292 unlink(listen_fds[i].addr);
2293 }
2295 free (listen_fds);
2296 listen_fds = NULL;
2297 listen_fds_num = 0;
2299 return (0);
2300 } /* }}} int close_listen_sockets */
2302 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2303 {
2304 struct pollfd *pollfds;
2305 int pollfds_num;
2306 int status;
2307 int i;
2309 if (listen_fds_num < 1)
2310 {
2311 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2312 return (NULL);
2313 }
2315 pollfds_num = listen_fds_num;
2316 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2317 if (pollfds == NULL)
2318 {
2319 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2320 return (NULL);
2321 }
2322 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2324 RRDD_LOG(LOG_INFO, "listening for connections");
2326 while (do_shutdown == 0)
2327 {
2328 for (i = 0; i < pollfds_num; i++)
2329 {
2330 pollfds[i].fd = listen_fds[i].fd;
2331 pollfds[i].events = POLLIN | POLLPRI;
2332 pollfds[i].revents = 0;
2333 }
2335 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2336 if (do_shutdown)
2337 break;
2338 else if (status == 0) /* timeout */
2339 continue;
2340 else if (status < 0) /* error */
2341 {
2342 status = errno;
2343 if (status != EINTR)
2344 {
2345 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2346 }
2347 continue;
2348 }
2350 for (i = 0; i < pollfds_num; i++)
2351 {
2352 listen_socket_t *client_sock;
2353 struct sockaddr_storage client_sa;
2354 socklen_t client_sa_size;
2355 pthread_t tid;
2356 pthread_attr_t attr;
2358 if (pollfds[i].revents == 0)
2359 continue;
2361 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2362 {
2363 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2364 "poll(2) returned something unexpected for listen FD #%i.",
2365 pollfds[i].fd);
2366 continue;
2367 }
2369 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2370 if (client_sock == NULL)
2371 {
2372 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2373 continue;
2374 }
2375 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2377 client_sa_size = sizeof (client_sa);
2378 client_sock->fd = accept (pollfds[i].fd,
2379 (struct sockaddr *) &client_sa, &client_sa_size);
2380 if (client_sock->fd < 0)
2381 {
2382 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2383 free(client_sock);
2384 continue;
2385 }
2387 pthread_attr_init (&attr);
2388 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2390 status = pthread_create (&tid, &attr, connection_thread_main,
2391 client_sock);
2392 if (status != 0)
2393 {
2394 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2395 close_connection(client_sock);
2396 continue;
2397 }
2398 } /* for (pollfds_num) */
2399 } /* while (do_shutdown == 0) */
2401 RRDD_LOG(LOG_INFO, "starting shutdown");
2403 close_listen_sockets ();
2405 pthread_mutex_lock (&connection_threads_lock);
2406 while (connection_threads_num > 0)
2407 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2408 pthread_mutex_unlock (&connection_threads_lock);
2410 free(pollfds);
2412 return (NULL);
2413 } /* }}} void *listen_thread_main */
2415 static int daemonize (void) /* {{{ */
2416 {
2417 int pid_fd;
2418 char *base_dir;
2420 daemon_uid = geteuid();
2422 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2423 if (pid_fd < 0)
2424 pid_fd = check_pidfile();
2425 if (pid_fd < 0)
2426 return pid_fd;
2428 /* open all the listen sockets */
2429 if (config_listen_address_list_len > 0)
2430 {
2431 for (size_t i = 0; i < config_listen_address_list_len; i++)
2432 open_listen_socket (config_listen_address_list[i]);
2434 rrd_free_ptrs((void ***) &config_listen_address_list,
2435 &config_listen_address_list_len);
2436 }
2437 else
2438 {
2439 listen_socket_t sock;
2440 memset(&sock, 0, sizeof(sock));
2441 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2442 open_listen_socket (&sock);
2443 }
2445 if (listen_fds_num < 1)
2446 {
2447 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2448 goto error;
2449 }
2451 if (!stay_foreground)
2452 {
2453 pid_t child;
2455 child = fork ();
2456 if (child < 0)
2457 {
2458 fprintf (stderr, "daemonize: fork(2) failed.\n");
2459 goto error;
2460 }
2461 else if (child > 0)
2462 exit(0);
2464 /* Become session leader */
2465 setsid ();
2467 /* Open the first three file descriptors to /dev/null */
2468 close (2);
2469 close (1);
2470 close (0);
2472 open ("/dev/null", O_RDWR);
2473 if (dup(0) == -1 || dup(0) == -1){
2474 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2475 }
2476 } /* if (!stay_foreground) */
2478 /* Change into the /tmp directory. */
2479 base_dir = (config_base_dir != NULL)
2480 ? config_base_dir
2481 : "/tmp";
2483 if (chdir (base_dir) != 0)
2484 {
2485 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2486 goto error;
2487 }
2489 install_signal_handlers();
2491 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2492 RRDD_LOG(LOG_INFO, "starting up");
2494 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2495 (GDestroyNotify) free_cache_item);
2496 if (cache_tree == NULL)
2497 {
2498 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2499 goto error;
2500 }
2502 return write_pidfile (pid_fd);
2504 error:
2505 remove_pidfile();
2506 return -1;
2507 } /* }}} int daemonize */
2509 static int cleanup (void) /* {{{ */
2510 {
2511 do_shutdown++;
2513 pthread_cond_broadcast (&flush_cond);
2514 pthread_join (flush_thread, NULL);
2516 pthread_cond_broadcast (&queue_cond);
2517 for (int i = 0; i < config_queue_threads; i++)
2518 pthread_join (queue_threads[i], NULL);
2520 if (config_flush_at_shutdown)
2521 {
2522 assert(cache_queue_head == NULL);
2523 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2524 }
2526 journal_done();
2527 remove_pidfile ();
2529 free(queue_threads);
2530 free(config_base_dir);
2531 free(config_pid_file);
2532 free(journal_cur);
2533 free(journal_old);
2535 pthread_mutex_lock(&cache_lock);
2536 g_tree_destroy(cache_tree);
2538 RRDD_LOG(LOG_INFO, "goodbye");
2539 closelog ();
2541 return (0);
2542 } /* }}} int cleanup */
2544 static int read_options (int argc, char **argv) /* {{{ */
2545 {
2546 int option;
2547 int status = 0;
2549 while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2550 {
2551 switch (option)
2552 {
2553 case 'g':
2554 stay_foreground=1;
2555 break;
2557 case 'L':
2558 case 'l':
2559 {
2560 listen_socket_t *new;
2562 new = malloc(sizeof(listen_socket_t));
2563 if (new == NULL)
2564 {
2565 fprintf(stderr, "read_options: malloc failed.\n");
2566 return(2);
2567 }
2568 memset(new, 0, sizeof(listen_socket_t));
2570 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2571 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2573 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2574 &config_listen_address_list_len, new))
2575 {
2576 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2577 return (2);
2578 }
2579 }
2580 break;
2582 case 'f':
2583 {
2584 int temp;
2586 temp = atoi (optarg);
2587 if (temp > 0)
2588 config_flush_interval = temp;
2589 else
2590 {
2591 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2592 status = 3;
2593 }
2594 }
2595 break;
2597 case 'w':
2598 {
2599 int temp;
2601 temp = atoi (optarg);
2602 if (temp > 0)
2603 config_write_interval = temp;
2604 else
2605 {
2606 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2607 status = 2;
2608 }
2609 }
2610 break;
2612 case 'z':
2613 {
2614 int temp;
2616 temp = atoi(optarg);
2617 if (temp > 0)
2618 config_write_jitter = temp;
2619 else
2620 {
2621 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2622 status = 2;
2623 }
2625 break;
2626 }
2628 case 't':
2629 {
2630 int threads;
2631 threads = atoi(optarg);
2632 if (threads >= 1)
2633 config_queue_threads = threads;
2634 else
2635 {
2636 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2637 return 1;
2638 }
2639 }
2640 break;
2642 case 'B':
2643 config_write_base_only = 1;
2644 break;
2646 case 'b':
2647 {
2648 size_t len;
2649 char base_realpath[PATH_MAX];
2651 if (config_base_dir != NULL)
2652 free (config_base_dir);
2653 config_base_dir = strdup (optarg);
2654 if (config_base_dir == NULL)
2655 {
2656 fprintf (stderr, "read_options: strdup failed.\n");
2657 return (3);
2658 }
2660 /* make sure that the base directory is not resolved via
2661 * symbolic links. this makes some performance-enhancing
2662 * assumptions possible (we don't have to resolve paths
2663 * that start with a "/")
2664 */
2665 if (realpath(config_base_dir, base_realpath) == NULL)
2666 {
2667 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2668 return 5;
2669 }
2670 else if (strncmp(config_base_dir,
2671 base_realpath, sizeof(base_realpath)) != 0)
2672 {
2673 fprintf(stderr,
2674 "Base directory (-b) resolved via file system links!\n"
2675 "Please consult rrdcached '-b' documentation!\n"
2676 "Consider specifying the real directory (%s)\n",
2677 base_realpath);
2678 return 5;
2679 }
2681 len = strlen (config_base_dir);
2682 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2683 {
2684 config_base_dir[len - 1] = 0;
2685 len--;
2686 }
2688 if (len < 1)
2689 {
2690 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2691 return (4);
2692 }
2694 _config_base_dir_len = len;
2695 }
2696 break;
2698 case 'p':
2699 {
2700 if (config_pid_file != NULL)
2701 free (config_pid_file);
2702 config_pid_file = strdup (optarg);
2703 if (config_pid_file == NULL)
2704 {
2705 fprintf (stderr, "read_options: strdup failed.\n");
2706 return (3);
2707 }
2708 }
2709 break;
2711 case 'F':
2712 config_flush_at_shutdown = 1;
2713 break;
2715 case 'j':
2716 {
2717 struct stat statbuf;
2718 const char *dir = optarg;
2720 status = stat(dir, &statbuf);
2721 if (status != 0)
2722 {
2723 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2724 return 6;
2725 }
2727 if (!S_ISDIR(statbuf.st_mode)
2728 || access(dir, R_OK|W_OK|X_OK) != 0)
2729 {
2730 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2731 errno ? rrd_strerror(errno) : "");
2732 return 6;
2733 }
2735 journal_cur = malloc(PATH_MAX + 1);
2736 journal_old = malloc(PATH_MAX + 1);
2737 if (journal_cur == NULL || journal_old == NULL)
2738 {
2739 fprintf(stderr, "malloc failure for journal files\n");
2740 return 6;
2741 }
2742 else
2743 {
2744 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2745 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2746 }
2747 }
2748 break;
2750 case 'h':
2751 case '?':
2752 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2753 "\n"
2754 "Usage: rrdcached [options]\n"
2755 "\n"
2756 "Valid options are:\n"
2757 " -l <address> Socket address to listen to.\n"
2758 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2759 " -w <seconds> Interval in which to write data.\n"
2760 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2761 " -t <threads> Number of write threads.\n"
2762 " -f <seconds> Interval in which to flush dead data.\n"
2763 " -p <file> Location of the PID-file.\n"
2764 " -b <dir> Base directory to change to.\n"
2765 " -B Restrict file access to paths within -b <dir>\n"
2766 " -g Do not fork and run in the foreground.\n"
2767 " -j <dir> Directory in which to create the journal files.\n"
2768 " -F Always flush all updates at shutdown\n"
2769 "\n"
2770 "For more information and a detailed description of all options "
2771 "please refer\n"
2772 "to the rrdcached(1) manual page.\n",
2773 VERSION);
2774 status = -1;
2775 break;
2776 } /* switch (option) */
2777 } /* while (getopt) */
2779 /* advise the user when values are not sane */
2780 if (config_flush_interval < 2 * config_write_interval)
2781 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2782 " 2x write interval (-w) !\n");
2783 if (config_write_jitter > config_write_interval)
2784 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2785 " write interval (-w) !\n");
2787 if (config_write_base_only && config_base_dir == NULL)
2788 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2789 " Consult the rrdcached documentation\n");
2791 if (journal_cur == NULL)
2792 config_flush_at_shutdown = 1;
2794 return (status);
2795 } /* }}} int read_options */
2797 int main (int argc, char **argv)
2798 {
2799 int status;
2801 status = read_options (argc, argv);
2802 if (status != 0)
2803 {
2804 if (status < 0)
2805 status = 0;
2806 return (status);
2807 }
2809 status = daemonize ();
2810 if (status != 0)
2811 {
2812 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2813 return (1);
2814 }
2816 journal_init();
2818 /* start the queue threads */
2819 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2820 if (queue_threads == NULL)
2821 {
2822 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2823 cleanup();
2824 return (1);
2825 }
2826 for (int i = 0; i < config_queue_threads; i++)
2827 {
2828 memset (&queue_threads[i], 0, sizeof (*queue_threads));
2829 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2830 if (status != 0)
2831 {
2832 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2833 cleanup();
2834 return (1);
2835 }
2836 }
2838 /* start the flush thread */
2839 memset(&flush_thread, 0, sizeof(flush_thread));
2840 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2841 if (status != 0)
2842 {
2843 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2844 cleanup();
2845 return (1);
2846 }
2848 listen_thread_main (NULL);
2849 cleanup ();
2851 return (0);
2852 } /* int main */
2854 /*
2855 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2856 */