1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 # include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116 * Types
117 */
118 typedef enum
119 {
120 PRIV_LOW,
121 PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
127 {
128 int fd;
129 char addr[PATH_MAX + 1];
130 int family;
131 socket_privilege privilege;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO struct command *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
160 socket_privilege min_priv;
162 char context; /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT (1<<0)
164 #define CMD_CONTEXT_BATCH (1<<1)
165 #define CMD_CONTEXT_JOURNAL (1<<2)
166 #define CMD_CONTEXT_ANY (0x7f)
168 char *syntax;
169 char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176 char *file;
177 char **values;
178 size_t values_num;
179 time_t last_flush_time;
180 time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183 int flags;
184 pthread_cond_t flushed;
185 cache_item_t *prev;
186 cache_item_t *next;
187 };
189 struct callback_flush_data_s
190 {
191 time_t now;
192 time_t abs_timeout;
193 char **keys;
194 size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
199 {
200 HEAD,
201 TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210 * Variables
211 */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 static int do_shutdown = 0;
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
231 /* Cache stuff */
232 static GTree *cache_tree = NULL;
233 static cache_item_t *cache_queue_head = NULL;
234 static cache_item_t *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
237 static int config_write_interval = 300;
238 static int config_write_jitter = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
246 static listen_socket_t **config_listen_address_list = NULL;
247 static size_t config_listen_address_list_len = 0;
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
270 /*
271 * Functions
272 */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276 do_shutdown++;
277 pthread_cond_broadcast(&flush_cond);
278 pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283 sig_common("INT");
284 } /* }}} void sig_int_handler */
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288 sig_common("TERM");
289 } /* }}} void sig_term_handler */
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293 config_flush_at_shutdown = 1;
294 sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299 config_flush_at_shutdown = 0;
300 sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
303 static void install_signal_handlers(void) /* {{{ */
304 {
305 /* These structures are static, because `sigaction' behaves weird if the are
306 * overwritten.. */
307 static struct sigaction sa_int;
308 static struct sigaction sa_term;
309 static struct sigaction sa_pipe;
310 static struct sigaction sa_usr1;
311 static struct sigaction sa_usr2;
313 /* Install signal handlers */
314 memset (&sa_int, 0, sizeof (sa_int));
315 sa_int.sa_handler = sig_int_handler;
316 sigaction (SIGINT, &sa_int, NULL);
318 memset (&sa_term, 0, sizeof (sa_term));
319 sa_term.sa_handler = sig_term_handler;
320 sigaction (SIGTERM, &sa_term, NULL);
322 memset (&sa_pipe, 0, sizeof (sa_pipe));
323 sa_pipe.sa_handler = SIG_IGN;
324 sigaction (SIGPIPE, &sa_pipe, NULL);
326 memset (&sa_pipe, 0, sizeof (sa_usr1));
327 sa_usr1.sa_handler = sig_usr1_handler;
328 sigaction (SIGUSR1, &sa_usr1, NULL);
330 memset (&sa_usr2, 0, sizeof (sa_usr2));
331 sa_usr2.sa_handler = sig_usr2_handler;
332 sigaction (SIGUSR2, &sa_usr2, NULL);
334 } /* }}} void install_signal_handlers */
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338 int fd;
339 char *file;
341 file = (config_pid_file != NULL)
342 ? config_pid_file
343 : LOCALSTATEDIR "/run/rrdcached.pid";
345 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346 if (fd < 0)
347 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348 action, file, rrd_strerror(errno));
350 return(fd);
351 } /* }}} static int open_pidfile */
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356 int pid_fd;
357 pid_t pid;
358 char pid_str[16];
360 pid_fd = open_pidfile("open", O_RDWR);
361 if (pid_fd < 0)
362 return pid_fd;
364 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365 return -1;
367 pid = atoi(pid_str);
368 if (pid <= 0)
369 return -1;
371 /* another running process that we can signal COULD be
372 * a competing rrdcached */
373 if (pid != getpid() && kill(pid, 0) == 0)
374 {
375 fprintf(stderr,
376 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377 close(pid_fd);
378 return -1;
379 }
381 lseek(pid_fd, 0, SEEK_SET);
382 ftruncate(pid_fd, 0);
384 fprintf(stderr,
385 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
386 "rrdcached: starting normally.\n", pid);
388 return pid_fd;
389 } /* }}} static int check_pidfile */
391 static int write_pidfile (int fd) /* {{{ */
392 {
393 pid_t pid;
394 FILE *fh;
396 pid = getpid ();
398 fh = fdopen (fd, "w");
399 if (fh == NULL)
400 {
401 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
402 close(fd);
403 return (-1);
404 }
406 fprintf (fh, "%i\n", (int) pid);
407 fclose (fh);
409 return (0);
410 } /* }}} int write_pidfile */
412 static int remove_pidfile (void) /* {{{ */
413 {
414 char *file;
415 int status;
417 file = (config_pid_file != NULL)
418 ? config_pid_file
419 : LOCALSTATEDIR "/run/rrdcached.pid";
421 status = unlink (file);
422 if (status == 0)
423 return (0);
424 return (errno);
425 } /* }}} int remove_pidfile */
427 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
428 {
429 char *eol;
431 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
432 sock->next_read - sock->next_cmd);
434 if (eol == NULL)
435 {
436 /* no commands left, move remainder back to front of rbuf */
437 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
438 sock->next_read - sock->next_cmd);
439 sock->next_read -= sock->next_cmd;
440 sock->next_cmd = 0;
441 *len = 0;
442 return NULL;
443 }
444 else
445 {
446 char *cmd = sock->rbuf + sock->next_cmd;
447 *eol = '\0';
449 sock->next_cmd = eol - sock->rbuf + 1;
451 if (eol > sock->rbuf && *(eol-1) == '\r')
452 *(--eol) = '\0'; /* handle "\r\n" EOL */
454 *len = eol - cmd;
456 return cmd;
457 }
459 /* NOTREACHED */
460 assert(1==0);
461 }
463 /* add the characters directly to the write buffer */
464 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
465 {
466 char *new_buf;
468 assert(sock != NULL);
470 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
471 if (new_buf == NULL)
472 {
473 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
474 return -1;
475 }
477 strncpy(new_buf + sock->wbuf_len, str, len + 1);
479 sock->wbuf = new_buf;
480 sock->wbuf_len += len;
482 return 0;
483 } /* }}} static int add_to_wbuf */
485 /* add the text to the "extra" info that's sent after the status line */
486 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
487 {
488 va_list argp;
489 char buffer[CMD_MAX];
490 int len;
492 if (sock == NULL) return 0; /* journal replay mode */
493 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
495 va_start(argp, fmt);
496 #ifdef HAVE_VSNPRINTF
497 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
498 #else
499 len = vsprintf(buffer, fmt, argp);
500 #endif
501 va_end(argp);
502 if (len < 0)
503 {
504 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
505 return -1;
506 }
508 return add_to_wbuf(sock, buffer, len);
509 } /* }}} static int add_response_info */
511 static int count_lines(char *str) /* {{{ */
512 {
513 int lines = 0;
515 if (str != NULL)
516 {
517 while ((str = strchr(str, '\n')) != NULL)
518 {
519 ++lines;
520 ++str;
521 }
522 }
524 return lines;
525 } /* }}} static int count_lines */
527 /* send the response back to the user.
528 * returns 0 on success, -1 on error
529 * write buffer is always zeroed after this call */
530 static int send_response (listen_socket_t *sock, response_code rc,
531 char *fmt, ...) /* {{{ */
532 {
533 va_list argp;
534 char buffer[CMD_MAX];
535 int lines;
536 ssize_t wrote;
537 int rclen, len;
539 if (sock == NULL) return rc; /* journal replay mode */
541 if (sock->batch_start)
542 {
543 if (rc == RESP_OK)
544 return rc; /* no response on success during BATCH */
545 lines = sock->batch_cmd;
546 }
547 else if (rc == RESP_OK)
548 lines = count_lines(sock->wbuf);
549 else
550 lines = -1;
552 rclen = sprintf(buffer, "%d ", lines);
553 va_start(argp, fmt);
554 #ifdef HAVE_VSNPRINTF
555 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
556 #else
557 len = vsprintf(buffer+rclen, fmt, argp);
558 #endif
559 va_end(argp);
560 if (len < 0)
561 return -1;
563 len += rclen;
565 /* append the result to the wbuf, don't write to the user */
566 if (sock->batch_start)
567 return add_to_wbuf(sock, buffer, len);
569 /* first write must be complete */
570 if (len != write(sock->fd, buffer, len))
571 {
572 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
573 return -1;
574 }
576 if (sock->wbuf != NULL && rc == RESP_OK)
577 {
578 wrote = 0;
579 while (wrote < sock->wbuf_len)
580 {
581 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
582 if (wb <= 0)
583 {
584 RRDD_LOG(LOG_INFO, "send_response: could not write results");
585 return -1;
586 }
587 wrote += wb;
588 }
589 }
591 free(sock->wbuf); sock->wbuf = NULL;
592 sock->wbuf_len = 0;
594 return 0;
595 } /* }}} */
597 static void wipe_ci_values(cache_item_t *ci, time_t when)
598 {
599 ci->values = NULL;
600 ci->values_num = 0;
602 ci->last_flush_time = when;
603 if (config_write_jitter > 0)
604 ci->last_flush_time += (rrd_random() % config_write_jitter);
605 }
607 /* remove_from_queue
608 * remove a "cache_item_t" item from the queue.
609 * must hold 'cache_lock' when calling this
610 */
611 static void remove_from_queue(cache_item_t *ci) /* {{{ */
612 {
613 if (ci == NULL) return;
614 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
616 if (ci->prev == NULL)
617 cache_queue_head = ci->next; /* reset head */
618 else
619 ci->prev->next = ci->next;
621 if (ci->next == NULL)
622 cache_queue_tail = ci->prev; /* reset the tail */
623 else
624 ci->next->prev = ci->prev;
626 ci->next = ci->prev = NULL;
627 ci->flags &= ~CI_FLAGS_IN_QUEUE;
629 pthread_mutex_lock (&stats_lock);
630 assert (stats_queue_length > 0);
631 stats_queue_length--;
632 pthread_mutex_unlock (&stats_lock);
634 } /* }}} static void remove_from_queue */
636 /* free the resources associated with the cache_item_t
637 * must hold cache_lock when calling this function
638 */
639 static void *free_cache_item(cache_item_t *ci) /* {{{ */
640 {
641 if (ci == NULL) return NULL;
643 remove_from_queue(ci);
645 for (size_t i=0; i < ci->values_num; i++)
646 free(ci->values[i]);
648 free (ci->values);
649 free (ci->file);
651 /* in case anyone is waiting */
652 pthread_cond_broadcast(&ci->flushed);
654 free (ci);
656 return NULL;
657 } /* }}} static void *free_cache_item */
659 /*
660 * enqueue_cache_item:
661 * `cache_lock' must be acquired before calling this function!
662 */
663 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
664 queue_side_t side)
665 {
666 if (ci == NULL)
667 return (-1);
669 if (ci->values_num == 0)
670 return (0);
672 if (side == HEAD)
673 {
674 if (cache_queue_head == ci)
675 return 0;
677 /* remove if further down in queue */
678 remove_from_queue(ci);
680 ci->prev = NULL;
681 ci->next = cache_queue_head;
682 if (ci->next != NULL)
683 ci->next->prev = ci;
684 cache_queue_head = ci;
686 if (cache_queue_tail == NULL)
687 cache_queue_tail = cache_queue_head;
688 }
689 else /* (side == TAIL) */
690 {
691 /* We don't move values back in the list.. */
692 if (ci->flags & CI_FLAGS_IN_QUEUE)
693 return (0);
695 assert (ci->next == NULL);
696 assert (ci->prev == NULL);
698 ci->prev = cache_queue_tail;
700 if (cache_queue_tail == NULL)
701 cache_queue_head = ci;
702 else
703 cache_queue_tail->next = ci;
705 cache_queue_tail = ci;
706 }
708 ci->flags |= CI_FLAGS_IN_QUEUE;
710 pthread_cond_signal(&queue_cond);
711 pthread_mutex_lock (&stats_lock);
712 stats_queue_length++;
713 pthread_mutex_unlock (&stats_lock);
715 return (0);
716 } /* }}} int enqueue_cache_item */
718 /*
719 * tree_callback_flush:
720 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
721 * while this is in progress.
722 */
723 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
724 gpointer data)
725 {
726 cache_item_t *ci;
727 callback_flush_data_t *cfd;
729 ci = (cache_item_t *) value;
730 cfd = (callback_flush_data_t *) data;
732 if (ci->flags & CI_FLAGS_IN_QUEUE)
733 return FALSE;
735 if ((ci->last_flush_time <= cfd->abs_timeout)
736 && (ci->values_num > 0))
737 {
738 enqueue_cache_item (ci, TAIL);
739 }
740 else if ((do_shutdown != 0)
741 && (ci->values_num > 0))
742 {
743 enqueue_cache_item (ci, TAIL);
744 }
745 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
746 && (ci->values_num <= 0))
747 {
748 assert ((char *) key == ci->file);
749 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
750 {
751 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
752 return (FALSE);
753 }
754 }
756 return (FALSE);
757 } /* }}} gboolean tree_callback_flush */
759 static int flush_old_values (int max_age)
760 {
761 callback_flush_data_t cfd;
762 size_t k;
764 memset (&cfd, 0, sizeof (cfd));
765 /* Pass the current time as user data so that we don't need to call
766 * `time' for each node. */
767 cfd.now = time (NULL);
768 cfd.keys = NULL;
769 cfd.keys_num = 0;
771 if (max_age > 0)
772 cfd.abs_timeout = cfd.now - max_age;
773 else
774 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
776 /* `tree_callback_flush' will return the keys of all values that haven't
777 * been touched in the last `config_flush_interval' seconds in `cfd'.
778 * The char*'s in this array point to the same memory as ci->file, so we
779 * don't need to free them separately. */
780 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
782 for (k = 0; k < cfd.keys_num; k++)
783 {
784 /* should never fail, since we have held the cache_lock
785 * the entire time */
786 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
787 }
789 if (cfd.keys != NULL)
790 {
791 free (cfd.keys);
792 cfd.keys = NULL;
793 }
795 return (0);
796 } /* int flush_old_values */
798 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
799 {
800 struct timeval now;
801 struct timespec next_flush;
802 int status;
804 gettimeofday (&now, NULL);
805 next_flush.tv_sec = now.tv_sec + config_flush_interval;
806 next_flush.tv_nsec = 1000 * now.tv_usec;
808 pthread_mutex_lock(&cache_lock);
810 while (!do_shutdown)
811 {
812 gettimeofday (&now, NULL);
813 if ((now.tv_sec > next_flush.tv_sec)
814 || ((now.tv_sec == next_flush.tv_sec)
815 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
816 {
817 /* Flush all values that haven't been written in the last
818 * `config_write_interval' seconds. */
819 flush_old_values (config_write_interval);
821 /* Determine the time of the next cache flush. */
822 next_flush.tv_sec =
823 now.tv_sec + next_flush.tv_sec % config_flush_interval;
825 /* unlock the cache while we rotate so we don't block incoming
826 * updates if the fsync() blocks on disk I/O */
827 pthread_mutex_unlock(&cache_lock);
828 journal_rotate();
829 pthread_mutex_lock(&cache_lock);
830 }
832 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
833 if (status != 0 && status != ETIMEDOUT)
834 {
835 RRDD_LOG (LOG_ERR, "flush_thread_main: "
836 "pthread_cond_timedwait returned %i.", status);
837 }
838 }
840 if (config_flush_at_shutdown)
841 flush_old_values (-1); /* flush everything */
843 pthread_mutex_unlock(&cache_lock);
845 return NULL;
846 } /* void *flush_thread_main */
848 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
849 {
850 pthread_mutex_lock (&cache_lock);
852 while (!do_shutdown
853 || (cache_queue_head != NULL && config_flush_at_shutdown))
854 {
855 cache_item_t *ci;
856 char *file;
857 char **values;
858 size_t values_num;
859 int status;
861 /* Now, check if there's something to store away. If not, wait until
862 * something comes in. if we are shutting down, do not wait around. */
863 if (cache_queue_head == NULL && !do_shutdown)
864 {
865 status = pthread_cond_wait (&queue_cond, &cache_lock);
866 if ((status != 0) && (status != ETIMEDOUT))
867 {
868 RRDD_LOG (LOG_ERR, "queue_thread_main: "
869 "pthread_cond_wait returned %i.", status);
870 }
871 }
873 /* Check if a value has arrived. This may be NULL if we timed out or there
874 * was an interrupt such as a signal. */
875 if (cache_queue_head == NULL)
876 continue;
878 ci = cache_queue_head;
880 /* copy the relevant parts */
881 file = strdup (ci->file);
882 if (file == NULL)
883 {
884 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
885 continue;
886 }
888 assert(ci->values != NULL);
889 assert(ci->values_num > 0);
891 values = ci->values;
892 values_num = ci->values_num;
894 wipe_ci_values(ci, time(NULL));
895 remove_from_queue(ci);
897 pthread_mutex_unlock (&cache_lock);
899 rrd_clear_error ();
900 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
901 if (status != 0)
902 {
903 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
904 "rrd_update_r (%s) failed with status %i. (%s)",
905 file, status, rrd_get_error());
906 }
908 journal_write("wrote", file);
909 pthread_cond_broadcast(&ci->flushed);
911 rrd_free_ptrs((void ***) &values, &values_num);
912 free(file);
914 if (status == 0)
915 {
916 pthread_mutex_lock (&stats_lock);
917 stats_updates_written++;
918 stats_data_sets_written += values_num;
919 pthread_mutex_unlock (&stats_lock);
920 }
922 pthread_mutex_lock (&cache_lock);
923 }
924 pthread_mutex_unlock (&cache_lock);
926 return (NULL);
927 } /* }}} void *queue_thread_main */
929 static int buffer_get_field (char **buffer_ret, /* {{{ */
930 size_t *buffer_size_ret, char **field_ret)
931 {
932 char *buffer;
933 size_t buffer_pos;
934 size_t buffer_size;
935 char *field;
936 size_t field_size;
937 int status;
939 buffer = *buffer_ret;
940 buffer_pos = 0;
941 buffer_size = *buffer_size_ret;
942 field = *buffer_ret;
943 field_size = 0;
945 if (buffer_size <= 0)
946 return (-1);
948 /* This is ensured by `handle_request'. */
949 assert (buffer[buffer_size - 1] == '\0');
951 status = -1;
952 while (buffer_pos < buffer_size)
953 {
954 /* Check for end-of-field or end-of-buffer */
955 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
956 {
957 field[field_size] = 0;
958 field_size++;
959 buffer_pos++;
960 status = 0;
961 break;
962 }
963 /* Handle escaped characters. */
964 else if (buffer[buffer_pos] == '\\')
965 {
966 if (buffer_pos >= (buffer_size - 1))
967 break;
968 buffer_pos++;
969 field[field_size] = buffer[buffer_pos];
970 field_size++;
971 buffer_pos++;
972 }
973 /* Normal operation */
974 else
975 {
976 field[field_size] = buffer[buffer_pos];
977 field_size++;
978 buffer_pos++;
979 }
980 } /* while (buffer_pos < buffer_size) */
982 if (status != 0)
983 return (status);
985 *buffer_ret = buffer + buffer_pos;
986 *buffer_size_ret = buffer_size - buffer_pos;
987 *field_ret = field;
989 return (0);
990 } /* }}} int buffer_get_field */
992 /* if we're restricting writes to the base directory,
993 * check whether the file falls within the dir
994 * returns 1 if OK, otherwise 0
995 */
996 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
997 {
998 assert(file != NULL);
1000 if (!config_write_base_only
1001 || sock == NULL /* journal replay */
1002 || config_base_dir == NULL)
1003 return 1;
1005 if (strstr(file, "../") != NULL) goto err;
1007 /* relative paths without "../" are ok */
1008 if (*file != '/') return 1;
1010 /* file must be of the format base + "/" + <1+ char filename> */
1011 if (strlen(file) < _config_base_dir_len + 2) goto err;
1012 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1013 if (*(file + _config_base_dir_len) != '/') goto err;
1015 return 1;
1017 err:
1018 if (sock != NULL && sock->fd >= 0)
1019 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1021 return 0;
1022 } /* }}} static int check_file_access */
1024 /* when using a base dir, convert relative paths to absolute paths.
1025 * if necessary, modifies the "filename" pointer to point
1026 * to the new path created in "tmp". "tmp" is provided
1027 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1028 *
1029 * this allows us to optimize for the expected case (absolute path)
1030 * with a no-op.
1031 */
1032 static void get_abs_path(char **filename, char *tmp)
1033 {
1034 assert(tmp != NULL);
1035 assert(filename != NULL && *filename != NULL);
1037 if (config_base_dir == NULL || **filename == '/')
1038 return;
1040 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1041 *filename = tmp;
1042 } /* }}} static int get_abs_path */
1044 /* returns 1 if we have the required privilege level,
1045 * otherwise issue an error to the user on sock */
1046 static int has_privilege (listen_socket_t *sock, /* {{{ */
1047 socket_privilege priv)
1048 {
1049 if (sock == NULL) /* journal replay */
1050 return 1;
1052 if (sock->privilege >= priv)
1053 return 1;
1055 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1056 } /* }}} static int has_privilege */
1058 static int flush_file (const char *filename) /* {{{ */
1059 {
1060 cache_item_t *ci;
1062 pthread_mutex_lock (&cache_lock);
1064 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1065 if (ci == NULL)
1066 {
1067 pthread_mutex_unlock (&cache_lock);
1068 return (ENOENT);
1069 }
1071 if (ci->values_num > 0)
1072 {
1073 /* Enqueue at head */
1074 enqueue_cache_item (ci, HEAD);
1075 pthread_cond_wait(&ci->flushed, &cache_lock);
1076 }
1078 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1079 * may have been purged during our cond_wait() */
1081 pthread_mutex_unlock(&cache_lock);
1083 return (0);
1084 } /* }}} int flush_file */
1086 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1087 {
1088 char *err = "Syntax error.\n";
1090 if (cmd && cmd->syntax)
1091 err = cmd->syntax;
1093 return send_response(sock, RESP_ERR, "Usage: %s", err);
1094 } /* }}} static int syntax_error() */
1096 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1097 {
1098 uint64_t copy_queue_length;
1099 uint64_t copy_updates_received;
1100 uint64_t copy_flush_received;
1101 uint64_t copy_updates_written;
1102 uint64_t copy_data_sets_written;
1103 uint64_t copy_journal_bytes;
1104 uint64_t copy_journal_rotate;
1106 uint64_t tree_nodes_number;
1107 uint64_t tree_depth;
1109 pthread_mutex_lock (&stats_lock);
1110 copy_queue_length = stats_queue_length;
1111 copy_updates_received = stats_updates_received;
1112 copy_flush_received = stats_flush_received;
1113 copy_updates_written = stats_updates_written;
1114 copy_data_sets_written = stats_data_sets_written;
1115 copy_journal_bytes = stats_journal_bytes;
1116 copy_journal_rotate = stats_journal_rotate;
1117 pthread_mutex_unlock (&stats_lock);
1119 pthread_mutex_lock (&cache_lock);
1120 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1121 tree_depth = (uint64_t) g_tree_height (cache_tree);
1122 pthread_mutex_unlock (&cache_lock);
1124 add_response_info(sock,
1125 "QueueLength: %"PRIu64"\n", copy_queue_length);
1126 add_response_info(sock,
1127 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1128 add_response_info(sock,
1129 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1130 add_response_info(sock,
1131 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1132 add_response_info(sock,
1133 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1134 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1135 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1136 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1137 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1139 send_response(sock, RESP_OK, "Statistics follow\n");
1141 return (0);
1142 } /* }}} int handle_request_stats */
1144 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1145 {
1146 char *file, file_tmp[PATH_MAX];
1147 int status;
1149 status = buffer_get_field (&buffer, &buffer_size, &file);
1150 if (status != 0)
1151 {
1152 return syntax_error(sock,cmd);
1153 }
1154 else
1155 {
1156 pthread_mutex_lock(&stats_lock);
1157 stats_flush_received++;
1158 pthread_mutex_unlock(&stats_lock);
1160 get_abs_path(&file, file_tmp);
1161 if (!check_file_access(file, sock)) return 0;
1163 status = flush_file (file);
1164 if (status == 0)
1165 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1166 else if (status == ENOENT)
1167 {
1168 /* no file in our tree; see whether it exists at all */
1169 struct stat statbuf;
1171 memset(&statbuf, 0, sizeof(statbuf));
1172 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1173 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1174 else
1175 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1176 }
1177 else if (status < 0)
1178 return send_response(sock, RESP_ERR, "Internal error.\n");
1179 else
1180 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1181 }
1183 /* NOTREACHED */
1184 assert(1==0);
1185 } /* }}} int handle_request_flush */
1187 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1188 {
1189 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1191 pthread_mutex_lock(&cache_lock);
1192 flush_old_values(-1);
1193 pthread_mutex_unlock(&cache_lock);
1195 return send_response(sock, RESP_OK, "Started flush.\n");
1196 } /* }}} static int handle_request_flushall */
1198 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1199 {
1200 int status;
1201 char *file, file_tmp[PATH_MAX];
1202 cache_item_t *ci;
1204 status = buffer_get_field(&buffer, &buffer_size, &file);
1205 if (status != 0)
1206 return syntax_error(sock,cmd);
1208 get_abs_path(&file, file_tmp);
1210 pthread_mutex_lock(&cache_lock);
1211 ci = g_tree_lookup(cache_tree, file);
1212 if (ci == NULL)
1213 {
1214 pthread_mutex_unlock(&cache_lock);
1215 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1216 }
1218 for (size_t i=0; i < ci->values_num; i++)
1219 add_response_info(sock, "%s\n", ci->values[i]);
1221 pthread_mutex_unlock(&cache_lock);
1222 return send_response(sock, RESP_OK, "updates pending\n");
1223 } /* }}} static int handle_request_pending */
1225 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1226 {
1227 int status;
1228 gboolean found;
1229 char *file, file_tmp[PATH_MAX];
1231 status = buffer_get_field(&buffer, &buffer_size, &file);
1232 if (status != 0)
1233 return syntax_error(sock,cmd);
1235 get_abs_path(&file, file_tmp);
1236 if (!check_file_access(file, sock)) return 0;
1238 pthread_mutex_lock(&cache_lock);
1239 found = g_tree_remove(cache_tree, file);
1240 pthread_mutex_unlock(&cache_lock);
1242 if (found == TRUE)
1243 {
1244 if (sock != NULL)
1245 journal_write("forget", file);
1247 return send_response(sock, RESP_OK, "Gone!\n");
1248 }
1249 else
1250 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1252 /* NOTREACHED */
1253 assert(1==0);
1254 } /* }}} static int handle_request_forget */
1256 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1257 {
1258 cache_item_t *ci;
1260 pthread_mutex_lock(&cache_lock);
1262 ci = cache_queue_head;
1263 while (ci != NULL)
1264 {
1265 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1266 ci = ci->next;
1267 }
1269 pthread_mutex_unlock(&cache_lock);
1271 return send_response(sock, RESP_OK, "in queue.\n");
1272 } /* }}} int handle_request_queue */
1274 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1275 {
1276 char *file, file_tmp[PATH_MAX];
1277 int values_num = 0;
1278 int status;
1279 char orig_buf[CMD_MAX];
1281 cache_item_t *ci;
1283 /* save it for the journal later */
1284 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1286 status = buffer_get_field (&buffer, &buffer_size, &file);
1287 if (status != 0)
1288 return syntax_error(sock,cmd);
1290 pthread_mutex_lock(&stats_lock);
1291 stats_updates_received++;
1292 pthread_mutex_unlock(&stats_lock);
1294 get_abs_path(&file, file_tmp);
1295 if (!check_file_access(file, sock)) return 0;
1297 pthread_mutex_lock (&cache_lock);
1298 ci = g_tree_lookup (cache_tree, file);
1300 if (ci == NULL) /* {{{ */
1301 {
1302 struct stat statbuf;
1304 /* don't hold the lock while we setup; stat(2) might block */
1305 pthread_mutex_unlock(&cache_lock);
1307 memset (&statbuf, 0, sizeof (statbuf));
1308 status = stat (file, &statbuf);
1309 if (status != 0)
1310 {
1311 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1313 status = errno;
1314 if (status == ENOENT)
1315 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1316 else
1317 return send_response(sock, RESP_ERR,
1318 "stat failed with error %i.\n", status);
1319 }
1320 if (!S_ISREG (statbuf.st_mode))
1321 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1323 if (access(file, R_OK|W_OK) != 0)
1324 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1325 file, rrd_strerror(errno));
1327 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1328 if (ci == NULL)
1329 {
1330 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1332 return send_response(sock, RESP_ERR, "malloc failed.\n");
1333 }
1334 memset (ci, 0, sizeof (cache_item_t));
1336 ci->file = strdup (file);
1337 if (ci->file == NULL)
1338 {
1339 free (ci);
1340 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1342 return send_response(sock, RESP_ERR, "strdup failed.\n");
1343 }
1345 wipe_ci_values(ci, now);
1346 ci->flags = CI_FLAGS_IN_TREE;
1347 pthread_cond_init(&ci->flushed, NULL);
1349 pthread_mutex_lock(&cache_lock);
1350 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1351 } /* }}} */
1352 assert (ci != NULL);
1354 /* don't re-write updates in replay mode */
1355 if (sock != NULL)
1356 journal_write("update", orig_buf);
1358 while (buffer_size > 0)
1359 {
1360 char *value;
1361 time_t stamp;
1362 char *eostamp;
1364 status = buffer_get_field (&buffer, &buffer_size, &value);
1365 if (status != 0)
1366 {
1367 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1368 break;
1369 }
1371 /* make sure update time is always moving forward */
1372 stamp = strtol(value, &eostamp, 10);
1373 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1374 {
1375 pthread_mutex_unlock(&cache_lock);
1376 return send_response(sock, RESP_ERR,
1377 "Cannot find timestamp in '%s'!\n", value);
1378 }
1379 else if (stamp <= ci->last_update_stamp)
1380 {
1381 pthread_mutex_unlock(&cache_lock);
1382 return send_response(sock, RESP_ERR,
1383 "illegal attempt to update using time %ld when last"
1384 " update time is %ld (minimum one second step)\n",
1385 stamp, ci->last_update_stamp);
1386 }
1387 else
1388 ci->last_update_stamp = stamp;
1390 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1391 {
1392 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1393 continue;
1394 }
1396 values_num++;
1397 }
1399 if (((now - ci->last_flush_time) >= config_write_interval)
1400 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1401 && (ci->values_num > 0))
1402 {
1403 enqueue_cache_item (ci, TAIL);
1404 }
1406 pthread_mutex_unlock (&cache_lock);
1408 if (values_num < 1)
1409 return send_response(sock, RESP_ERR, "No values updated.\n");
1410 else
1411 return send_response(sock, RESP_OK,
1412 "errors, enqueued %i value(s).\n", values_num);
1414 /* NOTREACHED */
1415 assert(1==0);
1417 } /* }}} int handle_request_update */
1419 /* we came across a "WROTE" entry during journal replay.
1420 * throw away any values that we have accumulated for this file
1421 */
1422 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1423 {
1424 cache_item_t *ci;
1425 const char *file = buffer;
1427 pthread_mutex_lock(&cache_lock);
1429 ci = g_tree_lookup(cache_tree, file);
1430 if (ci == NULL)
1431 {
1432 pthread_mutex_unlock(&cache_lock);
1433 return (0);
1434 }
1436 if (ci->values)
1437 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1439 wipe_ci_values(ci, now);
1440 remove_from_queue(ci);
1442 pthread_mutex_unlock(&cache_lock);
1443 return (0);
1444 } /* }}} int handle_request_wrote */
1446 /* start "BATCH" processing */
1447 static int batch_start (HANDLER_PROTO) /* {{{ */
1448 {
1449 int status;
1450 if (sock->batch_start)
1451 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1453 status = send_response(sock, RESP_OK,
1454 "Go ahead. End with dot '.' on its own line.\n");
1455 sock->batch_start = time(NULL);
1456 sock->batch_cmd = 0;
1458 return status;
1459 } /* }}} static int batch_start */
1461 /* finish "BATCH" processing and return results to the client */
1462 static int batch_done (HANDLER_PROTO) /* {{{ */
1463 {
1464 assert(sock->batch_start);
1465 sock->batch_start = 0;
1466 sock->batch_cmd = 0;
1467 return send_response(sock, RESP_OK, "errors\n");
1468 } /* }}} static int batch_done */
1470 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1471 {
1472 return -1;
1473 } /* }}} static int handle_request_quit */
1475 struct command COMMANDS[] = {
1476 {
1477 "UPDATE",
1478 handle_request_update,
1479 PRIV_HIGH,
1480 CMD_CONTEXT_ANY,
1481 "UPDATE <filename> <values> [<values> ...]\n"
1482 ,
1483 "Adds the given file to the internal cache if it is not yet known and\n"
1484 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1485 "for details.\n"
1486 "\n"
1487 "Each <values> has the following form:\n"
1488 " <values> = <time>:<value>[:<value>[...]]\n"
1489 "See the rrdupdate(1) manpage for details.\n"
1490 },
1491 {
1492 "WROTE",
1493 handle_request_wrote,
1494 PRIV_HIGH,
1495 CMD_CONTEXT_JOURNAL,
1496 NULL,
1497 NULL
1498 },
1499 {
1500 "FLUSH",
1501 handle_request_flush,
1502 PRIV_LOW,
1503 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1504 "FLUSH <filename>\n"
1505 ,
1506 "Adds the given filename to the head of the update queue and returns\n"
1507 "after it has been dequeued.\n"
1508 },
1509 {
1510 "FLUSHALL",
1511 handle_request_flushall,
1512 PRIV_HIGH,
1513 CMD_CONTEXT_CLIENT,
1514 "FLUSHALL\n"
1515 ,
1516 "Triggers writing of all pending updates. Returns immediately.\n"
1517 },
1518 {
1519 "PENDING",
1520 handle_request_pending,
1521 PRIV_HIGH,
1522 CMD_CONTEXT_CLIENT,
1523 "PENDING <filename>\n"
1524 ,
1525 "Shows any 'pending' updates for a file, in order.\n"
1526 "The updates shown have not yet been written to the underlying RRD file.\n"
1527 },
1528 {
1529 "FORGET",
1530 handle_request_forget,
1531 PRIV_HIGH,
1532 CMD_CONTEXT_ANY,
1533 "FORGET <filename>\n"
1534 ,
1535 "Removes the file completely from the cache.\n"
1536 "Any pending updates for the file will be lost.\n"
1537 },
1538 {
1539 "QUEUE",
1540 handle_request_queue,
1541 PRIV_LOW,
1542 CMD_CONTEXT_CLIENT,
1543 "QUEUE\n"
1544 ,
1545 "Shows all files in the output queue.\n"
1546 "The output is zero or more lines in the following format:\n"
1547 "(where <num_vals> is the number of values to be written)\n"
1548 "\n"
1549 "<num_vals> <filename>\n"
1550 },
1551 {
1552 "STATS",
1553 handle_request_stats,
1554 PRIV_LOW,
1555 CMD_CONTEXT_CLIENT,
1556 "STATS\n"
1557 ,
1558 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1559 "a description of the values.\n"
1560 },
1561 {
1562 "HELP",
1563 handle_request_help,
1564 PRIV_LOW,
1565 CMD_CONTEXT_CLIENT,
1566 "HELP [<command>]\n",
1567 NULL, /* special! */
1568 },
1569 {
1570 "BATCH",
1571 batch_start,
1572 PRIV_LOW,
1573 CMD_CONTEXT_CLIENT,
1574 "BATCH\n"
1575 ,
1576 "The 'BATCH' command permits the client to initiate a bulk load\n"
1577 " of commands to rrdcached.\n"
1578 "\n"
1579 "Usage:\n"
1580 "\n"
1581 " client: BATCH\n"
1582 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1583 " client: command #1\n"
1584 " client: command #2\n"
1585 " client: ... and so on\n"
1586 " client: .\n"
1587 " server: 2 errors\n"
1588 " server: 7 message for command #7\n"
1589 " server: 9 message for command #9\n"
1590 "\n"
1591 "For more information, consult the rrdcached(1) documentation.\n"
1592 },
1593 {
1594 ".", /* BATCH terminator */
1595 batch_done,
1596 PRIV_LOW,
1597 CMD_CONTEXT_BATCH,
1598 NULL,
1599 NULL
1600 },
1601 {
1602 "QUIT",
1603 handle_request_quit,
1604 PRIV_LOW,
1605 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1606 "QUIT\n"
1607 ,
1608 "Disconnect from rrdcached.\n"
1609 },
1610 {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */
1611 };
1613 static struct command *find_command(char *cmd)
1614 {
1615 struct command *c = COMMANDS;
1617 while (c->cmd != NULL)
1618 {
1619 if (strcasecmp(cmd, c->cmd) == 0)
1620 break;
1621 c++;
1622 }
1624 if (c->cmd == NULL)
1625 return NULL;
1626 else
1627 return c;
1628 }
1630 /* check whether commands are received in the expected context */
1631 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1632 {
1633 if (sock == NULL)
1634 return (cmd->context & CMD_CONTEXT_JOURNAL);
1635 else if (sock->batch_start)
1636 return (cmd->context & CMD_CONTEXT_BATCH);
1637 else
1638 return (cmd->context & CMD_CONTEXT_CLIENT);
1640 /* NOTREACHED */
1641 assert(1==0);
1642 }
1644 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1645 {
1646 int status;
1647 char *cmd_str;
1648 char *resp_txt;
1649 struct command *help = NULL;
1651 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1652 if (status == 0)
1653 help = find_command(cmd_str);
1655 if (help && (help->syntax || help->help))
1656 {
1657 char tmp[CMD_MAX];
1659 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1660 resp_txt = tmp;
1662 if (help->syntax)
1663 add_response_info(sock, "Usage: %s\n", help->syntax);
1665 if (help->help)
1666 add_response_info(sock, "%s\n", help->help);
1667 }
1668 else
1669 {
1670 help = COMMANDS;
1671 resp_txt = "Command overview\n";
1673 while (help->cmd)
1674 {
1675 if (help->syntax)
1676 add_response_info(sock, "%s", help->syntax);
1677 help++;
1678 }
1679 }
1681 return send_response(sock, RESP_OK, resp_txt);
1682 } /* }}} int handle_request_help */
1684 /* if sock==NULL, we are in journal replay mode */
1685 static int handle_request (DISPATCH_PROTO) /* {{{ */
1686 {
1687 char *buffer_ptr = buffer;
1688 char *cmd_str = NULL;
1689 struct command *cmd = NULL;
1690 int status;
1692 assert (buffer[buffer_size - 1] == '\0');
1694 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1695 if (status != 0)
1696 {
1697 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1698 return (-1);
1699 }
1701 if (sock != NULL && sock->batch_start)
1702 sock->batch_cmd++;
1704 cmd = find_command(cmd_str);
1705 if (!cmd)
1706 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1708 status = has_privilege(sock, cmd->min_priv);
1709 if (status <= 0)
1710 return status;
1712 if (!command_check_context(sock, cmd))
1713 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1715 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1716 } /* }}} int handle_request */
1718 /* MUST NOT hold journal_lock before calling this */
1719 static void journal_rotate(void) /* {{{ */
1720 {
1721 FILE *old_fh = NULL;
1722 int new_fd;
1724 if (journal_cur == NULL || journal_old == NULL)
1725 return;
1727 pthread_mutex_lock(&journal_lock);
1729 /* we rotate this way (rename before close) so that the we can release
1730 * the journal lock as fast as possible. Journal writes to the new
1731 * journal can proceed immediately after the new file is opened. The
1732 * fclose can then block without affecting new updates.
1733 */
1734 if (journal_fh != NULL)
1735 {
1736 old_fh = journal_fh;
1737 journal_fh = NULL;
1738 rename(journal_cur, journal_old);
1739 ++stats_journal_rotate;
1740 }
1742 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1743 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1744 if (new_fd >= 0)
1745 {
1746 journal_fh = fdopen(new_fd, "a");
1747 if (journal_fh == NULL)
1748 close(new_fd);
1749 }
1751 pthread_mutex_unlock(&journal_lock);
1753 if (old_fh != NULL)
1754 fclose(old_fh);
1756 if (journal_fh == NULL)
1757 {
1758 RRDD_LOG(LOG_CRIT,
1759 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1760 journal_cur, rrd_strerror(errno));
1762 RRDD_LOG(LOG_ERR,
1763 "JOURNALING DISABLED: All values will be flushed at shutdown");
1764 config_flush_at_shutdown = 1;
1765 }
1767 } /* }}} static void journal_rotate */
1769 static void journal_done(void) /* {{{ */
1770 {
1771 if (journal_cur == NULL)
1772 return;
1774 pthread_mutex_lock(&journal_lock);
1775 if (journal_fh != NULL)
1776 {
1777 fclose(journal_fh);
1778 journal_fh = NULL;
1779 }
1781 if (config_flush_at_shutdown)
1782 {
1783 RRDD_LOG(LOG_INFO, "removing journals");
1784 unlink(journal_old);
1785 unlink(journal_cur);
1786 }
1787 else
1788 {
1789 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1790 "journals will be used at next startup");
1791 }
1793 pthread_mutex_unlock(&journal_lock);
1795 } /* }}} static void journal_done */
1797 static int journal_write(char *cmd, char *args) /* {{{ */
1798 {
1799 int chars;
1801 if (journal_fh == NULL)
1802 return 0;
1804 pthread_mutex_lock(&journal_lock);
1805 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1806 pthread_mutex_unlock(&journal_lock);
1808 if (chars > 0)
1809 {
1810 pthread_mutex_lock(&stats_lock);
1811 stats_journal_bytes += chars;
1812 pthread_mutex_unlock(&stats_lock);
1813 }
1815 return chars;
1816 } /* }}} static int journal_write */
1818 static int journal_replay (const char *file) /* {{{ */
1819 {
1820 FILE *fh;
1821 int entry_cnt = 0;
1822 int fail_cnt = 0;
1823 uint64_t line = 0;
1824 char entry[CMD_MAX];
1825 time_t now;
1827 if (file == NULL) return 0;
1829 {
1830 char *reason = "unknown error";
1831 int status = 0;
1832 struct stat statbuf;
1834 memset(&statbuf, 0, sizeof(statbuf));
1835 if (stat(file, &statbuf) != 0)
1836 {
1837 if (errno == ENOENT)
1838 return 0;
1840 reason = "stat error";
1841 status = errno;
1842 }
1843 else if (!S_ISREG(statbuf.st_mode))
1844 {
1845 reason = "not a regular file";
1846 status = EPERM;
1847 }
1848 if (statbuf.st_uid != daemon_uid)
1849 {
1850 reason = "not owned by daemon user";
1851 status = EACCES;
1852 }
1853 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1854 {
1855 reason = "must not be user/group writable";
1856 status = EACCES;
1857 }
1859 if (status != 0)
1860 {
1861 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1862 file, rrd_strerror(status), reason);
1863 return 0;
1864 }
1865 }
1867 fh = fopen(file, "r");
1868 if (fh == NULL)
1869 {
1870 if (errno != ENOENT)
1871 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1872 file, rrd_strerror(errno));
1873 return 0;
1874 }
1875 else
1876 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1878 now = time(NULL);
1880 while(!feof(fh))
1881 {
1882 size_t entry_len;
1884 ++line;
1885 if (fgets(entry, sizeof(entry), fh) == NULL)
1886 break;
1887 entry_len = strlen(entry);
1889 /* check \n termination in case journal writing crashed mid-line */
1890 if (entry_len == 0)
1891 continue;
1892 else if (entry[entry_len - 1] != '\n')
1893 {
1894 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1895 ++fail_cnt;
1896 continue;
1897 }
1899 entry[entry_len - 1] = '\0';
1901 if (handle_request(NULL, now, entry, entry_len) == 0)
1902 ++entry_cnt;
1903 else
1904 ++fail_cnt;
1905 }
1907 fclose(fh);
1909 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1910 entry_cnt, fail_cnt);
1912 return entry_cnt > 0 ? 1 : 0;
1913 } /* }}} static int journal_replay */
1915 static void journal_init(void) /* {{{ */
1916 {
1917 int had_journal = 0;
1919 if (journal_cur == NULL) return;
1921 pthread_mutex_lock(&journal_lock);
1923 RRDD_LOG(LOG_INFO, "checking for journal files");
1925 had_journal += journal_replay(journal_old);
1926 had_journal += journal_replay(journal_cur);
1928 /* it must have been a crash. start a flush */
1929 if (had_journal && config_flush_at_shutdown)
1930 flush_old_values(-1);
1932 pthread_mutex_unlock(&journal_lock);
1933 journal_rotate();
1935 RRDD_LOG(LOG_INFO, "journal processing complete");
1937 } /* }}} static void journal_init */
1939 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1940 {
1941 assert(sock != NULL);
1943 free(sock->rbuf); sock->rbuf = NULL;
1944 free(sock->wbuf); sock->wbuf = NULL;
1945 free(sock);
1946 } /* }}} void free_listen_socket */
1948 static void close_connection(listen_socket_t *sock) /* {{{ */
1949 {
1950 if (sock->fd >= 0)
1951 {
1952 close(sock->fd);
1953 sock->fd = -1;
1954 }
1956 free_listen_socket(sock);
1958 } /* }}} void close_connection */
1960 static void *connection_thread_main (void *args) /* {{{ */
1961 {
1962 listen_socket_t *sock;
1963 int fd;
1965 sock = (listen_socket_t *) args;
1966 fd = sock->fd;
1968 /* init read buffers */
1969 sock->next_read = sock->next_cmd = 0;
1970 sock->rbuf = malloc(RBUF_SIZE);
1971 if (sock->rbuf == NULL)
1972 {
1973 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1974 close_connection(sock);
1975 return NULL;
1976 }
1978 pthread_mutex_lock (&connection_threads_lock);
1979 connection_threads_num++;
1980 pthread_mutex_unlock (&connection_threads_lock);
1982 while (do_shutdown == 0)
1983 {
1984 char *cmd;
1985 ssize_t cmd_len;
1986 ssize_t rbytes;
1987 time_t now;
1989 struct pollfd pollfd;
1990 int status;
1992 pollfd.fd = fd;
1993 pollfd.events = POLLIN | POLLPRI;
1994 pollfd.revents = 0;
1996 status = poll (&pollfd, 1, /* timeout = */ 500);
1997 if (do_shutdown)
1998 break;
1999 else if (status == 0) /* timeout */
2000 continue;
2001 else if (status < 0) /* error */
2002 {
2003 status = errno;
2004 if (status != EINTR)
2005 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2006 continue;
2007 }
2009 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2010 break;
2011 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2012 {
2013 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2014 "poll(2) returned something unexpected: %#04hx",
2015 pollfd.revents);
2016 break;
2017 }
2019 rbytes = read(fd, sock->rbuf + sock->next_read,
2020 RBUF_SIZE - sock->next_read);
2021 if (rbytes < 0)
2022 {
2023 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2024 break;
2025 }
2026 else if (rbytes == 0)
2027 break; /* eof */
2029 sock->next_read += rbytes;
2031 if (sock->batch_start)
2032 now = sock->batch_start;
2033 else
2034 now = time(NULL);
2036 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2037 {
2038 status = handle_request (sock, now, cmd, cmd_len+1);
2039 if (status != 0)
2040 goto out_close;
2041 }
2042 }
2044 out_close:
2045 close_connection(sock);
2047 /* Remove this thread from the connection threads list */
2048 pthread_mutex_lock (&connection_threads_lock);
2049 connection_threads_num--;
2050 if (connection_threads_num <= 0)
2051 pthread_cond_broadcast(&connection_threads_done);
2052 pthread_mutex_unlock (&connection_threads_lock);
2054 return (NULL);
2055 } /* }}} void *connection_thread_main */
2057 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2058 {
2059 int fd;
2060 struct sockaddr_un sa;
2061 listen_socket_t *temp;
2062 int status;
2063 const char *path;
2065 path = sock->addr;
2066 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2067 path += strlen("unix:");
2069 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2070 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2071 if (temp == NULL)
2072 {
2073 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2074 return (-1);
2075 }
2076 listen_fds = temp;
2077 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2079 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2080 if (fd < 0)
2081 {
2082 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2083 rrd_strerror(errno));
2084 return (-1);
2085 }
2087 memset (&sa, 0, sizeof (sa));
2088 sa.sun_family = AF_UNIX;
2089 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2091 /* if we've gotten this far, we own the pid file. any daemon started
2092 * with the same args must not be alive. therefore, ensure that we can
2093 * create the socket...
2094 */
2095 unlink(path);
2097 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2098 if (status != 0)
2099 {
2100 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2101 path, rrd_strerror(errno));
2102 close (fd);
2103 return (-1);
2104 }
2106 status = listen (fd, /* backlog = */ 10);
2107 if (status != 0)
2108 {
2109 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2110 path, rrd_strerror(errno));
2111 close (fd);
2112 unlink (path);
2113 return (-1);
2114 }
2116 listen_fds[listen_fds_num].fd = fd;
2117 listen_fds[listen_fds_num].family = PF_UNIX;
2118 strncpy(listen_fds[listen_fds_num].addr, path,
2119 sizeof (listen_fds[listen_fds_num].addr) - 1);
2120 listen_fds_num++;
2122 return (0);
2123 } /* }}} int open_listen_socket_unix */
2125 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2126 {
2127 struct addrinfo ai_hints;
2128 struct addrinfo *ai_res;
2129 struct addrinfo *ai_ptr;
2130 char addr_copy[NI_MAXHOST];
2131 char *addr;
2132 char *port;
2133 int status;
2135 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2136 addr_copy[sizeof (addr_copy) - 1] = 0;
2137 addr = addr_copy;
2139 memset (&ai_hints, 0, sizeof (ai_hints));
2140 ai_hints.ai_flags = 0;
2141 #ifdef AI_ADDRCONFIG
2142 ai_hints.ai_flags |= AI_ADDRCONFIG;
2143 #endif
2144 ai_hints.ai_family = AF_UNSPEC;
2145 ai_hints.ai_socktype = SOCK_STREAM;
2147 port = NULL;
2148 if (*addr == '[') /* IPv6+port format */
2149 {
2150 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2151 addr++;
2153 port = strchr (addr, ']');
2154 if (port == NULL)
2155 {
2156 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2157 return (-1);
2158 }
2159 *port = 0;
2160 port++;
2162 if (*port == ':')
2163 port++;
2164 else if (*port == 0)
2165 port = NULL;
2166 else
2167 {
2168 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2169 return (-1);
2170 }
2171 } /* if (*addr = ']') */
2172 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2173 {
2174 port = rindex(addr, ':');
2175 if (port != NULL)
2176 {
2177 *port = 0;
2178 port++;
2179 }
2180 }
2181 ai_res = NULL;
2182 status = getaddrinfo (addr,
2183 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2184 &ai_hints, &ai_res);
2185 if (status != 0)
2186 {
2187 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2188 addr, gai_strerror (status));
2189 return (-1);
2190 }
2192 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2193 {
2194 int fd;
2195 listen_socket_t *temp;
2196 int one = 1;
2198 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2199 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2200 if (temp == NULL)
2201 {
2202 fprintf (stderr,
2203 "rrdcached: open_listen_socket_network: realloc failed.\n");
2204 continue;
2205 }
2206 listen_fds = temp;
2207 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2209 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2210 if (fd < 0)
2211 {
2212 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2213 rrd_strerror(errno));
2214 continue;
2215 }
2217 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2219 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2220 if (status != 0)
2221 {
2222 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2223 sock->addr, rrd_strerror(errno));
2224 close (fd);
2225 continue;
2226 }
2228 status = listen (fd, /* backlog = */ 10);
2229 if (status != 0)
2230 {
2231 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2232 sock->addr, rrd_strerror(errno));
2233 close (fd);
2234 freeaddrinfo(ai_res);
2235 return (-1);
2236 }
2238 listen_fds[listen_fds_num].fd = fd;
2239 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2240 listen_fds_num++;
2241 } /* for (ai_ptr) */
2243 freeaddrinfo(ai_res);
2244 return (0);
2245 } /* }}} static int open_listen_socket_network */
2247 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2248 {
2249 assert(sock != NULL);
2250 assert(sock->addr != NULL);
2252 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2253 || sock->addr[0] == '/')
2254 return (open_listen_socket_unix(sock));
2255 else
2256 return (open_listen_socket_network(sock));
2257 } /* }}} int open_listen_socket */
2259 static int close_listen_sockets (void) /* {{{ */
2260 {
2261 size_t i;
2263 for (i = 0; i < listen_fds_num; i++)
2264 {
2265 close (listen_fds[i].fd);
2267 if (listen_fds[i].family == PF_UNIX)
2268 unlink(listen_fds[i].addr);
2269 }
2271 free (listen_fds);
2272 listen_fds = NULL;
2273 listen_fds_num = 0;
2275 return (0);
2276 } /* }}} int close_listen_sockets */
2278 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2279 {
2280 struct pollfd *pollfds;
2281 int pollfds_num;
2282 int status;
2283 int i;
2285 if (listen_fds_num < 1)
2286 {
2287 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2288 return (NULL);
2289 }
2291 pollfds_num = listen_fds_num;
2292 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2293 if (pollfds == NULL)
2294 {
2295 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2296 return (NULL);
2297 }
2298 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2300 RRDD_LOG(LOG_INFO, "listening for connections");
2302 while (do_shutdown == 0)
2303 {
2304 for (i = 0; i < pollfds_num; i++)
2305 {
2306 pollfds[i].fd = listen_fds[i].fd;
2307 pollfds[i].events = POLLIN | POLLPRI;
2308 pollfds[i].revents = 0;
2309 }
2311 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2312 if (do_shutdown)
2313 break;
2314 else if (status == 0) /* timeout */
2315 continue;
2316 else if (status < 0) /* error */
2317 {
2318 status = errno;
2319 if (status != EINTR)
2320 {
2321 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2322 }
2323 continue;
2324 }
2326 for (i = 0; i < pollfds_num; i++)
2327 {
2328 listen_socket_t *client_sock;
2329 struct sockaddr_storage client_sa;
2330 socklen_t client_sa_size;
2331 pthread_t tid;
2332 pthread_attr_t attr;
2334 if (pollfds[i].revents == 0)
2335 continue;
2337 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2338 {
2339 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2340 "poll(2) returned something unexpected for listen FD #%i.",
2341 pollfds[i].fd);
2342 continue;
2343 }
2345 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2346 if (client_sock == NULL)
2347 {
2348 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2349 continue;
2350 }
2351 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2353 client_sa_size = sizeof (client_sa);
2354 client_sock->fd = accept (pollfds[i].fd,
2355 (struct sockaddr *) &client_sa, &client_sa_size);
2356 if (client_sock->fd < 0)
2357 {
2358 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2359 free(client_sock);
2360 continue;
2361 }
2363 pthread_attr_init (&attr);
2364 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2366 status = pthread_create (&tid, &attr, connection_thread_main,
2367 client_sock);
2368 if (status != 0)
2369 {
2370 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2371 close_connection(client_sock);
2372 continue;
2373 }
2374 } /* for (pollfds_num) */
2375 } /* while (do_shutdown == 0) */
2377 RRDD_LOG(LOG_INFO, "starting shutdown");
2379 close_listen_sockets ();
2381 pthread_mutex_lock (&connection_threads_lock);
2382 while (connection_threads_num > 0)
2383 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2384 pthread_mutex_unlock (&connection_threads_lock);
2386 free(pollfds);
2388 return (NULL);
2389 } /* }}} void *listen_thread_main */
2391 static int daemonize (void) /* {{{ */
2392 {
2393 int pid_fd;
2394 char *base_dir;
2396 daemon_uid = geteuid();
2398 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2399 if (pid_fd < 0)
2400 pid_fd = check_pidfile();
2401 if (pid_fd < 0)
2402 return pid_fd;
2404 /* open all the listen sockets */
2405 if (config_listen_address_list_len > 0)
2406 {
2407 for (size_t i = 0; i < config_listen_address_list_len; i++)
2408 open_listen_socket (config_listen_address_list[i]);
2410 rrd_free_ptrs((void ***) &config_listen_address_list,
2411 &config_listen_address_list_len);
2412 }
2413 else
2414 {
2415 listen_socket_t sock;
2416 memset(&sock, 0, sizeof(sock));
2417 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2418 open_listen_socket (&sock);
2419 }
2421 if (listen_fds_num < 1)
2422 {
2423 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2424 goto error;
2425 }
2427 if (!stay_foreground)
2428 {
2429 pid_t child;
2431 child = fork ();
2432 if (child < 0)
2433 {
2434 fprintf (stderr, "daemonize: fork(2) failed.\n");
2435 goto error;
2436 }
2437 else if (child > 0)
2438 exit(0);
2440 /* Become session leader */
2441 setsid ();
2443 /* Open the first three file descriptors to /dev/null */
2444 close (2);
2445 close (1);
2446 close (0);
2448 open ("/dev/null", O_RDWR);
2449 dup (0);
2450 dup (0);
2451 } /* if (!stay_foreground) */
2453 /* Change into the /tmp directory. */
2454 base_dir = (config_base_dir != NULL)
2455 ? config_base_dir
2456 : "/tmp";
2458 if (chdir (base_dir) != 0)
2459 {
2460 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2461 goto error;
2462 }
2464 install_signal_handlers();
2466 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2467 RRDD_LOG(LOG_INFO, "starting up");
2469 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2470 (GDestroyNotify) free_cache_item);
2471 if (cache_tree == NULL)
2472 {
2473 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2474 goto error;
2475 }
2477 return write_pidfile (pid_fd);
2479 error:
2480 remove_pidfile();
2481 return -1;
2482 } /* }}} int daemonize */
2484 static int cleanup (void) /* {{{ */
2485 {
2486 do_shutdown++;
2488 pthread_cond_broadcast (&flush_cond);
2489 pthread_join (flush_thread, NULL);
2491 pthread_cond_broadcast (&queue_cond);
2492 for (int i = 0; i < config_queue_threads; i++)
2493 pthread_join (queue_threads[i], NULL);
2495 if (config_flush_at_shutdown)
2496 {
2497 assert(cache_queue_head == NULL);
2498 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2499 }
2501 journal_done();
2502 remove_pidfile ();
2504 free(queue_threads);
2505 free(config_base_dir);
2506 free(config_pid_file);
2507 free(journal_cur);
2508 free(journal_old);
2510 pthread_mutex_lock(&cache_lock);
2511 g_tree_destroy(cache_tree);
2513 RRDD_LOG(LOG_INFO, "goodbye");
2514 closelog ();
2516 return (0);
2517 } /* }}} int cleanup */
2519 static int read_options (int argc, char **argv) /* {{{ */
2520 {
2521 int option;
2522 int status = 0;
2524 while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2525 {
2526 switch (option)
2527 {
2528 case 'g':
2529 stay_foreground=1;
2530 break;
2532 case 'L':
2533 case 'l':
2534 {
2535 listen_socket_t *new;
2537 new = malloc(sizeof(listen_socket_t));
2538 if (new == NULL)
2539 {
2540 fprintf(stderr, "read_options: malloc failed.\n");
2541 return(2);
2542 }
2543 memset(new, 0, sizeof(listen_socket_t));
2545 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2546 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2548 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2549 &config_listen_address_list_len, new))
2550 {
2551 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2552 return (2);
2553 }
2554 }
2555 break;
2557 case 'f':
2558 {
2559 int temp;
2561 temp = atoi (optarg);
2562 if (temp > 0)
2563 config_flush_interval = temp;
2564 else
2565 {
2566 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2567 status = 3;
2568 }
2569 }
2570 break;
2572 case 'w':
2573 {
2574 int temp;
2576 temp = atoi (optarg);
2577 if (temp > 0)
2578 config_write_interval = temp;
2579 else
2580 {
2581 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2582 status = 2;
2583 }
2584 }
2585 break;
2587 case 'z':
2588 {
2589 int temp;
2591 temp = atoi(optarg);
2592 if (temp > 0)
2593 config_write_jitter = temp;
2594 else
2595 {
2596 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2597 status = 2;
2598 }
2600 break;
2601 }
2603 case 't':
2604 {
2605 int threads;
2606 threads = atoi(optarg);
2607 if (threads >= 1)
2608 config_queue_threads = threads;
2609 else
2610 {
2611 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2612 return 1;
2613 }
2614 }
2615 break;
2617 case 'B':
2618 config_write_base_only = 1;
2619 break;
2621 case 'b':
2622 {
2623 size_t len;
2624 char base_realpath[PATH_MAX];
2626 if (config_base_dir != NULL)
2627 free (config_base_dir);
2628 config_base_dir = strdup (optarg);
2629 if (config_base_dir == NULL)
2630 {
2631 fprintf (stderr, "read_options: strdup failed.\n");
2632 return (3);
2633 }
2635 /* make sure that the base directory is not resolved via
2636 * symbolic links. this makes some performance-enhancing
2637 * assumptions possible (we don't have to resolve paths
2638 * that start with a "/")
2639 */
2640 if (realpath(config_base_dir, base_realpath) == NULL)
2641 {
2642 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2643 return 5;
2644 }
2645 else if (strncmp(config_base_dir,
2646 base_realpath, sizeof(base_realpath)) != 0)
2647 {
2648 fprintf(stderr,
2649 "Base directory (-b) resolved via file system links!\n"
2650 "Please consult rrdcached '-b' documentation!\n"
2651 "Consider specifying the real directory (%s)\n",
2652 base_realpath);
2653 return 5;
2654 }
2656 len = strlen (config_base_dir);
2657 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2658 {
2659 config_base_dir[len - 1] = 0;
2660 len--;
2661 }
2663 if (len < 1)
2664 {
2665 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2666 return (4);
2667 }
2669 _config_base_dir_len = len;
2670 }
2671 break;
2673 case 'p':
2674 {
2675 if (config_pid_file != NULL)
2676 free (config_pid_file);
2677 config_pid_file = strdup (optarg);
2678 if (config_pid_file == NULL)
2679 {
2680 fprintf (stderr, "read_options: strdup failed.\n");
2681 return (3);
2682 }
2683 }
2684 break;
2686 case 'F':
2687 config_flush_at_shutdown = 1;
2688 break;
2690 case 'j':
2691 {
2692 struct stat statbuf;
2693 const char *dir = optarg;
2695 status = stat(dir, &statbuf);
2696 if (status != 0)
2697 {
2698 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2699 return 6;
2700 }
2702 if (!S_ISDIR(statbuf.st_mode)
2703 || access(dir, R_OK|W_OK|X_OK) != 0)
2704 {
2705 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2706 errno ? rrd_strerror(errno) : "");
2707 return 6;
2708 }
2710 journal_cur = malloc(PATH_MAX + 1);
2711 journal_old = malloc(PATH_MAX + 1);
2712 if (journal_cur == NULL || journal_old == NULL)
2713 {
2714 fprintf(stderr, "malloc failure for journal files\n");
2715 return 6;
2716 }
2717 else
2718 {
2719 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2720 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2721 }
2722 }
2723 break;
2725 case 'h':
2726 case '?':
2727 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2728 "\n"
2729 "Usage: rrdcached [options]\n"
2730 "\n"
2731 "Valid options are:\n"
2732 " -l <address> Socket address to listen to.\n"
2733 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2734 " -w <seconds> Interval in which to write data.\n"
2735 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2736 " -t <threads> Number of write threads.\n"
2737 " -f <seconds> Interval in which to flush dead data.\n"
2738 " -p <file> Location of the PID-file.\n"
2739 " -b <dir> Base directory to change to.\n"
2740 " -B Restrict file access to paths within -b <dir>\n"
2741 " -g Do not fork and run in the foreground.\n"
2742 " -j <dir> Directory in which to create the journal files.\n"
2743 " -F Always flush all updates at shutdown\n"
2744 "\n"
2745 "For more information and a detailed description of all options "
2746 "please refer\n"
2747 "to the rrdcached(1) manual page.\n",
2748 VERSION);
2749 status = -1;
2750 break;
2751 } /* switch (option) */
2752 } /* while (getopt) */
2754 /* advise the user when values are not sane */
2755 if (config_flush_interval < 2 * config_write_interval)
2756 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2757 " 2x write interval (-w) !\n");
2758 if (config_write_jitter > config_write_interval)
2759 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2760 " write interval (-w) !\n");
2762 if (config_write_base_only && config_base_dir == NULL)
2763 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2764 " Consult the rrdcached documentation\n");
2766 if (journal_cur == NULL)
2767 config_flush_at_shutdown = 1;
2769 return (status);
2770 } /* }}} int read_options */
2772 int main (int argc, char **argv)
2773 {
2774 int status;
2776 status = read_options (argc, argv);
2777 if (status != 0)
2778 {
2779 if (status < 0)
2780 status = 0;
2781 return (status);
2782 }
2784 status = daemonize ();
2785 if (status != 0)
2786 {
2787 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2788 return (1);
2789 }
2791 journal_init();
2793 /* start the queue threads */
2794 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2795 if (queue_threads == NULL)
2796 {
2797 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2798 cleanup();
2799 return (1);
2800 }
2801 for (int i = 0; i < config_queue_threads; i++)
2802 {
2803 memset (&queue_threads[i], 0, sizeof (*queue_threads));
2804 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2805 if (status != 0)
2806 {
2807 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2808 cleanup();
2809 return (1);
2810 }
2811 }
2813 /* start the flush thread */
2814 memset(&flush_thread, 0, sizeof(flush_thread));
2815 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2816 if (status != 0)
2817 {
2818 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2819 cleanup();
2820 return (1);
2821 }
2823 listen_thread_main (NULL);
2824 cleanup ();
2826 return (0);
2827 } /* int main */
2829 /*
2830 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2831 */