1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 # include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116 * Types
117 */
118 typedef enum
119 {
120 PRIV_LOW,
121 PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
127 {
128 int fd;
129 char addr[PATH_MAX + 1];
130 int family;
131 socket_privilege privilege;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
150 time_t now __attribute__((unused)),\
151 char *buffer __attribute__((unused)),\
152 size_t buffer_size __attribute__((unused))
154 #define HANDLER_PROTO struct command *cmd __attribute__((unused)),\
155 DISPATCH_PROTO
157 struct command {
158 char *cmd;
159 int (*handler)(HANDLER_PROTO);
160 socket_privilege min_priv;
162 char context; /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT (1<<0)
164 #define CMD_CONTEXT_BATCH (1<<1)
165 #define CMD_CONTEXT_JOURNAL (1<<2)
166 #define CMD_CONTEXT_ANY (0x7f)
168 char *syntax;
169 char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176 char *file;
177 char **values;
178 int values_num;
179 time_t last_flush_time;
180 time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183 int flags;
184 pthread_cond_t flushed;
185 cache_item_t *prev;
186 cache_item_t *next;
187 };
189 struct callback_flush_data_s
190 {
191 time_t now;
192 time_t abs_timeout;
193 char **keys;
194 size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
199 {
200 HEAD,
201 TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210 * Variables
211 */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 static int do_shutdown = 0;
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
231 /* Cache stuff */
232 static GTree *cache_tree = NULL;
233 static cache_item_t *cache_queue_head = NULL;
234 static cache_item_t *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
237 static int config_write_interval = 300;
238 static int config_write_jitter = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
246 static listen_socket_t **config_listen_address_list = NULL;
247 static int config_listen_address_list_len = 0;
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
270 /*
271 * Functions
272 */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276 do_shutdown++;
277 pthread_cond_broadcast(&flush_cond);
278 pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283 sig_common("INT");
284 } /* }}} void sig_int_handler */
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288 sig_common("TERM");
289 } /* }}} void sig_term_handler */
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293 config_flush_at_shutdown = 1;
294 sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299 config_flush_at_shutdown = 0;
300 sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
303 static void install_signal_handlers(void) /* {{{ */
304 {
305 /* These structures are static, because `sigaction' behaves weird if the are
306 * overwritten.. */
307 static struct sigaction sa_int;
308 static struct sigaction sa_term;
309 static struct sigaction sa_pipe;
310 static struct sigaction sa_usr1;
311 static struct sigaction sa_usr2;
313 /* Install signal handlers */
314 memset (&sa_int, 0, sizeof (sa_int));
315 sa_int.sa_handler = sig_int_handler;
316 sigaction (SIGINT, &sa_int, NULL);
318 memset (&sa_term, 0, sizeof (sa_term));
319 sa_term.sa_handler = sig_term_handler;
320 sigaction (SIGTERM, &sa_term, NULL);
322 memset (&sa_pipe, 0, sizeof (sa_pipe));
323 sa_pipe.sa_handler = SIG_IGN;
324 sigaction (SIGPIPE, &sa_pipe, NULL);
326 memset (&sa_pipe, 0, sizeof (sa_usr1));
327 sa_usr1.sa_handler = sig_usr1_handler;
328 sigaction (SIGUSR1, &sa_usr1, NULL);
330 memset (&sa_usr2, 0, sizeof (sa_usr2));
331 sa_usr2.sa_handler = sig_usr2_handler;
332 sigaction (SIGUSR2, &sa_usr2, NULL);
334 } /* }}} void install_signal_handlers */
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338 int fd;
339 char *file;
341 file = (config_pid_file != NULL)
342 ? config_pid_file
343 : LOCALSTATEDIR "/run/rrdcached.pid";
345 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346 if (fd < 0)
347 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348 action, file, rrd_strerror(errno));
350 return(fd);
351 } /* }}} static int open_pidfile */
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356 int pid_fd;
357 pid_t pid;
358 char pid_str[16];
360 pid_fd = open_pidfile("open", O_RDWR);
361 if (pid_fd < 0)
362 return pid_fd;
364 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365 return -1;
367 pid = atoi(pid_str);
368 if (pid <= 0)
369 return -1;
371 /* another running process that we can signal COULD be
372 * a competing rrdcached */
373 if (pid != getpid() && kill(pid, 0) == 0)
374 {
375 fprintf(stderr,
376 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377 close(pid_fd);
378 return -1;
379 }
381 lseek(pid_fd, 0, SEEK_SET);
382 ftruncate(pid_fd, 0);
384 fprintf(stderr,
385 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
386 "rrdcached: starting normally.\n", pid);
388 return pid_fd;
389 } /* }}} static int check_pidfile */
391 static int write_pidfile (int fd) /* {{{ */
392 {
393 pid_t pid;
394 FILE *fh;
396 pid = getpid ();
398 fh = fdopen (fd, "w");
399 if (fh == NULL)
400 {
401 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
402 close(fd);
403 return (-1);
404 }
406 fprintf (fh, "%i\n", (int) pid);
407 fclose (fh);
409 return (0);
410 } /* }}} int write_pidfile */
412 static int remove_pidfile (void) /* {{{ */
413 {
414 char *file;
415 int status;
417 file = (config_pid_file != NULL)
418 ? config_pid_file
419 : LOCALSTATEDIR "/run/rrdcached.pid";
421 status = unlink (file);
422 if (status == 0)
423 return (0);
424 return (errno);
425 } /* }}} int remove_pidfile */
427 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
428 {
429 char *eol;
431 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
432 sock->next_read - sock->next_cmd);
434 if (eol == NULL)
435 {
436 /* no commands left, move remainder back to front of rbuf */
437 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
438 sock->next_read - sock->next_cmd);
439 sock->next_read -= sock->next_cmd;
440 sock->next_cmd = 0;
441 *len = 0;
442 return NULL;
443 }
444 else
445 {
446 char *cmd = sock->rbuf + sock->next_cmd;
447 *eol = '\0';
449 sock->next_cmd = eol - sock->rbuf + 1;
451 if (eol > sock->rbuf && *(eol-1) == '\r')
452 *(--eol) = '\0'; /* handle "\r\n" EOL */
454 *len = eol - cmd;
456 return cmd;
457 }
459 /* NOTREACHED */
460 assert(1==0);
461 }
463 /* add the characters directly to the write buffer */
464 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
465 {
466 char *new_buf;
468 assert(sock != NULL);
470 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
471 if (new_buf == NULL)
472 {
473 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
474 return -1;
475 }
477 strncpy(new_buf + sock->wbuf_len, str, len + 1);
479 sock->wbuf = new_buf;
480 sock->wbuf_len += len;
482 return 0;
483 } /* }}} static int add_to_wbuf */
485 /* add the text to the "extra" info that's sent after the status line */
486 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
487 {
488 va_list argp;
489 char buffer[CMD_MAX];
490 int len;
492 if (sock == NULL) return 0; /* journal replay mode */
493 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
495 va_start(argp, fmt);
496 #ifdef HAVE_VSNPRINTF
497 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
498 #else
499 len = vsprintf(buffer, fmt, argp);
500 #endif
501 va_end(argp);
502 if (len < 0)
503 {
504 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
505 return -1;
506 }
508 return add_to_wbuf(sock, buffer, len);
509 } /* }}} static int add_response_info */
511 static int count_lines(char *str) /* {{{ */
512 {
513 int lines = 0;
515 if (str != NULL)
516 {
517 while ((str = strchr(str, '\n')) != NULL)
518 {
519 ++lines;
520 ++str;
521 }
522 }
524 return lines;
525 } /* }}} static int count_lines */
527 /* send the response back to the user.
528 * returns 0 on success, -1 on error
529 * write buffer is always zeroed after this call */
530 static int send_response (listen_socket_t *sock, response_code rc,
531 char *fmt, ...) /* {{{ */
532 {
533 va_list argp;
534 char buffer[CMD_MAX];
535 int lines;
536 ssize_t wrote;
537 int rclen, len;
539 if (sock == NULL) return rc; /* journal replay mode */
541 if (sock->batch_start)
542 {
543 if (rc == RESP_OK)
544 return rc; /* no response on success during BATCH */
545 lines = sock->batch_cmd;
546 }
547 else if (rc == RESP_OK)
548 lines = count_lines(sock->wbuf);
549 else
550 lines = -1;
552 rclen = sprintf(buffer, "%d ", lines);
553 va_start(argp, fmt);
554 #ifdef HAVE_VSNPRINTF
555 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
556 #else
557 len = vsprintf(buffer+rclen, fmt, argp);
558 #endif
559 va_end(argp);
560 if (len < 0)
561 return -1;
563 len += rclen;
565 /* append the result to the wbuf, don't write to the user */
566 if (sock->batch_start)
567 return add_to_wbuf(sock, buffer, len);
569 /* first write must be complete */
570 if (len != write(sock->fd, buffer, len))
571 {
572 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
573 return -1;
574 }
576 if (sock->wbuf != NULL && rc == RESP_OK)
577 {
578 wrote = 0;
579 while (wrote < sock->wbuf_len)
580 {
581 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
582 if (wb <= 0)
583 {
584 RRDD_LOG(LOG_INFO, "send_response: could not write results");
585 return -1;
586 }
587 wrote += wb;
588 }
589 }
591 free(sock->wbuf); sock->wbuf = NULL;
592 sock->wbuf_len = 0;
594 return 0;
595 } /* }}} */
597 static void wipe_ci_values(cache_item_t *ci, time_t when)
598 {
599 ci->values = NULL;
600 ci->values_num = 0;
602 ci->last_flush_time = when;
603 if (config_write_jitter > 0)
604 ci->last_flush_time += (rrd_random() % config_write_jitter);
605 }
607 /* remove_from_queue
608 * remove a "cache_item_t" item from the queue.
609 * must hold 'cache_lock' when calling this
610 */
611 static void remove_from_queue(cache_item_t *ci) /* {{{ */
612 {
613 if (ci == NULL) return;
614 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
616 if (ci->prev == NULL)
617 cache_queue_head = ci->next; /* reset head */
618 else
619 ci->prev->next = ci->next;
621 if (ci->next == NULL)
622 cache_queue_tail = ci->prev; /* reset the tail */
623 else
624 ci->next->prev = ci->prev;
626 ci->next = ci->prev = NULL;
627 ci->flags &= ~CI_FLAGS_IN_QUEUE;
629 pthread_mutex_lock (&stats_lock);
630 assert (stats_queue_length > 0);
631 stats_queue_length--;
632 pthread_mutex_unlock (&stats_lock);
634 } /* }}} static void remove_from_queue */
636 /* free the resources associated with the cache_item_t
637 * must hold cache_lock when calling this function
638 */
639 static void *free_cache_item(cache_item_t *ci) /* {{{ */
640 {
641 if (ci == NULL) return NULL;
643 remove_from_queue(ci);
645 for (int i=0; i < ci->values_num; i++)
646 free(ci->values[i]);
648 free (ci->values);
649 free (ci->file);
651 /* in case anyone is waiting */
652 pthread_cond_broadcast(&ci->flushed);
654 free (ci);
656 return NULL;
657 } /* }}} static void *free_cache_item */
659 /*
660 * enqueue_cache_item:
661 * `cache_lock' must be acquired before calling this function!
662 */
663 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
664 queue_side_t side)
665 {
666 if (ci == NULL)
667 return (-1);
669 if (ci->values_num == 0)
670 return (0);
672 if (side == HEAD)
673 {
674 if (cache_queue_head == ci)
675 return 0;
677 /* remove if further down in queue */
678 remove_from_queue(ci);
680 ci->prev = NULL;
681 ci->next = cache_queue_head;
682 if (ci->next != NULL)
683 ci->next->prev = ci;
684 cache_queue_head = ci;
686 if (cache_queue_tail == NULL)
687 cache_queue_tail = cache_queue_head;
688 }
689 else /* (side == TAIL) */
690 {
691 /* We don't move values back in the list.. */
692 if (ci->flags & CI_FLAGS_IN_QUEUE)
693 return (0);
695 assert (ci->next == NULL);
696 assert (ci->prev == NULL);
698 ci->prev = cache_queue_tail;
700 if (cache_queue_tail == NULL)
701 cache_queue_head = ci;
702 else
703 cache_queue_tail->next = ci;
705 cache_queue_tail = ci;
706 }
708 ci->flags |= CI_FLAGS_IN_QUEUE;
710 pthread_cond_signal(&queue_cond);
711 pthread_mutex_lock (&stats_lock);
712 stats_queue_length++;
713 pthread_mutex_unlock (&stats_lock);
715 return (0);
716 } /* }}} int enqueue_cache_item */
718 /*
719 * tree_callback_flush:
720 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
721 * while this is in progress.
722 */
723 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
724 gpointer data)
725 {
726 cache_item_t *ci;
727 callback_flush_data_t *cfd;
729 ci = (cache_item_t *) value;
730 cfd = (callback_flush_data_t *) data;
732 if (ci->flags & CI_FLAGS_IN_QUEUE)
733 return FALSE;
735 if ((ci->last_flush_time <= cfd->abs_timeout)
736 && (ci->values_num > 0))
737 {
738 enqueue_cache_item (ci, TAIL);
739 }
740 else if ((do_shutdown != 0)
741 && (ci->values_num > 0))
742 {
743 enqueue_cache_item (ci, TAIL);
744 }
745 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
746 && (ci->values_num <= 0))
747 {
748 char **temp;
750 temp = (char **) rrd_realloc (cfd->keys,
751 sizeof (char *) * (cfd->keys_num + 1));
752 if (temp == NULL)
753 {
754 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
755 return (FALSE);
756 }
757 cfd->keys = temp;
758 /* Make really sure this points to the _same_ place */
759 assert ((char *) key == ci->file);
760 cfd->keys[cfd->keys_num] = (char *) key;
761 cfd->keys_num++;
762 }
764 return (FALSE);
765 } /* }}} gboolean tree_callback_flush */
767 static int flush_old_values (int max_age)
768 {
769 callback_flush_data_t cfd;
770 size_t k;
772 memset (&cfd, 0, sizeof (cfd));
773 /* Pass the current time as user data so that we don't need to call
774 * `time' for each node. */
775 cfd.now = time (NULL);
776 cfd.keys = NULL;
777 cfd.keys_num = 0;
779 if (max_age > 0)
780 cfd.abs_timeout = cfd.now - max_age;
781 else
782 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
784 /* `tree_callback_flush' will return the keys of all values that haven't
785 * been touched in the last `config_flush_interval' seconds in `cfd'.
786 * The char*'s in this array point to the same memory as ci->file, so we
787 * don't need to free them separately. */
788 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
790 for (k = 0; k < cfd.keys_num; k++)
791 {
792 /* should never fail, since we have held the cache_lock
793 * the entire time */
794 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
795 }
797 if (cfd.keys != NULL)
798 {
799 free (cfd.keys);
800 cfd.keys = NULL;
801 }
803 return (0);
804 } /* int flush_old_values */
806 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
807 {
808 struct timeval now;
809 struct timespec next_flush;
810 int status;
812 gettimeofday (&now, NULL);
813 next_flush.tv_sec = now.tv_sec + config_flush_interval;
814 next_flush.tv_nsec = 1000 * now.tv_usec;
816 pthread_mutex_lock(&cache_lock);
818 while (!do_shutdown)
819 {
820 gettimeofday (&now, NULL);
821 if ((now.tv_sec > next_flush.tv_sec)
822 || ((now.tv_sec == next_flush.tv_sec)
823 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
824 {
825 /* Flush all values that haven't been written in the last
826 * `config_write_interval' seconds. */
827 flush_old_values (config_write_interval);
829 /* Determine the time of the next cache flush. */
830 next_flush.tv_sec =
831 now.tv_sec + next_flush.tv_sec % config_flush_interval;
833 /* unlock the cache while we rotate so we don't block incoming
834 * updates if the fsync() blocks on disk I/O */
835 pthread_mutex_unlock(&cache_lock);
836 journal_rotate();
837 pthread_mutex_lock(&cache_lock);
838 }
840 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
841 if (status != 0 && status != ETIMEDOUT)
842 {
843 RRDD_LOG (LOG_ERR, "flush_thread_main: "
844 "pthread_cond_timedwait returned %i.", status);
845 }
846 }
848 if (config_flush_at_shutdown)
849 flush_old_values (-1); /* flush everything */
851 pthread_mutex_unlock(&cache_lock);
853 return NULL;
854 } /* void *flush_thread_main */
856 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
857 {
858 pthread_mutex_lock (&cache_lock);
860 while (!do_shutdown
861 || (cache_queue_head != NULL && config_flush_at_shutdown))
862 {
863 cache_item_t *ci;
864 char *file;
865 char **values;
866 int values_num;
867 int status;
868 int i;
870 /* Now, check if there's something to store away. If not, wait until
871 * something comes in. if we are shutting down, do not wait around. */
872 if (cache_queue_head == NULL && !do_shutdown)
873 {
874 status = pthread_cond_wait (&queue_cond, &cache_lock);
875 if ((status != 0) && (status != ETIMEDOUT))
876 {
877 RRDD_LOG (LOG_ERR, "queue_thread_main: "
878 "pthread_cond_wait returned %i.", status);
879 }
880 }
882 /* Check if a value has arrived. This may be NULL if we timed out or there
883 * was an interrupt such as a signal. */
884 if (cache_queue_head == NULL)
885 continue;
887 ci = cache_queue_head;
889 /* copy the relevant parts */
890 file = strdup (ci->file);
891 if (file == NULL)
892 {
893 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
894 continue;
895 }
897 assert(ci->values != NULL);
898 assert(ci->values_num > 0);
900 values = ci->values;
901 values_num = ci->values_num;
903 wipe_ci_values(ci, time(NULL));
904 remove_from_queue(ci);
906 pthread_mutex_unlock (&cache_lock);
908 rrd_clear_error ();
909 status = rrd_update_r (file, NULL, values_num, (void *) values);
910 if (status != 0)
911 {
912 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
913 "rrd_update_r (%s) failed with status %i. (%s)",
914 file, status, rrd_get_error());
915 }
917 journal_write("wrote", file);
918 pthread_cond_broadcast(&ci->flushed);
920 for (i = 0; i < values_num; i++)
921 free (values[i]);
923 free(values);
924 free(file);
926 if (status == 0)
927 {
928 pthread_mutex_lock (&stats_lock);
929 stats_updates_written++;
930 stats_data_sets_written += values_num;
931 pthread_mutex_unlock (&stats_lock);
932 }
934 pthread_mutex_lock (&cache_lock);
935 }
936 pthread_mutex_unlock (&cache_lock);
938 return (NULL);
939 } /* }}} void *queue_thread_main */
941 static int buffer_get_field (char **buffer_ret, /* {{{ */
942 size_t *buffer_size_ret, char **field_ret)
943 {
944 char *buffer;
945 size_t buffer_pos;
946 size_t buffer_size;
947 char *field;
948 size_t field_size;
949 int status;
951 buffer = *buffer_ret;
952 buffer_pos = 0;
953 buffer_size = *buffer_size_ret;
954 field = *buffer_ret;
955 field_size = 0;
957 if (buffer_size <= 0)
958 return (-1);
960 /* This is ensured by `handle_request'. */
961 assert (buffer[buffer_size - 1] == '\0');
963 status = -1;
964 while (buffer_pos < buffer_size)
965 {
966 /* Check for end-of-field or end-of-buffer */
967 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
968 {
969 field[field_size] = 0;
970 field_size++;
971 buffer_pos++;
972 status = 0;
973 break;
974 }
975 /* Handle escaped characters. */
976 else if (buffer[buffer_pos] == '\\')
977 {
978 if (buffer_pos >= (buffer_size - 1))
979 break;
980 buffer_pos++;
981 field[field_size] = buffer[buffer_pos];
982 field_size++;
983 buffer_pos++;
984 }
985 /* Normal operation */
986 else
987 {
988 field[field_size] = buffer[buffer_pos];
989 field_size++;
990 buffer_pos++;
991 }
992 } /* while (buffer_pos < buffer_size) */
994 if (status != 0)
995 return (status);
997 *buffer_ret = buffer + buffer_pos;
998 *buffer_size_ret = buffer_size - buffer_pos;
999 *field_ret = field;
1001 return (0);
1002 } /* }}} int buffer_get_field */
1004 /* if we're restricting writes to the base directory,
1005 * check whether the file falls within the dir
1006 * returns 1 if OK, otherwise 0
1007 */
1008 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1009 {
1010 assert(file != NULL);
1012 if (!config_write_base_only
1013 || sock == NULL /* journal replay */
1014 || config_base_dir == NULL)
1015 return 1;
1017 if (strstr(file, "../") != NULL) goto err;
1019 /* relative paths without "../" are ok */
1020 if (*file != '/') return 1;
1022 /* file must be of the format base + "/" + <1+ char filename> */
1023 if (strlen(file) < _config_base_dir_len + 2) goto err;
1024 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1025 if (*(file + _config_base_dir_len) != '/') goto err;
1027 return 1;
1029 err:
1030 if (sock != NULL && sock->fd >= 0)
1031 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033 return 0;
1034 } /* }}} static int check_file_access */
1036 /* when using a base dir, convert relative paths to absolute paths.
1037 * if necessary, modifies the "filename" pointer to point
1038 * to the new path created in "tmp". "tmp" is provided
1039 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1040 *
1041 * this allows us to optimize for the expected case (absolute path)
1042 * with a no-op.
1043 */
1044 static void get_abs_path(char **filename, char *tmp)
1045 {
1046 assert(tmp != NULL);
1047 assert(filename != NULL && *filename != NULL);
1049 if (config_base_dir == NULL || **filename == '/')
1050 return;
1052 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1053 *filename = tmp;
1054 } /* }}} static int get_abs_path */
1056 /* returns 1 if we have the required privilege level,
1057 * otherwise issue an error to the user on sock */
1058 static int has_privilege (listen_socket_t *sock, /* {{{ */
1059 socket_privilege priv)
1060 {
1061 if (sock == NULL) /* journal replay */
1062 return 1;
1064 if (sock->privilege >= priv)
1065 return 1;
1067 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1068 } /* }}} static int has_privilege */
1070 static int flush_file (const char *filename) /* {{{ */
1071 {
1072 cache_item_t *ci;
1074 pthread_mutex_lock (&cache_lock);
1076 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1077 if (ci == NULL)
1078 {
1079 pthread_mutex_unlock (&cache_lock);
1080 return (ENOENT);
1081 }
1083 if (ci->values_num > 0)
1084 {
1085 /* Enqueue at head */
1086 enqueue_cache_item (ci, HEAD);
1087 pthread_cond_wait(&ci->flushed, &cache_lock);
1088 }
1090 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1091 * may have been purged during our cond_wait() */
1093 pthread_mutex_unlock(&cache_lock);
1095 return (0);
1096 } /* }}} int flush_file */
1098 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1099 {
1100 char *err = "Syntax error.\n";
1102 if (cmd && cmd->syntax)
1103 err = cmd->syntax;
1105 return send_response(sock, RESP_ERR, "Usage: %s", err);
1106 } /* }}} static int syntax_error() */
1108 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1109 {
1110 uint64_t copy_queue_length;
1111 uint64_t copy_updates_received;
1112 uint64_t copy_flush_received;
1113 uint64_t copy_updates_written;
1114 uint64_t copy_data_sets_written;
1115 uint64_t copy_journal_bytes;
1116 uint64_t copy_journal_rotate;
1118 uint64_t tree_nodes_number;
1119 uint64_t tree_depth;
1121 pthread_mutex_lock (&stats_lock);
1122 copy_queue_length = stats_queue_length;
1123 copy_updates_received = stats_updates_received;
1124 copy_flush_received = stats_flush_received;
1125 copy_updates_written = stats_updates_written;
1126 copy_data_sets_written = stats_data_sets_written;
1127 copy_journal_bytes = stats_journal_bytes;
1128 copy_journal_rotate = stats_journal_rotate;
1129 pthread_mutex_unlock (&stats_lock);
1131 pthread_mutex_lock (&cache_lock);
1132 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1133 tree_depth = (uint64_t) g_tree_height (cache_tree);
1134 pthread_mutex_unlock (&cache_lock);
1136 add_response_info(sock,
1137 "QueueLength: %"PRIu64"\n", copy_queue_length);
1138 add_response_info(sock,
1139 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1140 add_response_info(sock,
1141 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1142 add_response_info(sock,
1143 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1144 add_response_info(sock,
1145 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1146 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1147 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1148 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1149 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1151 send_response(sock, RESP_OK, "Statistics follow\n");
1153 return (0);
1154 } /* }}} int handle_request_stats */
1156 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1157 {
1158 char *file, file_tmp[PATH_MAX];
1159 int status;
1161 status = buffer_get_field (&buffer, &buffer_size, &file);
1162 if (status != 0)
1163 {
1164 return syntax_error(sock,cmd);
1165 }
1166 else
1167 {
1168 pthread_mutex_lock(&stats_lock);
1169 stats_flush_received++;
1170 pthread_mutex_unlock(&stats_lock);
1172 get_abs_path(&file, file_tmp);
1173 if (!check_file_access(file, sock)) return 0;
1175 status = flush_file (file);
1176 if (status == 0)
1177 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1178 else if (status == ENOENT)
1179 {
1180 /* no file in our tree; see whether it exists at all */
1181 struct stat statbuf;
1183 memset(&statbuf, 0, sizeof(statbuf));
1184 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1185 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1186 else
1187 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1188 }
1189 else if (status < 0)
1190 return send_response(sock, RESP_ERR, "Internal error.\n");
1191 else
1192 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1193 }
1195 /* NOTREACHED */
1196 assert(1==0);
1197 } /* }}} int handle_request_flush */
1199 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1200 {
1201 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1203 pthread_mutex_lock(&cache_lock);
1204 flush_old_values(-1);
1205 pthread_mutex_unlock(&cache_lock);
1207 return send_response(sock, RESP_OK, "Started flush.\n");
1208 } /* }}} static int handle_request_flushall */
1210 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1211 {
1212 int status;
1213 char *file, file_tmp[PATH_MAX];
1214 cache_item_t *ci;
1216 status = buffer_get_field(&buffer, &buffer_size, &file);
1217 if (status != 0)
1218 return syntax_error(sock,cmd);
1220 get_abs_path(&file, file_tmp);
1222 pthread_mutex_lock(&cache_lock);
1223 ci = g_tree_lookup(cache_tree, file);
1224 if (ci == NULL)
1225 {
1226 pthread_mutex_unlock(&cache_lock);
1227 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1228 }
1230 for (int i=0; i < ci->values_num; i++)
1231 add_response_info(sock, "%s\n", ci->values[i]);
1233 pthread_mutex_unlock(&cache_lock);
1234 return send_response(sock, RESP_OK, "updates pending\n");
1235 } /* }}} static int handle_request_pending */
1237 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1238 {
1239 int status;
1240 gboolean found;
1241 char *file, file_tmp[PATH_MAX];
1243 status = buffer_get_field(&buffer, &buffer_size, &file);
1244 if (status != 0)
1245 return syntax_error(sock,cmd);
1247 get_abs_path(&file, file_tmp);
1248 if (!check_file_access(file, sock)) return 0;
1250 pthread_mutex_lock(&cache_lock);
1251 found = g_tree_remove(cache_tree, file);
1252 pthread_mutex_unlock(&cache_lock);
1254 if (found == TRUE)
1255 {
1256 if (sock != NULL)
1257 journal_write("forget", file);
1259 return send_response(sock, RESP_OK, "Gone!\n");
1260 }
1261 else
1262 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1264 /* NOTREACHED */
1265 assert(1==0);
1266 } /* }}} static int handle_request_forget */
1268 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1269 {
1270 cache_item_t *ci;
1272 pthread_mutex_lock(&cache_lock);
1274 ci = cache_queue_head;
1275 while (ci != NULL)
1276 {
1277 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1278 ci = ci->next;
1279 }
1281 pthread_mutex_unlock(&cache_lock);
1283 return send_response(sock, RESP_OK, "in queue.\n");
1284 } /* }}} int handle_request_queue */
1286 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1287 {
1288 char *file, file_tmp[PATH_MAX];
1289 int values_num = 0;
1290 int status;
1291 char orig_buf[CMD_MAX];
1293 cache_item_t *ci;
1295 /* save it for the journal later */
1296 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1298 status = buffer_get_field (&buffer, &buffer_size, &file);
1299 if (status != 0)
1300 return syntax_error(sock,cmd);
1302 pthread_mutex_lock(&stats_lock);
1303 stats_updates_received++;
1304 pthread_mutex_unlock(&stats_lock);
1306 get_abs_path(&file, file_tmp);
1307 if (!check_file_access(file, sock)) return 0;
1309 pthread_mutex_lock (&cache_lock);
1310 ci = g_tree_lookup (cache_tree, file);
1312 if (ci == NULL) /* {{{ */
1313 {
1314 struct stat statbuf;
1316 /* don't hold the lock while we setup; stat(2) might block */
1317 pthread_mutex_unlock(&cache_lock);
1319 memset (&statbuf, 0, sizeof (statbuf));
1320 status = stat (file, &statbuf);
1321 if (status != 0)
1322 {
1323 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1325 status = errno;
1326 if (status == ENOENT)
1327 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1328 else
1329 return send_response(sock, RESP_ERR,
1330 "stat failed with error %i.\n", status);
1331 }
1332 if (!S_ISREG (statbuf.st_mode))
1333 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1335 if (access(file, R_OK|W_OK) != 0)
1336 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1337 file, rrd_strerror(errno));
1339 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1340 if (ci == NULL)
1341 {
1342 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1344 return send_response(sock, RESP_ERR, "malloc failed.\n");
1345 }
1346 memset (ci, 0, sizeof (cache_item_t));
1348 ci->file = strdup (file);
1349 if (ci->file == NULL)
1350 {
1351 free (ci);
1352 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1354 return send_response(sock, RESP_ERR, "strdup failed.\n");
1355 }
1357 wipe_ci_values(ci, now);
1358 ci->flags = CI_FLAGS_IN_TREE;
1359 pthread_cond_init(&ci->flushed, NULL);
1361 pthread_mutex_lock(&cache_lock);
1362 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1363 } /* }}} */
1364 assert (ci != NULL);
1366 /* don't re-write updates in replay mode */
1367 if (sock != NULL)
1368 journal_write("update", orig_buf);
1370 while (buffer_size > 0)
1371 {
1372 char **temp;
1373 char *value;
1374 time_t stamp;
1375 char *eostamp;
1377 status = buffer_get_field (&buffer, &buffer_size, &value);
1378 if (status != 0)
1379 {
1380 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1381 break;
1382 }
1384 /* make sure update time is always moving forward */
1385 stamp = strtol(value, &eostamp, 10);
1386 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1387 {
1388 pthread_mutex_unlock(&cache_lock);
1389 return send_response(sock, RESP_ERR,
1390 "Cannot find timestamp in '%s'!\n", value);
1391 }
1392 else if (stamp <= ci->last_update_stamp)
1393 {
1394 pthread_mutex_unlock(&cache_lock);
1395 return send_response(sock, RESP_ERR,
1396 "illegal attempt to update using time %ld when last"
1397 " update time is %ld (minimum one second step)\n",
1398 stamp, ci->last_update_stamp);
1399 }
1400 else
1401 ci->last_update_stamp = stamp;
1403 temp = (char **) rrd_realloc (ci->values,
1404 sizeof (char *) * (ci->values_num + 1));
1405 if (temp == NULL)
1406 {
1407 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1408 continue;
1409 }
1410 ci->values = temp;
1412 ci->values[ci->values_num] = strdup (value);
1413 if (ci->values[ci->values_num] == NULL)
1414 {
1415 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1416 continue;
1417 }
1418 ci->values_num++;
1420 values_num++;
1421 }
1423 if (((now - ci->last_flush_time) >= config_write_interval)
1424 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1425 && (ci->values_num > 0))
1426 {
1427 enqueue_cache_item (ci, TAIL);
1428 }
1430 pthread_mutex_unlock (&cache_lock);
1432 if (values_num < 1)
1433 return send_response(sock, RESP_ERR, "No values updated.\n");
1434 else
1435 return send_response(sock, RESP_OK,
1436 "errors, enqueued %i value(s).\n", values_num);
1438 /* NOTREACHED */
1439 assert(1==0);
1441 } /* }}} int handle_request_update */
1443 /* we came across a "WROTE" entry during journal replay.
1444 * throw away any values that we have accumulated for this file
1445 */
1446 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1447 {
1448 int i;
1449 cache_item_t *ci;
1450 const char *file = buffer;
1452 pthread_mutex_lock(&cache_lock);
1454 ci = g_tree_lookup(cache_tree, file);
1455 if (ci == NULL)
1456 {
1457 pthread_mutex_unlock(&cache_lock);
1458 return (0);
1459 }
1461 if (ci->values)
1462 {
1463 for (i=0; i < ci->values_num; i++)
1464 free(ci->values[i]);
1466 free(ci->values);
1467 }
1469 wipe_ci_values(ci, now);
1470 remove_from_queue(ci);
1472 pthread_mutex_unlock(&cache_lock);
1473 return (0);
1474 } /* }}} int handle_request_wrote */
1476 /* start "BATCH" processing */
1477 static int batch_start (HANDLER_PROTO) /* {{{ */
1478 {
1479 int status;
1480 if (sock->batch_start)
1481 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1483 status = send_response(sock, RESP_OK,
1484 "Go ahead. End with dot '.' on its own line.\n");
1485 sock->batch_start = time(NULL);
1486 sock->batch_cmd = 0;
1488 return status;
1489 } /* }}} static int batch_start */
1491 /* finish "BATCH" processing and return results to the client */
1492 static int batch_done (HANDLER_PROTO) /* {{{ */
1493 {
1494 assert(sock->batch_start);
1495 sock->batch_start = 0;
1496 sock->batch_cmd = 0;
1497 return send_response(sock, RESP_OK, "errors\n");
1498 } /* }}} static int batch_done */
1500 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1501 {
1502 return -1;
1503 } /* }}} static int handle_request_quit */
1505 struct command COMMANDS[] = {
1506 {
1507 "UPDATE",
1508 handle_request_update,
1509 PRIV_HIGH,
1510 CMD_CONTEXT_ANY,
1511 "UPDATE <filename> <values> [<values> ...]\n"
1512 ,
1513 "Adds the given file to the internal cache if it is not yet known and\n"
1514 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1515 "for details.\n"
1516 "\n"
1517 "Each <values> has the following form:\n"
1518 " <values> = <time>:<value>[:<value>[...]]\n"
1519 "See the rrdupdate(1) manpage for details.\n"
1520 },
1521 {
1522 "WROTE",
1523 handle_request_wrote,
1524 PRIV_HIGH,
1525 CMD_CONTEXT_JOURNAL,
1526 NULL,
1527 NULL
1528 },
1529 {
1530 "FLUSH",
1531 handle_request_flush,
1532 PRIV_LOW,
1533 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1534 "FLUSH <filename>\n"
1535 ,
1536 "Adds the given filename to the head of the update queue and returns\n"
1537 "after it has been dequeued.\n"
1538 },
1539 {
1540 "FLUSHALL",
1541 handle_request_flushall,
1542 PRIV_HIGH,
1543 CMD_CONTEXT_CLIENT,
1544 "FLUSHALL\n"
1545 ,
1546 "Triggers writing of all pending updates. Returns immediately.\n"
1547 },
1548 {
1549 "PENDING",
1550 handle_request_pending,
1551 PRIV_HIGH,
1552 CMD_CONTEXT_CLIENT,
1553 "PENDING <filename>\n"
1554 ,
1555 "Shows any 'pending' updates for a file, in order.\n"
1556 "The updates shown have not yet been written to the underlying RRD file.\n"
1557 },
1558 {
1559 "FORGET",
1560 handle_request_forget,
1561 PRIV_HIGH,
1562 CMD_CONTEXT_ANY,
1563 "FORGET <filename>\n"
1564 ,
1565 "Removes the file completely from the cache.\n"
1566 "Any pending updates for the file will be lost.\n"
1567 },
1568 {
1569 "QUEUE",
1570 handle_request_queue,
1571 PRIV_LOW,
1572 CMD_CONTEXT_CLIENT,
1573 "QUEUE\n"
1574 ,
1575 "Shows all files in the output queue.\n"
1576 "The output is zero or more lines in the following format:\n"
1577 "(where <num_vals> is the number of values to be written)\n"
1578 "\n"
1579 "<num_vals> <filename>\n"
1580 },
1581 {
1582 "STATS",
1583 handle_request_stats,
1584 PRIV_LOW,
1585 CMD_CONTEXT_CLIENT,
1586 "STATS\n"
1587 ,
1588 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1589 "a description of the values.\n"
1590 },
1591 {
1592 "HELP",
1593 handle_request_help,
1594 PRIV_LOW,
1595 CMD_CONTEXT_CLIENT,
1596 "HELP [<command>]\n",
1597 NULL, /* special! */
1598 },
1599 {
1600 "BATCH",
1601 batch_start,
1602 PRIV_LOW,
1603 CMD_CONTEXT_CLIENT,
1604 "BATCH\n"
1605 ,
1606 "The 'BATCH' command permits the client to initiate a bulk load\n"
1607 " of commands to rrdcached.\n"
1608 "\n"
1609 "Usage:\n"
1610 "\n"
1611 " client: BATCH\n"
1612 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1613 " client: command #1\n"
1614 " client: command #2\n"
1615 " client: ... and so on\n"
1616 " client: .\n"
1617 " server: 2 errors\n"
1618 " server: 7 message for command #7\n"
1619 " server: 9 message for command #9\n"
1620 "\n"
1621 "For more information, consult the rrdcached(1) documentation.\n"
1622 },
1623 {
1624 ".", /* BATCH terminator */
1625 batch_done,
1626 PRIV_LOW,
1627 CMD_CONTEXT_BATCH,
1628 NULL,
1629 NULL
1630 },
1631 {
1632 "QUIT",
1633 handle_request_quit,
1634 PRIV_LOW,
1635 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1636 "QUIT\n"
1637 ,
1638 "Disconnect from rrdcached.\n"
1639 },
1640 {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */
1641 };
1643 static struct command *find_command(char *cmd)
1644 {
1645 struct command *c = COMMANDS;
1647 while (c->cmd != NULL)
1648 {
1649 if (strcasecmp(cmd, c->cmd) == 0)
1650 break;
1651 c++;
1652 }
1654 if (c->cmd == NULL)
1655 return NULL;
1656 else
1657 return c;
1658 }
1660 /* check whether commands are received in the expected context */
1661 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1662 {
1663 if (sock == NULL)
1664 return (cmd->context & CMD_CONTEXT_JOURNAL);
1665 else if (sock->batch_start)
1666 return (cmd->context & CMD_CONTEXT_BATCH);
1667 else
1668 return (cmd->context & CMD_CONTEXT_CLIENT);
1670 /* NOTREACHED */
1671 assert(1==0);
1672 }
1674 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1675 {
1676 int status;
1677 char *cmd_str;
1678 char *resp_txt;
1679 struct command *help = NULL;
1681 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1682 if (status == 0)
1683 help = find_command(cmd_str);
1685 if (help && (help->syntax || help->help))
1686 {
1687 char tmp[CMD_MAX];
1689 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1690 resp_txt = tmp;
1692 if (help->syntax)
1693 add_response_info(sock, "Usage: %s\n", help->syntax);
1695 if (help->help)
1696 add_response_info(sock, "%s\n", help->help);
1697 }
1698 else
1699 {
1700 help = COMMANDS;
1701 resp_txt = "Command overview\n";
1703 while (help->cmd)
1704 {
1705 if (help->syntax)
1706 add_response_info(sock, "%s", help->syntax);
1707 help++;
1708 }
1709 }
1711 return send_response(sock, RESP_OK, resp_txt);
1712 } /* }}} int handle_request_help */
1714 /* if sock==NULL, we are in journal replay mode */
1715 static int handle_request (DISPATCH_PROTO) /* {{{ */
1716 {
1717 char *buffer_ptr = buffer;
1718 char *cmd_str = NULL;
1719 struct command *cmd = NULL;
1720 int status;
1722 assert (buffer[buffer_size - 1] == '\0');
1724 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1725 if (status != 0)
1726 {
1727 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1728 return (-1);
1729 }
1731 if (sock != NULL && sock->batch_start)
1732 sock->batch_cmd++;
1734 cmd = find_command(cmd_str);
1735 if (!cmd)
1736 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1738 status = has_privilege(sock, cmd->min_priv);
1739 if (status <= 0)
1740 return status;
1742 if (!command_check_context(sock, cmd))
1743 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1745 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1746 } /* }}} int handle_request */
1748 /* MUST NOT hold journal_lock before calling this */
1749 static void journal_rotate(void) /* {{{ */
1750 {
1751 FILE *old_fh = NULL;
1752 int new_fd;
1754 if (journal_cur == NULL || journal_old == NULL)
1755 return;
1757 pthread_mutex_lock(&journal_lock);
1759 /* we rotate this way (rename before close) so that the we can release
1760 * the journal lock as fast as possible. Journal writes to the new
1761 * journal can proceed immediately after the new file is opened. The
1762 * fclose can then block without affecting new updates.
1763 */
1764 if (journal_fh != NULL)
1765 {
1766 old_fh = journal_fh;
1767 journal_fh = NULL;
1768 rename(journal_cur, journal_old);
1769 ++stats_journal_rotate;
1770 }
1772 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1773 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1774 if (new_fd >= 0)
1775 {
1776 journal_fh = fdopen(new_fd, "a");
1777 if (journal_fh == NULL)
1778 close(new_fd);
1779 }
1781 pthread_mutex_unlock(&journal_lock);
1783 if (old_fh != NULL)
1784 fclose(old_fh);
1786 if (journal_fh == NULL)
1787 {
1788 RRDD_LOG(LOG_CRIT,
1789 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1790 journal_cur, rrd_strerror(errno));
1792 RRDD_LOG(LOG_ERR,
1793 "JOURNALING DISABLED: All values will be flushed at shutdown");
1794 config_flush_at_shutdown = 1;
1795 }
1797 } /* }}} static void journal_rotate */
1799 static void journal_done(void) /* {{{ */
1800 {
1801 if (journal_cur == NULL)
1802 return;
1804 pthread_mutex_lock(&journal_lock);
1805 if (journal_fh != NULL)
1806 {
1807 fclose(journal_fh);
1808 journal_fh = NULL;
1809 }
1811 if (config_flush_at_shutdown)
1812 {
1813 RRDD_LOG(LOG_INFO, "removing journals");
1814 unlink(journal_old);
1815 unlink(journal_cur);
1816 }
1817 else
1818 {
1819 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1820 "journals will be used at next startup");
1821 }
1823 pthread_mutex_unlock(&journal_lock);
1825 } /* }}} static void journal_done */
1827 static int journal_write(char *cmd, char *args) /* {{{ */
1828 {
1829 int chars;
1831 if (journal_fh == NULL)
1832 return 0;
1834 pthread_mutex_lock(&journal_lock);
1835 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1836 pthread_mutex_unlock(&journal_lock);
1838 if (chars > 0)
1839 {
1840 pthread_mutex_lock(&stats_lock);
1841 stats_journal_bytes += chars;
1842 pthread_mutex_unlock(&stats_lock);
1843 }
1845 return chars;
1846 } /* }}} static int journal_write */
1848 static int journal_replay (const char *file) /* {{{ */
1849 {
1850 FILE *fh;
1851 int entry_cnt = 0;
1852 int fail_cnt = 0;
1853 uint64_t line = 0;
1854 char entry[CMD_MAX];
1855 time_t now;
1857 if (file == NULL) return 0;
1859 {
1860 char *reason = "unknown error";
1861 int status = 0;
1862 struct stat statbuf;
1864 memset(&statbuf, 0, sizeof(statbuf));
1865 if (stat(file, &statbuf) != 0)
1866 {
1867 if (errno == ENOENT)
1868 return 0;
1870 reason = "stat error";
1871 status = errno;
1872 }
1873 else if (!S_ISREG(statbuf.st_mode))
1874 {
1875 reason = "not a regular file";
1876 status = EPERM;
1877 }
1878 if (statbuf.st_uid != daemon_uid)
1879 {
1880 reason = "not owned by daemon user";
1881 status = EACCES;
1882 }
1883 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1884 {
1885 reason = "must not be user/group writable";
1886 status = EACCES;
1887 }
1889 if (status != 0)
1890 {
1891 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1892 file, rrd_strerror(status), reason);
1893 return 0;
1894 }
1895 }
1897 fh = fopen(file, "r");
1898 if (fh == NULL)
1899 {
1900 if (errno != ENOENT)
1901 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1902 file, rrd_strerror(errno));
1903 return 0;
1904 }
1905 else
1906 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1908 now = time(NULL);
1910 while(!feof(fh))
1911 {
1912 size_t entry_len;
1914 ++line;
1915 if (fgets(entry, sizeof(entry), fh) == NULL)
1916 break;
1917 entry_len = strlen(entry);
1919 /* check \n termination in case journal writing crashed mid-line */
1920 if (entry_len == 0)
1921 continue;
1922 else if (entry[entry_len - 1] != '\n')
1923 {
1924 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1925 ++fail_cnt;
1926 continue;
1927 }
1929 entry[entry_len - 1] = '\0';
1931 if (handle_request(NULL, now, entry, entry_len) == 0)
1932 ++entry_cnt;
1933 else
1934 ++fail_cnt;
1935 }
1937 fclose(fh);
1939 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1940 entry_cnt, fail_cnt);
1942 return entry_cnt > 0 ? 1 : 0;
1943 } /* }}} static int journal_replay */
1945 static void journal_init(void) /* {{{ */
1946 {
1947 int had_journal = 0;
1949 if (journal_cur == NULL) return;
1951 pthread_mutex_lock(&journal_lock);
1953 RRDD_LOG(LOG_INFO, "checking for journal files");
1955 had_journal += journal_replay(journal_old);
1956 had_journal += journal_replay(journal_cur);
1958 /* it must have been a crash. start a flush */
1959 if (had_journal && config_flush_at_shutdown)
1960 flush_old_values(-1);
1962 pthread_mutex_unlock(&journal_lock);
1963 journal_rotate();
1965 RRDD_LOG(LOG_INFO, "journal processing complete");
1967 } /* }}} static void journal_init */
1969 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1970 {
1971 assert(sock != NULL);
1973 free(sock->rbuf); sock->rbuf = NULL;
1974 free(sock->wbuf); sock->wbuf = NULL;
1975 free(sock);
1976 } /* }}} void free_listen_socket */
1978 static void close_connection(listen_socket_t *sock) /* {{{ */
1979 {
1980 if (sock->fd >= 0)
1981 {
1982 close(sock->fd);
1983 sock->fd = -1;
1984 }
1986 free_listen_socket(sock);
1988 } /* }}} void close_connection */
1990 static void *connection_thread_main (void *args) /* {{{ */
1991 {
1992 listen_socket_t *sock;
1993 int fd;
1995 sock = (listen_socket_t *) args;
1996 fd = sock->fd;
1998 /* init read buffers */
1999 sock->next_read = sock->next_cmd = 0;
2000 sock->rbuf = malloc(RBUF_SIZE);
2001 if (sock->rbuf == NULL)
2002 {
2003 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2004 close_connection(sock);
2005 return NULL;
2006 }
2008 pthread_mutex_lock (&connection_threads_lock);
2009 connection_threads_num++;
2010 pthread_mutex_unlock (&connection_threads_lock);
2012 while (do_shutdown == 0)
2013 {
2014 char *cmd;
2015 ssize_t cmd_len;
2016 ssize_t rbytes;
2017 time_t now;
2019 struct pollfd pollfd;
2020 int status;
2022 pollfd.fd = fd;
2023 pollfd.events = POLLIN | POLLPRI;
2024 pollfd.revents = 0;
2026 status = poll (&pollfd, 1, /* timeout = */ 500);
2027 if (do_shutdown)
2028 break;
2029 else if (status == 0) /* timeout */
2030 continue;
2031 else if (status < 0) /* error */
2032 {
2033 status = errno;
2034 if (status != EINTR)
2035 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2036 continue;
2037 }
2039 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2040 break;
2041 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2042 {
2043 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2044 "poll(2) returned something unexpected: %#04hx",
2045 pollfd.revents);
2046 break;
2047 }
2049 rbytes = read(fd, sock->rbuf + sock->next_read,
2050 RBUF_SIZE - sock->next_read);
2051 if (rbytes < 0)
2052 {
2053 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2054 break;
2055 }
2056 else if (rbytes == 0)
2057 break; /* eof */
2059 sock->next_read += rbytes;
2061 if (sock->batch_start)
2062 now = sock->batch_start;
2063 else
2064 now = time(NULL);
2066 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2067 {
2068 status = handle_request (sock, now, cmd, cmd_len+1);
2069 if (status != 0)
2070 goto out_close;
2071 }
2072 }
2074 out_close:
2075 close_connection(sock);
2077 /* Remove this thread from the connection threads list */
2078 pthread_mutex_lock (&connection_threads_lock);
2079 connection_threads_num--;
2080 if (connection_threads_num <= 0)
2081 pthread_cond_broadcast(&connection_threads_done);
2082 pthread_mutex_unlock (&connection_threads_lock);
2084 return (NULL);
2085 } /* }}} void *connection_thread_main */
2087 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2088 {
2089 int fd;
2090 struct sockaddr_un sa;
2091 listen_socket_t *temp;
2092 int status;
2093 const char *path;
2095 path = sock->addr;
2096 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2097 path += strlen("unix:");
2099 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2100 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2101 if (temp == NULL)
2102 {
2103 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2104 return (-1);
2105 }
2106 listen_fds = temp;
2107 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2109 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2110 if (fd < 0)
2111 {
2112 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2113 rrd_strerror(errno));
2114 return (-1);
2115 }
2117 memset (&sa, 0, sizeof (sa));
2118 sa.sun_family = AF_UNIX;
2119 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2121 /* if we've gotten this far, we own the pid file. any daemon started
2122 * with the same args must not be alive. therefore, ensure that we can
2123 * create the socket...
2124 */
2125 unlink(path);
2127 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2128 if (status != 0)
2129 {
2130 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2131 path, rrd_strerror(errno));
2132 close (fd);
2133 return (-1);
2134 }
2136 status = listen (fd, /* backlog = */ 10);
2137 if (status != 0)
2138 {
2139 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2140 path, rrd_strerror(errno));
2141 close (fd);
2142 unlink (path);
2143 return (-1);
2144 }
2146 listen_fds[listen_fds_num].fd = fd;
2147 listen_fds[listen_fds_num].family = PF_UNIX;
2148 strncpy(listen_fds[listen_fds_num].addr, path,
2149 sizeof (listen_fds[listen_fds_num].addr) - 1);
2150 listen_fds_num++;
2152 return (0);
2153 } /* }}} int open_listen_socket_unix */
2155 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2156 {
2157 struct addrinfo ai_hints;
2158 struct addrinfo *ai_res;
2159 struct addrinfo *ai_ptr;
2160 char addr_copy[NI_MAXHOST];
2161 char *addr;
2162 char *port;
2163 int status;
2165 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2166 addr_copy[sizeof (addr_copy) - 1] = 0;
2167 addr = addr_copy;
2169 memset (&ai_hints, 0, sizeof (ai_hints));
2170 ai_hints.ai_flags = 0;
2171 #ifdef AI_ADDRCONFIG
2172 ai_hints.ai_flags |= AI_ADDRCONFIG;
2173 #endif
2174 ai_hints.ai_family = AF_UNSPEC;
2175 ai_hints.ai_socktype = SOCK_STREAM;
2177 port = NULL;
2178 if (*addr == '[') /* IPv6+port format */
2179 {
2180 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2181 addr++;
2183 port = strchr (addr, ']');
2184 if (port == NULL)
2185 {
2186 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2187 return (-1);
2188 }
2189 *port = 0;
2190 port++;
2192 if (*port == ':')
2193 port++;
2194 else if (*port == 0)
2195 port = NULL;
2196 else
2197 {
2198 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2199 return (-1);
2200 }
2201 } /* if (*addr = ']') */
2202 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2203 {
2204 port = rindex(addr, ':');
2205 if (port != NULL)
2206 {
2207 *port = 0;
2208 port++;
2209 }
2210 }
2211 ai_res = NULL;
2212 status = getaddrinfo (addr,
2213 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2214 &ai_hints, &ai_res);
2215 if (status != 0)
2216 {
2217 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2218 addr, gai_strerror (status));
2219 return (-1);
2220 }
2222 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2223 {
2224 int fd;
2225 listen_socket_t *temp;
2226 int one = 1;
2228 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2229 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2230 if (temp == NULL)
2231 {
2232 fprintf (stderr,
2233 "rrdcached: open_listen_socket_network: realloc failed.\n");
2234 continue;
2235 }
2236 listen_fds = temp;
2237 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2239 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2240 if (fd < 0)
2241 {
2242 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2243 rrd_strerror(errno));
2244 continue;
2245 }
2247 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2249 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2250 if (status != 0)
2251 {
2252 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2253 sock->addr, rrd_strerror(errno));
2254 close (fd);
2255 continue;
2256 }
2258 status = listen (fd, /* backlog = */ 10);
2259 if (status != 0)
2260 {
2261 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2262 sock->addr, rrd_strerror(errno));
2263 close (fd);
2264 freeaddrinfo(ai_res);
2265 return (-1);
2266 }
2268 listen_fds[listen_fds_num].fd = fd;
2269 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2270 listen_fds_num++;
2271 } /* for (ai_ptr) */
2273 freeaddrinfo(ai_res);
2274 return (0);
2275 } /* }}} static int open_listen_socket_network */
2277 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2278 {
2279 assert(sock != NULL);
2280 assert(sock->addr != NULL);
2282 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2283 || sock->addr[0] == '/')
2284 return (open_listen_socket_unix(sock));
2285 else
2286 return (open_listen_socket_network(sock));
2287 } /* }}} int open_listen_socket */
2289 static int close_listen_sockets (void) /* {{{ */
2290 {
2291 size_t i;
2293 for (i = 0; i < listen_fds_num; i++)
2294 {
2295 close (listen_fds[i].fd);
2297 if (listen_fds[i].family == PF_UNIX)
2298 unlink(listen_fds[i].addr);
2299 }
2301 free (listen_fds);
2302 listen_fds = NULL;
2303 listen_fds_num = 0;
2305 return (0);
2306 } /* }}} int close_listen_sockets */
2308 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2309 {
2310 struct pollfd *pollfds;
2311 int pollfds_num;
2312 int status;
2313 int i;
2315 if (listen_fds_num < 1)
2316 {
2317 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2318 return (NULL);
2319 }
2321 pollfds_num = listen_fds_num;
2322 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2323 if (pollfds == NULL)
2324 {
2325 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2326 return (NULL);
2327 }
2328 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2330 RRDD_LOG(LOG_INFO, "listening for connections");
2332 while (do_shutdown == 0)
2333 {
2334 for (i = 0; i < pollfds_num; i++)
2335 {
2336 pollfds[i].fd = listen_fds[i].fd;
2337 pollfds[i].events = POLLIN | POLLPRI;
2338 pollfds[i].revents = 0;
2339 }
2341 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2342 if (do_shutdown)
2343 break;
2344 else if (status == 0) /* timeout */
2345 continue;
2346 else if (status < 0) /* error */
2347 {
2348 status = errno;
2349 if (status != EINTR)
2350 {
2351 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2352 }
2353 continue;
2354 }
2356 for (i = 0; i < pollfds_num; i++)
2357 {
2358 listen_socket_t *client_sock;
2359 struct sockaddr_storage client_sa;
2360 socklen_t client_sa_size;
2361 pthread_t tid;
2362 pthread_attr_t attr;
2364 if (pollfds[i].revents == 0)
2365 continue;
2367 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2368 {
2369 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2370 "poll(2) returned something unexpected for listen FD #%i.",
2371 pollfds[i].fd);
2372 continue;
2373 }
2375 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2376 if (client_sock == NULL)
2377 {
2378 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2379 continue;
2380 }
2381 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2383 client_sa_size = sizeof (client_sa);
2384 client_sock->fd = accept (pollfds[i].fd,
2385 (struct sockaddr *) &client_sa, &client_sa_size);
2386 if (client_sock->fd < 0)
2387 {
2388 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2389 free(client_sock);
2390 continue;
2391 }
2393 pthread_attr_init (&attr);
2394 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2396 status = pthread_create (&tid, &attr, connection_thread_main,
2397 client_sock);
2398 if (status != 0)
2399 {
2400 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2401 close_connection(client_sock);
2402 continue;
2403 }
2404 } /* for (pollfds_num) */
2405 } /* while (do_shutdown == 0) */
2407 RRDD_LOG(LOG_INFO, "starting shutdown");
2409 close_listen_sockets ();
2411 pthread_mutex_lock (&connection_threads_lock);
2412 while (connection_threads_num > 0)
2413 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2414 pthread_mutex_unlock (&connection_threads_lock);
2416 free(pollfds);
2418 return (NULL);
2419 } /* }}} void *listen_thread_main */
2421 static int daemonize (void) /* {{{ */
2422 {
2423 int pid_fd;
2424 char *base_dir;
2426 daemon_uid = geteuid();
2428 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2429 if (pid_fd < 0)
2430 pid_fd = check_pidfile();
2431 if (pid_fd < 0)
2432 return pid_fd;
2434 /* open all the listen sockets */
2435 if (config_listen_address_list_len > 0)
2436 {
2437 for (int i = 0; i < config_listen_address_list_len; i++)
2438 {
2439 open_listen_socket (config_listen_address_list[i]);
2440 free_listen_socket (config_listen_address_list[i]);
2441 }
2443 free(config_listen_address_list);
2444 }
2445 else
2446 {
2447 listen_socket_t sock;
2448 memset(&sock, 0, sizeof(sock));
2449 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2450 open_listen_socket (&sock);
2451 }
2453 if (listen_fds_num < 1)
2454 {
2455 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2456 goto error;
2457 }
2459 if (!stay_foreground)
2460 {
2461 pid_t child;
2463 child = fork ();
2464 if (child < 0)
2465 {
2466 fprintf (stderr, "daemonize: fork(2) failed.\n");
2467 goto error;
2468 }
2469 else if (child > 0)
2470 exit(0);
2472 /* Become session leader */
2473 setsid ();
2475 /* Open the first three file descriptors to /dev/null */
2476 close (2);
2477 close (1);
2478 close (0);
2480 open ("/dev/null", O_RDWR);
2481 dup (0);
2482 dup (0);
2483 } /* if (!stay_foreground) */
2485 /* Change into the /tmp directory. */
2486 base_dir = (config_base_dir != NULL)
2487 ? config_base_dir
2488 : "/tmp";
2490 if (chdir (base_dir) != 0)
2491 {
2492 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2493 goto error;
2494 }
2496 install_signal_handlers();
2498 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2499 RRDD_LOG(LOG_INFO, "starting up");
2501 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2502 (GDestroyNotify) free_cache_item);
2503 if (cache_tree == NULL)
2504 {
2505 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2506 goto error;
2507 }
2509 return write_pidfile (pid_fd);
2511 error:
2512 remove_pidfile();
2513 return -1;
2514 } /* }}} int daemonize */
2516 static int cleanup (void) /* {{{ */
2517 {
2518 do_shutdown++;
2520 pthread_cond_broadcast (&flush_cond);
2521 pthread_join (flush_thread, NULL);
2523 pthread_cond_broadcast (&queue_cond);
2524 for (int i = 0; i < config_queue_threads; i++)
2525 pthread_join (queue_threads[i], NULL);
2527 if (config_flush_at_shutdown)
2528 {
2529 assert(cache_queue_head == NULL);
2530 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2531 }
2533 journal_done();
2534 remove_pidfile ();
2536 free(queue_threads);
2537 free(config_base_dir);
2538 free(config_pid_file);
2539 free(journal_cur);
2540 free(journal_old);
2542 pthread_mutex_lock(&cache_lock);
2543 g_tree_destroy(cache_tree);
2545 RRDD_LOG(LOG_INFO, "goodbye");
2546 closelog ();
2548 return (0);
2549 } /* }}} int cleanup */
2551 static int read_options (int argc, char **argv) /* {{{ */
2552 {
2553 int option;
2554 int status = 0;
2556 while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2557 {
2558 switch (option)
2559 {
2560 case 'g':
2561 stay_foreground=1;
2562 break;
2564 case 'L':
2565 case 'l':
2566 {
2567 listen_socket_t **temp;
2568 listen_socket_t *new;
2570 new = malloc(sizeof(listen_socket_t));
2571 if (new == NULL)
2572 {
2573 fprintf(stderr, "read_options: malloc failed.\n");
2574 return(2);
2575 }
2576 memset(new, 0, sizeof(listen_socket_t));
2578 temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2579 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2580 if (temp == NULL)
2581 {
2582 fprintf (stderr, "read_options: realloc failed.\n");
2583 return (2);
2584 }
2585 config_listen_address_list = temp;
2587 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2588 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2590 temp[config_listen_address_list_len] = new;
2591 config_listen_address_list_len++;
2592 }
2593 break;
2595 case 'f':
2596 {
2597 int temp;
2599 temp = atoi (optarg);
2600 if (temp > 0)
2601 config_flush_interval = temp;
2602 else
2603 {
2604 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2605 status = 3;
2606 }
2607 }
2608 break;
2610 case 'w':
2611 {
2612 int temp;
2614 temp = atoi (optarg);
2615 if (temp > 0)
2616 config_write_interval = temp;
2617 else
2618 {
2619 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2620 status = 2;
2621 }
2622 }
2623 break;
2625 case 'z':
2626 {
2627 int temp;
2629 temp = atoi(optarg);
2630 if (temp > 0)
2631 config_write_jitter = temp;
2632 else
2633 {
2634 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2635 status = 2;
2636 }
2638 break;
2639 }
2641 case 't':
2642 {
2643 int threads;
2644 threads = atoi(optarg);
2645 if (threads >= 1)
2646 config_queue_threads = threads;
2647 else
2648 {
2649 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2650 return 1;
2651 }
2652 }
2653 break;
2655 case 'B':
2656 config_write_base_only = 1;
2657 break;
2659 case 'b':
2660 {
2661 size_t len;
2662 char base_realpath[PATH_MAX];
2664 if (config_base_dir != NULL)
2665 free (config_base_dir);
2666 config_base_dir = strdup (optarg);
2667 if (config_base_dir == NULL)
2668 {
2669 fprintf (stderr, "read_options: strdup failed.\n");
2670 return (3);
2671 }
2673 /* make sure that the base directory is not resolved via
2674 * symbolic links. this makes some performance-enhancing
2675 * assumptions possible (we don't have to resolve paths
2676 * that start with a "/")
2677 */
2678 if (realpath(config_base_dir, base_realpath) == NULL)
2679 {
2680 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2681 return 5;
2682 }
2683 else if (strncmp(config_base_dir,
2684 base_realpath, sizeof(base_realpath)) != 0)
2685 {
2686 fprintf(stderr,
2687 "Base directory (-b) resolved via file system links!\n"
2688 "Please consult rrdcached '-b' documentation!\n"
2689 "Consider specifying the real directory (%s)\n",
2690 base_realpath);
2691 return 5;
2692 }
2694 len = strlen (config_base_dir);
2695 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2696 {
2697 config_base_dir[len - 1] = 0;
2698 len--;
2699 }
2701 if (len < 1)
2702 {
2703 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2704 return (4);
2705 }
2707 _config_base_dir_len = len;
2708 }
2709 break;
2711 case 'p':
2712 {
2713 if (config_pid_file != NULL)
2714 free (config_pid_file);
2715 config_pid_file = strdup (optarg);
2716 if (config_pid_file == NULL)
2717 {
2718 fprintf (stderr, "read_options: strdup failed.\n");
2719 return (3);
2720 }
2721 }
2722 break;
2724 case 'F':
2725 config_flush_at_shutdown = 1;
2726 break;
2728 case 'j':
2729 {
2730 struct stat statbuf;
2731 const char *dir = optarg;
2733 status = stat(dir, &statbuf);
2734 if (status != 0)
2735 {
2736 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2737 return 6;
2738 }
2740 if (!S_ISDIR(statbuf.st_mode)
2741 || access(dir, R_OK|W_OK|X_OK) != 0)
2742 {
2743 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2744 errno ? rrd_strerror(errno) : "");
2745 return 6;
2746 }
2748 journal_cur = malloc(PATH_MAX + 1);
2749 journal_old = malloc(PATH_MAX + 1);
2750 if (journal_cur == NULL || journal_old == NULL)
2751 {
2752 fprintf(stderr, "malloc failure for journal files\n");
2753 return 6;
2754 }
2755 else
2756 {
2757 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2758 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2759 }
2760 }
2761 break;
2763 case 'h':
2764 case '?':
2765 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2766 "\n"
2767 "Usage: rrdcached [options]\n"
2768 "\n"
2769 "Valid options are:\n"
2770 " -l <address> Socket address to listen to.\n"
2771 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2772 " -w <seconds> Interval in which to write data.\n"
2773 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2774 " -t <threads> Number of write threads.\n"
2775 " -f <seconds> Interval in which to flush dead data.\n"
2776 " -p <file> Location of the PID-file.\n"
2777 " -b <dir> Base directory to change to.\n"
2778 " -B Restrict file access to paths within -b <dir>\n"
2779 " -g Do not fork and run in the foreground.\n"
2780 " -j <dir> Directory in which to create the journal files.\n"
2781 " -F Always flush all updates at shutdown\n"
2782 "\n"
2783 "For more information and a detailed description of all options "
2784 "please refer\n"
2785 "to the rrdcached(1) manual page.\n",
2786 VERSION);
2787 status = -1;
2788 break;
2789 } /* switch (option) */
2790 } /* while (getopt) */
2792 /* advise the user when values are not sane */
2793 if (config_flush_interval < 2 * config_write_interval)
2794 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2795 " 2x write interval (-w) !\n");
2796 if (config_write_jitter > config_write_interval)
2797 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2798 " write interval (-w) !\n");
2800 if (config_write_base_only && config_base_dir == NULL)
2801 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2802 " Consult the rrdcached documentation\n");
2804 if (journal_cur == NULL)
2805 config_flush_at_shutdown = 1;
2807 return (status);
2808 } /* }}} int read_options */
2810 int main (int argc, char **argv)
2811 {
2812 int status;
2814 status = read_options (argc, argv);
2815 if (status != 0)
2816 {
2817 if (status < 0)
2818 status = 0;
2819 return (status);
2820 }
2822 status = daemonize ();
2823 if (status != 0)
2824 {
2825 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2826 return (1);
2827 }
2829 journal_init();
2831 /* start the queue threads */
2832 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2833 if (queue_threads == NULL)
2834 {
2835 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2836 cleanup();
2837 return (1);
2838 }
2839 for (int i = 0; i < config_queue_threads; i++)
2840 {
2841 memset (&queue_threads[i], 0, sizeof (*queue_threads));
2842 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2843 if (status != 0)
2844 {
2845 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2846 cleanup();
2847 return (1);
2848 }
2849 }
2851 /* start the flush thread */
2852 memset(&flush_thread, 0, sizeof(flush_thread));
2853 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2854 if (status != 0)
2855 {
2856 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2857 cleanup();
2858 return (1);
2859 }
2861 listen_thread_main (NULL);
2862 cleanup ();
2864 return (0);
2865 } /* int main */
2867 /*
2868 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2869 */