16b5df59ebf769d8d4884d5a885feb339347558d
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 # include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
109 #include <glib-2.0/glib.h>
110 /* }}} */
112 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
114 #ifndef __GNUC__
115 # define __attribute__(x) /**/
116 #endif
118 /*
119 * Types
120 */
121 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
123 struct listen_socket_s
124 {
125 int fd;
126 char addr[PATH_MAX + 1];
127 int family;
129 /* state for BATCH processing */
130 time_t batch_start;
131 int batch_cmd;
133 /* buffered IO */
134 char *rbuf;
135 off_t next_cmd;
136 off_t next_read;
138 char *wbuf;
139 ssize_t wbuf_len;
141 uint32_t permissions;
142 };
143 typedef struct listen_socket_s listen_socket_t;
145 struct command_s;
146 typedef struct command_s command_t;
147 /* note: guard against "unused" warnings in the handlers */
148 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
149 time_t now __attribute__((unused)),\
150 char *buffer __attribute__((unused)),\
151 size_t buffer_size __attribute__((unused))
153 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
154 DISPATCH_PROTO
156 struct command_s {
157 char *cmd;
158 int (*handler)(HANDLER_PROTO);
160 char context; /* where we expect to see it */
161 #define CMD_CONTEXT_CLIENT (1<<0)
162 #define CMD_CONTEXT_BATCH (1<<1)
163 #define CMD_CONTEXT_JOURNAL (1<<2)
164 #define CMD_CONTEXT_ANY (0x7f)
166 char *syntax;
167 char *help;
168 };
170 struct cache_item_s;
171 typedef struct cache_item_s cache_item_t;
172 struct cache_item_s
173 {
174 char *file;
175 char **values;
176 size_t values_num;
177 time_t last_flush_time;
178 time_t last_update_stamp;
179 #define CI_FLAGS_IN_TREE (1<<0)
180 #define CI_FLAGS_IN_QUEUE (1<<1)
181 int flags;
182 pthread_cond_t flushed;
183 cache_item_t *prev;
184 cache_item_t *next;
185 };
187 struct callback_flush_data_s
188 {
189 time_t now;
190 time_t abs_timeout;
191 char **keys;
192 size_t keys_num;
193 };
194 typedef struct callback_flush_data_s callback_flush_data_t;
196 enum queue_side_e
197 {
198 HEAD,
199 TAIL
200 };
201 typedef enum queue_side_e queue_side_t;
203 /* describe a set of journal files */
204 typedef struct {
205 char **files;
206 size_t files_num;
207 } journal_set;
209 /* max length of socket command or response */
210 #define CMD_MAX 4096
211 #define RBUF_SIZE (CMD_MAX*2)
213 /*
214 * Variables
215 */
216 static int stay_foreground = 0;
217 static uid_t daemon_uid;
219 static listen_socket_t *listen_fds = NULL;
220 static size_t listen_fds_num = 0;
222 enum {
223 RUNNING, /* normal operation */
224 FLUSHING, /* flushing remaining values */
225 SHUTDOWN /* shutting down */
226 } state = RUNNING;
228 static pthread_t *queue_threads;
229 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
230 static int config_queue_threads = 4;
232 static pthread_t flush_thread;
233 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
235 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
236 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
237 static int connection_threads_num = 0;
239 /* Cache stuff */
240 static GTree *cache_tree = NULL;
241 static cache_item_t *cache_queue_head = NULL;
242 static cache_item_t *cache_queue_tail = NULL;
243 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
245 static int config_write_interval = 300;
246 static int config_write_jitter = 0;
247 static int config_flush_interval = 3600;
248 static int config_flush_at_shutdown = 0;
249 static char *config_pid_file = NULL;
250 static char *config_base_dir = NULL;
251 static size_t _config_base_dir_len = 0;
252 static int config_write_base_only = 0;
254 static listen_socket_t **config_listen_address_list = NULL;
255 static size_t config_listen_address_list_len = 0;
257 static uint64_t stats_queue_length = 0;
258 static uint64_t stats_updates_received = 0;
259 static uint64_t stats_flush_received = 0;
260 static uint64_t stats_updates_written = 0;
261 static uint64_t stats_data_sets_written = 0;
262 static uint64_t stats_journal_bytes = 0;
263 static uint64_t stats_journal_rotate = 0;
264 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
266 /* Journaled updates */
267 #define JOURNAL_BASE "rrd.journal"
268 static journal_set *journal_cur = NULL;
269 static journal_set *journal_old = NULL;
270 static char *journal_dir = NULL;
271 static FILE *journal_fh = NULL; /* current journal file handle */
272 static long journal_size = 0; /* current journal size */
273 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
274 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
275 static int journal_write(char *cmd, char *args);
276 static void journal_done(void);
277 static void journal_rotate(void);
279 /* prototypes for forward refernces */
280 static int handle_request_help (HANDLER_PROTO);
282 /*
283 * Functions
284 */
285 static void sig_common (const char *sig) /* {{{ */
286 {
287 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
288 state = FLUSHING;
289 pthread_cond_broadcast(&flush_cond);
290 pthread_cond_broadcast(&queue_cond);
291 } /* }}} void sig_common */
293 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
294 {
295 sig_common("INT");
296 } /* }}} void sig_int_handler */
298 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
299 {
300 sig_common("TERM");
301 } /* }}} void sig_term_handler */
303 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
304 {
305 config_flush_at_shutdown = 1;
306 sig_common("USR1");
307 } /* }}} void sig_usr1_handler */
309 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
310 {
311 config_flush_at_shutdown = 0;
312 sig_common("USR2");
313 } /* }}} void sig_usr2_handler */
315 static void install_signal_handlers(void) /* {{{ */
316 {
317 /* These structures are static, because `sigaction' behaves weird if the are
318 * overwritten.. */
319 static struct sigaction sa_int;
320 static struct sigaction sa_term;
321 static struct sigaction sa_pipe;
322 static struct sigaction sa_usr1;
323 static struct sigaction sa_usr2;
325 /* Install signal handlers */
326 memset (&sa_int, 0, sizeof (sa_int));
327 sa_int.sa_handler = sig_int_handler;
328 sigaction (SIGINT, &sa_int, NULL);
330 memset (&sa_term, 0, sizeof (sa_term));
331 sa_term.sa_handler = sig_term_handler;
332 sigaction (SIGTERM, &sa_term, NULL);
334 memset (&sa_pipe, 0, sizeof (sa_pipe));
335 sa_pipe.sa_handler = SIG_IGN;
336 sigaction (SIGPIPE, &sa_pipe, NULL);
338 memset (&sa_pipe, 0, sizeof (sa_usr1));
339 sa_usr1.sa_handler = sig_usr1_handler;
340 sigaction (SIGUSR1, &sa_usr1, NULL);
342 memset (&sa_usr2, 0, sizeof (sa_usr2));
343 sa_usr2.sa_handler = sig_usr2_handler;
344 sigaction (SIGUSR2, &sa_usr2, NULL);
346 } /* }}} void install_signal_handlers */
348 static int open_pidfile(char *action, int oflag) /* {{{ */
349 {
350 int fd;
351 char *file;
353 file = (config_pid_file != NULL)
354 ? config_pid_file
355 : LOCALSTATEDIR "/run/rrdcached.pid";
357 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
358 if (fd < 0)
359 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
360 action, file, rrd_strerror(errno));
362 return(fd);
363 } /* }}} static int open_pidfile */
365 /* check existing pid file to see whether a daemon is running */
366 static int check_pidfile(void)
367 {
368 int pid_fd;
369 pid_t pid;
370 char pid_str[16];
372 pid_fd = open_pidfile("open", O_RDWR);
373 if (pid_fd < 0)
374 return pid_fd;
376 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
377 return -1;
379 pid = atoi(pid_str);
380 if (pid <= 0)
381 return -1;
383 /* another running process that we can signal COULD be
384 * a competing rrdcached */
385 if (pid != getpid() && kill(pid, 0) == 0)
386 {
387 fprintf(stderr,
388 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
389 close(pid_fd);
390 return -1;
391 }
393 lseek(pid_fd, 0, SEEK_SET);
394 if (ftruncate(pid_fd, 0) == -1)
395 {
396 fprintf(stderr,
397 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
398 close(pid_fd);
399 return -1;
400 }
402 fprintf(stderr,
403 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
404 "rrdcached: starting normally.\n", pid);
406 return pid_fd;
407 } /* }}} static int check_pidfile */
409 static int write_pidfile (int fd) /* {{{ */
410 {
411 pid_t pid;
412 FILE *fh;
414 pid = getpid ();
416 fh = fdopen (fd, "w");
417 if (fh == NULL)
418 {
419 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
420 close(fd);
421 return (-1);
422 }
424 fprintf (fh, "%i\n", (int) pid);
425 fclose (fh);
427 return (0);
428 } /* }}} int write_pidfile */
430 static int remove_pidfile (void) /* {{{ */
431 {
432 char *file;
433 int status;
435 file = (config_pid_file != NULL)
436 ? config_pid_file
437 : LOCALSTATEDIR "/run/rrdcached.pid";
439 status = unlink (file);
440 if (status == 0)
441 return (0);
442 return (errno);
443 } /* }}} int remove_pidfile */
445 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
446 {
447 char *eol;
449 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
450 sock->next_read - sock->next_cmd);
452 if (eol == NULL)
453 {
454 /* no commands left, move remainder back to front of rbuf */
455 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
456 sock->next_read - sock->next_cmd);
457 sock->next_read -= sock->next_cmd;
458 sock->next_cmd = 0;
459 *len = 0;
460 return NULL;
461 }
462 else
463 {
464 char *cmd = sock->rbuf + sock->next_cmd;
465 *eol = '\0';
467 sock->next_cmd = eol - sock->rbuf + 1;
469 if (eol > sock->rbuf && *(eol-1) == '\r')
470 *(--eol) = '\0'; /* handle "\r\n" EOL */
472 *len = eol - cmd;
474 return cmd;
475 }
477 /* NOTREACHED */
478 assert(1==0);
479 } /* }}} char *next_cmd */
481 /* add the characters directly to the write buffer */
482 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
483 {
484 char *new_buf;
486 assert(sock != NULL);
488 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
489 if (new_buf == NULL)
490 {
491 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
492 return -1;
493 }
495 strncpy(new_buf + sock->wbuf_len, str, len + 1);
497 sock->wbuf = new_buf;
498 sock->wbuf_len += len;
500 return 0;
501 } /* }}} static int add_to_wbuf */
503 /* add the text to the "extra" info that's sent after the status line */
504 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
505 {
506 va_list argp;
507 char buffer[CMD_MAX];
508 int len;
510 if (sock == NULL) return 0; /* journal replay mode */
511 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
513 va_start(argp, fmt);
514 #ifdef HAVE_VSNPRINTF
515 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
516 #else
517 len = vsprintf(buffer, fmt, argp);
518 #endif
519 va_end(argp);
520 if (len < 0)
521 {
522 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
523 return -1;
524 }
526 return add_to_wbuf(sock, buffer, len);
527 } /* }}} static int add_response_info */
529 static int count_lines(char *str) /* {{{ */
530 {
531 int lines = 0;
533 if (str != NULL)
534 {
535 while ((str = strchr(str, '\n')) != NULL)
536 {
537 ++lines;
538 ++str;
539 }
540 }
542 return lines;
543 } /* }}} static int count_lines */
545 /* send the response back to the user.
546 * returns 0 on success, -1 on error
547 * write buffer is always zeroed after this call */
548 static int send_response (listen_socket_t *sock, response_code rc,
549 char *fmt, ...) /* {{{ */
550 {
551 va_list argp;
552 char buffer[CMD_MAX];
553 int lines;
554 ssize_t wrote;
555 int rclen, len;
557 if (sock == NULL) return rc; /* journal replay mode */
559 if (sock->batch_start)
560 {
561 if (rc == RESP_OK)
562 return rc; /* no response on success during BATCH */
563 lines = sock->batch_cmd;
564 }
565 else if (rc == RESP_OK)
566 lines = count_lines(sock->wbuf);
567 else
568 lines = -1;
570 rclen = sprintf(buffer, "%d ", lines);
571 va_start(argp, fmt);
572 #ifdef HAVE_VSNPRINTF
573 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
574 #else
575 len = vsprintf(buffer+rclen, fmt, argp);
576 #endif
577 va_end(argp);
578 if (len < 0)
579 return -1;
581 len += rclen;
583 /* append the result to the wbuf, don't write to the user */
584 if (sock->batch_start)
585 return add_to_wbuf(sock, buffer, len);
587 /* first write must be complete */
588 if (len != write(sock->fd, buffer, len))
589 {
590 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
591 return -1;
592 }
594 if (sock->wbuf != NULL && rc == RESP_OK)
595 {
596 wrote = 0;
597 while (wrote < sock->wbuf_len)
598 {
599 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
600 if (wb <= 0)
601 {
602 RRDD_LOG(LOG_INFO, "send_response: could not write results");
603 return -1;
604 }
605 wrote += wb;
606 }
607 }
609 free(sock->wbuf); sock->wbuf = NULL;
610 sock->wbuf_len = 0;
612 return 0;
613 } /* }}} */
615 static void wipe_ci_values(cache_item_t *ci, time_t when)
616 {
617 ci->values = NULL;
618 ci->values_num = 0;
620 ci->last_flush_time = when;
621 if (config_write_jitter > 0)
622 ci->last_flush_time += (rrd_random() % config_write_jitter);
623 }
625 /* remove_from_queue
626 * remove a "cache_item_t" item from the queue.
627 * must hold 'cache_lock' when calling this
628 */
629 static void remove_from_queue(cache_item_t *ci) /* {{{ */
630 {
631 if (ci == NULL) return;
632 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
634 if (ci->prev == NULL)
635 cache_queue_head = ci->next; /* reset head */
636 else
637 ci->prev->next = ci->next;
639 if (ci->next == NULL)
640 cache_queue_tail = ci->prev; /* reset the tail */
641 else
642 ci->next->prev = ci->prev;
644 ci->next = ci->prev = NULL;
645 ci->flags &= ~CI_FLAGS_IN_QUEUE;
647 pthread_mutex_lock (&stats_lock);
648 assert (stats_queue_length > 0);
649 stats_queue_length--;
650 pthread_mutex_unlock (&stats_lock);
652 } /* }}} static void remove_from_queue */
654 /* free the resources associated with the cache_item_t
655 * must hold cache_lock when calling this function
656 */
657 static void *free_cache_item(cache_item_t *ci) /* {{{ */
658 {
659 if (ci == NULL) return NULL;
661 remove_from_queue(ci);
663 for (size_t i=0; i < ci->values_num; i++)
664 free(ci->values[i]);
666 free (ci->values);
667 free (ci->file);
669 /* in case anyone is waiting */
670 pthread_cond_broadcast(&ci->flushed);
671 pthread_cond_destroy(&ci->flushed);
673 free (ci);
675 return NULL;
676 } /* }}} static void *free_cache_item */
678 /*
679 * enqueue_cache_item:
680 * `cache_lock' must be acquired before calling this function!
681 */
682 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
683 queue_side_t side)
684 {
685 if (ci == NULL)
686 return (-1);
688 if (ci->values_num == 0)
689 return (0);
691 if (side == HEAD)
692 {
693 if (cache_queue_head == ci)
694 return 0;
696 /* remove if further down in queue */
697 remove_from_queue(ci);
699 ci->prev = NULL;
700 ci->next = cache_queue_head;
701 if (ci->next != NULL)
702 ci->next->prev = ci;
703 cache_queue_head = ci;
705 if (cache_queue_tail == NULL)
706 cache_queue_tail = cache_queue_head;
707 }
708 else /* (side == TAIL) */
709 {
710 /* We don't move values back in the list.. */
711 if (ci->flags & CI_FLAGS_IN_QUEUE)
712 return (0);
714 assert (ci->next == NULL);
715 assert (ci->prev == NULL);
717 ci->prev = cache_queue_tail;
719 if (cache_queue_tail == NULL)
720 cache_queue_head = ci;
721 else
722 cache_queue_tail->next = ci;
724 cache_queue_tail = ci;
725 }
727 ci->flags |= CI_FLAGS_IN_QUEUE;
729 pthread_cond_signal(&queue_cond);
730 pthread_mutex_lock (&stats_lock);
731 stats_queue_length++;
732 pthread_mutex_unlock (&stats_lock);
734 return (0);
735 } /* }}} int enqueue_cache_item */
737 /*
738 * tree_callback_flush:
739 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
740 * while this is in progress.
741 */
742 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
743 gpointer data)
744 {
745 cache_item_t *ci;
746 callback_flush_data_t *cfd;
748 ci = (cache_item_t *) value;
749 cfd = (callback_flush_data_t *) data;
751 if (ci->flags & CI_FLAGS_IN_QUEUE)
752 return FALSE;
754 if (ci->values_num > 0
755 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
756 {
757 enqueue_cache_item (ci, TAIL);
758 }
759 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
760 && (ci->values_num <= 0))
761 {
762 assert ((char *) key == ci->file);
763 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
764 {
765 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
766 return (FALSE);
767 }
768 }
770 return (FALSE);
771 } /* }}} gboolean tree_callback_flush */
773 static int flush_old_values (int max_age)
774 {
775 callback_flush_data_t cfd;
776 size_t k;
778 memset (&cfd, 0, sizeof (cfd));
779 /* Pass the current time as user data so that we don't need to call
780 * `time' for each node. */
781 cfd.now = time (NULL);
782 cfd.keys = NULL;
783 cfd.keys_num = 0;
785 if (max_age > 0)
786 cfd.abs_timeout = cfd.now - max_age;
787 else
788 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
790 /* `tree_callback_flush' will return the keys of all values that haven't
791 * been touched in the last `config_flush_interval' seconds in `cfd'.
792 * The char*'s in this array point to the same memory as ci->file, so we
793 * don't need to free them separately. */
794 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
796 for (k = 0; k < cfd.keys_num; k++)
797 {
798 /* should never fail, since we have held the cache_lock
799 * the entire time */
800 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
801 }
803 if (cfd.keys != NULL)
804 {
805 free (cfd.keys);
806 cfd.keys = NULL;
807 }
809 return (0);
810 } /* int flush_old_values */
812 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
813 {
814 struct timeval now;
815 struct timespec next_flush;
816 int status;
818 gettimeofday (&now, NULL);
819 next_flush.tv_sec = now.tv_sec + config_flush_interval;
820 next_flush.tv_nsec = 1000 * now.tv_usec;
822 pthread_mutex_lock(&cache_lock);
824 while (state == RUNNING)
825 {
826 gettimeofday (&now, NULL);
827 if ((now.tv_sec > next_flush.tv_sec)
828 || ((now.tv_sec == next_flush.tv_sec)
829 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
830 {
831 RRDD_LOG(LOG_DEBUG, "flushing old values");
833 /* Determine the time of the next cache flush. */
834 next_flush.tv_sec = now.tv_sec + config_flush_interval;
836 /* Flush all values that haven't been written in the last
837 * `config_write_interval' seconds. */
838 flush_old_values (config_write_interval);
840 /* unlock the cache while we rotate so we don't block incoming
841 * updates if the fsync() blocks on disk I/O */
842 pthread_mutex_unlock(&cache_lock);
843 journal_rotate();
844 pthread_mutex_lock(&cache_lock);
845 }
847 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
848 if (status != 0 && status != ETIMEDOUT)
849 {
850 RRDD_LOG (LOG_ERR, "flush_thread_main: "
851 "pthread_cond_timedwait returned %i.", status);
852 }
853 }
855 if (config_flush_at_shutdown)
856 flush_old_values (-1); /* flush everything */
858 state = SHUTDOWN;
860 pthread_mutex_unlock(&cache_lock);
862 return NULL;
863 } /* void *flush_thread_main */
865 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
866 {
867 pthread_mutex_lock (&cache_lock);
869 while (state != SHUTDOWN
870 || (cache_queue_head != NULL && config_flush_at_shutdown))
871 {
872 cache_item_t *ci;
873 char *file;
874 char **values;
875 size_t values_num;
876 int status;
878 /* Now, check if there's something to store away. If not, wait until
879 * something comes in. */
880 if (cache_queue_head == NULL)
881 {
882 status = pthread_cond_wait (&queue_cond, &cache_lock);
883 if ((status != 0) && (status != ETIMEDOUT))
884 {
885 RRDD_LOG (LOG_ERR, "queue_thread_main: "
886 "pthread_cond_wait returned %i.", status);
887 }
888 }
890 /* Check if a value has arrived. This may be NULL if we timed out or there
891 * was an interrupt such as a signal. */
892 if (cache_queue_head == NULL)
893 continue;
895 ci = cache_queue_head;
897 /* copy the relevant parts */
898 file = strdup (ci->file);
899 if (file == NULL)
900 {
901 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
902 continue;
903 }
905 assert(ci->values != NULL);
906 assert(ci->values_num > 0);
908 values = ci->values;
909 values_num = ci->values_num;
911 wipe_ci_values(ci, time(NULL));
912 remove_from_queue(ci);
914 pthread_mutex_unlock (&cache_lock);
916 rrd_clear_error ();
917 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
918 if (status != 0)
919 {
920 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
921 "rrd_update_r (%s) failed with status %i. (%s)",
922 file, status, rrd_get_error());
923 }
925 journal_write("wrote", file);
927 /* Search again in the tree. It's possible someone issued a "FORGET"
928 * while we were writing the update values. */
929 pthread_mutex_lock(&cache_lock);
930 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
931 if (ci)
932 pthread_cond_broadcast(&ci->flushed);
933 pthread_mutex_unlock(&cache_lock);
935 if (status == 0)
936 {
937 pthread_mutex_lock (&stats_lock);
938 stats_updates_written++;
939 stats_data_sets_written += values_num;
940 pthread_mutex_unlock (&stats_lock);
941 }
943 rrd_free_ptrs((void ***) &values, &values_num);
944 free(file);
946 pthread_mutex_lock (&cache_lock);
947 }
948 pthread_mutex_unlock (&cache_lock);
950 return (NULL);
951 } /* }}} void *queue_thread_main */
953 static int buffer_get_field (char **buffer_ret, /* {{{ */
954 size_t *buffer_size_ret, char **field_ret)
955 {
956 char *buffer;
957 size_t buffer_pos;
958 size_t buffer_size;
959 char *field;
960 size_t field_size;
961 int status;
963 buffer = *buffer_ret;
964 buffer_pos = 0;
965 buffer_size = *buffer_size_ret;
966 field = *buffer_ret;
967 field_size = 0;
969 if (buffer_size <= 0)
970 return (-1);
972 /* This is ensured by `handle_request'. */
973 assert (buffer[buffer_size - 1] == '\0');
975 status = -1;
976 while (buffer_pos < buffer_size)
977 {
978 /* Check for end-of-field or end-of-buffer */
979 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
980 {
981 field[field_size] = 0;
982 field_size++;
983 buffer_pos++;
984 status = 0;
985 break;
986 }
987 /* Handle escaped characters. */
988 else if (buffer[buffer_pos] == '\\')
989 {
990 if (buffer_pos >= (buffer_size - 1))
991 break;
992 buffer_pos++;
993 field[field_size] = buffer[buffer_pos];
994 field_size++;
995 buffer_pos++;
996 }
997 /* Normal operation */
998 else
999 {
1000 field[field_size] = buffer[buffer_pos];
1001 field_size++;
1002 buffer_pos++;
1003 }
1004 } /* while (buffer_pos < buffer_size) */
1006 if (status != 0)
1007 return (status);
1009 *buffer_ret = buffer + buffer_pos;
1010 *buffer_size_ret = buffer_size - buffer_pos;
1011 *field_ret = field;
1013 return (0);
1014 } /* }}} int buffer_get_field */
1016 /* if we're restricting writes to the base directory,
1017 * check whether the file falls within the dir
1018 * returns 1 if OK, otherwise 0
1019 */
1020 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1021 {
1022 assert(file != NULL);
1024 if (!config_write_base_only
1025 || sock == NULL /* journal replay */
1026 || config_base_dir == NULL)
1027 return 1;
1029 if (strstr(file, "../") != NULL) goto err;
1031 /* relative paths without "../" are ok */
1032 if (*file != '/') return 1;
1034 /* file must be of the format base + "/" + <1+ char filename> */
1035 if (strlen(file) < _config_base_dir_len + 2) goto err;
1036 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1037 if (*(file + _config_base_dir_len) != '/') goto err;
1039 return 1;
1041 err:
1042 if (sock != NULL && sock->fd >= 0)
1043 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1045 return 0;
1046 } /* }}} static int check_file_access */
1048 /* when using a base dir, convert relative paths to absolute paths.
1049 * if necessary, modifies the "filename" pointer to point
1050 * to the new path created in "tmp". "tmp" is provided
1051 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1052 *
1053 * this allows us to optimize for the expected case (absolute path)
1054 * with a no-op.
1055 */
1056 static void get_abs_path(char **filename, char *tmp)
1057 {
1058 assert(tmp != NULL);
1059 assert(filename != NULL && *filename != NULL);
1061 if (config_base_dir == NULL || **filename == '/')
1062 return;
1064 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1065 *filename = tmp;
1066 } /* }}} static int get_abs_path */
1068 static int flush_file (const char *filename) /* {{{ */
1069 {
1070 cache_item_t *ci;
1072 pthread_mutex_lock (&cache_lock);
1074 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1075 if (ci == NULL)
1076 {
1077 pthread_mutex_unlock (&cache_lock);
1078 return (ENOENT);
1079 }
1081 if (ci->values_num > 0)
1082 {
1083 /* Enqueue at head */
1084 enqueue_cache_item (ci, HEAD);
1085 pthread_cond_wait(&ci->flushed, &cache_lock);
1086 }
1088 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1089 * may have been purged during our cond_wait() */
1091 pthread_mutex_unlock(&cache_lock);
1093 return (0);
1094 } /* }}} int flush_file */
1096 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1097 {
1098 char *err = "Syntax error.\n";
1100 if (cmd && cmd->syntax)
1101 err = cmd->syntax;
1103 return send_response(sock, RESP_ERR, "Usage: %s", err);
1104 } /* }}} static int syntax_error() */
1106 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1107 {
1108 uint64_t copy_queue_length;
1109 uint64_t copy_updates_received;
1110 uint64_t copy_flush_received;
1111 uint64_t copy_updates_written;
1112 uint64_t copy_data_sets_written;
1113 uint64_t copy_journal_bytes;
1114 uint64_t copy_journal_rotate;
1116 uint64_t tree_nodes_number;
1117 uint64_t tree_depth;
1119 pthread_mutex_lock (&stats_lock);
1120 copy_queue_length = stats_queue_length;
1121 copy_updates_received = stats_updates_received;
1122 copy_flush_received = stats_flush_received;
1123 copy_updates_written = stats_updates_written;
1124 copy_data_sets_written = stats_data_sets_written;
1125 copy_journal_bytes = stats_journal_bytes;
1126 copy_journal_rotate = stats_journal_rotate;
1127 pthread_mutex_unlock (&stats_lock);
1129 pthread_mutex_lock (&cache_lock);
1130 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1131 tree_depth = (uint64_t) g_tree_height (cache_tree);
1132 pthread_mutex_unlock (&cache_lock);
1134 add_response_info(sock,
1135 "QueueLength: %"PRIu64"\n", copy_queue_length);
1136 add_response_info(sock,
1137 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1138 add_response_info(sock,
1139 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1140 add_response_info(sock,
1141 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1142 add_response_info(sock,
1143 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1144 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1145 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1146 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1147 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1149 send_response(sock, RESP_OK, "Statistics follow\n");
1151 return (0);
1152 } /* }}} int handle_request_stats */
1154 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1155 {
1156 char *file, file_tmp[PATH_MAX];
1157 int status;
1159 status = buffer_get_field (&buffer, &buffer_size, &file);
1160 if (status != 0)
1161 {
1162 return syntax_error(sock,cmd);
1163 }
1164 else
1165 {
1166 pthread_mutex_lock(&stats_lock);
1167 stats_flush_received++;
1168 pthread_mutex_unlock(&stats_lock);
1170 get_abs_path(&file, file_tmp);
1171 if (!check_file_access(file, sock)) return 0;
1173 status = flush_file (file);
1174 if (status == 0)
1175 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1176 else if (status == ENOENT)
1177 {
1178 /* no file in our tree; see whether it exists at all */
1179 struct stat statbuf;
1181 memset(&statbuf, 0, sizeof(statbuf));
1182 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1183 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1184 else
1185 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1186 }
1187 else if (status < 0)
1188 return send_response(sock, RESP_ERR, "Internal error.\n");
1189 else
1190 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1191 }
1193 /* NOTREACHED */
1194 assert(1==0);
1195 } /* }}} int handle_request_flush */
1197 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1198 {
1199 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1201 pthread_mutex_lock(&cache_lock);
1202 flush_old_values(-1);
1203 pthread_mutex_unlock(&cache_lock);
1205 return send_response(sock, RESP_OK, "Started flush.\n");
1206 } /* }}} static int handle_request_flushall */
1208 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1209 {
1210 int status;
1211 char *file, file_tmp[PATH_MAX];
1212 cache_item_t *ci;
1214 status = buffer_get_field(&buffer, &buffer_size, &file);
1215 if (status != 0)
1216 return syntax_error(sock,cmd);
1218 get_abs_path(&file, file_tmp);
1220 pthread_mutex_lock(&cache_lock);
1221 ci = g_tree_lookup(cache_tree, file);
1222 if (ci == NULL)
1223 {
1224 pthread_mutex_unlock(&cache_lock);
1225 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1226 }
1228 for (size_t i=0; i < ci->values_num; i++)
1229 add_response_info(sock, "%s\n", ci->values[i]);
1231 pthread_mutex_unlock(&cache_lock);
1232 return send_response(sock, RESP_OK, "updates pending\n");
1233 } /* }}} static int handle_request_pending */
1235 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1236 {
1237 int status;
1238 gboolean found;
1239 char *file, file_tmp[PATH_MAX];
1241 status = buffer_get_field(&buffer, &buffer_size, &file);
1242 if (status != 0)
1243 return syntax_error(sock,cmd);
1245 get_abs_path(&file, file_tmp);
1246 if (!check_file_access(file, sock)) return 0;
1248 pthread_mutex_lock(&cache_lock);
1249 found = g_tree_remove(cache_tree, file);
1250 pthread_mutex_unlock(&cache_lock);
1252 if (found == TRUE)
1253 {
1254 if (sock != NULL)
1255 journal_write("forget", file);
1257 return send_response(sock, RESP_OK, "Gone!\n");
1258 }
1259 else
1260 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1262 /* NOTREACHED */
1263 assert(1==0);
1264 } /* }}} static int handle_request_forget */
1266 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1267 {
1268 cache_item_t *ci;
1270 pthread_mutex_lock(&cache_lock);
1272 ci = cache_queue_head;
1273 while (ci != NULL)
1274 {
1275 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1276 ci = ci->next;
1277 }
1279 pthread_mutex_unlock(&cache_lock);
1281 return send_response(sock, RESP_OK, "in queue.\n");
1282 } /* }}} int handle_request_queue */
1284 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1285 {
1286 char *file, file_tmp[PATH_MAX];
1287 int values_num = 0;
1288 int status;
1289 char orig_buf[CMD_MAX];
1291 cache_item_t *ci;
1293 /* save it for the journal later */
1294 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1296 status = buffer_get_field (&buffer, &buffer_size, &file);
1297 if (status != 0)
1298 return syntax_error(sock,cmd);
1300 pthread_mutex_lock(&stats_lock);
1301 stats_updates_received++;
1302 pthread_mutex_unlock(&stats_lock);
1304 get_abs_path(&file, file_tmp);
1305 if (!check_file_access(file, sock)) return 0;
1307 pthread_mutex_lock (&cache_lock);
1308 ci = g_tree_lookup (cache_tree, file);
1310 if (ci == NULL) /* {{{ */
1311 {
1312 struct stat statbuf;
1313 cache_item_t *tmp;
1315 /* don't hold the lock while we setup; stat(2) might block */
1316 pthread_mutex_unlock(&cache_lock);
1318 memset (&statbuf, 0, sizeof (statbuf));
1319 status = stat (file, &statbuf);
1320 if (status != 0)
1321 {
1322 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1324 status = errno;
1325 if (status == ENOENT)
1326 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1327 else
1328 return send_response(sock, RESP_ERR,
1329 "stat failed with error %i.\n", status);
1330 }
1331 if (!S_ISREG (statbuf.st_mode))
1332 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1334 if (access(file, R_OK|W_OK) != 0)
1335 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1336 file, rrd_strerror(errno));
1338 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1339 if (ci == NULL)
1340 {
1341 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1343 return send_response(sock, RESP_ERR, "malloc failed.\n");
1344 }
1345 memset (ci, 0, sizeof (cache_item_t));
1347 ci->file = strdup (file);
1348 if (ci->file == NULL)
1349 {
1350 free (ci);
1351 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1353 return send_response(sock, RESP_ERR, "strdup failed.\n");
1354 }
1356 wipe_ci_values(ci, now);
1357 ci->flags = CI_FLAGS_IN_TREE;
1358 pthread_cond_init(&ci->flushed, NULL);
1360 pthread_mutex_lock(&cache_lock);
1362 /* another UPDATE might have added this entry in the meantime */
1363 tmp = g_tree_lookup (cache_tree, file);
1364 if (tmp == NULL)
1365 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1366 else
1367 {
1368 free_cache_item (ci);
1369 ci = tmp;
1370 }
1372 /* state may have changed while we were unlocked */
1373 if (state == SHUTDOWN)
1374 return -1;
1375 } /* }}} */
1376 assert (ci != NULL);
1378 /* don't re-write updates in replay mode */
1379 if (sock != NULL)
1380 journal_write("update", orig_buf);
1382 while (buffer_size > 0)
1383 {
1384 char *value;
1385 time_t stamp;
1386 char *eostamp;
1388 status = buffer_get_field (&buffer, &buffer_size, &value);
1389 if (status != 0)
1390 {
1391 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1392 break;
1393 }
1395 /* make sure update time is always moving forward */
1396 stamp = strtol(value, &eostamp, 10);
1397 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1398 {
1399 pthread_mutex_unlock(&cache_lock);
1400 return send_response(sock, RESP_ERR,
1401 "Cannot find timestamp in '%s'!\n", value);
1402 }
1403 else if (stamp <= ci->last_update_stamp)
1404 {
1405 pthread_mutex_unlock(&cache_lock);
1406 return send_response(sock, RESP_ERR,
1407 "illegal attempt to update using time %ld when last"
1408 " update time is %ld (minimum one second step)\n",
1409 stamp, ci->last_update_stamp);
1410 }
1411 else
1412 ci->last_update_stamp = stamp;
1414 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1415 {
1416 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1417 continue;
1418 }
1420 values_num++;
1421 }
1423 if (((now - ci->last_flush_time) >= config_write_interval)
1424 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1425 && (ci->values_num > 0))
1426 {
1427 enqueue_cache_item (ci, TAIL);
1428 }
1430 pthread_mutex_unlock (&cache_lock);
1432 if (values_num < 1)
1433 return send_response(sock, RESP_ERR, "No values updated.\n");
1434 else
1435 return send_response(sock, RESP_OK,
1436 "errors, enqueued %i value(s).\n", values_num);
1438 /* NOTREACHED */
1439 assert(1==0);
1441 } /* }}} int handle_request_update */
1443 /* we came across a "WROTE" entry during journal replay.
1444 * throw away any values that we have accumulated for this file
1445 */
1446 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1447 {
1448 cache_item_t *ci;
1449 const char *file = buffer;
1451 pthread_mutex_lock(&cache_lock);
1453 ci = g_tree_lookup(cache_tree, file);
1454 if (ci == NULL)
1455 {
1456 pthread_mutex_unlock(&cache_lock);
1457 return (0);
1458 }
1460 if (ci->values)
1461 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1463 wipe_ci_values(ci, now);
1464 remove_from_queue(ci);
1466 pthread_mutex_unlock(&cache_lock);
1467 return (0);
1468 } /* }}} int handle_request_wrote */
1470 /* start "BATCH" processing */
1471 static int batch_start (HANDLER_PROTO) /* {{{ */
1472 {
1473 int status;
1474 if (sock->batch_start)
1475 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1477 status = send_response(sock, RESP_OK,
1478 "Go ahead. End with dot '.' on its own line.\n");
1479 sock->batch_start = time(NULL);
1480 sock->batch_cmd = 0;
1482 return status;
1483 } /* }}} static int batch_start */
1485 /* finish "BATCH" processing and return results to the client */
1486 static int batch_done (HANDLER_PROTO) /* {{{ */
1487 {
1488 assert(sock->batch_start);
1489 sock->batch_start = 0;
1490 sock->batch_cmd = 0;
1491 return send_response(sock, RESP_OK, "errors\n");
1492 } /* }}} static int batch_done */
1494 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1495 {
1496 return -1;
1497 } /* }}} static int handle_request_quit */
1499 static command_t list_of_commands[] = { /* {{{ */
1500 {
1501 "UPDATE",
1502 handle_request_update,
1503 CMD_CONTEXT_ANY,
1504 "UPDATE <filename> <values> [<values> ...]\n"
1505 ,
1506 "Adds the given file to the internal cache if it is not yet known and\n"
1507 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1508 "for details.\n"
1509 "\n"
1510 "Each <values> has the following form:\n"
1511 " <values> = <time>:<value>[:<value>[...]]\n"
1512 "See the rrdupdate(1) manpage for details.\n"
1513 },
1514 {
1515 "WROTE",
1516 handle_request_wrote,
1517 CMD_CONTEXT_JOURNAL,
1518 NULL,
1519 NULL
1520 },
1521 {
1522 "FLUSH",
1523 handle_request_flush,
1524 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1525 "FLUSH <filename>\n"
1526 ,
1527 "Adds the given filename to the head of the update queue and returns\n"
1528 "after it has been dequeued.\n"
1529 },
1530 {
1531 "FLUSHALL",
1532 handle_request_flushall,
1533 CMD_CONTEXT_CLIENT,
1534 "FLUSHALL\n"
1535 ,
1536 "Triggers writing of all pending updates. Returns immediately.\n"
1537 },
1538 {
1539 "PENDING",
1540 handle_request_pending,
1541 CMD_CONTEXT_CLIENT,
1542 "PENDING <filename>\n"
1543 ,
1544 "Shows any 'pending' updates for a file, in order.\n"
1545 "The updates shown have not yet been written to the underlying RRD file.\n"
1546 },
1547 {
1548 "FORGET",
1549 handle_request_forget,
1550 CMD_CONTEXT_ANY,
1551 "FORGET <filename>\n"
1552 ,
1553 "Removes the file completely from the cache.\n"
1554 "Any pending updates for the file will be lost.\n"
1555 },
1556 {
1557 "QUEUE",
1558 handle_request_queue,
1559 CMD_CONTEXT_CLIENT,
1560 "QUEUE\n"
1561 ,
1562 "Shows all files in the output queue.\n"
1563 "The output is zero or more lines in the following format:\n"
1564 "(where <num_vals> is the number of values to be written)\n"
1565 "\n"
1566 "<num_vals> <filename>\n"
1567 },
1568 {
1569 "STATS",
1570 handle_request_stats,
1571 CMD_CONTEXT_CLIENT,
1572 "STATS\n"
1573 ,
1574 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1575 "a description of the values.\n"
1576 },
1577 {
1578 "HELP",
1579 handle_request_help,
1580 CMD_CONTEXT_CLIENT,
1581 "HELP [<command>]\n",
1582 NULL, /* special! */
1583 },
1584 {
1585 "BATCH",
1586 batch_start,
1587 CMD_CONTEXT_CLIENT,
1588 "BATCH\n"
1589 ,
1590 "The 'BATCH' command permits the client to initiate a bulk load\n"
1591 " of commands to rrdcached.\n"
1592 "\n"
1593 "Usage:\n"
1594 "\n"
1595 " client: BATCH\n"
1596 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1597 " client: command #1\n"
1598 " client: command #2\n"
1599 " client: ... and so on\n"
1600 " client: .\n"
1601 " server: 2 errors\n"
1602 " server: 7 message for command #7\n"
1603 " server: 9 message for command #9\n"
1604 "\n"
1605 "For more information, consult the rrdcached(1) documentation.\n"
1606 },
1607 {
1608 ".", /* BATCH terminator */
1609 batch_done,
1610 CMD_CONTEXT_BATCH,
1611 NULL,
1612 NULL
1613 },
1614 {
1615 "QUIT",
1616 handle_request_quit,
1617 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1618 "QUIT\n"
1619 ,
1620 "Disconnect from rrdcached.\n"
1621 }
1622 }; /* }}} command_t list_of_commands[] */
1623 static size_t list_of_commands_len = sizeof (list_of_commands)
1624 / sizeof (list_of_commands[0]);
1626 static command_t *find_command(char *cmd)
1627 {
1628 size_t i;
1630 for (i = 0; i < list_of_commands_len; i++)
1631 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1632 return (&list_of_commands[i]);
1633 return NULL;
1634 }
1636 /* We currently use the index in the `list_of_commands' array as a bit position
1637 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1638 * outside these functions so that switching to a more elegant storage method
1639 * is easily possible. */
1640 static ssize_t find_command_index (const char *cmd) /* {{{ */
1641 {
1642 size_t i;
1644 for (i = 0; i < list_of_commands_len; i++)
1645 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1646 return ((ssize_t) i);
1647 return (-1);
1648 } /* }}} ssize_t find_command_index */
1650 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1651 const char *cmd)
1652 {
1653 ssize_t i;
1655 if (cmd == NULL)
1656 return (-1);
1658 if ((strcasecmp ("QUIT", cmd) == 0)
1659 || (strcasecmp ("HELP", cmd) == 0))
1660 return (1);
1661 else if (strcmp (".", cmd) == 0)
1662 cmd = "BATCH";
1664 i = find_command_index (cmd);
1665 if (i < 0)
1666 return (-1);
1667 assert (i < 32);
1669 if ((sock->permissions & (1 << i)) != 0)
1670 return (1);
1671 return (0);
1672 } /* }}} int socket_permission_check */
1674 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1675 const char *cmd)
1676 {
1677 ssize_t i;
1679 i = find_command_index (cmd);
1680 if (i < 0)
1681 return (-1);
1682 assert (i < 32);
1684 sock->permissions |= (1 << i);
1685 return (0);
1686 } /* }}} int socket_permission_add */
1688 /* check whether commands are received in the expected context */
1689 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1690 {
1691 if (sock == NULL)
1692 return (cmd->context & CMD_CONTEXT_JOURNAL);
1693 else if (sock->batch_start)
1694 return (cmd->context & CMD_CONTEXT_BATCH);
1695 else
1696 return (cmd->context & CMD_CONTEXT_CLIENT);
1698 /* NOTREACHED */
1699 assert(1==0);
1700 }
1702 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1703 {
1704 int status;
1705 char *cmd_str;
1706 char *resp_txt;
1707 command_t *help = NULL;
1709 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1710 if (status == 0)
1711 help = find_command(cmd_str);
1713 if (help && (help->syntax || help->help))
1714 {
1715 char tmp[CMD_MAX];
1717 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1718 resp_txt = tmp;
1720 if (help->syntax)
1721 add_response_info(sock, "Usage: %s\n", help->syntax);
1723 if (help->help)
1724 add_response_info(sock, "%s\n", help->help);
1725 }
1726 else
1727 {
1728 size_t i;
1730 resp_txt = "Command overview\n";
1732 for (i = 0; i < list_of_commands_len; i++)
1733 {
1734 if (list_of_commands[i].syntax == NULL)
1735 continue;
1736 add_response_info (sock, "%s", list_of_commands[i].syntax);
1737 }
1738 }
1740 return send_response(sock, RESP_OK, resp_txt);
1741 } /* }}} int handle_request_help */
1743 /* if sock==NULL, we are in journal replay mode */
1744 static int handle_request (DISPATCH_PROTO) /* {{{ */
1745 {
1746 char *buffer_ptr = buffer;
1747 char *cmd_str = NULL;
1748 command_t *cmd = NULL;
1749 int status;
1751 assert (buffer[buffer_size - 1] == '\0');
1753 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1754 if (status != 0)
1755 {
1756 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1757 return (-1);
1758 }
1760 if (sock != NULL && sock->batch_start)
1761 sock->batch_cmd++;
1763 cmd = find_command(cmd_str);
1764 if (!cmd)
1765 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1767 if (!socket_permission_check (sock, cmd->cmd))
1768 return send_response(sock, RESP_ERR, "Permission denied.\n");
1770 if (!command_check_context(sock, cmd))
1771 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1773 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1774 } /* }}} int handle_request */
1776 static void journal_set_free (journal_set *js) /* {{{ */
1777 {
1778 if (js == NULL)
1779 return;
1781 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1783 free(js);
1784 } /* }}} journal_set_free */
1786 static void journal_set_remove (journal_set *js) /* {{{ */
1787 {
1788 if (js == NULL)
1789 return;
1791 for (uint i=0; i < js->files_num; i++)
1792 {
1793 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1794 unlink(js->files[i]);
1795 }
1796 } /* }}} journal_set_remove */
1798 /* close current journal file handle.
1799 * MUST hold journal_lock before calling */
1800 static void journal_close(void) /* {{{ */
1801 {
1802 if (journal_fh != NULL)
1803 {
1804 if (fclose(journal_fh) != 0)
1805 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1806 }
1808 journal_fh = NULL;
1809 journal_size = 0;
1810 } /* }}} journal_close */
1812 /* MUST hold journal_lock before calling */
1813 static void journal_new_file(void) /* {{{ */
1814 {
1815 struct timeval now;
1816 int new_fd;
1817 char new_file[PATH_MAX + 1];
1819 assert(journal_dir != NULL);
1820 assert(journal_cur != NULL);
1822 journal_close();
1824 gettimeofday(&now, NULL);
1825 /* this format assures that the files sort in strcmp() order */
1826 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1827 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1829 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1830 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1831 if (new_fd < 0)
1832 goto error;
1834 journal_fh = fdopen(new_fd, "a");
1835 if (journal_fh == NULL)
1836 goto error;
1838 journal_size = ftell(journal_fh);
1839 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1841 /* record the file in the journal set */
1842 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1844 return;
1846 error:
1847 RRDD_LOG(LOG_CRIT,
1848 "JOURNALING DISABLED: Error while trying to create %s : %s",
1849 new_file, rrd_strerror(errno));
1850 RRDD_LOG(LOG_CRIT,
1851 "JOURNALING DISABLED: All values will be flushed at shutdown");
1853 close(new_fd);
1854 config_flush_at_shutdown = 1;
1856 } /* }}} journal_new_file */
1858 /* MUST NOT hold journal_lock before calling this */
1859 static void journal_rotate(void) /* {{{ */
1860 {
1861 journal_set *old_js = NULL;
1863 if (journal_dir == NULL)
1864 return;
1866 RRDD_LOG(LOG_DEBUG, "rotating journals");
1868 pthread_mutex_lock(&stats_lock);
1869 ++stats_journal_rotate;
1870 pthread_mutex_unlock(&stats_lock);
1872 pthread_mutex_lock(&journal_lock);
1874 journal_close();
1876 /* rotate the journal sets */
1877 old_js = journal_old;
1878 journal_old = journal_cur;
1879 journal_cur = calloc(1, sizeof(journal_set));
1881 if (journal_cur != NULL)
1882 journal_new_file();
1883 else
1884 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1886 pthread_mutex_unlock(&journal_lock);
1888 journal_set_remove(old_js);
1889 journal_set_free (old_js);
1891 } /* }}} static void journal_rotate */
1893 /* MUST hold journal_lock when calling */
1894 static void journal_done(void) /* {{{ */
1895 {
1896 if (journal_cur == NULL)
1897 return;
1899 journal_close();
1901 if (config_flush_at_shutdown)
1902 {
1903 RRDD_LOG(LOG_INFO, "removing journals");
1904 journal_set_remove(journal_old);
1905 journal_set_remove(journal_cur);
1906 }
1907 else
1908 {
1909 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1910 "journals will be used at next startup");
1911 }
1913 journal_set_free(journal_cur);
1914 journal_set_free(journal_old);
1915 free(journal_dir);
1917 } /* }}} static void journal_done */
1919 static int journal_write(char *cmd, char *args) /* {{{ */
1920 {
1921 int chars;
1923 if (journal_fh == NULL)
1924 return 0;
1926 pthread_mutex_lock(&journal_lock);
1927 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1928 journal_size += chars;
1930 if (journal_size > JOURNAL_MAX)
1931 journal_new_file();
1933 pthread_mutex_unlock(&journal_lock);
1935 if (chars > 0)
1936 {
1937 pthread_mutex_lock(&stats_lock);
1938 stats_journal_bytes += chars;
1939 pthread_mutex_unlock(&stats_lock);
1940 }
1942 return chars;
1943 } /* }}} static int journal_write */
1945 static int journal_replay (const char *file) /* {{{ */
1946 {
1947 FILE *fh;
1948 int entry_cnt = 0;
1949 int fail_cnt = 0;
1950 uint64_t line = 0;
1951 char entry[CMD_MAX];
1952 time_t now;
1954 if (file == NULL) return 0;
1956 {
1957 char *reason = "unknown error";
1958 int status = 0;
1959 struct stat statbuf;
1961 memset(&statbuf, 0, sizeof(statbuf));
1962 if (stat(file, &statbuf) != 0)
1963 {
1964 reason = "stat error";
1965 status = errno;
1966 }
1967 else if (!S_ISREG(statbuf.st_mode))
1968 {
1969 reason = "not a regular file";
1970 status = EPERM;
1971 }
1972 if (statbuf.st_uid != daemon_uid)
1973 {
1974 reason = "not owned by daemon user";
1975 status = EACCES;
1976 }
1977 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1978 {
1979 reason = "must not be user/group writable";
1980 status = EACCES;
1981 }
1983 if (status != 0)
1984 {
1985 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1986 file, rrd_strerror(status), reason);
1987 return 0;
1988 }
1989 }
1991 fh = fopen(file, "r");
1992 if (fh == NULL)
1993 {
1994 if (errno != ENOENT)
1995 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1996 file, rrd_strerror(errno));
1997 return 0;
1998 }
1999 else
2000 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2002 now = time(NULL);
2004 while(!feof(fh))
2005 {
2006 size_t entry_len;
2008 ++line;
2009 if (fgets(entry, sizeof(entry), fh) == NULL)
2010 break;
2011 entry_len = strlen(entry);
2013 /* check \n termination in case journal writing crashed mid-line */
2014 if (entry_len == 0)
2015 continue;
2016 else if (entry[entry_len - 1] != '\n')
2017 {
2018 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2019 ++fail_cnt;
2020 continue;
2021 }
2023 entry[entry_len - 1] = '\0';
2025 if (handle_request(NULL, now, entry, entry_len) == 0)
2026 ++entry_cnt;
2027 else
2028 ++fail_cnt;
2029 }
2031 fclose(fh);
2033 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2034 entry_cnt, fail_cnt);
2036 return entry_cnt > 0 ? 1 : 0;
2037 } /* }}} static int journal_replay */
2039 static int journal_sort(const void *v1, const void *v2)
2040 {
2041 char **jn1 = (char **) v1;
2042 char **jn2 = (char **) v2;
2044 return strcmp(*jn1,*jn2);
2045 }
2047 static void journal_init(void) /* {{{ */
2048 {
2049 int had_journal = 0;
2050 DIR *dir;
2051 struct dirent *dent;
2052 char path[PATH_MAX+1];
2054 if (journal_dir == NULL) return;
2056 pthread_mutex_lock(&journal_lock);
2058 journal_cur = calloc(1, sizeof(journal_set));
2059 if (journal_cur == NULL)
2060 {
2061 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2062 return;
2063 }
2065 RRDD_LOG(LOG_INFO, "checking for journal files");
2067 /* Handle old journal files during transition. This gives them the
2068 * correct sort order. TODO: remove after first release
2069 */
2070 {
2071 char old_path[PATH_MAX+1];
2072 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2073 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2074 rename(old_path, path);
2076 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2077 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2078 rename(old_path, path);
2079 }
2081 dir = opendir(journal_dir);
2082 while ((dent = readdir(dir)) != NULL)
2083 {
2084 /* looks like a journal file? */
2085 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2086 continue;
2088 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2090 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2091 {
2092 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2093 dent->d_name);
2094 break;
2095 }
2096 }
2097 closedir(dir);
2099 qsort(journal_cur->files, journal_cur->files_num,
2100 sizeof(journal_cur->files[0]), journal_sort);
2102 for (uint i=0; i < journal_cur->files_num; i++)
2103 had_journal += journal_replay(journal_cur->files[i]);
2105 journal_new_file();
2107 /* it must have been a crash. start a flush */
2108 if (had_journal && config_flush_at_shutdown)
2109 flush_old_values(-1);
2111 pthread_mutex_unlock(&journal_lock);
2113 RRDD_LOG(LOG_INFO, "journal processing complete");
2115 } /* }}} static void journal_init */
2117 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2118 {
2119 assert(sock != NULL);
2121 free(sock->rbuf); sock->rbuf = NULL;
2122 free(sock->wbuf); sock->wbuf = NULL;
2123 free(sock);
2124 } /* }}} void free_listen_socket */
2126 static void close_connection(listen_socket_t *sock) /* {{{ */
2127 {
2128 if (sock->fd >= 0)
2129 {
2130 close(sock->fd);
2131 sock->fd = -1;
2132 }
2134 free_listen_socket(sock);
2136 } /* }}} void close_connection */
2138 static void *connection_thread_main (void *args) /* {{{ */
2139 {
2140 listen_socket_t *sock;
2141 int fd;
2143 sock = (listen_socket_t *) args;
2144 fd = sock->fd;
2146 /* init read buffers */
2147 sock->next_read = sock->next_cmd = 0;
2148 sock->rbuf = malloc(RBUF_SIZE);
2149 if (sock->rbuf == NULL)
2150 {
2151 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2152 close_connection(sock);
2153 return NULL;
2154 }
2156 pthread_mutex_lock (&connection_threads_lock);
2157 connection_threads_num++;
2158 pthread_mutex_unlock (&connection_threads_lock);
2160 while (state == RUNNING)
2161 {
2162 char *cmd;
2163 ssize_t cmd_len;
2164 ssize_t rbytes;
2165 time_t now;
2167 struct pollfd pollfd;
2168 int status;
2170 pollfd.fd = fd;
2171 pollfd.events = POLLIN | POLLPRI;
2172 pollfd.revents = 0;
2174 status = poll (&pollfd, 1, /* timeout = */ 500);
2175 if (state != RUNNING)
2176 break;
2177 else if (status == 0) /* timeout */
2178 continue;
2179 else if (status < 0) /* error */
2180 {
2181 status = errno;
2182 if (status != EINTR)
2183 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2184 continue;
2185 }
2187 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2188 break;
2189 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2190 {
2191 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2192 "poll(2) returned something unexpected: %#04hx",
2193 pollfd.revents);
2194 break;
2195 }
2197 rbytes = read(fd, sock->rbuf + sock->next_read,
2198 RBUF_SIZE - sock->next_read);
2199 if (rbytes < 0)
2200 {
2201 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2202 break;
2203 }
2204 else if (rbytes == 0)
2205 break; /* eof */
2207 sock->next_read += rbytes;
2209 if (sock->batch_start)
2210 now = sock->batch_start;
2211 else
2212 now = time(NULL);
2214 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2215 {
2216 status = handle_request (sock, now, cmd, cmd_len+1);
2217 if (status != 0)
2218 goto out_close;
2219 }
2220 }
2222 out_close:
2223 close_connection(sock);
2225 /* Remove this thread from the connection threads list */
2226 pthread_mutex_lock (&connection_threads_lock);
2227 connection_threads_num--;
2228 if (connection_threads_num <= 0)
2229 pthread_cond_broadcast(&connection_threads_done);
2230 pthread_mutex_unlock (&connection_threads_lock);
2232 return (NULL);
2233 } /* }}} void *connection_thread_main */
2235 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2236 {
2237 int fd;
2238 struct sockaddr_un sa;
2239 listen_socket_t *temp;
2240 int status;
2241 const char *path;
2243 path = sock->addr;
2244 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2245 path += strlen("unix:");
2247 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2248 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2249 if (temp == NULL)
2250 {
2251 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2252 return (-1);
2253 }
2254 listen_fds = temp;
2255 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2257 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2258 if (fd < 0)
2259 {
2260 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2261 rrd_strerror(errno));
2262 return (-1);
2263 }
2265 memset (&sa, 0, sizeof (sa));
2266 sa.sun_family = AF_UNIX;
2267 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2269 /* if we've gotten this far, we own the pid file. any daemon started
2270 * with the same args must not be alive. therefore, ensure that we can
2271 * create the socket...
2272 */
2273 unlink(path);
2275 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2276 if (status != 0)
2277 {
2278 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2279 path, rrd_strerror(errno));
2280 close (fd);
2281 return (-1);
2282 }
2284 status = listen (fd, /* backlog = */ 10);
2285 if (status != 0)
2286 {
2287 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2288 path, rrd_strerror(errno));
2289 close (fd);
2290 unlink (path);
2291 return (-1);
2292 }
2294 listen_fds[listen_fds_num].fd = fd;
2295 listen_fds[listen_fds_num].family = PF_UNIX;
2296 strncpy(listen_fds[listen_fds_num].addr, path,
2297 sizeof (listen_fds[listen_fds_num].addr) - 1);
2298 listen_fds_num++;
2300 return (0);
2301 } /* }}} int open_listen_socket_unix */
2303 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2304 {
2305 struct addrinfo ai_hints;
2306 struct addrinfo *ai_res;
2307 struct addrinfo *ai_ptr;
2308 char addr_copy[NI_MAXHOST];
2309 char *addr;
2310 char *port;
2311 int status;
2313 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2314 addr_copy[sizeof (addr_copy) - 1] = 0;
2315 addr = addr_copy;
2317 memset (&ai_hints, 0, sizeof (ai_hints));
2318 ai_hints.ai_flags = 0;
2319 #ifdef AI_ADDRCONFIG
2320 ai_hints.ai_flags |= AI_ADDRCONFIG;
2321 #endif
2322 ai_hints.ai_family = AF_UNSPEC;
2323 ai_hints.ai_socktype = SOCK_STREAM;
2325 port = NULL;
2326 if (*addr == '[') /* IPv6+port format */
2327 {
2328 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2329 addr++;
2331 port = strchr (addr, ']');
2332 if (port == NULL)
2333 {
2334 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2335 return (-1);
2336 }
2337 *port = 0;
2338 port++;
2340 if (*port == ':')
2341 port++;
2342 else if (*port == 0)
2343 port = NULL;
2344 else
2345 {
2346 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2347 return (-1);
2348 }
2349 } /* if (*addr = ']') */
2350 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2351 {
2352 port = rindex(addr, ':');
2353 if (port != NULL)
2354 {
2355 *port = 0;
2356 port++;
2357 }
2358 }
2359 ai_res = NULL;
2360 status = getaddrinfo (addr,
2361 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2362 &ai_hints, &ai_res);
2363 if (status != 0)
2364 {
2365 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2366 addr, gai_strerror (status));
2367 return (-1);
2368 }
2370 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2371 {
2372 int fd;
2373 listen_socket_t *temp;
2374 int one = 1;
2376 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2377 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2378 if (temp == NULL)
2379 {
2380 fprintf (stderr,
2381 "rrdcached: open_listen_socket_network: realloc failed.\n");
2382 continue;
2383 }
2384 listen_fds = temp;
2385 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2387 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2388 if (fd < 0)
2389 {
2390 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2391 rrd_strerror(errno));
2392 continue;
2393 }
2395 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2397 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2398 if (status != 0)
2399 {
2400 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2401 sock->addr, rrd_strerror(errno));
2402 close (fd);
2403 continue;
2404 }
2406 status = listen (fd, /* backlog = */ 10);
2407 if (status != 0)
2408 {
2409 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2410 sock->addr, rrd_strerror(errno));
2411 close (fd);
2412 freeaddrinfo(ai_res);
2413 return (-1);
2414 }
2416 listen_fds[listen_fds_num].fd = fd;
2417 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2418 listen_fds_num++;
2419 } /* for (ai_ptr) */
2421 freeaddrinfo(ai_res);
2422 return (0);
2423 } /* }}} static int open_listen_socket_network */
2425 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2426 {
2427 assert(sock != NULL);
2428 assert(sock->addr != NULL);
2430 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2431 || sock->addr[0] == '/')
2432 return (open_listen_socket_unix(sock));
2433 else
2434 return (open_listen_socket_network(sock));
2435 } /* }}} int open_listen_socket */
2437 static int close_listen_sockets (void) /* {{{ */
2438 {
2439 size_t i;
2441 for (i = 0; i < listen_fds_num; i++)
2442 {
2443 close (listen_fds[i].fd);
2445 if (listen_fds[i].family == PF_UNIX)
2446 unlink(listen_fds[i].addr);
2447 }
2449 free (listen_fds);
2450 listen_fds = NULL;
2451 listen_fds_num = 0;
2453 return (0);
2454 } /* }}} int close_listen_sockets */
2456 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2457 {
2458 struct pollfd *pollfds;
2459 int pollfds_num;
2460 int status;
2461 int i;
2463 if (listen_fds_num < 1)
2464 {
2465 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2466 return (NULL);
2467 }
2469 pollfds_num = listen_fds_num;
2470 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2471 if (pollfds == NULL)
2472 {
2473 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2474 return (NULL);
2475 }
2476 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2478 RRDD_LOG(LOG_INFO, "listening for connections");
2480 while (state == RUNNING)
2481 {
2482 for (i = 0; i < pollfds_num; i++)
2483 {
2484 pollfds[i].fd = listen_fds[i].fd;
2485 pollfds[i].events = POLLIN | POLLPRI;
2486 pollfds[i].revents = 0;
2487 }
2489 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2490 if (state != RUNNING)
2491 break;
2492 else if (status == 0) /* timeout */
2493 continue;
2494 else if (status < 0) /* error */
2495 {
2496 status = errno;
2497 if (status != EINTR)
2498 {
2499 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2500 }
2501 continue;
2502 }
2504 for (i = 0; i < pollfds_num; i++)
2505 {
2506 listen_socket_t *client_sock;
2507 struct sockaddr_storage client_sa;
2508 socklen_t client_sa_size;
2509 pthread_t tid;
2510 pthread_attr_t attr;
2512 if (pollfds[i].revents == 0)
2513 continue;
2515 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2516 {
2517 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2518 "poll(2) returned something unexpected for listen FD #%i.",
2519 pollfds[i].fd);
2520 continue;
2521 }
2523 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2524 if (client_sock == NULL)
2525 {
2526 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2527 continue;
2528 }
2529 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2531 client_sa_size = sizeof (client_sa);
2532 client_sock->fd = accept (pollfds[i].fd,
2533 (struct sockaddr *) &client_sa, &client_sa_size);
2534 if (client_sock->fd < 0)
2535 {
2536 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2537 free(client_sock);
2538 continue;
2539 }
2541 pthread_attr_init (&attr);
2542 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2544 status = pthread_create (&tid, &attr, connection_thread_main,
2545 client_sock);
2546 if (status != 0)
2547 {
2548 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2549 close_connection(client_sock);
2550 continue;
2551 }
2552 } /* for (pollfds_num) */
2553 } /* while (state == RUNNING) */
2555 RRDD_LOG(LOG_INFO, "starting shutdown");
2557 close_listen_sockets ();
2559 pthread_mutex_lock (&connection_threads_lock);
2560 while (connection_threads_num > 0)
2561 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2562 pthread_mutex_unlock (&connection_threads_lock);
2564 free(pollfds);
2566 return (NULL);
2567 } /* }}} void *listen_thread_main */
2569 static int daemonize (void) /* {{{ */
2570 {
2571 int pid_fd;
2572 char *base_dir;
2574 daemon_uid = geteuid();
2576 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2577 if (pid_fd < 0)
2578 pid_fd = check_pidfile();
2579 if (pid_fd < 0)
2580 return pid_fd;
2582 /* open all the listen sockets */
2583 if (config_listen_address_list_len > 0)
2584 {
2585 for (size_t i = 0; i < config_listen_address_list_len; i++)
2586 open_listen_socket (config_listen_address_list[i]);
2588 rrd_free_ptrs((void ***) &config_listen_address_list,
2589 &config_listen_address_list_len);
2590 }
2591 else
2592 {
2593 listen_socket_t sock;
2594 memset(&sock, 0, sizeof(sock));
2595 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2596 open_listen_socket (&sock);
2597 }
2599 if (listen_fds_num < 1)
2600 {
2601 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2602 goto error;
2603 }
2605 if (!stay_foreground)
2606 {
2607 pid_t child;
2609 child = fork ();
2610 if (child < 0)
2611 {
2612 fprintf (stderr, "daemonize: fork(2) failed.\n");
2613 goto error;
2614 }
2615 else if (child > 0)
2616 exit(0);
2618 /* Become session leader */
2619 setsid ();
2621 /* Open the first three file descriptors to /dev/null */
2622 close (2);
2623 close (1);
2624 close (0);
2626 open ("/dev/null", O_RDWR);
2627 if (dup(0) == -1 || dup(0) == -1){
2628 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2629 }
2630 } /* if (!stay_foreground) */
2632 /* Change into the /tmp directory. */
2633 base_dir = (config_base_dir != NULL)
2634 ? config_base_dir
2635 : "/tmp";
2637 if (chdir (base_dir) != 0)
2638 {
2639 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2640 goto error;
2641 }
2643 install_signal_handlers();
2645 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2646 RRDD_LOG(LOG_INFO, "starting up");
2648 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2649 (GDestroyNotify) free_cache_item);
2650 if (cache_tree == NULL)
2651 {
2652 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2653 goto error;
2654 }
2656 return write_pidfile (pid_fd);
2658 error:
2659 remove_pidfile();
2660 return -1;
2661 } /* }}} int daemonize */
2663 static int cleanup (void) /* {{{ */
2664 {
2665 pthread_cond_broadcast (&flush_cond);
2666 pthread_join (flush_thread, NULL);
2668 pthread_cond_broadcast (&queue_cond);
2669 for (int i = 0; i < config_queue_threads; i++)
2670 pthread_join (queue_threads[i], NULL);
2672 if (config_flush_at_shutdown)
2673 {
2674 assert(cache_queue_head == NULL);
2675 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2676 }
2678 free(queue_threads);
2679 free(config_base_dir);
2680 free(config_pid_file);
2682 pthread_mutex_lock(&cache_lock);
2683 g_tree_destroy(cache_tree);
2685 pthread_mutex_lock(&journal_lock);
2686 journal_done();
2688 RRDD_LOG(LOG_INFO, "goodbye");
2689 closelog ();
2691 remove_pidfile ();
2693 return (0);
2694 } /* }}} int cleanup */
2696 static int read_options (int argc, char **argv) /* {{{ */
2697 {
2698 int option;
2699 int status = 0;
2701 char **permissions = NULL;
2702 size_t permissions_len = 0;
2704 while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2705 {
2706 switch (option)
2707 {
2708 case 'g':
2709 stay_foreground=1;
2710 break;
2712 case 'l':
2713 {
2714 listen_socket_t *new;
2716 new = malloc(sizeof(listen_socket_t));
2717 if (new == NULL)
2718 {
2719 fprintf(stderr, "read_options: malloc failed.\n");
2720 return(2);
2721 }
2722 memset(new, 0, sizeof(listen_socket_t));
2724 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2726 /* Add permissions to the socket {{{ */
2727 if (permissions_len != 0)
2728 {
2729 size_t i;
2730 for (i = 0; i < permissions_len; i++)
2731 {
2732 status = socket_permission_add (new, permissions[i]);
2733 if (status != 0)
2734 {
2735 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2736 "socket failed. Most likely, this permission doesn't "
2737 "exist. Check your command line.\n", permissions[i]);
2738 status = 4;
2739 }
2740 }
2741 }
2742 else /* if (permissions_len == 0) */
2743 {
2744 /* Add permission for ALL commands to the socket. */
2745 size_t i;
2746 for (i = 0; i < list_of_commands_len; i++)
2747 {
2748 status = socket_permission_add (new, list_of_commands[i].cmd);
2749 if (status != 0)
2750 {
2751 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2752 "socket failed. This should never happen, ever! Sorry.\n",
2753 permissions[i]);
2754 status = 4;
2755 }
2756 }
2757 }
2758 /* }}} Done adding permissions. */
2760 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2761 &config_listen_address_list_len, new))
2762 {
2763 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2764 return (2);
2765 }
2766 }
2767 break;
2769 case 'P':
2770 {
2771 char *optcopy;
2772 char *saveptr;
2773 char *dummy;
2774 char *ptr;
2776 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2778 optcopy = strdup (optarg);
2779 dummy = optcopy;
2780 saveptr = NULL;
2781 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2782 {
2783 dummy = NULL;
2784 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2785 }
2787 free (optcopy);
2788 }
2789 break;
2791 case 'f':
2792 {
2793 int temp;
2795 temp = atoi (optarg);
2796 if (temp > 0)
2797 config_flush_interval = temp;
2798 else
2799 {
2800 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2801 status = 3;
2802 }
2803 }
2804 break;
2806 case 'w':
2807 {
2808 int temp;
2810 temp = atoi (optarg);
2811 if (temp > 0)
2812 config_write_interval = temp;
2813 else
2814 {
2815 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2816 status = 2;
2817 }
2818 }
2819 break;
2821 case 'z':
2822 {
2823 int temp;
2825 temp = atoi(optarg);
2826 if (temp > 0)
2827 config_write_jitter = temp;
2828 else
2829 {
2830 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2831 status = 2;
2832 }
2834 break;
2835 }
2837 case 't':
2838 {
2839 int threads;
2840 threads = atoi(optarg);
2841 if (threads >= 1)
2842 config_queue_threads = threads;
2843 else
2844 {
2845 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2846 return 1;
2847 }
2848 }
2849 break;
2851 case 'B':
2852 config_write_base_only = 1;
2853 break;
2855 case 'b':
2856 {
2857 size_t len;
2858 char base_realpath[PATH_MAX];
2860 if (config_base_dir != NULL)
2861 free (config_base_dir);
2862 config_base_dir = strdup (optarg);
2863 if (config_base_dir == NULL)
2864 {
2865 fprintf (stderr, "read_options: strdup failed.\n");
2866 return (3);
2867 }
2869 /* make sure that the base directory is not resolved via
2870 * symbolic links. this makes some performance-enhancing
2871 * assumptions possible (we don't have to resolve paths
2872 * that start with a "/")
2873 */
2874 if (realpath(config_base_dir, base_realpath) == NULL)
2875 {
2876 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2877 return 5;
2878 }
2879 else if (strncmp(config_base_dir,
2880 base_realpath, sizeof(base_realpath)) != 0)
2881 {
2882 fprintf(stderr,
2883 "Base directory (-b) resolved via file system links!\n"
2884 "Please consult rrdcached '-b' documentation!\n"
2885 "Consider specifying the real directory (%s)\n",
2886 base_realpath);
2887 return 5;
2888 }
2890 len = strlen (config_base_dir);
2891 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2892 {
2893 config_base_dir[len - 1] = 0;
2894 len--;
2895 }
2897 if (len < 1)
2898 {
2899 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2900 return (4);
2901 }
2903 _config_base_dir_len = len;
2904 }
2905 break;
2907 case 'p':
2908 {
2909 if (config_pid_file != NULL)
2910 free (config_pid_file);
2911 config_pid_file = strdup (optarg);
2912 if (config_pid_file == NULL)
2913 {
2914 fprintf (stderr, "read_options: strdup failed.\n");
2915 return (3);
2916 }
2917 }
2918 break;
2920 case 'F':
2921 config_flush_at_shutdown = 1;
2922 break;
2924 case 'j':
2925 {
2926 struct stat statbuf;
2927 const char *dir = journal_dir = strdup(optarg);
2929 status = stat(dir, &statbuf);
2930 if (status != 0)
2931 {
2932 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2933 return 6;
2934 }
2936 if (!S_ISDIR(statbuf.st_mode)
2937 || access(dir, R_OK|W_OK|X_OK) != 0)
2938 {
2939 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2940 errno ? rrd_strerror(errno) : "");
2941 return 6;
2942 }
2943 }
2944 break;
2946 case 'h':
2947 case '?':
2948 printf ("RRDCacheD %s\n"
2949 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
2950 "\n"
2951 "Usage: rrdcached [options]\n"
2952 "\n"
2953 "Valid options are:\n"
2954 " -l <address> Socket address to listen to.\n"
2955 " -P <perms> Sets the permissions to assign to all following "
2956 "sockets\n"
2957 " -w <seconds> Interval in which to write data.\n"
2958 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2959 " -t <threads> Number of write threads.\n"
2960 " -f <seconds> Interval in which to flush dead data.\n"
2961 " -p <file> Location of the PID-file.\n"
2962 " -b <dir> Base directory to change to.\n"
2963 " -B Restrict file access to paths within -b <dir>\n"
2964 " -g Do not fork and run in the foreground.\n"
2965 " -j <dir> Directory in which to create the journal files.\n"
2966 " -F Always flush all updates at shutdown\n"
2967 "\n"
2968 "For more information and a detailed description of all options "
2969 "please refer\n"
2970 "to the rrdcached(1) manual page.\n",
2971 VERSION);
2972 status = -1;
2973 break;
2974 } /* switch (option) */
2975 } /* while (getopt) */
2977 /* advise the user when values are not sane */
2978 if (config_flush_interval < 2 * config_write_interval)
2979 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2980 " 2x write interval (-w) !\n");
2981 if (config_write_jitter > config_write_interval)
2982 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2983 " write interval (-w) !\n");
2985 if (config_write_base_only && config_base_dir == NULL)
2986 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2987 " Consult the rrdcached documentation\n");
2989 if (journal_dir == NULL)
2990 config_flush_at_shutdown = 1;
2992 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2994 return (status);
2995 } /* }}} int read_options */
2997 int main (int argc, char **argv)
2998 {
2999 int status;
3001 status = read_options (argc, argv);
3002 if (status != 0)
3003 {
3004 if (status < 0)
3005 status = 0;
3006 return (status);
3007 }
3009 status = daemonize ();
3010 if (status != 0)
3011 {
3012 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3013 return (1);
3014 }
3016 journal_init();
3018 /* start the queue threads */
3019 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3020 if (queue_threads == NULL)
3021 {
3022 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3023 cleanup();
3024 return (1);
3025 }
3026 for (int i = 0; i < config_queue_threads; i++)
3027 {
3028 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3029 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3030 if (status != 0)
3031 {
3032 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3033 cleanup();
3034 return (1);
3035 }
3036 }
3038 /* start the flush thread */
3039 memset(&flush_thread, 0, sizeof(flush_thread));
3040 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3041 if (status != 0)
3042 {
3043 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3044 cleanup();
3045 return (1);
3046 }
3048 listen_thread_main (NULL);
3049 cleanup ();
3051 return (0);
3052 } /* int main */
3054 /*
3055 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3056 */