f997d3ceddbd4060add089914cf95b80b86b3fcf
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 # include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116 * Types
117 */
118 typedef enum
119 {
120 PRIV_LOW,
121 PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
127 {
128 int fd;
129 char addr[PATH_MAX + 1];
130 int family;
131 socket_privilege privilege;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO struct command *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
160 socket_privilege min_priv;
162 char context; /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT (1<<0)
164 #define CMD_CONTEXT_BATCH (1<<1)
165 #define CMD_CONTEXT_JOURNAL (1<<2)
166 #define CMD_CONTEXT_ANY (0x7f)
168 char *syntax;
169 char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176 char *file;
177 char **values;
178 size_t values_num;
179 time_t last_flush_time;
180 time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183 int flags;
184 pthread_cond_t flushed;
185 cache_item_t *prev;
186 cache_item_t *next;
187 };
189 struct callback_flush_data_s
190 {
191 time_t now;
192 time_t abs_timeout;
193 char **keys;
194 size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
199 {
200 HEAD,
201 TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210 * Variables
211 */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 static int do_shutdown = 0;
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
231 /* Cache stuff */
232 static GTree *cache_tree = NULL;
233 static cache_item_t *cache_queue_head = NULL;
234 static cache_item_t *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
237 static int config_write_interval = 300;
238 static int config_write_jitter = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
246 static listen_socket_t **config_listen_address_list = NULL;
247 static size_t config_listen_address_list_len = 0;
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
270 /*
271 * Functions
272 */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276 do_shutdown++;
277 pthread_cond_broadcast(&flush_cond);
278 pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283 sig_common("INT");
284 } /* }}} void sig_int_handler */
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288 sig_common("TERM");
289 } /* }}} void sig_term_handler */
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293 config_flush_at_shutdown = 1;
294 sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299 config_flush_at_shutdown = 0;
300 sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
303 static void install_signal_handlers(void) /* {{{ */
304 {
305 /* These structures are static, because `sigaction' behaves weird if the are
306 * overwritten.. */
307 static struct sigaction sa_int;
308 static struct sigaction sa_term;
309 static struct sigaction sa_pipe;
310 static struct sigaction sa_usr1;
311 static struct sigaction sa_usr2;
313 /* Install signal handlers */
314 memset (&sa_int, 0, sizeof (sa_int));
315 sa_int.sa_handler = sig_int_handler;
316 sigaction (SIGINT, &sa_int, NULL);
318 memset (&sa_term, 0, sizeof (sa_term));
319 sa_term.sa_handler = sig_term_handler;
320 sigaction (SIGTERM, &sa_term, NULL);
322 memset (&sa_pipe, 0, sizeof (sa_pipe));
323 sa_pipe.sa_handler = SIG_IGN;
324 sigaction (SIGPIPE, &sa_pipe, NULL);
326 memset (&sa_pipe, 0, sizeof (sa_usr1));
327 sa_usr1.sa_handler = sig_usr1_handler;
328 sigaction (SIGUSR1, &sa_usr1, NULL);
330 memset (&sa_usr2, 0, sizeof (sa_usr2));
331 sa_usr2.sa_handler = sig_usr2_handler;
332 sigaction (SIGUSR2, &sa_usr2, NULL);
334 } /* }}} void install_signal_handlers */
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338 int fd;
339 char *file;
341 file = (config_pid_file != NULL)
342 ? config_pid_file
343 : LOCALSTATEDIR "/run/rrdcached.pid";
345 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346 if (fd < 0)
347 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348 action, file, rrd_strerror(errno));
350 return(fd);
351 } /* }}} static int open_pidfile */
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356 int pid_fd;
357 pid_t pid;
358 char pid_str[16];
360 pid_fd = open_pidfile("open", O_RDWR);
361 if (pid_fd < 0)
362 return pid_fd;
364 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365 return -1;
367 pid = atoi(pid_str);
368 if (pid <= 0)
369 return -1;
371 /* another running process that we can signal COULD be
372 * a competing rrdcached */
373 if (pid != getpid() && kill(pid, 0) == 0)
374 {
375 fprintf(stderr,
376 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377 close(pid_fd);
378 return -1;
379 }
381 lseek(pid_fd, 0, SEEK_SET);
382 if (ftruncate(pid_fd, 0) == -1)
383 {
384 fprintf(stderr,
385 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
386 close(pid_fd);
387 return -1;
388 }
390 fprintf(stderr,
391 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
392 "rrdcached: starting normally.\n", pid);
394 return pid_fd;
395 } /* }}} static int check_pidfile */
397 static int write_pidfile (int fd) /* {{{ */
398 {
399 pid_t pid;
400 FILE *fh;
402 pid = getpid ();
404 fh = fdopen (fd, "w");
405 if (fh == NULL)
406 {
407 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
408 close(fd);
409 return (-1);
410 }
412 fprintf (fh, "%i\n", (int) pid);
413 fclose (fh);
415 return (0);
416 } /* }}} int write_pidfile */
418 static int remove_pidfile (void) /* {{{ */
419 {
420 char *file;
421 int status;
423 file = (config_pid_file != NULL)
424 ? config_pid_file
425 : LOCALSTATEDIR "/run/rrdcached.pid";
427 status = unlink (file);
428 if (status == 0)
429 return (0);
430 return (errno);
431 } /* }}} int remove_pidfile */
433 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
434 {
435 char *eol;
437 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
438 sock->next_read - sock->next_cmd);
440 if (eol == NULL)
441 {
442 /* no commands left, move remainder back to front of rbuf */
443 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
444 sock->next_read - sock->next_cmd);
445 sock->next_read -= sock->next_cmd;
446 sock->next_cmd = 0;
447 *len = 0;
448 return NULL;
449 }
450 else
451 {
452 char *cmd = sock->rbuf + sock->next_cmd;
453 *eol = '\0';
455 sock->next_cmd = eol - sock->rbuf + 1;
457 if (eol > sock->rbuf && *(eol-1) == '\r')
458 *(--eol) = '\0'; /* handle "\r\n" EOL */
460 *len = eol - cmd;
462 return cmd;
463 }
465 /* NOTREACHED */
466 assert(1==0);
467 }
469 /* add the characters directly to the write buffer */
470 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
471 {
472 char *new_buf;
474 assert(sock != NULL);
476 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
477 if (new_buf == NULL)
478 {
479 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
480 return -1;
481 }
483 strncpy(new_buf + sock->wbuf_len, str, len + 1);
485 sock->wbuf = new_buf;
486 sock->wbuf_len += len;
488 return 0;
489 } /* }}} static int add_to_wbuf */
491 /* add the text to the "extra" info that's sent after the status line */
492 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
493 {
494 va_list argp;
495 char buffer[CMD_MAX];
496 int len;
498 if (sock == NULL) return 0; /* journal replay mode */
499 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
501 va_start(argp, fmt);
502 #ifdef HAVE_VSNPRINTF
503 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
504 #else
505 len = vsprintf(buffer, fmt, argp);
506 #endif
507 va_end(argp);
508 if (len < 0)
509 {
510 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
511 return -1;
512 }
514 return add_to_wbuf(sock, buffer, len);
515 } /* }}} static int add_response_info */
517 static int count_lines(char *str) /* {{{ */
518 {
519 int lines = 0;
521 if (str != NULL)
522 {
523 while ((str = strchr(str, '\n')) != NULL)
524 {
525 ++lines;
526 ++str;
527 }
528 }
530 return lines;
531 } /* }}} static int count_lines */
533 /* send the response back to the user.
534 * returns 0 on success, -1 on error
535 * write buffer is always zeroed after this call */
536 static int send_response (listen_socket_t *sock, response_code rc,
537 char *fmt, ...) /* {{{ */
538 {
539 va_list argp;
540 char buffer[CMD_MAX];
541 int lines;
542 ssize_t wrote;
543 int rclen, len;
545 if (sock == NULL) return rc; /* journal replay mode */
547 if (sock->batch_start)
548 {
549 if (rc == RESP_OK)
550 return rc; /* no response on success during BATCH */
551 lines = sock->batch_cmd;
552 }
553 else if (rc == RESP_OK)
554 lines = count_lines(sock->wbuf);
555 else
556 lines = -1;
558 rclen = sprintf(buffer, "%d ", lines);
559 va_start(argp, fmt);
560 #ifdef HAVE_VSNPRINTF
561 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
562 #else
563 len = vsprintf(buffer+rclen, fmt, argp);
564 #endif
565 va_end(argp);
566 if (len < 0)
567 return -1;
569 len += rclen;
571 /* append the result to the wbuf, don't write to the user */
572 if (sock->batch_start)
573 return add_to_wbuf(sock, buffer, len);
575 /* first write must be complete */
576 if (len != write(sock->fd, buffer, len))
577 {
578 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
579 return -1;
580 }
582 if (sock->wbuf != NULL && rc == RESP_OK)
583 {
584 wrote = 0;
585 while (wrote < sock->wbuf_len)
586 {
587 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
588 if (wb <= 0)
589 {
590 RRDD_LOG(LOG_INFO, "send_response: could not write results");
591 return -1;
592 }
593 wrote += wb;
594 }
595 }
597 free(sock->wbuf); sock->wbuf = NULL;
598 sock->wbuf_len = 0;
600 return 0;
601 } /* }}} */
603 static void wipe_ci_values(cache_item_t *ci, time_t when)
604 {
605 ci->values = NULL;
606 ci->values_num = 0;
608 ci->last_flush_time = when;
609 if (config_write_jitter > 0)
610 ci->last_flush_time += (rrd_random() % config_write_jitter);
611 }
613 /* remove_from_queue
614 * remove a "cache_item_t" item from the queue.
615 * must hold 'cache_lock' when calling this
616 */
617 static void remove_from_queue(cache_item_t *ci) /* {{{ */
618 {
619 if (ci == NULL) return;
620 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
622 if (ci->prev == NULL)
623 cache_queue_head = ci->next; /* reset head */
624 else
625 ci->prev->next = ci->next;
627 if (ci->next == NULL)
628 cache_queue_tail = ci->prev; /* reset the tail */
629 else
630 ci->next->prev = ci->prev;
632 ci->next = ci->prev = NULL;
633 ci->flags &= ~CI_FLAGS_IN_QUEUE;
635 pthread_mutex_lock (&stats_lock);
636 assert (stats_queue_length > 0);
637 stats_queue_length--;
638 pthread_mutex_unlock (&stats_lock);
640 } /* }}} static void remove_from_queue */
642 /* free the resources associated with the cache_item_t
643 * must hold cache_lock when calling this function
644 */
645 static void *free_cache_item(cache_item_t *ci) /* {{{ */
646 {
647 if (ci == NULL) return NULL;
649 remove_from_queue(ci);
651 for (size_t i=0; i < ci->values_num; i++)
652 free(ci->values[i]);
654 free (ci->values);
655 free (ci->file);
657 /* in case anyone is waiting */
658 pthread_cond_broadcast(&ci->flushed);
660 free (ci);
662 return NULL;
663 } /* }}} static void *free_cache_item */
665 /*
666 * enqueue_cache_item:
667 * `cache_lock' must be acquired before calling this function!
668 */
669 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
670 queue_side_t side)
671 {
672 if (ci == NULL)
673 return (-1);
675 if (ci->values_num == 0)
676 return (0);
678 if (side == HEAD)
679 {
680 if (cache_queue_head == ci)
681 return 0;
683 /* remove if further down in queue */
684 remove_from_queue(ci);
686 ci->prev = NULL;
687 ci->next = cache_queue_head;
688 if (ci->next != NULL)
689 ci->next->prev = ci;
690 cache_queue_head = ci;
692 if (cache_queue_tail == NULL)
693 cache_queue_tail = cache_queue_head;
694 }
695 else /* (side == TAIL) */
696 {
697 /* We don't move values back in the list.. */
698 if (ci->flags & CI_FLAGS_IN_QUEUE)
699 return (0);
701 assert (ci->next == NULL);
702 assert (ci->prev == NULL);
704 ci->prev = cache_queue_tail;
706 if (cache_queue_tail == NULL)
707 cache_queue_head = ci;
708 else
709 cache_queue_tail->next = ci;
711 cache_queue_tail = ci;
712 }
714 ci->flags |= CI_FLAGS_IN_QUEUE;
716 pthread_cond_signal(&queue_cond);
717 pthread_mutex_lock (&stats_lock);
718 stats_queue_length++;
719 pthread_mutex_unlock (&stats_lock);
721 return (0);
722 } /* }}} int enqueue_cache_item */
724 /*
725 * tree_callback_flush:
726 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
727 * while this is in progress.
728 */
729 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
730 gpointer data)
731 {
732 cache_item_t *ci;
733 callback_flush_data_t *cfd;
735 ci = (cache_item_t *) value;
736 cfd = (callback_flush_data_t *) data;
738 if (ci->flags & CI_FLAGS_IN_QUEUE)
739 return FALSE;
741 if ((ci->last_flush_time <= cfd->abs_timeout)
742 && (ci->values_num > 0))
743 {
744 enqueue_cache_item (ci, TAIL);
745 }
746 else if ((do_shutdown != 0)
747 && (ci->values_num > 0))
748 {
749 enqueue_cache_item (ci, TAIL);
750 }
751 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
752 && (ci->values_num <= 0))
753 {
754 assert ((char *) key == ci->file);
755 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
756 {
757 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
758 return (FALSE);
759 }
760 }
762 return (FALSE);
763 } /* }}} gboolean tree_callback_flush */
765 static int flush_old_values (int max_age)
766 {
767 callback_flush_data_t cfd;
768 size_t k;
770 memset (&cfd, 0, sizeof (cfd));
771 /* Pass the current time as user data so that we don't need to call
772 * `time' for each node. */
773 cfd.now = time (NULL);
774 cfd.keys = NULL;
775 cfd.keys_num = 0;
777 if (max_age > 0)
778 cfd.abs_timeout = cfd.now - max_age;
779 else
780 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
782 /* `tree_callback_flush' will return the keys of all values that haven't
783 * been touched in the last `config_flush_interval' seconds in `cfd'.
784 * The char*'s in this array point to the same memory as ci->file, so we
785 * don't need to free them separately. */
786 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
788 for (k = 0; k < cfd.keys_num; k++)
789 {
790 /* should never fail, since we have held the cache_lock
791 * the entire time */
792 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
793 }
795 if (cfd.keys != NULL)
796 {
797 free (cfd.keys);
798 cfd.keys = NULL;
799 }
801 return (0);
802 } /* int flush_old_values */
804 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
805 {
806 struct timeval now;
807 struct timespec next_flush;
808 int status;
810 gettimeofday (&now, NULL);
811 next_flush.tv_sec = now.tv_sec + config_flush_interval;
812 next_flush.tv_nsec = 1000 * now.tv_usec;
814 pthread_mutex_lock(&cache_lock);
816 while (!do_shutdown)
817 {
818 gettimeofday (&now, NULL);
819 if ((now.tv_sec > next_flush.tv_sec)
820 || ((now.tv_sec == next_flush.tv_sec)
821 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
822 {
823 /* Flush all values that haven't been written in the last
824 * `config_write_interval' seconds. */
825 flush_old_values (config_write_interval);
827 /* Determine the time of the next cache flush. */
828 next_flush.tv_sec =
829 now.tv_sec + next_flush.tv_sec % config_flush_interval;
831 /* unlock the cache while we rotate so we don't block incoming
832 * updates if the fsync() blocks on disk I/O */
833 pthread_mutex_unlock(&cache_lock);
834 journal_rotate();
835 pthread_mutex_lock(&cache_lock);
836 }
838 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
839 if (status != 0 && status != ETIMEDOUT)
840 {
841 RRDD_LOG (LOG_ERR, "flush_thread_main: "
842 "pthread_cond_timedwait returned %i.", status);
843 }
844 }
846 if (config_flush_at_shutdown)
847 flush_old_values (-1); /* flush everything */
849 pthread_mutex_unlock(&cache_lock);
851 return NULL;
852 } /* void *flush_thread_main */
854 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
855 {
856 pthread_mutex_lock (&cache_lock);
858 while (!do_shutdown
859 || (cache_queue_head != NULL && config_flush_at_shutdown))
860 {
861 cache_item_t *ci;
862 char *file;
863 char **values;
864 size_t values_num;
865 int status;
867 /* Now, check if there's something to store away. If not, wait until
868 * something comes in. if we are shutting down, do not wait around. */
869 if (cache_queue_head == NULL && !do_shutdown)
870 {
871 status = pthread_cond_wait (&queue_cond, &cache_lock);
872 if ((status != 0) && (status != ETIMEDOUT))
873 {
874 RRDD_LOG (LOG_ERR, "queue_thread_main: "
875 "pthread_cond_wait returned %i.", status);
876 }
877 }
879 /* Check if a value has arrived. This may be NULL if we timed out or there
880 * was an interrupt such as a signal. */
881 if (cache_queue_head == NULL)
882 continue;
884 ci = cache_queue_head;
886 /* copy the relevant parts */
887 file = strdup (ci->file);
888 if (file == NULL)
889 {
890 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
891 continue;
892 }
894 assert(ci->values != NULL);
895 assert(ci->values_num > 0);
897 values = ci->values;
898 values_num = ci->values_num;
900 wipe_ci_values(ci, time(NULL));
901 remove_from_queue(ci);
903 pthread_mutex_unlock (&cache_lock);
905 rrd_clear_error ();
906 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
907 if (status != 0)
908 {
909 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
910 "rrd_update_r (%s) failed with status %i. (%s)",
911 file, status, rrd_get_error());
912 }
914 journal_write("wrote", file);
915 pthread_cond_broadcast(&ci->flushed);
917 rrd_free_ptrs((void ***) &values, &values_num);
918 free(file);
920 if (status == 0)
921 {
922 pthread_mutex_lock (&stats_lock);
923 stats_updates_written++;
924 stats_data_sets_written += values_num;
925 pthread_mutex_unlock (&stats_lock);
926 }
928 pthread_mutex_lock (&cache_lock);
929 }
930 pthread_mutex_unlock (&cache_lock);
932 return (NULL);
933 } /* }}} void *queue_thread_main */
935 static int buffer_get_field (char **buffer_ret, /* {{{ */
936 size_t *buffer_size_ret, char **field_ret)
937 {
938 char *buffer;
939 size_t buffer_pos;
940 size_t buffer_size;
941 char *field;
942 size_t field_size;
943 int status;
945 buffer = *buffer_ret;
946 buffer_pos = 0;
947 buffer_size = *buffer_size_ret;
948 field = *buffer_ret;
949 field_size = 0;
951 if (buffer_size <= 0)
952 return (-1);
954 /* This is ensured by `handle_request'. */
955 assert (buffer[buffer_size - 1] == '\0');
957 status = -1;
958 while (buffer_pos < buffer_size)
959 {
960 /* Check for end-of-field or end-of-buffer */
961 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
962 {
963 field[field_size] = 0;
964 field_size++;
965 buffer_pos++;
966 status = 0;
967 break;
968 }
969 /* Handle escaped characters. */
970 else if (buffer[buffer_pos] == '\\')
971 {
972 if (buffer_pos >= (buffer_size - 1))
973 break;
974 buffer_pos++;
975 field[field_size] = buffer[buffer_pos];
976 field_size++;
977 buffer_pos++;
978 }
979 /* Normal operation */
980 else
981 {
982 field[field_size] = buffer[buffer_pos];
983 field_size++;
984 buffer_pos++;
985 }
986 } /* while (buffer_pos < buffer_size) */
988 if (status != 0)
989 return (status);
991 *buffer_ret = buffer + buffer_pos;
992 *buffer_size_ret = buffer_size - buffer_pos;
993 *field_ret = field;
995 return (0);
996 } /* }}} int buffer_get_field */
998 /* if we're restricting writes to the base directory,
999 * check whether the file falls within the dir
1000 * returns 1 if OK, otherwise 0
1001 */
1002 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1003 {
1004 assert(file != NULL);
1006 if (!config_write_base_only
1007 || sock == NULL /* journal replay */
1008 || config_base_dir == NULL)
1009 return 1;
1011 if (strstr(file, "../") != NULL) goto err;
1013 /* relative paths without "../" are ok */
1014 if (*file != '/') return 1;
1016 /* file must be of the format base + "/" + <1+ char filename> */
1017 if (strlen(file) < _config_base_dir_len + 2) goto err;
1018 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1019 if (*(file + _config_base_dir_len) != '/') goto err;
1021 return 1;
1023 err:
1024 if (sock != NULL && sock->fd >= 0)
1025 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1027 return 0;
1028 } /* }}} static int check_file_access */
1030 /* when using a base dir, convert relative paths to absolute paths.
1031 * if necessary, modifies the "filename" pointer to point
1032 * to the new path created in "tmp". "tmp" is provided
1033 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1034 *
1035 * this allows us to optimize for the expected case (absolute path)
1036 * with a no-op.
1037 */
1038 static void get_abs_path(char **filename, char *tmp)
1039 {
1040 assert(tmp != NULL);
1041 assert(filename != NULL && *filename != NULL);
1043 if (config_base_dir == NULL || **filename == '/')
1044 return;
1046 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1047 *filename = tmp;
1048 } /* }}} static int get_abs_path */
1050 /* returns 1 if we have the required privilege level,
1051 * otherwise issue an error to the user on sock */
1052 static int has_privilege (listen_socket_t *sock, /* {{{ */
1053 socket_privilege priv)
1054 {
1055 if (sock == NULL) /* journal replay */
1056 return 1;
1058 if (sock->privilege >= priv)
1059 return 1;
1061 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1062 } /* }}} static int has_privilege */
1064 static int flush_file (const char *filename) /* {{{ */
1065 {
1066 cache_item_t *ci;
1068 pthread_mutex_lock (&cache_lock);
1070 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1071 if (ci == NULL)
1072 {
1073 pthread_mutex_unlock (&cache_lock);
1074 return (ENOENT);
1075 }
1077 if (ci->values_num > 0)
1078 {
1079 /* Enqueue at head */
1080 enqueue_cache_item (ci, HEAD);
1081 pthread_cond_wait(&ci->flushed, &cache_lock);
1082 }
1084 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1085 * may have been purged during our cond_wait() */
1087 pthread_mutex_unlock(&cache_lock);
1089 return (0);
1090 } /* }}} int flush_file */
1092 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1093 {
1094 char *err = "Syntax error.\n";
1096 if (cmd && cmd->syntax)
1097 err = cmd->syntax;
1099 return send_response(sock, RESP_ERR, "Usage: %s", err);
1100 } /* }}} static int syntax_error() */
1102 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1103 {
1104 uint64_t copy_queue_length;
1105 uint64_t copy_updates_received;
1106 uint64_t copy_flush_received;
1107 uint64_t copy_updates_written;
1108 uint64_t copy_data_sets_written;
1109 uint64_t copy_journal_bytes;
1110 uint64_t copy_journal_rotate;
1112 uint64_t tree_nodes_number;
1113 uint64_t tree_depth;
1115 pthread_mutex_lock (&stats_lock);
1116 copy_queue_length = stats_queue_length;
1117 copy_updates_received = stats_updates_received;
1118 copy_flush_received = stats_flush_received;
1119 copy_updates_written = stats_updates_written;
1120 copy_data_sets_written = stats_data_sets_written;
1121 copy_journal_bytes = stats_journal_bytes;
1122 copy_journal_rotate = stats_journal_rotate;
1123 pthread_mutex_unlock (&stats_lock);
1125 pthread_mutex_lock (&cache_lock);
1126 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1127 tree_depth = (uint64_t) g_tree_height (cache_tree);
1128 pthread_mutex_unlock (&cache_lock);
1130 add_response_info(sock,
1131 "QueueLength: %"PRIu64"\n", copy_queue_length);
1132 add_response_info(sock,
1133 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1134 add_response_info(sock,
1135 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1136 add_response_info(sock,
1137 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1138 add_response_info(sock,
1139 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1140 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1141 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1142 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1143 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1145 send_response(sock, RESP_OK, "Statistics follow\n");
1147 return (0);
1148 } /* }}} int handle_request_stats */
1150 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1151 {
1152 char *file, file_tmp[PATH_MAX];
1153 int status;
1155 status = buffer_get_field (&buffer, &buffer_size, &file);
1156 if (status != 0)
1157 {
1158 return syntax_error(sock,cmd);
1159 }
1160 else
1161 {
1162 pthread_mutex_lock(&stats_lock);
1163 stats_flush_received++;
1164 pthread_mutex_unlock(&stats_lock);
1166 get_abs_path(&file, file_tmp);
1167 if (!check_file_access(file, sock)) return 0;
1169 status = flush_file (file);
1170 if (status == 0)
1171 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1172 else if (status == ENOENT)
1173 {
1174 /* no file in our tree; see whether it exists at all */
1175 struct stat statbuf;
1177 memset(&statbuf, 0, sizeof(statbuf));
1178 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1179 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1180 else
1181 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1182 }
1183 else if (status < 0)
1184 return send_response(sock, RESP_ERR, "Internal error.\n");
1185 else
1186 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1187 }
1189 /* NOTREACHED */
1190 assert(1==0);
1191 } /* }}} int handle_request_flush */
1193 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1194 {
1195 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1197 pthread_mutex_lock(&cache_lock);
1198 flush_old_values(-1);
1199 pthread_mutex_unlock(&cache_lock);
1201 return send_response(sock, RESP_OK, "Started flush.\n");
1202 } /* }}} static int handle_request_flushall */
1204 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1205 {
1206 int status;
1207 char *file, file_tmp[PATH_MAX];
1208 cache_item_t *ci;
1210 status = buffer_get_field(&buffer, &buffer_size, &file);
1211 if (status != 0)
1212 return syntax_error(sock,cmd);
1214 get_abs_path(&file, file_tmp);
1216 pthread_mutex_lock(&cache_lock);
1217 ci = g_tree_lookup(cache_tree, file);
1218 if (ci == NULL)
1219 {
1220 pthread_mutex_unlock(&cache_lock);
1221 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1222 }
1224 for (size_t i=0; i < ci->values_num; i++)
1225 add_response_info(sock, "%s\n", ci->values[i]);
1227 pthread_mutex_unlock(&cache_lock);
1228 return send_response(sock, RESP_OK, "updates pending\n");
1229 } /* }}} static int handle_request_pending */
1231 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1232 {
1233 int status;
1234 gboolean found;
1235 char *file, file_tmp[PATH_MAX];
1237 status = buffer_get_field(&buffer, &buffer_size, &file);
1238 if (status != 0)
1239 return syntax_error(sock,cmd);
1241 get_abs_path(&file, file_tmp);
1242 if (!check_file_access(file, sock)) return 0;
1244 pthread_mutex_lock(&cache_lock);
1245 found = g_tree_remove(cache_tree, file);
1246 pthread_mutex_unlock(&cache_lock);
1248 if (found == TRUE)
1249 {
1250 if (sock != NULL)
1251 journal_write("forget", file);
1253 return send_response(sock, RESP_OK, "Gone!\n");
1254 }
1255 else
1256 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258 /* NOTREACHED */
1259 assert(1==0);
1260 } /* }}} static int handle_request_forget */
1262 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1263 {
1264 cache_item_t *ci;
1266 pthread_mutex_lock(&cache_lock);
1268 ci = cache_queue_head;
1269 while (ci != NULL)
1270 {
1271 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1272 ci = ci->next;
1273 }
1275 pthread_mutex_unlock(&cache_lock);
1277 return send_response(sock, RESP_OK, "in queue.\n");
1278 } /* }}} int handle_request_queue */
1280 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1281 {
1282 char *file, file_tmp[PATH_MAX];
1283 int values_num = 0;
1284 int status;
1285 char orig_buf[CMD_MAX];
1287 cache_item_t *ci;
1289 /* save it for the journal later */
1290 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1292 status = buffer_get_field (&buffer, &buffer_size, &file);
1293 if (status != 0)
1294 return syntax_error(sock,cmd);
1296 pthread_mutex_lock(&stats_lock);
1297 stats_updates_received++;
1298 pthread_mutex_unlock(&stats_lock);
1300 get_abs_path(&file, file_tmp);
1301 if (!check_file_access(file, sock)) return 0;
1303 pthread_mutex_lock (&cache_lock);
1304 ci = g_tree_lookup (cache_tree, file);
1306 if (ci == NULL) /* {{{ */
1307 {
1308 struct stat statbuf;
1310 /* don't hold the lock while we setup; stat(2) might block */
1311 pthread_mutex_unlock(&cache_lock);
1313 memset (&statbuf, 0, sizeof (statbuf));
1314 status = stat (file, &statbuf);
1315 if (status != 0)
1316 {
1317 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1319 status = errno;
1320 if (status == ENOENT)
1321 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1322 else
1323 return send_response(sock, RESP_ERR,
1324 "stat failed with error %i.\n", status);
1325 }
1326 if (!S_ISREG (statbuf.st_mode))
1327 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1329 if (access(file, R_OK|W_OK) != 0)
1330 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1331 file, rrd_strerror(errno));
1333 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1334 if (ci == NULL)
1335 {
1336 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1338 return send_response(sock, RESP_ERR, "malloc failed.\n");
1339 }
1340 memset (ci, 0, sizeof (cache_item_t));
1342 ci->file = strdup (file);
1343 if (ci->file == NULL)
1344 {
1345 free (ci);
1346 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1348 return send_response(sock, RESP_ERR, "strdup failed.\n");
1349 }
1351 wipe_ci_values(ci, now);
1352 ci->flags = CI_FLAGS_IN_TREE;
1353 pthread_cond_init(&ci->flushed, NULL);
1355 pthread_mutex_lock(&cache_lock);
1356 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1357 } /* }}} */
1358 assert (ci != NULL);
1360 /* don't re-write updates in replay mode */
1361 if (sock != NULL)
1362 journal_write("update", orig_buf);
1364 while (buffer_size > 0)
1365 {
1366 char *value;
1367 time_t stamp;
1368 char *eostamp;
1370 status = buffer_get_field (&buffer, &buffer_size, &value);
1371 if (status != 0)
1372 {
1373 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1374 break;
1375 }
1377 /* make sure update time is always moving forward */
1378 stamp = strtol(value, &eostamp, 10);
1379 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1380 {
1381 pthread_mutex_unlock(&cache_lock);
1382 return send_response(sock, RESP_ERR,
1383 "Cannot find timestamp in '%s'!\n", value);
1384 }
1385 else if (stamp <= ci->last_update_stamp)
1386 {
1387 pthread_mutex_unlock(&cache_lock);
1388 return send_response(sock, RESP_ERR,
1389 "illegal attempt to update using time %ld when last"
1390 " update time is %ld (minimum one second step)\n",
1391 stamp, ci->last_update_stamp);
1392 }
1393 else
1394 ci->last_update_stamp = stamp;
1396 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1397 {
1398 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1399 continue;
1400 }
1402 values_num++;
1403 }
1405 if (((now - ci->last_flush_time) >= config_write_interval)
1406 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1407 && (ci->values_num > 0))
1408 {
1409 enqueue_cache_item (ci, TAIL);
1410 }
1412 pthread_mutex_unlock (&cache_lock);
1414 if (values_num < 1)
1415 return send_response(sock, RESP_ERR, "No values updated.\n");
1416 else
1417 return send_response(sock, RESP_OK,
1418 "errors, enqueued %i value(s).\n", values_num);
1420 /* NOTREACHED */
1421 assert(1==0);
1423 } /* }}} int handle_request_update */
1425 /* we came across a "WROTE" entry during journal replay.
1426 * throw away any values that we have accumulated for this file
1427 */
1428 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1429 {
1430 cache_item_t *ci;
1431 const char *file = buffer;
1433 pthread_mutex_lock(&cache_lock);
1435 ci = g_tree_lookup(cache_tree, file);
1436 if (ci == NULL)
1437 {
1438 pthread_mutex_unlock(&cache_lock);
1439 return (0);
1440 }
1442 if (ci->values)
1443 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1445 wipe_ci_values(ci, now);
1446 remove_from_queue(ci);
1448 pthread_mutex_unlock(&cache_lock);
1449 return (0);
1450 } /* }}} int handle_request_wrote */
1452 /* start "BATCH" processing */
1453 static int batch_start (HANDLER_PROTO) /* {{{ */
1454 {
1455 int status;
1456 if (sock->batch_start)
1457 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1459 status = send_response(sock, RESP_OK,
1460 "Go ahead. End with dot '.' on its own line.\n");
1461 sock->batch_start = time(NULL);
1462 sock->batch_cmd = 0;
1464 return status;
1465 } /* }}} static int batch_start */
1467 /* finish "BATCH" processing and return results to the client */
1468 static int batch_done (HANDLER_PROTO) /* {{{ */
1469 {
1470 assert(sock->batch_start);
1471 sock->batch_start = 0;
1472 sock->batch_cmd = 0;
1473 return send_response(sock, RESP_OK, "errors\n");
1474 } /* }}} static int batch_done */
1476 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1477 {
1478 return -1;
1479 } /* }}} static int handle_request_quit */
1481 struct command COMMANDS[] = {
1482 {
1483 "UPDATE",
1484 handle_request_update,
1485 PRIV_HIGH,
1486 CMD_CONTEXT_ANY,
1487 "UPDATE <filename> <values> [<values> ...]\n"
1488 ,
1489 "Adds the given file to the internal cache if it is not yet known and\n"
1490 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1491 "for details.\n"
1492 "\n"
1493 "Each <values> has the following form:\n"
1494 " <values> = <time>:<value>[:<value>[...]]\n"
1495 "See the rrdupdate(1) manpage for details.\n"
1496 },
1497 {
1498 "WROTE",
1499 handle_request_wrote,
1500 PRIV_HIGH,
1501 CMD_CONTEXT_JOURNAL,
1502 NULL,
1503 NULL
1504 },
1505 {
1506 "FLUSH",
1507 handle_request_flush,
1508 PRIV_LOW,
1509 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1510 "FLUSH <filename>\n"
1511 ,
1512 "Adds the given filename to the head of the update queue and returns\n"
1513 "after it has been dequeued.\n"
1514 },
1515 {
1516 "FLUSHALL",
1517 handle_request_flushall,
1518 PRIV_HIGH,
1519 CMD_CONTEXT_CLIENT,
1520 "FLUSHALL\n"
1521 ,
1522 "Triggers writing of all pending updates. Returns immediately.\n"
1523 },
1524 {
1525 "PENDING",
1526 handle_request_pending,
1527 PRIV_HIGH,
1528 CMD_CONTEXT_CLIENT,
1529 "PENDING <filename>\n"
1530 ,
1531 "Shows any 'pending' updates for a file, in order.\n"
1532 "The updates shown have not yet been written to the underlying RRD file.\n"
1533 },
1534 {
1535 "FORGET",
1536 handle_request_forget,
1537 PRIV_HIGH,
1538 CMD_CONTEXT_ANY,
1539 "FORGET <filename>\n"
1540 ,
1541 "Removes the file completely from the cache.\n"
1542 "Any pending updates for the file will be lost.\n"
1543 },
1544 {
1545 "QUEUE",
1546 handle_request_queue,
1547 PRIV_LOW,
1548 CMD_CONTEXT_CLIENT,
1549 "QUEUE\n"
1550 ,
1551 "Shows all files in the output queue.\n"
1552 "The output is zero or more lines in the following format:\n"
1553 "(where <num_vals> is the number of values to be written)\n"
1554 "\n"
1555 "<num_vals> <filename>\n"
1556 },
1557 {
1558 "STATS",
1559 handle_request_stats,
1560 PRIV_LOW,
1561 CMD_CONTEXT_CLIENT,
1562 "STATS\n"
1563 ,
1564 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1565 "a description of the values.\n"
1566 },
1567 {
1568 "HELP",
1569 handle_request_help,
1570 PRIV_LOW,
1571 CMD_CONTEXT_CLIENT,
1572 "HELP [<command>]\n",
1573 NULL, /* special! */
1574 },
1575 {
1576 "BATCH",
1577 batch_start,
1578 PRIV_LOW,
1579 CMD_CONTEXT_CLIENT,
1580 "BATCH\n"
1581 ,
1582 "The 'BATCH' command permits the client to initiate a bulk load\n"
1583 " of commands to rrdcached.\n"
1584 "\n"
1585 "Usage:\n"
1586 "\n"
1587 " client: BATCH\n"
1588 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1589 " client: command #1\n"
1590 " client: command #2\n"
1591 " client: ... and so on\n"
1592 " client: .\n"
1593 " server: 2 errors\n"
1594 " server: 7 message for command #7\n"
1595 " server: 9 message for command #9\n"
1596 "\n"
1597 "For more information, consult the rrdcached(1) documentation.\n"
1598 },
1599 {
1600 ".", /* BATCH terminator */
1601 batch_done,
1602 PRIV_LOW,
1603 CMD_CONTEXT_BATCH,
1604 NULL,
1605 NULL
1606 },
1607 {
1608 "QUIT",
1609 handle_request_quit,
1610 PRIV_LOW,
1611 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1612 "QUIT\n"
1613 ,
1614 "Disconnect from rrdcached.\n"
1615 },
1616 {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */
1617 };
1619 static struct command *find_command(char *cmd)
1620 {
1621 struct command *c = COMMANDS;
1623 while (c->cmd != NULL)
1624 {
1625 if (strcasecmp(cmd, c->cmd) == 0)
1626 break;
1627 c++;
1628 }
1630 if (c->cmd == NULL)
1631 return NULL;
1632 else
1633 return c;
1634 }
1636 /* check whether commands are received in the expected context */
1637 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1638 {
1639 if (sock == NULL)
1640 return (cmd->context & CMD_CONTEXT_JOURNAL);
1641 else if (sock->batch_start)
1642 return (cmd->context & CMD_CONTEXT_BATCH);
1643 else
1644 return (cmd->context & CMD_CONTEXT_CLIENT);
1646 /* NOTREACHED */
1647 assert(1==0);
1648 }
1650 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1651 {
1652 int status;
1653 char *cmd_str;
1654 char *resp_txt;
1655 struct command *help = NULL;
1657 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1658 if (status == 0)
1659 help = find_command(cmd_str);
1661 if (help && (help->syntax || help->help))
1662 {
1663 char tmp[CMD_MAX];
1665 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1666 resp_txt = tmp;
1668 if (help->syntax)
1669 add_response_info(sock, "Usage: %s\n", help->syntax);
1671 if (help->help)
1672 add_response_info(sock, "%s\n", help->help);
1673 }
1674 else
1675 {
1676 help = COMMANDS;
1677 resp_txt = "Command overview\n";
1679 while (help->cmd)
1680 {
1681 if (help->syntax)
1682 add_response_info(sock, "%s", help->syntax);
1683 help++;
1684 }
1685 }
1687 return send_response(sock, RESP_OK, resp_txt);
1688 } /* }}} int handle_request_help */
1690 /* if sock==NULL, we are in journal replay mode */
1691 static int handle_request (DISPATCH_PROTO) /* {{{ */
1692 {
1693 char *buffer_ptr = buffer;
1694 char *cmd_str = NULL;
1695 struct command *cmd = NULL;
1696 int status;
1698 assert (buffer[buffer_size - 1] == '\0');
1700 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1701 if (status != 0)
1702 {
1703 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1704 return (-1);
1705 }
1707 if (sock != NULL && sock->batch_start)
1708 sock->batch_cmd++;
1710 cmd = find_command(cmd_str);
1711 if (!cmd)
1712 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1714 status = has_privilege(sock, cmd->min_priv);
1715 if (status <= 0)
1716 return status;
1718 if (!command_check_context(sock, cmd))
1719 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1721 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1722 } /* }}} int handle_request */
1724 /* MUST NOT hold journal_lock before calling this */
1725 static void journal_rotate(void) /* {{{ */
1726 {
1727 FILE *old_fh = NULL;
1728 int new_fd;
1730 if (journal_cur == NULL || journal_old == NULL)
1731 return;
1733 pthread_mutex_lock(&journal_lock);
1735 /* we rotate this way (rename before close) so that the we can release
1736 * the journal lock as fast as possible. Journal writes to the new
1737 * journal can proceed immediately after the new file is opened. The
1738 * fclose can then block without affecting new updates.
1739 */
1740 if (journal_fh != NULL)
1741 {
1742 old_fh = journal_fh;
1743 journal_fh = NULL;
1744 rename(journal_cur, journal_old);
1745 ++stats_journal_rotate;
1746 }
1748 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1749 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1750 if (new_fd >= 0)
1751 {
1752 journal_fh = fdopen(new_fd, "a");
1753 if (journal_fh == NULL)
1754 close(new_fd);
1755 }
1757 pthread_mutex_unlock(&journal_lock);
1759 if (old_fh != NULL)
1760 fclose(old_fh);
1762 if (journal_fh == NULL)
1763 {
1764 RRDD_LOG(LOG_CRIT,
1765 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1766 journal_cur, rrd_strerror(errno));
1768 RRDD_LOG(LOG_ERR,
1769 "JOURNALING DISABLED: All values will be flushed at shutdown");
1770 config_flush_at_shutdown = 1;
1771 }
1773 } /* }}} static void journal_rotate */
1775 static void journal_done(void) /* {{{ */
1776 {
1777 if (journal_cur == NULL)
1778 return;
1780 pthread_mutex_lock(&journal_lock);
1781 if (journal_fh != NULL)
1782 {
1783 fclose(journal_fh);
1784 journal_fh = NULL;
1785 }
1787 if (config_flush_at_shutdown)
1788 {
1789 RRDD_LOG(LOG_INFO, "removing journals");
1790 unlink(journal_old);
1791 unlink(journal_cur);
1792 }
1793 else
1794 {
1795 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1796 "journals will be used at next startup");
1797 }
1799 pthread_mutex_unlock(&journal_lock);
1801 } /* }}} static void journal_done */
1803 static int journal_write(char *cmd, char *args) /* {{{ */
1804 {
1805 int chars;
1807 if (journal_fh == NULL)
1808 return 0;
1810 pthread_mutex_lock(&journal_lock);
1811 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1812 pthread_mutex_unlock(&journal_lock);
1814 if (chars > 0)
1815 {
1816 pthread_mutex_lock(&stats_lock);
1817 stats_journal_bytes += chars;
1818 pthread_mutex_unlock(&stats_lock);
1819 }
1821 return chars;
1822 } /* }}} static int journal_write */
1824 static int journal_replay (const char *file) /* {{{ */
1825 {
1826 FILE *fh;
1827 int entry_cnt = 0;
1828 int fail_cnt = 0;
1829 uint64_t line = 0;
1830 char entry[CMD_MAX];
1831 time_t now;
1833 if (file == NULL) return 0;
1835 {
1836 char *reason = "unknown error";
1837 int status = 0;
1838 struct stat statbuf;
1840 memset(&statbuf, 0, sizeof(statbuf));
1841 if (stat(file, &statbuf) != 0)
1842 {
1843 if (errno == ENOENT)
1844 return 0;
1846 reason = "stat error";
1847 status = errno;
1848 }
1849 else if (!S_ISREG(statbuf.st_mode))
1850 {
1851 reason = "not a regular file";
1852 status = EPERM;
1853 }
1854 if (statbuf.st_uid != daemon_uid)
1855 {
1856 reason = "not owned by daemon user";
1857 status = EACCES;
1858 }
1859 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1860 {
1861 reason = "must not be user/group writable";
1862 status = EACCES;
1863 }
1865 if (status != 0)
1866 {
1867 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1868 file, rrd_strerror(status), reason);
1869 return 0;
1870 }
1871 }
1873 fh = fopen(file, "r");
1874 if (fh == NULL)
1875 {
1876 if (errno != ENOENT)
1877 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1878 file, rrd_strerror(errno));
1879 return 0;
1880 }
1881 else
1882 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1884 now = time(NULL);
1886 while(!feof(fh))
1887 {
1888 size_t entry_len;
1890 ++line;
1891 if (fgets(entry, sizeof(entry), fh) == NULL)
1892 break;
1893 entry_len = strlen(entry);
1895 /* check \n termination in case journal writing crashed mid-line */
1896 if (entry_len == 0)
1897 continue;
1898 else if (entry[entry_len - 1] != '\n')
1899 {
1900 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1901 ++fail_cnt;
1902 continue;
1903 }
1905 entry[entry_len - 1] = '\0';
1907 if (handle_request(NULL, now, entry, entry_len) == 0)
1908 ++entry_cnt;
1909 else
1910 ++fail_cnt;
1911 }
1913 fclose(fh);
1915 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1916 entry_cnt, fail_cnt);
1918 return entry_cnt > 0 ? 1 : 0;
1919 } /* }}} static int journal_replay */
1921 static void journal_init(void) /* {{{ */
1922 {
1923 int had_journal = 0;
1925 if (journal_cur == NULL) return;
1927 pthread_mutex_lock(&journal_lock);
1929 RRDD_LOG(LOG_INFO, "checking for journal files");
1931 had_journal += journal_replay(journal_old);
1932 had_journal += journal_replay(journal_cur);
1934 /* it must have been a crash. start a flush */
1935 if (had_journal && config_flush_at_shutdown)
1936 flush_old_values(-1);
1938 pthread_mutex_unlock(&journal_lock);
1939 journal_rotate();
1941 RRDD_LOG(LOG_INFO, "journal processing complete");
1943 } /* }}} static void journal_init */
1945 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1946 {
1947 assert(sock != NULL);
1949 free(sock->rbuf); sock->rbuf = NULL;
1950 free(sock->wbuf); sock->wbuf = NULL;
1951 free(sock);
1952 } /* }}} void free_listen_socket */
1954 static void close_connection(listen_socket_t *sock) /* {{{ */
1955 {
1956 if (sock->fd >= 0)
1957 {
1958 close(sock->fd);
1959 sock->fd = -1;
1960 }
1962 free_listen_socket(sock);
1964 } /* }}} void close_connection */
1966 static void *connection_thread_main (void *args) /* {{{ */
1967 {
1968 listen_socket_t *sock;
1969 int fd;
1971 sock = (listen_socket_t *) args;
1972 fd = sock->fd;
1974 /* init read buffers */
1975 sock->next_read = sock->next_cmd = 0;
1976 sock->rbuf = malloc(RBUF_SIZE);
1977 if (sock->rbuf == NULL)
1978 {
1979 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1980 close_connection(sock);
1981 return NULL;
1982 }
1984 pthread_mutex_lock (&connection_threads_lock);
1985 connection_threads_num++;
1986 pthread_mutex_unlock (&connection_threads_lock);
1988 while (do_shutdown == 0)
1989 {
1990 char *cmd;
1991 ssize_t cmd_len;
1992 ssize_t rbytes;
1993 time_t now;
1995 struct pollfd pollfd;
1996 int status;
1998 pollfd.fd = fd;
1999 pollfd.events = POLLIN | POLLPRI;
2000 pollfd.revents = 0;
2002 status = poll (&pollfd, 1, /* timeout = */ 500);
2003 if (do_shutdown)
2004 break;
2005 else if (status == 0) /* timeout */
2006 continue;
2007 else if (status < 0) /* error */
2008 {
2009 status = errno;
2010 if (status != EINTR)
2011 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2012 continue;
2013 }
2015 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2016 break;
2017 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2018 {
2019 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2020 "poll(2) returned something unexpected: %#04hx",
2021 pollfd.revents);
2022 break;
2023 }
2025 rbytes = read(fd, sock->rbuf + sock->next_read,
2026 RBUF_SIZE - sock->next_read);
2027 if (rbytes < 0)
2028 {
2029 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2030 break;
2031 }
2032 else if (rbytes == 0)
2033 break; /* eof */
2035 sock->next_read += rbytes;
2037 if (sock->batch_start)
2038 now = sock->batch_start;
2039 else
2040 now = time(NULL);
2042 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2043 {
2044 status = handle_request (sock, now, cmd, cmd_len+1);
2045 if (status != 0)
2046 goto out_close;
2047 }
2048 }
2050 out_close:
2051 close_connection(sock);
2053 /* Remove this thread from the connection threads list */
2054 pthread_mutex_lock (&connection_threads_lock);
2055 connection_threads_num--;
2056 if (connection_threads_num <= 0)
2057 pthread_cond_broadcast(&connection_threads_done);
2058 pthread_mutex_unlock (&connection_threads_lock);
2060 return (NULL);
2061 } /* }}} void *connection_thread_main */
2063 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2064 {
2065 int fd;
2066 struct sockaddr_un sa;
2067 listen_socket_t *temp;
2068 int status;
2069 const char *path;
2071 path = sock->addr;
2072 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2073 path += strlen("unix:");
2075 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2076 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2077 if (temp == NULL)
2078 {
2079 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2080 return (-1);
2081 }
2082 listen_fds = temp;
2083 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2085 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2086 if (fd < 0)
2087 {
2088 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2089 rrd_strerror(errno));
2090 return (-1);
2091 }
2093 memset (&sa, 0, sizeof (sa));
2094 sa.sun_family = AF_UNIX;
2095 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2097 /* if we've gotten this far, we own the pid file. any daemon started
2098 * with the same args must not be alive. therefore, ensure that we can
2099 * create the socket...
2100 */
2101 unlink(path);
2103 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2104 if (status != 0)
2105 {
2106 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2107 path, rrd_strerror(errno));
2108 close (fd);
2109 return (-1);
2110 }
2112 status = listen (fd, /* backlog = */ 10);
2113 if (status != 0)
2114 {
2115 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2116 path, rrd_strerror(errno));
2117 close (fd);
2118 unlink (path);
2119 return (-1);
2120 }
2122 listen_fds[listen_fds_num].fd = fd;
2123 listen_fds[listen_fds_num].family = PF_UNIX;
2124 strncpy(listen_fds[listen_fds_num].addr, path,
2125 sizeof (listen_fds[listen_fds_num].addr) - 1);
2126 listen_fds_num++;
2128 return (0);
2129 } /* }}} int open_listen_socket_unix */
2131 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2132 {
2133 struct addrinfo ai_hints;
2134 struct addrinfo *ai_res;
2135 struct addrinfo *ai_ptr;
2136 char addr_copy[NI_MAXHOST];
2137 char *addr;
2138 char *port;
2139 int status;
2141 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2142 addr_copy[sizeof (addr_copy) - 1] = 0;
2143 addr = addr_copy;
2145 memset (&ai_hints, 0, sizeof (ai_hints));
2146 ai_hints.ai_flags = 0;
2147 #ifdef AI_ADDRCONFIG
2148 ai_hints.ai_flags |= AI_ADDRCONFIG;
2149 #endif
2150 ai_hints.ai_family = AF_UNSPEC;
2151 ai_hints.ai_socktype = SOCK_STREAM;
2153 port = NULL;
2154 if (*addr == '[') /* IPv6+port format */
2155 {
2156 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2157 addr++;
2159 port = strchr (addr, ']');
2160 if (port == NULL)
2161 {
2162 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2163 return (-1);
2164 }
2165 *port = 0;
2166 port++;
2168 if (*port == ':')
2169 port++;
2170 else if (*port == 0)
2171 port = NULL;
2172 else
2173 {
2174 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2175 return (-1);
2176 }
2177 } /* if (*addr = ']') */
2178 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2179 {
2180 port = rindex(addr, ':');
2181 if (port != NULL)
2182 {
2183 *port = 0;
2184 port++;
2185 }
2186 }
2187 ai_res = NULL;
2188 status = getaddrinfo (addr,
2189 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2190 &ai_hints, &ai_res);
2191 if (status != 0)
2192 {
2193 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2194 addr, gai_strerror (status));
2195 return (-1);
2196 }
2198 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2199 {
2200 int fd;
2201 listen_socket_t *temp;
2202 int one = 1;
2204 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2205 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2206 if (temp == NULL)
2207 {
2208 fprintf (stderr,
2209 "rrdcached: open_listen_socket_network: realloc failed.\n");
2210 continue;
2211 }
2212 listen_fds = temp;
2213 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2215 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2216 if (fd < 0)
2217 {
2218 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2219 rrd_strerror(errno));
2220 continue;
2221 }
2223 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2225 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2226 if (status != 0)
2227 {
2228 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2229 sock->addr, rrd_strerror(errno));
2230 close (fd);
2231 continue;
2232 }
2234 status = listen (fd, /* backlog = */ 10);
2235 if (status != 0)
2236 {
2237 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2238 sock->addr, rrd_strerror(errno));
2239 close (fd);
2240 freeaddrinfo(ai_res);
2241 return (-1);
2242 }
2244 listen_fds[listen_fds_num].fd = fd;
2245 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2246 listen_fds_num++;
2247 } /* for (ai_ptr) */
2249 freeaddrinfo(ai_res);
2250 return (0);
2251 } /* }}} static int open_listen_socket_network */
2253 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2254 {
2255 assert(sock != NULL);
2256 assert(sock->addr != NULL);
2258 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2259 || sock->addr[0] == '/')
2260 return (open_listen_socket_unix(sock));
2261 else
2262 return (open_listen_socket_network(sock));
2263 } /* }}} int open_listen_socket */
2265 static int close_listen_sockets (void) /* {{{ */
2266 {
2267 size_t i;
2269 for (i = 0; i < listen_fds_num; i++)
2270 {
2271 close (listen_fds[i].fd);
2273 if (listen_fds[i].family == PF_UNIX)
2274 unlink(listen_fds[i].addr);
2275 }
2277 free (listen_fds);
2278 listen_fds = NULL;
2279 listen_fds_num = 0;
2281 return (0);
2282 } /* }}} int close_listen_sockets */
2284 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2285 {
2286 struct pollfd *pollfds;
2287 int pollfds_num;
2288 int status;
2289 int i;
2291 if (listen_fds_num < 1)
2292 {
2293 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2294 return (NULL);
2295 }
2297 pollfds_num = listen_fds_num;
2298 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2299 if (pollfds == NULL)
2300 {
2301 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2302 return (NULL);
2303 }
2304 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2306 RRDD_LOG(LOG_INFO, "listening for connections");
2308 while (do_shutdown == 0)
2309 {
2310 for (i = 0; i < pollfds_num; i++)
2311 {
2312 pollfds[i].fd = listen_fds[i].fd;
2313 pollfds[i].events = POLLIN | POLLPRI;
2314 pollfds[i].revents = 0;
2315 }
2317 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2318 if (do_shutdown)
2319 break;
2320 else if (status == 0) /* timeout */
2321 continue;
2322 else if (status < 0) /* error */
2323 {
2324 status = errno;
2325 if (status != EINTR)
2326 {
2327 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2328 }
2329 continue;
2330 }
2332 for (i = 0; i < pollfds_num; i++)
2333 {
2334 listen_socket_t *client_sock;
2335 struct sockaddr_storage client_sa;
2336 socklen_t client_sa_size;
2337 pthread_t tid;
2338 pthread_attr_t attr;
2340 if (pollfds[i].revents == 0)
2341 continue;
2343 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2344 {
2345 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2346 "poll(2) returned something unexpected for listen FD #%i.",
2347 pollfds[i].fd);
2348 continue;
2349 }
2351 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2352 if (client_sock == NULL)
2353 {
2354 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2355 continue;
2356 }
2357 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2359 client_sa_size = sizeof (client_sa);
2360 client_sock->fd = accept (pollfds[i].fd,
2361 (struct sockaddr *) &client_sa, &client_sa_size);
2362 if (client_sock->fd < 0)
2363 {
2364 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2365 free(client_sock);
2366 continue;
2367 }
2369 pthread_attr_init (&attr);
2370 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2372 status = pthread_create (&tid, &attr, connection_thread_main,
2373 client_sock);
2374 if (status != 0)
2375 {
2376 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2377 close_connection(client_sock);
2378 continue;
2379 }
2380 } /* for (pollfds_num) */
2381 } /* while (do_shutdown == 0) */
2383 RRDD_LOG(LOG_INFO, "starting shutdown");
2385 close_listen_sockets ();
2387 pthread_mutex_lock (&connection_threads_lock);
2388 while (connection_threads_num > 0)
2389 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2390 pthread_mutex_unlock (&connection_threads_lock);
2392 free(pollfds);
2394 return (NULL);
2395 } /* }}} void *listen_thread_main */
2397 static int daemonize (void) /* {{{ */
2398 {
2399 int pid_fd;
2400 char *base_dir;
2402 daemon_uid = geteuid();
2404 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2405 if (pid_fd < 0)
2406 pid_fd = check_pidfile();
2407 if (pid_fd < 0)
2408 return pid_fd;
2410 /* open all the listen sockets */
2411 if (config_listen_address_list_len > 0)
2412 {
2413 for (size_t i = 0; i < config_listen_address_list_len; i++)
2414 open_listen_socket (config_listen_address_list[i]);
2416 rrd_free_ptrs((void ***) &config_listen_address_list,
2417 &config_listen_address_list_len);
2418 }
2419 else
2420 {
2421 listen_socket_t sock;
2422 memset(&sock, 0, sizeof(sock));
2423 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2424 open_listen_socket (&sock);
2425 }
2427 if (listen_fds_num < 1)
2428 {
2429 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2430 goto error;
2431 }
2433 if (!stay_foreground)
2434 {
2435 pid_t child;
2437 child = fork ();
2438 if (child < 0)
2439 {
2440 fprintf (stderr, "daemonize: fork(2) failed.\n");
2441 goto error;
2442 }
2443 else if (child > 0)
2444 exit(0);
2446 /* Become session leader */
2447 setsid ();
2449 /* Open the first three file descriptors to /dev/null */
2450 close (2);
2451 close (1);
2452 close (0);
2454 open ("/dev/null", O_RDWR);
2455 if (dup(0) == -1 || dup(0) == -1){
2456 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2457 }
2458 } /* if (!stay_foreground) */
2460 /* Change into the /tmp directory. */
2461 base_dir = (config_base_dir != NULL)
2462 ? config_base_dir
2463 : "/tmp";
2465 if (chdir (base_dir) != 0)
2466 {
2467 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2468 goto error;
2469 }
2471 install_signal_handlers();
2473 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2474 RRDD_LOG(LOG_INFO, "starting up");
2476 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2477 (GDestroyNotify) free_cache_item);
2478 if (cache_tree == NULL)
2479 {
2480 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2481 goto error;
2482 }
2484 return write_pidfile (pid_fd);
2486 error:
2487 remove_pidfile();
2488 return -1;
2489 } /* }}} int daemonize */
2491 static int cleanup (void) /* {{{ */
2492 {
2493 do_shutdown++;
2495 pthread_cond_broadcast (&flush_cond);
2496 pthread_join (flush_thread, NULL);
2498 pthread_cond_broadcast (&queue_cond);
2499 for (int i = 0; i < config_queue_threads; i++)
2500 pthread_join (queue_threads[i], NULL);
2502 if (config_flush_at_shutdown)
2503 {
2504 assert(cache_queue_head == NULL);
2505 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2506 }
2508 journal_done();
2509 remove_pidfile ();
2511 free(queue_threads);
2512 free(config_base_dir);
2513 free(config_pid_file);
2514 free(journal_cur);
2515 free(journal_old);
2517 pthread_mutex_lock(&cache_lock);
2518 g_tree_destroy(cache_tree);
2520 RRDD_LOG(LOG_INFO, "goodbye");
2521 closelog ();
2523 return (0);
2524 } /* }}} int cleanup */
2526 static int read_options (int argc, char **argv) /* {{{ */
2527 {
2528 int option;
2529 int status = 0;
2531 while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2532 {
2533 switch (option)
2534 {
2535 case 'g':
2536 stay_foreground=1;
2537 break;
2539 case 'L':
2540 case 'l':
2541 {
2542 listen_socket_t *new;
2544 new = malloc(sizeof(listen_socket_t));
2545 if (new == NULL)
2546 {
2547 fprintf(stderr, "read_options: malloc failed.\n");
2548 return(2);
2549 }
2550 memset(new, 0, sizeof(listen_socket_t));
2552 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2553 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2555 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2556 &config_listen_address_list_len, new))
2557 {
2558 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2559 return (2);
2560 }
2561 }
2562 break;
2564 case 'f':
2565 {
2566 int temp;
2568 temp = atoi (optarg);
2569 if (temp > 0)
2570 config_flush_interval = temp;
2571 else
2572 {
2573 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2574 status = 3;
2575 }
2576 }
2577 break;
2579 case 'w':
2580 {
2581 int temp;
2583 temp = atoi (optarg);
2584 if (temp > 0)
2585 config_write_interval = temp;
2586 else
2587 {
2588 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2589 status = 2;
2590 }
2591 }
2592 break;
2594 case 'z':
2595 {
2596 int temp;
2598 temp = atoi(optarg);
2599 if (temp > 0)
2600 config_write_jitter = temp;
2601 else
2602 {
2603 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2604 status = 2;
2605 }
2607 break;
2608 }
2610 case 't':
2611 {
2612 int threads;
2613 threads = atoi(optarg);
2614 if (threads >= 1)
2615 config_queue_threads = threads;
2616 else
2617 {
2618 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2619 return 1;
2620 }
2621 }
2622 break;
2624 case 'B':
2625 config_write_base_only = 1;
2626 break;
2628 case 'b':
2629 {
2630 size_t len;
2631 char base_realpath[PATH_MAX];
2633 if (config_base_dir != NULL)
2634 free (config_base_dir);
2635 config_base_dir = strdup (optarg);
2636 if (config_base_dir == NULL)
2637 {
2638 fprintf (stderr, "read_options: strdup failed.\n");
2639 return (3);
2640 }
2642 /* make sure that the base directory is not resolved via
2643 * symbolic links. this makes some performance-enhancing
2644 * assumptions possible (we don't have to resolve paths
2645 * that start with a "/")
2646 */
2647 if (realpath(config_base_dir, base_realpath) == NULL)
2648 {
2649 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2650 return 5;
2651 }
2652 else if (strncmp(config_base_dir,
2653 base_realpath, sizeof(base_realpath)) != 0)
2654 {
2655 fprintf(stderr,
2656 "Base directory (-b) resolved via file system links!\n"
2657 "Please consult rrdcached '-b' documentation!\n"
2658 "Consider specifying the real directory (%s)\n",
2659 base_realpath);
2660 return 5;
2661 }
2663 len = strlen (config_base_dir);
2664 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2665 {
2666 config_base_dir[len - 1] = 0;
2667 len--;
2668 }
2670 if (len < 1)
2671 {
2672 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2673 return (4);
2674 }
2676 _config_base_dir_len = len;
2677 }
2678 break;
2680 case 'p':
2681 {
2682 if (config_pid_file != NULL)
2683 free (config_pid_file);
2684 config_pid_file = strdup (optarg);
2685 if (config_pid_file == NULL)
2686 {
2687 fprintf (stderr, "read_options: strdup failed.\n");
2688 return (3);
2689 }
2690 }
2691 break;
2693 case 'F':
2694 config_flush_at_shutdown = 1;
2695 break;
2697 case 'j':
2698 {
2699 struct stat statbuf;
2700 const char *dir = optarg;
2702 status = stat(dir, &statbuf);
2703 if (status != 0)
2704 {
2705 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2706 return 6;
2707 }
2709 if (!S_ISDIR(statbuf.st_mode)
2710 || access(dir, R_OK|W_OK|X_OK) != 0)
2711 {
2712 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2713 errno ? rrd_strerror(errno) : "");
2714 return 6;
2715 }
2717 journal_cur = malloc(PATH_MAX + 1);
2718 journal_old = malloc(PATH_MAX + 1);
2719 if (journal_cur == NULL || journal_old == NULL)
2720 {
2721 fprintf(stderr, "malloc failure for journal files\n");
2722 return 6;
2723 }
2724 else
2725 {
2726 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2727 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2728 }
2729 }
2730 break;
2732 case 'h':
2733 case '?':
2734 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2735 "\n"
2736 "Usage: rrdcached [options]\n"
2737 "\n"
2738 "Valid options are:\n"
2739 " -l <address> Socket address to listen to.\n"
2740 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2741 " -w <seconds> Interval in which to write data.\n"
2742 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2743 " -t <threads> Number of write threads.\n"
2744 " -f <seconds> Interval in which to flush dead data.\n"
2745 " -p <file> Location of the PID-file.\n"
2746 " -b <dir> Base directory to change to.\n"
2747 " -B Restrict file access to paths within -b <dir>\n"
2748 " -g Do not fork and run in the foreground.\n"
2749 " -j <dir> Directory in which to create the journal files.\n"
2750 " -F Always flush all updates at shutdown\n"
2751 "\n"
2752 "For more information and a detailed description of all options "
2753 "please refer\n"
2754 "to the rrdcached(1) manual page.\n",
2755 VERSION);
2756 status = -1;
2757 break;
2758 } /* switch (option) */
2759 } /* while (getopt) */
2761 /* advise the user when values are not sane */
2762 if (config_flush_interval < 2 * config_write_interval)
2763 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2764 " 2x write interval (-w) !\n");
2765 if (config_write_jitter > config_write_interval)
2766 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2767 " write interval (-w) !\n");
2769 if (config_write_base_only && config_base_dir == NULL)
2770 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2771 " Consult the rrdcached documentation\n");
2773 if (journal_cur == NULL)
2774 config_flush_at_shutdown = 1;
2776 return (status);
2777 } /* }}} int read_options */
2779 int main (int argc, char **argv)
2780 {
2781 int status;
2783 status = read_options (argc, argv);
2784 if (status != 0)
2785 {
2786 if (status < 0)
2787 status = 0;
2788 return (status);
2789 }
2791 status = daemonize ();
2792 if (status != 0)
2793 {
2794 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2795 return (1);
2796 }
2798 journal_init();
2800 /* start the queue threads */
2801 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2802 if (queue_threads == NULL)
2803 {
2804 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2805 cleanup();
2806 return (1);
2807 }
2808 for (int i = 0; i < config_queue_threads; i++)
2809 {
2810 memset (&queue_threads[i], 0, sizeof (*queue_threads));
2811 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2812 if (status != 0)
2813 {
2814 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2815 cleanup();
2816 return (1);
2817 }
2818 }
2820 /* start the flush thread */
2821 memset(&flush_thread, 0, sizeof(flush_thread));
2822 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2823 if (status != 0)
2824 {
2825 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2826 cleanup();
2827 return (1);
2828 }
2830 listen_thread_main (NULL);
2831 cleanup ();
2833 return (0);
2834 } /* int main */
2836 /*
2837 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2838 */