1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 # include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116 * Types
117 */
118 typedef enum
119 {
120 PRIV_LOW,
121 PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
127 {
128 int fd;
129 char addr[PATH_MAX + 1];
130 int family;
131 socket_privilege privilege;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO struct command *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
160 socket_privilege min_priv;
162 char context; /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT (1<<0)
164 #define CMD_CONTEXT_BATCH (1<<1)
165 #define CMD_CONTEXT_JOURNAL (1<<2)
166 #define CMD_CONTEXT_ANY (0x7f)
168 char *syntax;
169 char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176 char *file;
177 char **values;
178 size_t values_num;
179 time_t last_flush_time;
180 time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183 int flags;
184 pthread_cond_t flushed;
185 cache_item_t *prev;
186 cache_item_t *next;
187 };
189 struct callback_flush_data_s
190 {
191 time_t now;
192 time_t abs_timeout;
193 char **keys;
194 size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
199 {
200 HEAD,
201 TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210 * Variables
211 */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 enum {
219 RUNNING, /* normal operation */
220 FLUSHING, /* flushing remaining values */
221 SHUTDOWN /* shutting down */
222 } state = RUNNING;
224 static pthread_t *queue_threads;
225 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
226 static int config_queue_threads = 4;
228 static pthread_t flush_thread;
229 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
231 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
232 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
233 static int connection_threads_num = 0;
235 /* Cache stuff */
236 static GTree *cache_tree = NULL;
237 static cache_item_t *cache_queue_head = NULL;
238 static cache_item_t *cache_queue_tail = NULL;
239 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
241 static int config_write_interval = 300;
242 static int config_write_jitter = 0;
243 static int config_flush_interval = 3600;
244 static int config_flush_at_shutdown = 0;
245 static char *config_pid_file = NULL;
246 static char *config_base_dir = NULL;
247 static size_t _config_base_dir_len = 0;
248 static int config_write_base_only = 0;
250 static listen_socket_t **config_listen_address_list = NULL;
251 static size_t config_listen_address_list_len = 0;
253 static uint64_t stats_queue_length = 0;
254 static uint64_t stats_updates_received = 0;
255 static uint64_t stats_flush_received = 0;
256 static uint64_t stats_updates_written = 0;
257 static uint64_t stats_data_sets_written = 0;
258 static uint64_t stats_journal_bytes = 0;
259 static uint64_t stats_journal_rotate = 0;
260 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
262 /* Journaled updates */
263 static char *journal_cur = NULL;
264 static char *journal_old = NULL;
265 static FILE *journal_fh = NULL;
266 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
267 static int journal_write(char *cmd, char *args);
268 static void journal_done(void);
269 static void journal_rotate(void);
271 /* prototypes for forward refernces */
272 static int handle_request_help (HANDLER_PROTO);
274 /*
275 * Functions
276 */
277 static void sig_common (const char *sig) /* {{{ */
278 {
279 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
280 state = FLUSHING;
281 pthread_cond_broadcast(&flush_cond);
282 pthread_cond_broadcast(&queue_cond);
283 } /* }}} void sig_common */
285 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
286 {
287 sig_common("INT");
288 } /* }}} void sig_int_handler */
290 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
291 {
292 sig_common("TERM");
293 } /* }}} void sig_term_handler */
295 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
296 {
297 config_flush_at_shutdown = 1;
298 sig_common("USR1");
299 } /* }}} void sig_usr1_handler */
301 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
302 {
303 config_flush_at_shutdown = 0;
304 sig_common("USR2");
305 } /* }}} void sig_usr2_handler */
307 static void install_signal_handlers(void) /* {{{ */
308 {
309 /* These structures are static, because `sigaction' behaves weird if the are
310 * overwritten.. */
311 static struct sigaction sa_int;
312 static struct sigaction sa_term;
313 static struct sigaction sa_pipe;
314 static struct sigaction sa_usr1;
315 static struct sigaction sa_usr2;
317 /* Install signal handlers */
318 memset (&sa_int, 0, sizeof (sa_int));
319 sa_int.sa_handler = sig_int_handler;
320 sigaction (SIGINT, &sa_int, NULL);
322 memset (&sa_term, 0, sizeof (sa_term));
323 sa_term.sa_handler = sig_term_handler;
324 sigaction (SIGTERM, &sa_term, NULL);
326 memset (&sa_pipe, 0, sizeof (sa_pipe));
327 sa_pipe.sa_handler = SIG_IGN;
328 sigaction (SIGPIPE, &sa_pipe, NULL);
330 memset (&sa_pipe, 0, sizeof (sa_usr1));
331 sa_usr1.sa_handler = sig_usr1_handler;
332 sigaction (SIGUSR1, &sa_usr1, NULL);
334 memset (&sa_usr2, 0, sizeof (sa_usr2));
335 sa_usr2.sa_handler = sig_usr2_handler;
336 sigaction (SIGUSR2, &sa_usr2, NULL);
338 } /* }}} void install_signal_handlers */
340 static int open_pidfile(char *action, int oflag) /* {{{ */
341 {
342 int fd;
343 char *file;
345 file = (config_pid_file != NULL)
346 ? config_pid_file
347 : LOCALSTATEDIR "/run/rrdcached.pid";
349 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
350 if (fd < 0)
351 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
352 action, file, rrd_strerror(errno));
354 return(fd);
355 } /* }}} static int open_pidfile */
357 /* check existing pid file to see whether a daemon is running */
358 static int check_pidfile(void)
359 {
360 int pid_fd;
361 pid_t pid;
362 char pid_str[16];
364 pid_fd = open_pidfile("open", O_RDWR);
365 if (pid_fd < 0)
366 return pid_fd;
368 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
369 return -1;
371 pid = atoi(pid_str);
372 if (pid <= 0)
373 return -1;
375 /* another running process that we can signal COULD be
376 * a competing rrdcached */
377 if (pid != getpid() && kill(pid, 0) == 0)
378 {
379 fprintf(stderr,
380 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
381 close(pid_fd);
382 return -1;
383 }
385 lseek(pid_fd, 0, SEEK_SET);
386 if (ftruncate(pid_fd, 0) == -1)
387 {
388 fprintf(stderr,
389 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
390 close(pid_fd);
391 return -1;
392 }
394 fprintf(stderr,
395 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
396 "rrdcached: starting normally.\n", pid);
398 return pid_fd;
399 } /* }}} static int check_pidfile */
401 static int write_pidfile (int fd) /* {{{ */
402 {
403 pid_t pid;
404 FILE *fh;
406 pid = getpid ();
408 fh = fdopen (fd, "w");
409 if (fh == NULL)
410 {
411 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
412 close(fd);
413 return (-1);
414 }
416 fprintf (fh, "%i\n", (int) pid);
417 fclose (fh);
419 return (0);
420 } /* }}} int write_pidfile */
422 static int remove_pidfile (void) /* {{{ */
423 {
424 char *file;
425 int status;
427 file = (config_pid_file != NULL)
428 ? config_pid_file
429 : LOCALSTATEDIR "/run/rrdcached.pid";
431 status = unlink (file);
432 if (status == 0)
433 return (0);
434 return (errno);
435 } /* }}} int remove_pidfile */
437 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
438 {
439 char *eol;
441 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
442 sock->next_read - sock->next_cmd);
444 if (eol == NULL)
445 {
446 /* no commands left, move remainder back to front of rbuf */
447 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
448 sock->next_read - sock->next_cmd);
449 sock->next_read -= sock->next_cmd;
450 sock->next_cmd = 0;
451 *len = 0;
452 return NULL;
453 }
454 else
455 {
456 char *cmd = sock->rbuf + sock->next_cmd;
457 *eol = '\0';
459 sock->next_cmd = eol - sock->rbuf + 1;
461 if (eol > sock->rbuf && *(eol-1) == '\r')
462 *(--eol) = '\0'; /* handle "\r\n" EOL */
464 *len = eol - cmd;
466 return cmd;
467 }
469 /* NOTREACHED */
470 assert(1==0);
471 }
473 /* add the characters directly to the write buffer */
474 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
475 {
476 char *new_buf;
478 assert(sock != NULL);
480 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
481 if (new_buf == NULL)
482 {
483 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
484 return -1;
485 }
487 strncpy(new_buf + sock->wbuf_len, str, len + 1);
489 sock->wbuf = new_buf;
490 sock->wbuf_len += len;
492 return 0;
493 } /* }}} static int add_to_wbuf */
495 /* add the text to the "extra" info that's sent after the status line */
496 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
497 {
498 va_list argp;
499 char buffer[CMD_MAX];
500 int len;
502 if (sock == NULL) return 0; /* journal replay mode */
503 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
505 va_start(argp, fmt);
506 #ifdef HAVE_VSNPRINTF
507 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
508 #else
509 len = vsprintf(buffer, fmt, argp);
510 #endif
511 va_end(argp);
512 if (len < 0)
513 {
514 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
515 return -1;
516 }
518 return add_to_wbuf(sock, buffer, len);
519 } /* }}} static int add_response_info */
521 static int count_lines(char *str) /* {{{ */
522 {
523 int lines = 0;
525 if (str != NULL)
526 {
527 while ((str = strchr(str, '\n')) != NULL)
528 {
529 ++lines;
530 ++str;
531 }
532 }
534 return lines;
535 } /* }}} static int count_lines */
537 /* send the response back to the user.
538 * returns 0 on success, -1 on error
539 * write buffer is always zeroed after this call */
540 static int send_response (listen_socket_t *sock, response_code rc,
541 char *fmt, ...) /* {{{ */
542 {
543 va_list argp;
544 char buffer[CMD_MAX];
545 int lines;
546 ssize_t wrote;
547 int rclen, len;
549 if (sock == NULL) return rc; /* journal replay mode */
551 if (sock->batch_start)
552 {
553 if (rc == RESP_OK)
554 return rc; /* no response on success during BATCH */
555 lines = sock->batch_cmd;
556 }
557 else if (rc == RESP_OK)
558 lines = count_lines(sock->wbuf);
559 else
560 lines = -1;
562 rclen = sprintf(buffer, "%d ", lines);
563 va_start(argp, fmt);
564 #ifdef HAVE_VSNPRINTF
565 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
566 #else
567 len = vsprintf(buffer+rclen, fmt, argp);
568 #endif
569 va_end(argp);
570 if (len < 0)
571 return -1;
573 len += rclen;
575 /* append the result to the wbuf, don't write to the user */
576 if (sock->batch_start)
577 return add_to_wbuf(sock, buffer, len);
579 /* first write must be complete */
580 if (len != write(sock->fd, buffer, len))
581 {
582 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
583 return -1;
584 }
586 if (sock->wbuf != NULL && rc == RESP_OK)
587 {
588 wrote = 0;
589 while (wrote < sock->wbuf_len)
590 {
591 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
592 if (wb <= 0)
593 {
594 RRDD_LOG(LOG_INFO, "send_response: could not write results");
595 return -1;
596 }
597 wrote += wb;
598 }
599 }
601 free(sock->wbuf); sock->wbuf = NULL;
602 sock->wbuf_len = 0;
604 return 0;
605 } /* }}} */
607 static void wipe_ci_values(cache_item_t *ci, time_t when)
608 {
609 ci->values = NULL;
610 ci->values_num = 0;
612 ci->last_flush_time = when;
613 if (config_write_jitter > 0)
614 ci->last_flush_time += (rrd_random() % config_write_jitter);
615 }
617 /* remove_from_queue
618 * remove a "cache_item_t" item from the queue.
619 * must hold 'cache_lock' when calling this
620 */
621 static void remove_from_queue(cache_item_t *ci) /* {{{ */
622 {
623 if (ci == NULL) return;
624 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
626 if (ci->prev == NULL)
627 cache_queue_head = ci->next; /* reset head */
628 else
629 ci->prev->next = ci->next;
631 if (ci->next == NULL)
632 cache_queue_tail = ci->prev; /* reset the tail */
633 else
634 ci->next->prev = ci->prev;
636 ci->next = ci->prev = NULL;
637 ci->flags &= ~CI_FLAGS_IN_QUEUE;
639 pthread_mutex_lock (&stats_lock);
640 assert (stats_queue_length > 0);
641 stats_queue_length--;
642 pthread_mutex_unlock (&stats_lock);
644 } /* }}} static void remove_from_queue */
646 /* free the resources associated with the cache_item_t
647 * must hold cache_lock when calling this function
648 */
649 static void *free_cache_item(cache_item_t *ci) /* {{{ */
650 {
651 if (ci == NULL) return NULL;
653 remove_from_queue(ci);
655 for (size_t i=0; i < ci->values_num; i++)
656 free(ci->values[i]);
658 free (ci->values);
659 free (ci->file);
661 /* in case anyone is waiting */
662 pthread_cond_broadcast(&ci->flushed);
663 pthread_cond_destroy(&ci->flushed);
665 free (ci);
667 return NULL;
668 } /* }}} static void *free_cache_item */
670 /*
671 * enqueue_cache_item:
672 * `cache_lock' must be acquired before calling this function!
673 */
674 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
675 queue_side_t side)
676 {
677 if (ci == NULL)
678 return (-1);
680 if (ci->values_num == 0)
681 return (0);
683 if (side == HEAD)
684 {
685 if (cache_queue_head == ci)
686 return 0;
688 /* remove if further down in queue */
689 remove_from_queue(ci);
691 ci->prev = NULL;
692 ci->next = cache_queue_head;
693 if (ci->next != NULL)
694 ci->next->prev = ci;
695 cache_queue_head = ci;
697 if (cache_queue_tail == NULL)
698 cache_queue_tail = cache_queue_head;
699 }
700 else /* (side == TAIL) */
701 {
702 /* We don't move values back in the list.. */
703 if (ci->flags & CI_FLAGS_IN_QUEUE)
704 return (0);
706 assert (ci->next == NULL);
707 assert (ci->prev == NULL);
709 ci->prev = cache_queue_tail;
711 if (cache_queue_tail == NULL)
712 cache_queue_head = ci;
713 else
714 cache_queue_tail->next = ci;
716 cache_queue_tail = ci;
717 }
719 ci->flags |= CI_FLAGS_IN_QUEUE;
721 pthread_cond_signal(&queue_cond);
722 pthread_mutex_lock (&stats_lock);
723 stats_queue_length++;
724 pthread_mutex_unlock (&stats_lock);
726 return (0);
727 } /* }}} int enqueue_cache_item */
729 /*
730 * tree_callback_flush:
731 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
732 * while this is in progress.
733 */
734 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
735 gpointer data)
736 {
737 cache_item_t *ci;
738 callback_flush_data_t *cfd;
740 ci = (cache_item_t *) value;
741 cfd = (callback_flush_data_t *) data;
743 if (ci->flags & CI_FLAGS_IN_QUEUE)
744 return FALSE;
746 if (ci->values_num > 0
747 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
748 {
749 enqueue_cache_item (ci, TAIL);
750 }
751 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
752 && (ci->values_num <= 0))
753 {
754 assert ((char *) key == ci->file);
755 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
756 {
757 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
758 return (FALSE);
759 }
760 }
762 return (FALSE);
763 } /* }}} gboolean tree_callback_flush */
765 static int flush_old_values (int max_age)
766 {
767 callback_flush_data_t cfd;
768 size_t k;
770 memset (&cfd, 0, sizeof (cfd));
771 /* Pass the current time as user data so that we don't need to call
772 * `time' for each node. */
773 cfd.now = time (NULL);
774 cfd.keys = NULL;
775 cfd.keys_num = 0;
777 if (max_age > 0)
778 cfd.abs_timeout = cfd.now - max_age;
779 else
780 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
782 /* `tree_callback_flush' will return the keys of all values that haven't
783 * been touched in the last `config_flush_interval' seconds in `cfd'.
784 * The char*'s in this array point to the same memory as ci->file, so we
785 * don't need to free them separately. */
786 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
788 for (k = 0; k < cfd.keys_num; k++)
789 {
790 /* should never fail, since we have held the cache_lock
791 * the entire time */
792 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
793 }
795 if (cfd.keys != NULL)
796 {
797 free (cfd.keys);
798 cfd.keys = NULL;
799 }
801 return (0);
802 } /* int flush_old_values */
804 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
805 {
806 struct timeval now;
807 struct timespec next_flush;
808 int status;
810 gettimeofday (&now, NULL);
811 next_flush.tv_sec = now.tv_sec + config_flush_interval;
812 next_flush.tv_nsec = 1000 * now.tv_usec;
814 pthread_mutex_lock(&cache_lock);
816 while (state == RUNNING)
817 {
818 gettimeofday (&now, NULL);
819 if ((now.tv_sec > next_flush.tv_sec)
820 || ((now.tv_sec == next_flush.tv_sec)
821 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
822 {
823 /* Flush all values that haven't been written in the last
824 * `config_write_interval' seconds. */
825 flush_old_values (config_write_interval);
827 /* Determine the time of the next cache flush. */
828 next_flush.tv_sec =
829 now.tv_sec + next_flush.tv_sec % config_flush_interval;
831 /* unlock the cache while we rotate so we don't block incoming
832 * updates if the fsync() blocks on disk I/O */
833 pthread_mutex_unlock(&cache_lock);
834 journal_rotate();
835 pthread_mutex_lock(&cache_lock);
836 }
838 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
839 if (status != 0 && status != ETIMEDOUT)
840 {
841 RRDD_LOG (LOG_ERR, "flush_thread_main: "
842 "pthread_cond_timedwait returned %i.", status);
843 }
844 }
846 if (config_flush_at_shutdown)
847 flush_old_values (-1); /* flush everything */
849 state = SHUTDOWN;
851 pthread_mutex_unlock(&cache_lock);
853 return NULL;
854 } /* void *flush_thread_main */
856 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
857 {
858 pthread_mutex_lock (&cache_lock);
860 while (state != SHUTDOWN
861 || (cache_queue_head != NULL && config_flush_at_shutdown))
862 {
863 cache_item_t *ci;
864 char *file;
865 char **values;
866 size_t values_num;
867 int status;
869 /* Now, check if there's something to store away. If not, wait until
870 * something comes in. */
871 if (cache_queue_head == NULL)
872 {
873 status = pthread_cond_wait (&queue_cond, &cache_lock);
874 if ((status != 0) && (status != ETIMEDOUT))
875 {
876 RRDD_LOG (LOG_ERR, "queue_thread_main: "
877 "pthread_cond_wait returned %i.", status);
878 }
879 }
881 /* Check if a value has arrived. This may be NULL if we timed out or there
882 * was an interrupt such as a signal. */
883 if (cache_queue_head == NULL)
884 continue;
886 ci = cache_queue_head;
888 /* copy the relevant parts */
889 file = strdup (ci->file);
890 if (file == NULL)
891 {
892 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
893 continue;
894 }
896 assert(ci->values != NULL);
897 assert(ci->values_num > 0);
899 values = ci->values;
900 values_num = ci->values_num;
902 wipe_ci_values(ci, time(NULL));
903 remove_from_queue(ci);
905 pthread_mutex_unlock (&cache_lock);
907 rrd_clear_error ();
908 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
909 if (status != 0)
910 {
911 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
912 "rrd_update_r (%s) failed with status %i. (%s)",
913 file, status, rrd_get_error());
914 }
916 journal_write("wrote", file);
918 /* Search again in the tree. It's possible someone issued a "FORGET"
919 * while we were writing the update values. */
920 pthread_mutex_lock(&cache_lock);
921 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
922 if (ci)
923 pthread_cond_broadcast(&ci->flushed);
924 pthread_mutex_unlock(&cache_lock);
926 rrd_free_ptrs((void ***) &values, &values_num);
927 free(file);
929 if (status == 0)
930 {
931 pthread_mutex_lock (&stats_lock);
932 stats_updates_written++;
933 stats_data_sets_written += values_num;
934 pthread_mutex_unlock (&stats_lock);
935 }
937 pthread_mutex_lock (&cache_lock);
938 }
939 pthread_mutex_unlock (&cache_lock);
941 return (NULL);
942 } /* }}} void *queue_thread_main */
944 static int buffer_get_field (char **buffer_ret, /* {{{ */
945 size_t *buffer_size_ret, char **field_ret)
946 {
947 char *buffer;
948 size_t buffer_pos;
949 size_t buffer_size;
950 char *field;
951 size_t field_size;
952 int status;
954 buffer = *buffer_ret;
955 buffer_pos = 0;
956 buffer_size = *buffer_size_ret;
957 field = *buffer_ret;
958 field_size = 0;
960 if (buffer_size <= 0)
961 return (-1);
963 /* This is ensured by `handle_request'. */
964 assert (buffer[buffer_size - 1] == '\0');
966 status = -1;
967 while (buffer_pos < buffer_size)
968 {
969 /* Check for end-of-field or end-of-buffer */
970 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
971 {
972 field[field_size] = 0;
973 field_size++;
974 buffer_pos++;
975 status = 0;
976 break;
977 }
978 /* Handle escaped characters. */
979 else if (buffer[buffer_pos] == '\\')
980 {
981 if (buffer_pos >= (buffer_size - 1))
982 break;
983 buffer_pos++;
984 field[field_size] = buffer[buffer_pos];
985 field_size++;
986 buffer_pos++;
987 }
988 /* Normal operation */
989 else
990 {
991 field[field_size] = buffer[buffer_pos];
992 field_size++;
993 buffer_pos++;
994 }
995 } /* while (buffer_pos < buffer_size) */
997 if (status != 0)
998 return (status);
1000 *buffer_ret = buffer + buffer_pos;
1001 *buffer_size_ret = buffer_size - buffer_pos;
1002 *field_ret = field;
1004 return (0);
1005 } /* }}} int buffer_get_field */
1007 /* if we're restricting writes to the base directory,
1008 * check whether the file falls within the dir
1009 * returns 1 if OK, otherwise 0
1010 */
1011 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1012 {
1013 assert(file != NULL);
1015 if (!config_write_base_only
1016 || sock == NULL /* journal replay */
1017 || config_base_dir == NULL)
1018 return 1;
1020 if (strstr(file, "../") != NULL) goto err;
1022 /* relative paths without "../" are ok */
1023 if (*file != '/') return 1;
1025 /* file must be of the format base + "/" + <1+ char filename> */
1026 if (strlen(file) < _config_base_dir_len + 2) goto err;
1027 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1028 if (*(file + _config_base_dir_len) != '/') goto err;
1030 return 1;
1032 err:
1033 if (sock != NULL && sock->fd >= 0)
1034 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1036 return 0;
1037 } /* }}} static int check_file_access */
1039 /* when using a base dir, convert relative paths to absolute paths.
1040 * if necessary, modifies the "filename" pointer to point
1041 * to the new path created in "tmp". "tmp" is provided
1042 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1043 *
1044 * this allows us to optimize for the expected case (absolute path)
1045 * with a no-op.
1046 */
1047 static void get_abs_path(char **filename, char *tmp)
1048 {
1049 assert(tmp != NULL);
1050 assert(filename != NULL && *filename != NULL);
1052 if (config_base_dir == NULL || **filename == '/')
1053 return;
1055 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1056 *filename = tmp;
1057 } /* }}} static int get_abs_path */
1059 /* returns 1 if we have the required privilege level,
1060 * otherwise issue an error to the user on sock */
1061 static int has_privilege (listen_socket_t *sock, /* {{{ */
1062 socket_privilege priv)
1063 {
1064 if (sock == NULL) /* journal replay */
1065 return 1;
1067 if (sock->privilege >= priv)
1068 return 1;
1070 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1071 } /* }}} static int has_privilege */
1073 static int flush_file (const char *filename) /* {{{ */
1074 {
1075 cache_item_t *ci;
1077 pthread_mutex_lock (&cache_lock);
1079 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1080 if (ci == NULL)
1081 {
1082 pthread_mutex_unlock (&cache_lock);
1083 return (ENOENT);
1084 }
1086 if (ci->values_num > 0)
1087 {
1088 /* Enqueue at head */
1089 enqueue_cache_item (ci, HEAD);
1090 pthread_cond_wait(&ci->flushed, &cache_lock);
1091 }
1093 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1094 * may have been purged during our cond_wait() */
1096 pthread_mutex_unlock(&cache_lock);
1098 return (0);
1099 } /* }}} int flush_file */
1101 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1102 {
1103 char *err = "Syntax error.\n";
1105 if (cmd && cmd->syntax)
1106 err = cmd->syntax;
1108 return send_response(sock, RESP_ERR, "Usage: %s", err);
1109 } /* }}} static int syntax_error() */
1111 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1112 {
1113 uint64_t copy_queue_length;
1114 uint64_t copy_updates_received;
1115 uint64_t copy_flush_received;
1116 uint64_t copy_updates_written;
1117 uint64_t copy_data_sets_written;
1118 uint64_t copy_journal_bytes;
1119 uint64_t copy_journal_rotate;
1121 uint64_t tree_nodes_number;
1122 uint64_t tree_depth;
1124 pthread_mutex_lock (&stats_lock);
1125 copy_queue_length = stats_queue_length;
1126 copy_updates_received = stats_updates_received;
1127 copy_flush_received = stats_flush_received;
1128 copy_updates_written = stats_updates_written;
1129 copy_data_sets_written = stats_data_sets_written;
1130 copy_journal_bytes = stats_journal_bytes;
1131 copy_journal_rotate = stats_journal_rotate;
1132 pthread_mutex_unlock (&stats_lock);
1134 pthread_mutex_lock (&cache_lock);
1135 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1136 tree_depth = (uint64_t) g_tree_height (cache_tree);
1137 pthread_mutex_unlock (&cache_lock);
1139 add_response_info(sock,
1140 "QueueLength: %"PRIu64"\n", copy_queue_length);
1141 add_response_info(sock,
1142 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1143 add_response_info(sock,
1144 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1145 add_response_info(sock,
1146 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1147 add_response_info(sock,
1148 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1149 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1150 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1151 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1152 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1154 send_response(sock, RESP_OK, "Statistics follow\n");
1156 return (0);
1157 } /* }}} int handle_request_stats */
1159 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1160 {
1161 char *file, file_tmp[PATH_MAX];
1162 int status;
1164 status = buffer_get_field (&buffer, &buffer_size, &file);
1165 if (status != 0)
1166 {
1167 return syntax_error(sock,cmd);
1168 }
1169 else
1170 {
1171 pthread_mutex_lock(&stats_lock);
1172 stats_flush_received++;
1173 pthread_mutex_unlock(&stats_lock);
1175 get_abs_path(&file, file_tmp);
1176 if (!check_file_access(file, sock)) return 0;
1178 status = flush_file (file);
1179 if (status == 0)
1180 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1181 else if (status == ENOENT)
1182 {
1183 /* no file in our tree; see whether it exists at all */
1184 struct stat statbuf;
1186 memset(&statbuf, 0, sizeof(statbuf));
1187 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1188 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1189 else
1190 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1191 }
1192 else if (status < 0)
1193 return send_response(sock, RESP_ERR, "Internal error.\n");
1194 else
1195 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1196 }
1198 /* NOTREACHED */
1199 assert(1==0);
1200 } /* }}} int handle_request_flush */
1202 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1203 {
1204 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1206 pthread_mutex_lock(&cache_lock);
1207 flush_old_values(-1);
1208 pthread_mutex_unlock(&cache_lock);
1210 return send_response(sock, RESP_OK, "Started flush.\n");
1211 } /* }}} static int handle_request_flushall */
1213 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1214 {
1215 int status;
1216 char *file, file_tmp[PATH_MAX];
1217 cache_item_t *ci;
1219 status = buffer_get_field(&buffer, &buffer_size, &file);
1220 if (status != 0)
1221 return syntax_error(sock,cmd);
1223 get_abs_path(&file, file_tmp);
1225 pthread_mutex_lock(&cache_lock);
1226 ci = g_tree_lookup(cache_tree, file);
1227 if (ci == NULL)
1228 {
1229 pthread_mutex_unlock(&cache_lock);
1230 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1231 }
1233 for (size_t i=0; i < ci->values_num; i++)
1234 add_response_info(sock, "%s\n", ci->values[i]);
1236 pthread_mutex_unlock(&cache_lock);
1237 return send_response(sock, RESP_OK, "updates pending\n");
1238 } /* }}} static int handle_request_pending */
1240 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1241 {
1242 int status;
1243 gboolean found;
1244 char *file, file_tmp[PATH_MAX];
1246 status = buffer_get_field(&buffer, &buffer_size, &file);
1247 if (status != 0)
1248 return syntax_error(sock,cmd);
1250 get_abs_path(&file, file_tmp);
1251 if (!check_file_access(file, sock)) return 0;
1253 pthread_mutex_lock(&cache_lock);
1254 found = g_tree_remove(cache_tree, file);
1255 pthread_mutex_unlock(&cache_lock);
1257 if (found == TRUE)
1258 {
1259 if (sock != NULL)
1260 journal_write("forget", file);
1262 return send_response(sock, RESP_OK, "Gone!\n");
1263 }
1264 else
1265 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267 /* NOTREACHED */
1268 assert(1==0);
1269 } /* }}} static int handle_request_forget */
1271 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1272 {
1273 cache_item_t *ci;
1275 pthread_mutex_lock(&cache_lock);
1277 ci = cache_queue_head;
1278 while (ci != NULL)
1279 {
1280 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1281 ci = ci->next;
1282 }
1284 pthread_mutex_unlock(&cache_lock);
1286 return send_response(sock, RESP_OK, "in queue.\n");
1287 } /* }}} int handle_request_queue */
1289 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1290 {
1291 char *file, file_tmp[PATH_MAX];
1292 int values_num = 0;
1293 int status;
1294 char orig_buf[CMD_MAX];
1296 cache_item_t *ci;
1298 /* save it for the journal later */
1299 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1301 status = buffer_get_field (&buffer, &buffer_size, &file);
1302 if (status != 0)
1303 return syntax_error(sock,cmd);
1305 pthread_mutex_lock(&stats_lock);
1306 stats_updates_received++;
1307 pthread_mutex_unlock(&stats_lock);
1309 get_abs_path(&file, file_tmp);
1310 if (!check_file_access(file, sock)) return 0;
1312 pthread_mutex_lock (&cache_lock);
1313 ci = g_tree_lookup (cache_tree, file);
1315 if (ci == NULL) /* {{{ */
1316 {
1317 struct stat statbuf;
1318 cache_item_t *tmp;
1320 /* don't hold the lock while we setup; stat(2) might block */
1321 pthread_mutex_unlock(&cache_lock);
1323 memset (&statbuf, 0, sizeof (statbuf));
1324 status = stat (file, &statbuf);
1325 if (status != 0)
1326 {
1327 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1329 status = errno;
1330 if (status == ENOENT)
1331 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1332 else
1333 return send_response(sock, RESP_ERR,
1334 "stat failed with error %i.\n", status);
1335 }
1336 if (!S_ISREG (statbuf.st_mode))
1337 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1339 if (access(file, R_OK|W_OK) != 0)
1340 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1341 file, rrd_strerror(errno));
1343 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1344 if (ci == NULL)
1345 {
1346 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1348 return send_response(sock, RESP_ERR, "malloc failed.\n");
1349 }
1350 memset (ci, 0, sizeof (cache_item_t));
1352 ci->file = strdup (file);
1353 if (ci->file == NULL)
1354 {
1355 free (ci);
1356 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1358 return send_response(sock, RESP_ERR, "strdup failed.\n");
1359 }
1361 wipe_ci_values(ci, now);
1362 ci->flags = CI_FLAGS_IN_TREE;
1363 pthread_cond_init(&ci->flushed, NULL);
1365 pthread_mutex_lock(&cache_lock);
1367 /* another UPDATE might have added this entry in the meantime */
1368 tmp = g_tree_lookup (cache_tree, file);
1369 if (tmp == NULL)
1370 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1371 else
1372 {
1373 free_cache_item (ci);
1374 ci = tmp;
1375 }
1377 /* state may have changed while we were unlocked */
1378 if (state == SHUTDOWN)
1379 return -1;
1380 } /* }}} */
1381 assert (ci != NULL);
1383 /* don't re-write updates in replay mode */
1384 if (sock != NULL)
1385 journal_write("update", orig_buf);
1387 while (buffer_size > 0)
1388 {
1389 char *value;
1390 time_t stamp;
1391 char *eostamp;
1393 status = buffer_get_field (&buffer, &buffer_size, &value);
1394 if (status != 0)
1395 {
1396 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1397 break;
1398 }
1400 /* make sure update time is always moving forward */
1401 stamp = strtol(value, &eostamp, 10);
1402 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1403 {
1404 pthread_mutex_unlock(&cache_lock);
1405 return send_response(sock, RESP_ERR,
1406 "Cannot find timestamp in '%s'!\n", value);
1407 }
1408 else if (stamp <= ci->last_update_stamp)
1409 {
1410 pthread_mutex_unlock(&cache_lock);
1411 return send_response(sock, RESP_ERR,
1412 "illegal attempt to update using time %ld when last"
1413 " update time is %ld (minimum one second step)\n",
1414 stamp, ci->last_update_stamp);
1415 }
1416 else
1417 ci->last_update_stamp = stamp;
1419 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1420 {
1421 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1422 continue;
1423 }
1425 values_num++;
1426 }
1428 if (((now - ci->last_flush_time) >= config_write_interval)
1429 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1430 && (ci->values_num > 0))
1431 {
1432 enqueue_cache_item (ci, TAIL);
1433 }
1435 pthread_mutex_unlock (&cache_lock);
1437 if (values_num < 1)
1438 return send_response(sock, RESP_ERR, "No values updated.\n");
1439 else
1440 return send_response(sock, RESP_OK,
1441 "errors, enqueued %i value(s).\n", values_num);
1443 /* NOTREACHED */
1444 assert(1==0);
1446 } /* }}} int handle_request_update */
1448 /* we came across a "WROTE" entry during journal replay.
1449 * throw away any values that we have accumulated for this file
1450 */
1451 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1452 {
1453 cache_item_t *ci;
1454 const char *file = buffer;
1456 pthread_mutex_lock(&cache_lock);
1458 ci = g_tree_lookup(cache_tree, file);
1459 if (ci == NULL)
1460 {
1461 pthread_mutex_unlock(&cache_lock);
1462 return (0);
1463 }
1465 if (ci->values)
1466 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1468 wipe_ci_values(ci, now);
1469 remove_from_queue(ci);
1471 pthread_mutex_unlock(&cache_lock);
1472 return (0);
1473 } /* }}} int handle_request_wrote */
1475 /* start "BATCH" processing */
1476 static int batch_start (HANDLER_PROTO) /* {{{ */
1477 {
1478 int status;
1479 if (sock->batch_start)
1480 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1482 status = send_response(sock, RESP_OK,
1483 "Go ahead. End with dot '.' on its own line.\n");
1484 sock->batch_start = time(NULL);
1485 sock->batch_cmd = 0;
1487 return status;
1488 } /* }}} static int batch_start */
1490 /* finish "BATCH" processing and return results to the client */
1491 static int batch_done (HANDLER_PROTO) /* {{{ */
1492 {
1493 assert(sock->batch_start);
1494 sock->batch_start = 0;
1495 sock->batch_cmd = 0;
1496 return send_response(sock, RESP_OK, "errors\n");
1497 } /* }}} static int batch_done */
1499 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1500 {
1501 return -1;
1502 } /* }}} static int handle_request_quit */
1504 struct command COMMANDS[] = {
1505 {
1506 "UPDATE",
1507 handle_request_update,
1508 PRIV_HIGH,
1509 CMD_CONTEXT_ANY,
1510 "UPDATE <filename> <values> [<values> ...]\n"
1511 ,
1512 "Adds the given file to the internal cache if it is not yet known and\n"
1513 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1514 "for details.\n"
1515 "\n"
1516 "Each <values> has the following form:\n"
1517 " <values> = <time>:<value>[:<value>[...]]\n"
1518 "See the rrdupdate(1) manpage for details.\n"
1519 },
1520 {
1521 "WROTE",
1522 handle_request_wrote,
1523 PRIV_HIGH,
1524 CMD_CONTEXT_JOURNAL,
1525 NULL,
1526 NULL
1527 },
1528 {
1529 "FLUSH",
1530 handle_request_flush,
1531 PRIV_LOW,
1532 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1533 "FLUSH <filename>\n"
1534 ,
1535 "Adds the given filename to the head of the update queue and returns\n"
1536 "after it has been dequeued.\n"
1537 },
1538 {
1539 "FLUSHALL",
1540 handle_request_flushall,
1541 PRIV_HIGH,
1542 CMD_CONTEXT_CLIENT,
1543 "FLUSHALL\n"
1544 ,
1545 "Triggers writing of all pending updates. Returns immediately.\n"
1546 },
1547 {
1548 "PENDING",
1549 handle_request_pending,
1550 PRIV_HIGH,
1551 CMD_CONTEXT_CLIENT,
1552 "PENDING <filename>\n"
1553 ,
1554 "Shows any 'pending' updates for a file, in order.\n"
1555 "The updates shown have not yet been written to the underlying RRD file.\n"
1556 },
1557 {
1558 "FORGET",
1559 handle_request_forget,
1560 PRIV_HIGH,
1561 CMD_CONTEXT_ANY,
1562 "FORGET <filename>\n"
1563 ,
1564 "Removes the file completely from the cache.\n"
1565 "Any pending updates for the file will be lost.\n"
1566 },
1567 {
1568 "QUEUE",
1569 handle_request_queue,
1570 PRIV_LOW,
1571 CMD_CONTEXT_CLIENT,
1572 "QUEUE\n"
1573 ,
1574 "Shows all files in the output queue.\n"
1575 "The output is zero or more lines in the following format:\n"
1576 "(where <num_vals> is the number of values to be written)\n"
1577 "\n"
1578 "<num_vals> <filename>\n"
1579 },
1580 {
1581 "STATS",
1582 handle_request_stats,
1583 PRIV_LOW,
1584 CMD_CONTEXT_CLIENT,
1585 "STATS\n"
1586 ,
1587 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1588 "a description of the values.\n"
1589 },
1590 {
1591 "HELP",
1592 handle_request_help,
1593 PRIV_LOW,
1594 CMD_CONTEXT_CLIENT,
1595 "HELP [<command>]\n",
1596 NULL, /* special! */
1597 },
1598 {
1599 "BATCH",
1600 batch_start,
1601 PRIV_LOW,
1602 CMD_CONTEXT_CLIENT,
1603 "BATCH\n"
1604 ,
1605 "The 'BATCH' command permits the client to initiate a bulk load\n"
1606 " of commands to rrdcached.\n"
1607 "\n"
1608 "Usage:\n"
1609 "\n"
1610 " client: BATCH\n"
1611 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1612 " client: command #1\n"
1613 " client: command #2\n"
1614 " client: ... and so on\n"
1615 " client: .\n"
1616 " server: 2 errors\n"
1617 " server: 7 message for command #7\n"
1618 " server: 9 message for command #9\n"
1619 "\n"
1620 "For more information, consult the rrdcached(1) documentation.\n"
1621 },
1622 {
1623 ".", /* BATCH terminator */
1624 batch_done,
1625 PRIV_LOW,
1626 CMD_CONTEXT_BATCH,
1627 NULL,
1628 NULL
1629 },
1630 {
1631 "QUIT",
1632 handle_request_quit,
1633 PRIV_LOW,
1634 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1635 "QUIT\n"
1636 ,
1637 "Disconnect from rrdcached.\n"
1638 },
1639 {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */
1640 };
1642 static struct command *find_command(char *cmd)
1643 {
1644 struct command *c = COMMANDS;
1646 while (c->cmd != NULL)
1647 {
1648 if (strcasecmp(cmd, c->cmd) == 0)
1649 break;
1650 c++;
1651 }
1653 if (c->cmd == NULL)
1654 return NULL;
1655 else
1656 return c;
1657 }
1659 /* check whether commands are received in the expected context */
1660 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1661 {
1662 if (sock == NULL)
1663 return (cmd->context & CMD_CONTEXT_JOURNAL);
1664 else if (sock->batch_start)
1665 return (cmd->context & CMD_CONTEXT_BATCH);
1666 else
1667 return (cmd->context & CMD_CONTEXT_CLIENT);
1669 /* NOTREACHED */
1670 assert(1==0);
1671 }
1673 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1674 {
1675 int status;
1676 char *cmd_str;
1677 char *resp_txt;
1678 struct command *help = NULL;
1680 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1681 if (status == 0)
1682 help = find_command(cmd_str);
1684 if (help && (help->syntax || help->help))
1685 {
1686 char tmp[CMD_MAX];
1688 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1689 resp_txt = tmp;
1691 if (help->syntax)
1692 add_response_info(sock, "Usage: %s\n", help->syntax);
1694 if (help->help)
1695 add_response_info(sock, "%s\n", help->help);
1696 }
1697 else
1698 {
1699 help = COMMANDS;
1700 resp_txt = "Command overview\n";
1702 while (help->cmd)
1703 {
1704 if (help->syntax)
1705 add_response_info(sock, "%s", help->syntax);
1706 help++;
1707 }
1708 }
1710 return send_response(sock, RESP_OK, resp_txt);
1711 } /* }}} int handle_request_help */
1713 /* if sock==NULL, we are in journal replay mode */
1714 static int handle_request (DISPATCH_PROTO) /* {{{ */
1715 {
1716 char *buffer_ptr = buffer;
1717 char *cmd_str = NULL;
1718 struct command *cmd = NULL;
1719 int status;
1721 assert (buffer[buffer_size - 1] == '\0');
1723 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1724 if (status != 0)
1725 {
1726 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1727 return (-1);
1728 }
1730 if (sock != NULL && sock->batch_start)
1731 sock->batch_cmd++;
1733 cmd = find_command(cmd_str);
1734 if (!cmd)
1735 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1737 status = has_privilege(sock, cmd->min_priv);
1738 if (status <= 0)
1739 return status;
1741 if (!command_check_context(sock, cmd))
1742 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1744 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1745 } /* }}} int handle_request */
1747 /* MUST NOT hold journal_lock before calling this */
1748 static void journal_rotate(void) /* {{{ */
1749 {
1750 FILE *old_fh = NULL;
1751 int new_fd;
1753 if (journal_cur == NULL || journal_old == NULL)
1754 return;
1756 pthread_mutex_lock(&journal_lock);
1758 /* we rotate this way (rename before close) so that the we can release
1759 * the journal lock as fast as possible. Journal writes to the new
1760 * journal can proceed immediately after the new file is opened. The
1761 * fclose can then block without affecting new updates.
1762 */
1763 if (journal_fh != NULL)
1764 {
1765 old_fh = journal_fh;
1766 journal_fh = NULL;
1767 rename(journal_cur, journal_old);
1768 ++stats_journal_rotate;
1769 }
1771 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1772 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1773 if (new_fd >= 0)
1774 {
1775 journal_fh = fdopen(new_fd, "a");
1776 if (journal_fh == NULL)
1777 close(new_fd);
1778 }
1780 pthread_mutex_unlock(&journal_lock);
1782 if (old_fh != NULL)
1783 fclose(old_fh);
1785 if (journal_fh == NULL)
1786 {
1787 RRDD_LOG(LOG_CRIT,
1788 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1789 journal_cur, rrd_strerror(errno));
1791 RRDD_LOG(LOG_ERR,
1792 "JOURNALING DISABLED: All values will be flushed at shutdown");
1793 config_flush_at_shutdown = 1;
1794 }
1796 } /* }}} static void journal_rotate */
1798 static void journal_done(void) /* {{{ */
1799 {
1800 if (journal_cur == NULL)
1801 return;
1803 pthread_mutex_lock(&journal_lock);
1804 if (journal_fh != NULL)
1805 {
1806 fclose(journal_fh);
1807 journal_fh = NULL;
1808 }
1810 if (config_flush_at_shutdown)
1811 {
1812 RRDD_LOG(LOG_INFO, "removing journals");
1813 unlink(journal_old);
1814 unlink(journal_cur);
1815 }
1816 else
1817 {
1818 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1819 "journals will be used at next startup");
1820 }
1822 pthread_mutex_unlock(&journal_lock);
1824 } /* }}} static void journal_done */
1826 static int journal_write(char *cmd, char *args) /* {{{ */
1827 {
1828 int chars;
1830 if (journal_fh == NULL)
1831 return 0;
1833 pthread_mutex_lock(&journal_lock);
1834 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1835 pthread_mutex_unlock(&journal_lock);
1837 if (chars > 0)
1838 {
1839 pthread_mutex_lock(&stats_lock);
1840 stats_journal_bytes += chars;
1841 pthread_mutex_unlock(&stats_lock);
1842 }
1844 return chars;
1845 } /* }}} static int journal_write */
1847 static int journal_replay (const char *file) /* {{{ */
1848 {
1849 FILE *fh;
1850 int entry_cnt = 0;
1851 int fail_cnt = 0;
1852 uint64_t line = 0;
1853 char entry[CMD_MAX];
1854 time_t now;
1856 if (file == NULL) return 0;
1858 {
1859 char *reason = "unknown error";
1860 int status = 0;
1861 struct stat statbuf;
1863 memset(&statbuf, 0, sizeof(statbuf));
1864 if (stat(file, &statbuf) != 0)
1865 {
1866 if (errno == ENOENT)
1867 return 0;
1869 reason = "stat error";
1870 status = errno;
1871 }
1872 else if (!S_ISREG(statbuf.st_mode))
1873 {
1874 reason = "not a regular file";
1875 status = EPERM;
1876 }
1877 if (statbuf.st_uid != daemon_uid)
1878 {
1879 reason = "not owned by daemon user";
1880 status = EACCES;
1881 }
1882 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1883 {
1884 reason = "must not be user/group writable";
1885 status = EACCES;
1886 }
1888 if (status != 0)
1889 {
1890 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1891 file, rrd_strerror(status), reason);
1892 return 0;
1893 }
1894 }
1896 fh = fopen(file, "r");
1897 if (fh == NULL)
1898 {
1899 if (errno != ENOENT)
1900 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1901 file, rrd_strerror(errno));
1902 return 0;
1903 }
1904 else
1905 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1907 now = time(NULL);
1909 while(!feof(fh))
1910 {
1911 size_t entry_len;
1913 ++line;
1914 if (fgets(entry, sizeof(entry), fh) == NULL)
1915 break;
1916 entry_len = strlen(entry);
1918 /* check \n termination in case journal writing crashed mid-line */
1919 if (entry_len == 0)
1920 continue;
1921 else if (entry[entry_len - 1] != '\n')
1922 {
1923 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1924 ++fail_cnt;
1925 continue;
1926 }
1928 entry[entry_len - 1] = '\0';
1930 if (handle_request(NULL, now, entry, entry_len) == 0)
1931 ++entry_cnt;
1932 else
1933 ++fail_cnt;
1934 }
1936 fclose(fh);
1938 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1939 entry_cnt, fail_cnt);
1941 return entry_cnt > 0 ? 1 : 0;
1942 } /* }}} static int journal_replay */
1944 static void journal_init(void) /* {{{ */
1945 {
1946 int had_journal = 0;
1948 if (journal_cur == NULL) return;
1950 pthread_mutex_lock(&journal_lock);
1952 RRDD_LOG(LOG_INFO, "checking for journal files");
1954 had_journal += journal_replay(journal_old);
1955 had_journal += journal_replay(journal_cur);
1957 /* it must have been a crash. start a flush */
1958 if (had_journal && config_flush_at_shutdown)
1959 flush_old_values(-1);
1961 pthread_mutex_unlock(&journal_lock);
1962 journal_rotate();
1964 RRDD_LOG(LOG_INFO, "journal processing complete");
1966 } /* }}} static void journal_init */
1968 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1969 {
1970 assert(sock != NULL);
1972 free(sock->rbuf); sock->rbuf = NULL;
1973 free(sock->wbuf); sock->wbuf = NULL;
1974 free(sock);
1975 } /* }}} void free_listen_socket */
1977 static void close_connection(listen_socket_t *sock) /* {{{ */
1978 {
1979 if (sock->fd >= 0)
1980 {
1981 close(sock->fd);
1982 sock->fd = -1;
1983 }
1985 free_listen_socket(sock);
1987 } /* }}} void close_connection */
1989 static void *connection_thread_main (void *args) /* {{{ */
1990 {
1991 listen_socket_t *sock;
1992 int fd;
1994 sock = (listen_socket_t *) args;
1995 fd = sock->fd;
1997 /* init read buffers */
1998 sock->next_read = sock->next_cmd = 0;
1999 sock->rbuf = malloc(RBUF_SIZE);
2000 if (sock->rbuf == NULL)
2001 {
2002 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2003 close_connection(sock);
2004 return NULL;
2005 }
2007 pthread_mutex_lock (&connection_threads_lock);
2008 connection_threads_num++;
2009 pthread_mutex_unlock (&connection_threads_lock);
2011 while (state == RUNNING)
2012 {
2013 char *cmd;
2014 ssize_t cmd_len;
2015 ssize_t rbytes;
2016 time_t now;
2018 struct pollfd pollfd;
2019 int status;
2021 pollfd.fd = fd;
2022 pollfd.events = POLLIN | POLLPRI;
2023 pollfd.revents = 0;
2025 status = poll (&pollfd, 1, /* timeout = */ 500);
2026 if (state != RUNNING)
2027 break;
2028 else if (status == 0) /* timeout */
2029 continue;
2030 else if (status < 0) /* error */
2031 {
2032 status = errno;
2033 if (status != EINTR)
2034 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2035 continue;
2036 }
2038 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2039 break;
2040 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2041 {
2042 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2043 "poll(2) returned something unexpected: %#04hx",
2044 pollfd.revents);
2045 break;
2046 }
2048 rbytes = read(fd, sock->rbuf + sock->next_read,
2049 RBUF_SIZE - sock->next_read);
2050 if (rbytes < 0)
2051 {
2052 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2053 break;
2054 }
2055 else if (rbytes == 0)
2056 break; /* eof */
2058 sock->next_read += rbytes;
2060 if (sock->batch_start)
2061 now = sock->batch_start;
2062 else
2063 now = time(NULL);
2065 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2066 {
2067 status = handle_request (sock, now, cmd, cmd_len+1);
2068 if (status != 0)
2069 goto out_close;
2070 }
2071 }
2073 out_close:
2074 close_connection(sock);
2076 /* Remove this thread from the connection threads list */
2077 pthread_mutex_lock (&connection_threads_lock);
2078 connection_threads_num--;
2079 if (connection_threads_num <= 0)
2080 pthread_cond_broadcast(&connection_threads_done);
2081 pthread_mutex_unlock (&connection_threads_lock);
2083 return (NULL);
2084 } /* }}} void *connection_thread_main */
2086 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2087 {
2088 int fd;
2089 struct sockaddr_un sa;
2090 listen_socket_t *temp;
2091 int status;
2092 const char *path;
2094 path = sock->addr;
2095 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2096 path += strlen("unix:");
2098 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2099 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2100 if (temp == NULL)
2101 {
2102 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2103 return (-1);
2104 }
2105 listen_fds = temp;
2106 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2108 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2109 if (fd < 0)
2110 {
2111 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2112 rrd_strerror(errno));
2113 return (-1);
2114 }
2116 memset (&sa, 0, sizeof (sa));
2117 sa.sun_family = AF_UNIX;
2118 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2120 /* if we've gotten this far, we own the pid file. any daemon started
2121 * with the same args must not be alive. therefore, ensure that we can
2122 * create the socket...
2123 */
2124 unlink(path);
2126 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2127 if (status != 0)
2128 {
2129 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2130 path, rrd_strerror(errno));
2131 close (fd);
2132 return (-1);
2133 }
2135 status = listen (fd, /* backlog = */ 10);
2136 if (status != 0)
2137 {
2138 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2139 path, rrd_strerror(errno));
2140 close (fd);
2141 unlink (path);
2142 return (-1);
2143 }
2145 listen_fds[listen_fds_num].fd = fd;
2146 listen_fds[listen_fds_num].family = PF_UNIX;
2147 strncpy(listen_fds[listen_fds_num].addr, path,
2148 sizeof (listen_fds[listen_fds_num].addr) - 1);
2149 listen_fds_num++;
2151 return (0);
2152 } /* }}} int open_listen_socket_unix */
2154 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2155 {
2156 struct addrinfo ai_hints;
2157 struct addrinfo *ai_res;
2158 struct addrinfo *ai_ptr;
2159 char addr_copy[NI_MAXHOST];
2160 char *addr;
2161 char *port;
2162 int status;
2164 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2165 addr_copy[sizeof (addr_copy) - 1] = 0;
2166 addr = addr_copy;
2168 memset (&ai_hints, 0, sizeof (ai_hints));
2169 ai_hints.ai_flags = 0;
2170 #ifdef AI_ADDRCONFIG
2171 ai_hints.ai_flags |= AI_ADDRCONFIG;
2172 #endif
2173 ai_hints.ai_family = AF_UNSPEC;
2174 ai_hints.ai_socktype = SOCK_STREAM;
2176 port = NULL;
2177 if (*addr == '[') /* IPv6+port format */
2178 {
2179 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2180 addr++;
2182 port = strchr (addr, ']');
2183 if (port == NULL)
2184 {
2185 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2186 return (-1);
2187 }
2188 *port = 0;
2189 port++;
2191 if (*port == ':')
2192 port++;
2193 else if (*port == 0)
2194 port = NULL;
2195 else
2196 {
2197 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2198 return (-1);
2199 }
2200 } /* if (*addr = ']') */
2201 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2202 {
2203 port = rindex(addr, ':');
2204 if (port != NULL)
2205 {
2206 *port = 0;
2207 port++;
2208 }
2209 }
2210 ai_res = NULL;
2211 status = getaddrinfo (addr,
2212 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2213 &ai_hints, &ai_res);
2214 if (status != 0)
2215 {
2216 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2217 addr, gai_strerror (status));
2218 return (-1);
2219 }
2221 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2222 {
2223 int fd;
2224 listen_socket_t *temp;
2225 int one = 1;
2227 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2228 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2229 if (temp == NULL)
2230 {
2231 fprintf (stderr,
2232 "rrdcached: open_listen_socket_network: realloc failed.\n");
2233 continue;
2234 }
2235 listen_fds = temp;
2236 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2238 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2239 if (fd < 0)
2240 {
2241 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2242 rrd_strerror(errno));
2243 continue;
2244 }
2246 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2248 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2249 if (status != 0)
2250 {
2251 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2252 sock->addr, rrd_strerror(errno));
2253 close (fd);
2254 continue;
2255 }
2257 status = listen (fd, /* backlog = */ 10);
2258 if (status != 0)
2259 {
2260 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2261 sock->addr, rrd_strerror(errno));
2262 close (fd);
2263 freeaddrinfo(ai_res);
2264 return (-1);
2265 }
2267 listen_fds[listen_fds_num].fd = fd;
2268 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2269 listen_fds_num++;
2270 } /* for (ai_ptr) */
2272 freeaddrinfo(ai_res);
2273 return (0);
2274 } /* }}} static int open_listen_socket_network */
2276 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2277 {
2278 assert(sock != NULL);
2279 assert(sock->addr != NULL);
2281 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2282 || sock->addr[0] == '/')
2283 return (open_listen_socket_unix(sock));
2284 else
2285 return (open_listen_socket_network(sock));
2286 } /* }}} int open_listen_socket */
2288 static int close_listen_sockets (void) /* {{{ */
2289 {
2290 size_t i;
2292 for (i = 0; i < listen_fds_num; i++)
2293 {
2294 close (listen_fds[i].fd);
2296 if (listen_fds[i].family == PF_UNIX)
2297 unlink(listen_fds[i].addr);
2298 }
2300 free (listen_fds);
2301 listen_fds = NULL;
2302 listen_fds_num = 0;
2304 return (0);
2305 } /* }}} int close_listen_sockets */
2307 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2308 {
2309 struct pollfd *pollfds;
2310 int pollfds_num;
2311 int status;
2312 int i;
2314 if (listen_fds_num < 1)
2315 {
2316 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2317 return (NULL);
2318 }
2320 pollfds_num = listen_fds_num;
2321 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2322 if (pollfds == NULL)
2323 {
2324 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2325 return (NULL);
2326 }
2327 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2329 RRDD_LOG(LOG_INFO, "listening for connections");
2331 while (state == RUNNING)
2332 {
2333 for (i = 0; i < pollfds_num; i++)
2334 {
2335 pollfds[i].fd = listen_fds[i].fd;
2336 pollfds[i].events = POLLIN | POLLPRI;
2337 pollfds[i].revents = 0;
2338 }
2340 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2341 if (state != RUNNING)
2342 break;
2343 else if (status == 0) /* timeout */
2344 continue;
2345 else if (status < 0) /* error */
2346 {
2347 status = errno;
2348 if (status != EINTR)
2349 {
2350 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2351 }
2352 continue;
2353 }
2355 for (i = 0; i < pollfds_num; i++)
2356 {
2357 listen_socket_t *client_sock;
2358 struct sockaddr_storage client_sa;
2359 socklen_t client_sa_size;
2360 pthread_t tid;
2361 pthread_attr_t attr;
2363 if (pollfds[i].revents == 0)
2364 continue;
2366 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2367 {
2368 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2369 "poll(2) returned something unexpected for listen FD #%i.",
2370 pollfds[i].fd);
2371 continue;
2372 }
2374 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2375 if (client_sock == NULL)
2376 {
2377 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2378 continue;
2379 }
2380 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2382 client_sa_size = sizeof (client_sa);
2383 client_sock->fd = accept (pollfds[i].fd,
2384 (struct sockaddr *) &client_sa, &client_sa_size);
2385 if (client_sock->fd < 0)
2386 {
2387 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2388 free(client_sock);
2389 continue;
2390 }
2392 pthread_attr_init (&attr);
2393 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2395 status = pthread_create (&tid, &attr, connection_thread_main,
2396 client_sock);
2397 if (status != 0)
2398 {
2399 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2400 close_connection(client_sock);
2401 continue;
2402 }
2403 } /* for (pollfds_num) */
2404 } /* while (state == RUNNING) */
2406 RRDD_LOG(LOG_INFO, "starting shutdown");
2408 close_listen_sockets ();
2410 pthread_mutex_lock (&connection_threads_lock);
2411 while (connection_threads_num > 0)
2412 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2413 pthread_mutex_unlock (&connection_threads_lock);
2415 free(pollfds);
2417 return (NULL);
2418 } /* }}} void *listen_thread_main */
2420 static int daemonize (void) /* {{{ */
2421 {
2422 int pid_fd;
2423 char *base_dir;
2425 daemon_uid = geteuid();
2427 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2428 if (pid_fd < 0)
2429 pid_fd = check_pidfile();
2430 if (pid_fd < 0)
2431 return pid_fd;
2433 /* open all the listen sockets */
2434 if (config_listen_address_list_len > 0)
2435 {
2436 for (size_t i = 0; i < config_listen_address_list_len; i++)
2437 open_listen_socket (config_listen_address_list[i]);
2439 rrd_free_ptrs((void ***) &config_listen_address_list,
2440 &config_listen_address_list_len);
2441 }
2442 else
2443 {
2444 listen_socket_t sock;
2445 memset(&sock, 0, sizeof(sock));
2446 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2447 open_listen_socket (&sock);
2448 }
2450 if (listen_fds_num < 1)
2451 {
2452 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2453 goto error;
2454 }
2456 if (!stay_foreground)
2457 {
2458 pid_t child;
2460 child = fork ();
2461 if (child < 0)
2462 {
2463 fprintf (stderr, "daemonize: fork(2) failed.\n");
2464 goto error;
2465 }
2466 else if (child > 0)
2467 exit(0);
2469 /* Become session leader */
2470 setsid ();
2472 /* Open the first three file descriptors to /dev/null */
2473 close (2);
2474 close (1);
2475 close (0);
2477 open ("/dev/null", O_RDWR);
2478 if (dup(0) == -1 || dup(0) == -1){
2479 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2480 }
2481 } /* if (!stay_foreground) */
2483 /* Change into the /tmp directory. */
2484 base_dir = (config_base_dir != NULL)
2485 ? config_base_dir
2486 : "/tmp";
2488 if (chdir (base_dir) != 0)
2489 {
2490 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2491 goto error;
2492 }
2494 install_signal_handlers();
2496 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2497 RRDD_LOG(LOG_INFO, "starting up");
2499 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2500 (GDestroyNotify) free_cache_item);
2501 if (cache_tree == NULL)
2502 {
2503 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2504 goto error;
2505 }
2507 return write_pidfile (pid_fd);
2509 error:
2510 remove_pidfile();
2511 return -1;
2512 } /* }}} int daemonize */
2514 static int cleanup (void) /* {{{ */
2515 {
2516 pthread_cond_broadcast (&flush_cond);
2517 pthread_join (flush_thread, NULL);
2519 pthread_cond_broadcast (&queue_cond);
2520 for (int i = 0; i < config_queue_threads; i++)
2521 pthread_join (queue_threads[i], NULL);
2523 if (config_flush_at_shutdown)
2524 {
2525 assert(cache_queue_head == NULL);
2526 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2527 }
2529 journal_done();
2530 remove_pidfile ();
2532 free(queue_threads);
2533 free(config_base_dir);
2534 free(config_pid_file);
2535 free(journal_cur);
2536 free(journal_old);
2538 pthread_mutex_lock(&cache_lock);
2539 g_tree_destroy(cache_tree);
2541 RRDD_LOG(LOG_INFO, "goodbye");
2542 closelog ();
2544 return (0);
2545 } /* }}} int cleanup */
2547 static int read_options (int argc, char **argv) /* {{{ */
2548 {
2549 int option;
2550 int status = 0;
2552 while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2553 {
2554 switch (option)
2555 {
2556 case 'g':
2557 stay_foreground=1;
2558 break;
2560 case 'L':
2561 case 'l':
2562 {
2563 listen_socket_t *new;
2565 new = malloc(sizeof(listen_socket_t));
2566 if (new == NULL)
2567 {
2568 fprintf(stderr, "read_options: malloc failed.\n");
2569 return(2);
2570 }
2571 memset(new, 0, sizeof(listen_socket_t));
2573 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2574 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2576 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2577 &config_listen_address_list_len, new))
2578 {
2579 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2580 return (2);
2581 }
2582 }
2583 break;
2585 case 'f':
2586 {
2587 int temp;
2589 temp = atoi (optarg);
2590 if (temp > 0)
2591 config_flush_interval = temp;
2592 else
2593 {
2594 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2595 status = 3;
2596 }
2597 }
2598 break;
2600 case 'w':
2601 {
2602 int temp;
2604 temp = atoi (optarg);
2605 if (temp > 0)
2606 config_write_interval = temp;
2607 else
2608 {
2609 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2610 status = 2;
2611 }
2612 }
2613 break;
2615 case 'z':
2616 {
2617 int temp;
2619 temp = atoi(optarg);
2620 if (temp > 0)
2621 config_write_jitter = temp;
2622 else
2623 {
2624 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2625 status = 2;
2626 }
2628 break;
2629 }
2631 case 't':
2632 {
2633 int threads;
2634 threads = atoi(optarg);
2635 if (threads >= 1)
2636 config_queue_threads = threads;
2637 else
2638 {
2639 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2640 return 1;
2641 }
2642 }
2643 break;
2645 case 'B':
2646 config_write_base_only = 1;
2647 break;
2649 case 'b':
2650 {
2651 size_t len;
2652 char base_realpath[PATH_MAX];
2654 if (config_base_dir != NULL)
2655 free (config_base_dir);
2656 config_base_dir = strdup (optarg);
2657 if (config_base_dir == NULL)
2658 {
2659 fprintf (stderr, "read_options: strdup failed.\n");
2660 return (3);
2661 }
2663 /* make sure that the base directory is not resolved via
2664 * symbolic links. this makes some performance-enhancing
2665 * assumptions possible (we don't have to resolve paths
2666 * that start with a "/")
2667 */
2668 if (realpath(config_base_dir, base_realpath) == NULL)
2669 {
2670 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2671 return 5;
2672 }
2673 else if (strncmp(config_base_dir,
2674 base_realpath, sizeof(base_realpath)) != 0)
2675 {
2676 fprintf(stderr,
2677 "Base directory (-b) resolved via file system links!\n"
2678 "Please consult rrdcached '-b' documentation!\n"
2679 "Consider specifying the real directory (%s)\n",
2680 base_realpath);
2681 return 5;
2682 }
2684 len = strlen (config_base_dir);
2685 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2686 {
2687 config_base_dir[len - 1] = 0;
2688 len--;
2689 }
2691 if (len < 1)
2692 {
2693 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2694 return (4);
2695 }
2697 _config_base_dir_len = len;
2698 }
2699 break;
2701 case 'p':
2702 {
2703 if (config_pid_file != NULL)
2704 free (config_pid_file);
2705 config_pid_file = strdup (optarg);
2706 if (config_pid_file == NULL)
2707 {
2708 fprintf (stderr, "read_options: strdup failed.\n");
2709 return (3);
2710 }
2711 }
2712 break;
2714 case 'F':
2715 config_flush_at_shutdown = 1;
2716 break;
2718 case 'j':
2719 {
2720 struct stat statbuf;
2721 const char *dir = optarg;
2723 status = stat(dir, &statbuf);
2724 if (status != 0)
2725 {
2726 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2727 return 6;
2728 }
2730 if (!S_ISDIR(statbuf.st_mode)
2731 || access(dir, R_OK|W_OK|X_OK) != 0)
2732 {
2733 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2734 errno ? rrd_strerror(errno) : "");
2735 return 6;
2736 }
2738 journal_cur = malloc(PATH_MAX + 1);
2739 journal_old = malloc(PATH_MAX + 1);
2740 if (journal_cur == NULL || journal_old == NULL)
2741 {
2742 fprintf(stderr, "malloc failure for journal files\n");
2743 return 6;
2744 }
2745 else
2746 {
2747 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2748 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2749 }
2750 }
2751 break;
2753 case 'h':
2754 case '?':
2755 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2756 "\n"
2757 "Usage: rrdcached [options]\n"
2758 "\n"
2759 "Valid options are:\n"
2760 " -l <address> Socket address to listen to.\n"
2761 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2762 " -w <seconds> Interval in which to write data.\n"
2763 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2764 " -t <threads> Number of write threads.\n"
2765 " -f <seconds> Interval in which to flush dead data.\n"
2766 " -p <file> Location of the PID-file.\n"
2767 " -b <dir> Base directory to change to.\n"
2768 " -B Restrict file access to paths within -b <dir>\n"
2769 " -g Do not fork and run in the foreground.\n"
2770 " -j <dir> Directory in which to create the journal files.\n"
2771 " -F Always flush all updates at shutdown\n"
2772 "\n"
2773 "For more information and a detailed description of all options "
2774 "please refer\n"
2775 "to the rrdcached(1) manual page.\n",
2776 VERSION);
2777 status = -1;
2778 break;
2779 } /* switch (option) */
2780 } /* while (getopt) */
2782 /* advise the user when values are not sane */
2783 if (config_flush_interval < 2 * config_write_interval)
2784 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2785 " 2x write interval (-w) !\n");
2786 if (config_write_jitter > config_write_interval)
2787 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2788 " write interval (-w) !\n");
2790 if (config_write_base_only && config_base_dir == NULL)
2791 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2792 " Consult the rrdcached documentation\n");
2794 if (journal_cur == NULL)
2795 config_flush_at_shutdown = 1;
2797 return (status);
2798 } /* }}} int read_options */
2800 int main (int argc, char **argv)
2801 {
2802 int status;
2804 status = read_options (argc, argv);
2805 if (status != 0)
2806 {
2807 if (status < 0)
2808 status = 0;
2809 return (status);
2810 }
2812 status = daemonize ();
2813 if (status != 0)
2814 {
2815 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2816 return (1);
2817 }
2819 journal_init();
2821 /* start the queue threads */
2822 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2823 if (queue_threads == NULL)
2824 {
2825 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2826 cleanup();
2827 return (1);
2828 }
2829 for (int i = 0; i < config_queue_threads; i++)
2830 {
2831 memset (&queue_threads[i], 0, sizeof (*queue_threads));
2832 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2833 if (status != 0)
2834 {
2835 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2836 cleanup();
2837 return (1);
2838 }
2839 }
2841 /* start the flush thread */
2842 memset(&flush_thread, 0, sizeof(flush_thread));
2843 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2844 if (status != 0)
2845 {
2846 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2847 cleanup();
2848 return (1);
2849 }
2851 listen_thread_main (NULL);
2852 cleanup ();
2854 return (0);
2855 } /* int main */
2857 /*
2858 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2859 */