b4bc1ee06ef0319ce83eb79797857ea88de4f0c7
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 # include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <dirent.h>
95 #include <fcntl.h>
96 #include <signal.h>
97 #include <sys/un.h>
98 #include <netdb.h>
99 #include <poll.h>
100 #include <syslog.h>
101 #include <pthread.h>
102 #include <errno.h>
103 #include <assert.h>
104 #include <sys/time.h>
105 #include <time.h>
107 #include <glib-2.0/glib.h>
108 /* }}} */
110 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
112 #ifndef __GNUC__
113 # define __attribute__(x) /**/
114 #endif
116 /*
117 * Types
118 */
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
122 {
123 int fd;
124 char addr[PATH_MAX + 1];
125 int family;
127 /* state for BATCH processing */
128 time_t batch_start;
129 int batch_cmd;
131 /* buffered IO */
132 char *rbuf;
133 off_t next_cmd;
134 off_t next_read;
136 char *wbuf;
137 ssize_t wbuf_len;
139 uint32_t permissions;
140 };
141 typedef struct listen_socket_s listen_socket_t;
143 struct command_s;
144 typedef struct command_s command_t;
145 /* note: guard against "unused" warnings in the handlers */
146 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
147 time_t now __attribute__((unused)),\
148 char *buffer __attribute__((unused)),\
149 size_t buffer_size __attribute__((unused))
151 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
152 DISPATCH_PROTO
154 struct command_s {
155 char *cmd;
156 int (*handler)(HANDLER_PROTO);
158 char context; /* where we expect to see it */
159 #define CMD_CONTEXT_CLIENT (1<<0)
160 #define CMD_CONTEXT_BATCH (1<<1)
161 #define CMD_CONTEXT_JOURNAL (1<<2)
162 #define CMD_CONTEXT_ANY (0x7f)
164 char *syntax;
165 char *help;
166 };
168 struct cache_item_s;
169 typedef struct cache_item_s cache_item_t;
170 struct cache_item_s
171 {
172 char *file;
173 char **values;
174 size_t values_num;
175 time_t last_flush_time;
176 time_t last_update_stamp;
177 #define CI_FLAGS_IN_TREE (1<<0)
178 #define CI_FLAGS_IN_QUEUE (1<<1)
179 int flags;
180 pthread_cond_t flushed;
181 cache_item_t *prev;
182 cache_item_t *next;
183 };
185 struct callback_flush_data_s
186 {
187 time_t now;
188 time_t abs_timeout;
189 char **keys;
190 size_t keys_num;
191 };
192 typedef struct callback_flush_data_s callback_flush_data_t;
194 enum queue_side_e
195 {
196 HEAD,
197 TAIL
198 };
199 typedef enum queue_side_e queue_side_t;
201 /* describe a set of journal files */
202 typedef struct {
203 char **files;
204 size_t files_num;
205 } journal_set;
207 /* max length of socket command or response */
208 #define CMD_MAX 4096
209 #define RBUF_SIZE (CMD_MAX*2)
211 /*
212 * Variables
213 */
214 static int stay_foreground = 0;
215 static uid_t daemon_uid;
217 static listen_socket_t *listen_fds = NULL;
218 static size_t listen_fds_num = 0;
220 enum {
221 RUNNING, /* normal operation */
222 FLUSHING, /* flushing remaining values */
223 SHUTDOWN /* shutting down */
224 } state = RUNNING;
226 static pthread_t *queue_threads;
227 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
228 static int config_queue_threads = 4;
230 static pthread_t flush_thread;
231 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
233 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
234 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
235 static int connection_threads_num = 0;
237 /* Cache stuff */
238 static GTree *cache_tree = NULL;
239 static cache_item_t *cache_queue_head = NULL;
240 static cache_item_t *cache_queue_tail = NULL;
241 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
243 static int config_write_interval = 300;
244 static int config_write_jitter = 0;
245 static int config_flush_interval = 3600;
246 static int config_flush_at_shutdown = 0;
247 static char *config_pid_file = NULL;
248 static char *config_base_dir = NULL;
249 static size_t _config_base_dir_len = 0;
250 static int config_write_base_only = 0;
252 static listen_socket_t **config_listen_address_list = NULL;
253 static size_t config_listen_address_list_len = 0;
255 static uint64_t stats_queue_length = 0;
256 static uint64_t stats_updates_received = 0;
257 static uint64_t stats_flush_received = 0;
258 static uint64_t stats_updates_written = 0;
259 static uint64_t stats_data_sets_written = 0;
260 static uint64_t stats_journal_bytes = 0;
261 static uint64_t stats_journal_rotate = 0;
262 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
264 /* Journaled updates */
265 #define JOURNAL_BASE "rrd.journal"
266 static journal_set *journal_cur = NULL;
267 static journal_set *journal_old = NULL;
268 static char *journal_dir = NULL;
269 static FILE *journal_fh = NULL; /* current journal file handle */
270 static long journal_size = 0; /* current journal size */
271 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
272 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
273 static int journal_write(char *cmd, char *args);
274 static void journal_done(void);
275 static void journal_rotate(void);
277 /* prototypes for forward refernces */
278 static int handle_request_help (HANDLER_PROTO);
280 /*
281 * Functions
282 */
283 static void sig_common (const char *sig) /* {{{ */
284 {
285 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
286 state = FLUSHING;
287 pthread_cond_broadcast(&flush_cond);
288 pthread_cond_broadcast(&queue_cond);
289 } /* }}} void sig_common */
291 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
292 {
293 sig_common("INT");
294 } /* }}} void sig_int_handler */
296 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
297 {
298 sig_common("TERM");
299 } /* }}} void sig_term_handler */
301 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
302 {
303 config_flush_at_shutdown = 1;
304 sig_common("USR1");
305 } /* }}} void sig_usr1_handler */
307 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
308 {
309 config_flush_at_shutdown = 0;
310 sig_common("USR2");
311 } /* }}} void sig_usr2_handler */
313 static void install_signal_handlers(void) /* {{{ */
314 {
315 /* These structures are static, because `sigaction' behaves weird if the are
316 * overwritten.. */
317 static struct sigaction sa_int;
318 static struct sigaction sa_term;
319 static struct sigaction sa_pipe;
320 static struct sigaction sa_usr1;
321 static struct sigaction sa_usr2;
323 /* Install signal handlers */
324 memset (&sa_int, 0, sizeof (sa_int));
325 sa_int.sa_handler = sig_int_handler;
326 sigaction (SIGINT, &sa_int, NULL);
328 memset (&sa_term, 0, sizeof (sa_term));
329 sa_term.sa_handler = sig_term_handler;
330 sigaction (SIGTERM, &sa_term, NULL);
332 memset (&sa_pipe, 0, sizeof (sa_pipe));
333 sa_pipe.sa_handler = SIG_IGN;
334 sigaction (SIGPIPE, &sa_pipe, NULL);
336 memset (&sa_pipe, 0, sizeof (sa_usr1));
337 sa_usr1.sa_handler = sig_usr1_handler;
338 sigaction (SIGUSR1, &sa_usr1, NULL);
340 memset (&sa_usr2, 0, sizeof (sa_usr2));
341 sa_usr2.sa_handler = sig_usr2_handler;
342 sigaction (SIGUSR2, &sa_usr2, NULL);
344 } /* }}} void install_signal_handlers */
346 static int open_pidfile(char *action, int oflag) /* {{{ */
347 {
348 int fd;
349 char *file;
351 file = (config_pid_file != NULL)
352 ? config_pid_file
353 : LOCALSTATEDIR "/run/rrdcached.pid";
355 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
356 if (fd < 0)
357 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
358 action, file, rrd_strerror(errno));
360 return(fd);
361 } /* }}} static int open_pidfile */
363 /* check existing pid file to see whether a daemon is running */
364 static int check_pidfile(void)
365 {
366 int pid_fd;
367 pid_t pid;
368 char pid_str[16];
370 pid_fd = open_pidfile("open", O_RDWR);
371 if (pid_fd < 0)
372 return pid_fd;
374 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
375 return -1;
377 pid = atoi(pid_str);
378 if (pid <= 0)
379 return -1;
381 /* another running process that we can signal COULD be
382 * a competing rrdcached */
383 if (pid != getpid() && kill(pid, 0) == 0)
384 {
385 fprintf(stderr,
386 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
387 close(pid_fd);
388 return -1;
389 }
391 lseek(pid_fd, 0, SEEK_SET);
392 if (ftruncate(pid_fd, 0) == -1)
393 {
394 fprintf(stderr,
395 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
396 close(pid_fd);
397 return -1;
398 }
400 fprintf(stderr,
401 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
402 "rrdcached: starting normally.\n", pid);
404 return pid_fd;
405 } /* }}} static int check_pidfile */
407 static int write_pidfile (int fd) /* {{{ */
408 {
409 pid_t pid;
410 FILE *fh;
412 pid = getpid ();
414 fh = fdopen (fd, "w");
415 if (fh == NULL)
416 {
417 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
418 close(fd);
419 return (-1);
420 }
422 fprintf (fh, "%i\n", (int) pid);
423 fclose (fh);
425 return (0);
426 } /* }}} int write_pidfile */
428 static int remove_pidfile (void) /* {{{ */
429 {
430 char *file;
431 int status;
433 file = (config_pid_file != NULL)
434 ? config_pid_file
435 : LOCALSTATEDIR "/run/rrdcached.pid";
437 status = unlink (file);
438 if (status == 0)
439 return (0);
440 return (errno);
441 } /* }}} int remove_pidfile */
443 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
444 {
445 char *eol;
447 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
448 sock->next_read - sock->next_cmd);
450 if (eol == NULL)
451 {
452 /* no commands left, move remainder back to front of rbuf */
453 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
454 sock->next_read - sock->next_cmd);
455 sock->next_read -= sock->next_cmd;
456 sock->next_cmd = 0;
457 *len = 0;
458 return NULL;
459 }
460 else
461 {
462 char *cmd = sock->rbuf + sock->next_cmd;
463 *eol = '\0';
465 sock->next_cmd = eol - sock->rbuf + 1;
467 if (eol > sock->rbuf && *(eol-1) == '\r')
468 *(--eol) = '\0'; /* handle "\r\n" EOL */
470 *len = eol - cmd;
472 return cmd;
473 }
475 /* NOTREACHED */
476 assert(1==0);
477 } /* }}} char *next_cmd */
479 /* add the characters directly to the write buffer */
480 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
481 {
482 char *new_buf;
484 assert(sock != NULL);
486 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
487 if (new_buf == NULL)
488 {
489 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
490 return -1;
491 }
493 strncpy(new_buf + sock->wbuf_len, str, len + 1);
495 sock->wbuf = new_buf;
496 sock->wbuf_len += len;
498 return 0;
499 } /* }}} static int add_to_wbuf */
501 /* add the text to the "extra" info that's sent after the status line */
502 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
503 {
504 va_list argp;
505 char buffer[CMD_MAX];
506 int len;
508 if (sock == NULL) return 0; /* journal replay mode */
509 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
511 va_start(argp, fmt);
512 #ifdef HAVE_VSNPRINTF
513 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
514 #else
515 len = vsprintf(buffer, fmt, argp);
516 #endif
517 va_end(argp);
518 if (len < 0)
519 {
520 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
521 return -1;
522 }
524 return add_to_wbuf(sock, buffer, len);
525 } /* }}} static int add_response_info */
527 static int count_lines(char *str) /* {{{ */
528 {
529 int lines = 0;
531 if (str != NULL)
532 {
533 while ((str = strchr(str, '\n')) != NULL)
534 {
535 ++lines;
536 ++str;
537 }
538 }
540 return lines;
541 } /* }}} static int count_lines */
543 /* send the response back to the user.
544 * returns 0 on success, -1 on error
545 * write buffer is always zeroed after this call */
546 static int send_response (listen_socket_t *sock, response_code rc,
547 char *fmt, ...) /* {{{ */
548 {
549 va_list argp;
550 char buffer[CMD_MAX];
551 int lines;
552 ssize_t wrote;
553 int rclen, len;
555 if (sock == NULL) return rc; /* journal replay mode */
557 if (sock->batch_start)
558 {
559 if (rc == RESP_OK)
560 return rc; /* no response on success during BATCH */
561 lines = sock->batch_cmd;
562 }
563 else if (rc == RESP_OK)
564 lines = count_lines(sock->wbuf);
565 else
566 lines = -1;
568 rclen = sprintf(buffer, "%d ", lines);
569 va_start(argp, fmt);
570 #ifdef HAVE_VSNPRINTF
571 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
572 #else
573 len = vsprintf(buffer+rclen, fmt, argp);
574 #endif
575 va_end(argp);
576 if (len < 0)
577 return -1;
579 len += rclen;
581 /* append the result to the wbuf, don't write to the user */
582 if (sock->batch_start)
583 return add_to_wbuf(sock, buffer, len);
585 /* first write must be complete */
586 if (len != write(sock->fd, buffer, len))
587 {
588 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
589 return -1;
590 }
592 if (sock->wbuf != NULL && rc == RESP_OK)
593 {
594 wrote = 0;
595 while (wrote < sock->wbuf_len)
596 {
597 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
598 if (wb <= 0)
599 {
600 RRDD_LOG(LOG_INFO, "send_response: could not write results");
601 return -1;
602 }
603 wrote += wb;
604 }
605 }
607 free(sock->wbuf); sock->wbuf = NULL;
608 sock->wbuf_len = 0;
610 return 0;
611 } /* }}} */
613 static void wipe_ci_values(cache_item_t *ci, time_t when)
614 {
615 ci->values = NULL;
616 ci->values_num = 0;
618 ci->last_flush_time = when;
619 if (config_write_jitter > 0)
620 ci->last_flush_time += (rrd_random() % config_write_jitter);
621 }
623 /* remove_from_queue
624 * remove a "cache_item_t" item from the queue.
625 * must hold 'cache_lock' when calling this
626 */
627 static void remove_from_queue(cache_item_t *ci) /* {{{ */
628 {
629 if (ci == NULL) return;
630 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
632 if (ci->prev == NULL)
633 cache_queue_head = ci->next; /* reset head */
634 else
635 ci->prev->next = ci->next;
637 if (ci->next == NULL)
638 cache_queue_tail = ci->prev; /* reset the tail */
639 else
640 ci->next->prev = ci->prev;
642 ci->next = ci->prev = NULL;
643 ci->flags &= ~CI_FLAGS_IN_QUEUE;
645 pthread_mutex_lock (&stats_lock);
646 assert (stats_queue_length > 0);
647 stats_queue_length--;
648 pthread_mutex_unlock (&stats_lock);
650 } /* }}} static void remove_from_queue */
652 /* free the resources associated with the cache_item_t
653 * must hold cache_lock when calling this function
654 */
655 static void *free_cache_item(cache_item_t *ci) /* {{{ */
656 {
657 if (ci == NULL) return NULL;
659 remove_from_queue(ci);
661 for (size_t i=0; i < ci->values_num; i++)
662 free(ci->values[i]);
664 free (ci->values);
665 free (ci->file);
667 /* in case anyone is waiting */
668 pthread_cond_broadcast(&ci->flushed);
669 pthread_cond_destroy(&ci->flushed);
671 free (ci);
673 return NULL;
674 } /* }}} static void *free_cache_item */
676 /*
677 * enqueue_cache_item:
678 * `cache_lock' must be acquired before calling this function!
679 */
680 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
681 queue_side_t side)
682 {
683 if (ci == NULL)
684 return (-1);
686 if (ci->values_num == 0)
687 return (0);
689 if (side == HEAD)
690 {
691 if (cache_queue_head == ci)
692 return 0;
694 /* remove if further down in queue */
695 remove_from_queue(ci);
697 ci->prev = NULL;
698 ci->next = cache_queue_head;
699 if (ci->next != NULL)
700 ci->next->prev = ci;
701 cache_queue_head = ci;
703 if (cache_queue_tail == NULL)
704 cache_queue_tail = cache_queue_head;
705 }
706 else /* (side == TAIL) */
707 {
708 /* We don't move values back in the list.. */
709 if (ci->flags & CI_FLAGS_IN_QUEUE)
710 return (0);
712 assert (ci->next == NULL);
713 assert (ci->prev == NULL);
715 ci->prev = cache_queue_tail;
717 if (cache_queue_tail == NULL)
718 cache_queue_head = ci;
719 else
720 cache_queue_tail->next = ci;
722 cache_queue_tail = ci;
723 }
725 ci->flags |= CI_FLAGS_IN_QUEUE;
727 pthread_cond_signal(&queue_cond);
728 pthread_mutex_lock (&stats_lock);
729 stats_queue_length++;
730 pthread_mutex_unlock (&stats_lock);
732 return (0);
733 } /* }}} int enqueue_cache_item */
735 /*
736 * tree_callback_flush:
737 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
738 * while this is in progress.
739 */
740 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
741 gpointer data)
742 {
743 cache_item_t *ci;
744 callback_flush_data_t *cfd;
746 ci = (cache_item_t *) value;
747 cfd = (callback_flush_data_t *) data;
749 if (ci->flags & CI_FLAGS_IN_QUEUE)
750 return FALSE;
752 if (ci->values_num > 0
753 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
754 {
755 enqueue_cache_item (ci, TAIL);
756 }
757 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
758 && (ci->values_num <= 0))
759 {
760 assert ((char *) key == ci->file);
761 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
762 {
763 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
764 return (FALSE);
765 }
766 }
768 return (FALSE);
769 } /* }}} gboolean tree_callback_flush */
771 static int flush_old_values (int max_age)
772 {
773 callback_flush_data_t cfd;
774 size_t k;
776 memset (&cfd, 0, sizeof (cfd));
777 /* Pass the current time as user data so that we don't need to call
778 * `time' for each node. */
779 cfd.now = time (NULL);
780 cfd.keys = NULL;
781 cfd.keys_num = 0;
783 if (max_age > 0)
784 cfd.abs_timeout = cfd.now - max_age;
785 else
786 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
788 /* `tree_callback_flush' will return the keys of all values that haven't
789 * been touched in the last `config_flush_interval' seconds in `cfd'.
790 * The char*'s in this array point to the same memory as ci->file, so we
791 * don't need to free them separately. */
792 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
794 for (k = 0; k < cfd.keys_num; k++)
795 {
796 /* should never fail, since we have held the cache_lock
797 * the entire time */
798 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
799 }
801 if (cfd.keys != NULL)
802 {
803 free (cfd.keys);
804 cfd.keys = NULL;
805 }
807 return (0);
808 } /* int flush_old_values */
810 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
811 {
812 struct timeval now;
813 struct timespec next_flush;
814 int status;
816 gettimeofday (&now, NULL);
817 next_flush.tv_sec = now.tv_sec + config_flush_interval;
818 next_flush.tv_nsec = 1000 * now.tv_usec;
820 pthread_mutex_lock(&cache_lock);
822 while (state == RUNNING)
823 {
824 gettimeofday (&now, NULL);
825 if ((now.tv_sec > next_flush.tv_sec)
826 || ((now.tv_sec == next_flush.tv_sec)
827 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
828 {
829 RRDD_LOG(LOG_DEBUG, "flushing old values");
831 /* Determine the time of the next cache flush. */
832 next_flush.tv_sec = now.tv_sec + config_flush_interval;
834 /* Flush all values that haven't been written in the last
835 * `config_write_interval' seconds. */
836 flush_old_values (config_write_interval);
838 /* unlock the cache while we rotate so we don't block incoming
839 * updates if the fsync() blocks on disk I/O */
840 pthread_mutex_unlock(&cache_lock);
841 journal_rotate();
842 pthread_mutex_lock(&cache_lock);
843 }
845 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
846 if (status != 0 && status != ETIMEDOUT)
847 {
848 RRDD_LOG (LOG_ERR, "flush_thread_main: "
849 "pthread_cond_timedwait returned %i.", status);
850 }
851 }
853 if (config_flush_at_shutdown)
854 flush_old_values (-1); /* flush everything */
856 state = SHUTDOWN;
858 pthread_mutex_unlock(&cache_lock);
860 return NULL;
861 } /* void *flush_thread_main */
863 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
864 {
865 pthread_mutex_lock (&cache_lock);
867 while (state != SHUTDOWN
868 || (cache_queue_head != NULL && config_flush_at_shutdown))
869 {
870 cache_item_t *ci;
871 char *file;
872 char **values;
873 size_t values_num;
874 int status;
876 /* Now, check if there's something to store away. If not, wait until
877 * something comes in. */
878 if (cache_queue_head == NULL)
879 {
880 status = pthread_cond_wait (&queue_cond, &cache_lock);
881 if ((status != 0) && (status != ETIMEDOUT))
882 {
883 RRDD_LOG (LOG_ERR, "queue_thread_main: "
884 "pthread_cond_wait returned %i.", status);
885 }
886 }
888 /* Check if a value has arrived. This may be NULL if we timed out or there
889 * was an interrupt such as a signal. */
890 if (cache_queue_head == NULL)
891 continue;
893 ci = cache_queue_head;
895 /* copy the relevant parts */
896 file = strdup (ci->file);
897 if (file == NULL)
898 {
899 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
900 continue;
901 }
903 assert(ci->values != NULL);
904 assert(ci->values_num > 0);
906 values = ci->values;
907 values_num = ci->values_num;
909 wipe_ci_values(ci, time(NULL));
910 remove_from_queue(ci);
912 pthread_mutex_unlock (&cache_lock);
914 rrd_clear_error ();
915 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
916 if (status != 0)
917 {
918 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
919 "rrd_update_r (%s) failed with status %i. (%s)",
920 file, status, rrd_get_error());
921 }
923 journal_write("wrote", file);
925 /* Search again in the tree. It's possible someone issued a "FORGET"
926 * while we were writing the update values. */
927 pthread_mutex_lock(&cache_lock);
928 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
929 if (ci)
930 pthread_cond_broadcast(&ci->flushed);
931 pthread_mutex_unlock(&cache_lock);
933 if (status == 0)
934 {
935 pthread_mutex_lock (&stats_lock);
936 stats_updates_written++;
937 stats_data_sets_written += values_num;
938 pthread_mutex_unlock (&stats_lock);
939 }
941 rrd_free_ptrs((void ***) &values, &values_num);
942 free(file);
944 pthread_mutex_lock (&cache_lock);
945 }
946 pthread_mutex_unlock (&cache_lock);
948 return (NULL);
949 } /* }}} void *queue_thread_main */
951 static int buffer_get_field (char **buffer_ret, /* {{{ */
952 size_t *buffer_size_ret, char **field_ret)
953 {
954 char *buffer;
955 size_t buffer_pos;
956 size_t buffer_size;
957 char *field;
958 size_t field_size;
959 int status;
961 buffer = *buffer_ret;
962 buffer_pos = 0;
963 buffer_size = *buffer_size_ret;
964 field = *buffer_ret;
965 field_size = 0;
967 if (buffer_size <= 0)
968 return (-1);
970 /* This is ensured by `handle_request'. */
971 assert (buffer[buffer_size - 1] == '\0');
973 status = -1;
974 while (buffer_pos < buffer_size)
975 {
976 /* Check for end-of-field or end-of-buffer */
977 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
978 {
979 field[field_size] = 0;
980 field_size++;
981 buffer_pos++;
982 status = 0;
983 break;
984 }
985 /* Handle escaped characters. */
986 else if (buffer[buffer_pos] == '\\')
987 {
988 if (buffer_pos >= (buffer_size - 1))
989 break;
990 buffer_pos++;
991 field[field_size] = buffer[buffer_pos];
992 field_size++;
993 buffer_pos++;
994 }
995 /* Normal operation */
996 else
997 {
998 field[field_size] = buffer[buffer_pos];
999 field_size++;
1000 buffer_pos++;
1001 }
1002 } /* while (buffer_pos < buffer_size) */
1004 if (status != 0)
1005 return (status);
1007 *buffer_ret = buffer + buffer_pos;
1008 *buffer_size_ret = buffer_size - buffer_pos;
1009 *field_ret = field;
1011 return (0);
1012 } /* }}} int buffer_get_field */
1014 /* if we're restricting writes to the base directory,
1015 * check whether the file falls within the dir
1016 * returns 1 if OK, otherwise 0
1017 */
1018 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1019 {
1020 assert(file != NULL);
1022 if (!config_write_base_only
1023 || sock == NULL /* journal replay */
1024 || config_base_dir == NULL)
1025 return 1;
1027 if (strstr(file, "../") != NULL) goto err;
1029 /* relative paths without "../" are ok */
1030 if (*file != '/') return 1;
1032 /* file must be of the format base + "/" + <1+ char filename> */
1033 if (strlen(file) < _config_base_dir_len + 2) goto err;
1034 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1035 if (*(file + _config_base_dir_len) != '/') goto err;
1037 return 1;
1039 err:
1040 if (sock != NULL && sock->fd >= 0)
1041 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1043 return 0;
1044 } /* }}} static int check_file_access */
1046 /* when using a base dir, convert relative paths to absolute paths.
1047 * if necessary, modifies the "filename" pointer to point
1048 * to the new path created in "tmp". "tmp" is provided
1049 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1050 *
1051 * this allows us to optimize for the expected case (absolute path)
1052 * with a no-op.
1053 */
1054 static void get_abs_path(char **filename, char *tmp)
1055 {
1056 assert(tmp != NULL);
1057 assert(filename != NULL && *filename != NULL);
1059 if (config_base_dir == NULL || **filename == '/')
1060 return;
1062 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1063 *filename = tmp;
1064 } /* }}} static int get_abs_path */
1066 static int flush_file (const char *filename) /* {{{ */
1067 {
1068 cache_item_t *ci;
1070 pthread_mutex_lock (&cache_lock);
1072 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1073 if (ci == NULL)
1074 {
1075 pthread_mutex_unlock (&cache_lock);
1076 return (ENOENT);
1077 }
1079 if (ci->values_num > 0)
1080 {
1081 /* Enqueue at head */
1082 enqueue_cache_item (ci, HEAD);
1083 pthread_cond_wait(&ci->flushed, &cache_lock);
1084 }
1086 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1087 * may have been purged during our cond_wait() */
1089 pthread_mutex_unlock(&cache_lock);
1091 return (0);
1092 } /* }}} int flush_file */
1094 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1095 {
1096 char *err = "Syntax error.\n";
1098 if (cmd && cmd->syntax)
1099 err = cmd->syntax;
1101 return send_response(sock, RESP_ERR, "Usage: %s", err);
1102 } /* }}} static int syntax_error() */
1104 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1105 {
1106 uint64_t copy_queue_length;
1107 uint64_t copy_updates_received;
1108 uint64_t copy_flush_received;
1109 uint64_t copy_updates_written;
1110 uint64_t copy_data_sets_written;
1111 uint64_t copy_journal_bytes;
1112 uint64_t copy_journal_rotate;
1114 uint64_t tree_nodes_number;
1115 uint64_t tree_depth;
1117 pthread_mutex_lock (&stats_lock);
1118 copy_queue_length = stats_queue_length;
1119 copy_updates_received = stats_updates_received;
1120 copy_flush_received = stats_flush_received;
1121 copy_updates_written = stats_updates_written;
1122 copy_data_sets_written = stats_data_sets_written;
1123 copy_journal_bytes = stats_journal_bytes;
1124 copy_journal_rotate = stats_journal_rotate;
1125 pthread_mutex_unlock (&stats_lock);
1127 pthread_mutex_lock (&cache_lock);
1128 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1129 tree_depth = (uint64_t) g_tree_height (cache_tree);
1130 pthread_mutex_unlock (&cache_lock);
1132 add_response_info(sock,
1133 "QueueLength: %"PRIu64"\n", copy_queue_length);
1134 add_response_info(sock,
1135 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1136 add_response_info(sock,
1137 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1138 add_response_info(sock,
1139 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1140 add_response_info(sock,
1141 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1142 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1143 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1144 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1145 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1147 send_response(sock, RESP_OK, "Statistics follow\n");
1149 return (0);
1150 } /* }}} int handle_request_stats */
1152 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1153 {
1154 char *file, file_tmp[PATH_MAX];
1155 int status;
1157 status = buffer_get_field (&buffer, &buffer_size, &file);
1158 if (status != 0)
1159 {
1160 return syntax_error(sock,cmd);
1161 }
1162 else
1163 {
1164 pthread_mutex_lock(&stats_lock);
1165 stats_flush_received++;
1166 pthread_mutex_unlock(&stats_lock);
1168 get_abs_path(&file, file_tmp);
1169 if (!check_file_access(file, sock)) return 0;
1171 status = flush_file (file);
1172 if (status == 0)
1173 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1174 else if (status == ENOENT)
1175 {
1176 /* no file in our tree; see whether it exists at all */
1177 struct stat statbuf;
1179 memset(&statbuf, 0, sizeof(statbuf));
1180 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1181 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1182 else
1183 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1184 }
1185 else if (status < 0)
1186 return send_response(sock, RESP_ERR, "Internal error.\n");
1187 else
1188 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1189 }
1191 /* NOTREACHED */
1192 assert(1==0);
1193 } /* }}} int handle_request_flush */
1195 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1196 {
1197 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1199 pthread_mutex_lock(&cache_lock);
1200 flush_old_values(-1);
1201 pthread_mutex_unlock(&cache_lock);
1203 return send_response(sock, RESP_OK, "Started flush.\n");
1204 } /* }}} static int handle_request_flushall */
1206 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1207 {
1208 int status;
1209 char *file, file_tmp[PATH_MAX];
1210 cache_item_t *ci;
1212 status = buffer_get_field(&buffer, &buffer_size, &file);
1213 if (status != 0)
1214 return syntax_error(sock,cmd);
1216 get_abs_path(&file, file_tmp);
1218 pthread_mutex_lock(&cache_lock);
1219 ci = g_tree_lookup(cache_tree, file);
1220 if (ci == NULL)
1221 {
1222 pthread_mutex_unlock(&cache_lock);
1223 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1224 }
1226 for (size_t i=0; i < ci->values_num; i++)
1227 add_response_info(sock, "%s\n", ci->values[i]);
1229 pthread_mutex_unlock(&cache_lock);
1230 return send_response(sock, RESP_OK, "updates pending\n");
1231 } /* }}} static int handle_request_pending */
1233 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1234 {
1235 int status;
1236 gboolean found;
1237 char *file, file_tmp[PATH_MAX];
1239 status = buffer_get_field(&buffer, &buffer_size, &file);
1240 if (status != 0)
1241 return syntax_error(sock,cmd);
1243 get_abs_path(&file, file_tmp);
1244 if (!check_file_access(file, sock)) return 0;
1246 pthread_mutex_lock(&cache_lock);
1247 found = g_tree_remove(cache_tree, file);
1248 pthread_mutex_unlock(&cache_lock);
1250 if (found == TRUE)
1251 {
1252 if (sock != NULL)
1253 journal_write("forget", file);
1255 return send_response(sock, RESP_OK, "Gone!\n");
1256 }
1257 else
1258 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1260 /* NOTREACHED */
1261 assert(1==0);
1262 } /* }}} static int handle_request_forget */
1264 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1265 {
1266 cache_item_t *ci;
1268 pthread_mutex_lock(&cache_lock);
1270 ci = cache_queue_head;
1271 while (ci != NULL)
1272 {
1273 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1274 ci = ci->next;
1275 }
1277 pthread_mutex_unlock(&cache_lock);
1279 return send_response(sock, RESP_OK, "in queue.\n");
1280 } /* }}} int handle_request_queue */
1282 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1283 {
1284 char *file, file_tmp[PATH_MAX];
1285 int values_num = 0;
1286 int status;
1287 char orig_buf[CMD_MAX];
1289 cache_item_t *ci;
1291 /* save it for the journal later */
1292 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1294 status = buffer_get_field (&buffer, &buffer_size, &file);
1295 if (status != 0)
1296 return syntax_error(sock,cmd);
1298 pthread_mutex_lock(&stats_lock);
1299 stats_updates_received++;
1300 pthread_mutex_unlock(&stats_lock);
1302 get_abs_path(&file, file_tmp);
1303 if (!check_file_access(file, sock)) return 0;
1305 pthread_mutex_lock (&cache_lock);
1306 ci = g_tree_lookup (cache_tree, file);
1308 if (ci == NULL) /* {{{ */
1309 {
1310 struct stat statbuf;
1311 cache_item_t *tmp;
1313 /* don't hold the lock while we setup; stat(2) might block */
1314 pthread_mutex_unlock(&cache_lock);
1316 memset (&statbuf, 0, sizeof (statbuf));
1317 status = stat (file, &statbuf);
1318 if (status != 0)
1319 {
1320 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1322 status = errno;
1323 if (status == ENOENT)
1324 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1325 else
1326 return send_response(sock, RESP_ERR,
1327 "stat failed with error %i.\n", status);
1328 }
1329 if (!S_ISREG (statbuf.st_mode))
1330 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1332 if (access(file, R_OK|W_OK) != 0)
1333 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1334 file, rrd_strerror(errno));
1336 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1337 if (ci == NULL)
1338 {
1339 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1341 return send_response(sock, RESP_ERR, "malloc failed.\n");
1342 }
1343 memset (ci, 0, sizeof (cache_item_t));
1345 ci->file = strdup (file);
1346 if (ci->file == NULL)
1347 {
1348 free (ci);
1349 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1351 return send_response(sock, RESP_ERR, "strdup failed.\n");
1352 }
1354 wipe_ci_values(ci, now);
1355 ci->flags = CI_FLAGS_IN_TREE;
1356 pthread_cond_init(&ci->flushed, NULL);
1358 pthread_mutex_lock(&cache_lock);
1360 /* another UPDATE might have added this entry in the meantime */
1361 tmp = g_tree_lookup (cache_tree, file);
1362 if (tmp == NULL)
1363 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1364 else
1365 {
1366 free_cache_item (ci);
1367 ci = tmp;
1368 }
1370 /* state may have changed while we were unlocked */
1371 if (state == SHUTDOWN)
1372 return -1;
1373 } /* }}} */
1374 assert (ci != NULL);
1376 /* don't re-write updates in replay mode */
1377 if (sock != NULL)
1378 journal_write("update", orig_buf);
1380 while (buffer_size > 0)
1381 {
1382 char *value;
1383 time_t stamp;
1384 char *eostamp;
1386 status = buffer_get_field (&buffer, &buffer_size, &value);
1387 if (status != 0)
1388 {
1389 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1390 break;
1391 }
1393 /* make sure update time is always moving forward */
1394 stamp = strtol(value, &eostamp, 10);
1395 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1396 {
1397 pthread_mutex_unlock(&cache_lock);
1398 return send_response(sock, RESP_ERR,
1399 "Cannot find timestamp in '%s'!\n", value);
1400 }
1401 else if (stamp <= ci->last_update_stamp)
1402 {
1403 pthread_mutex_unlock(&cache_lock);
1404 return send_response(sock, RESP_ERR,
1405 "illegal attempt to update using time %ld when last"
1406 " update time is %ld (minimum one second step)\n",
1407 stamp, ci->last_update_stamp);
1408 }
1409 else
1410 ci->last_update_stamp = stamp;
1412 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1413 {
1414 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1415 continue;
1416 }
1418 values_num++;
1419 }
1421 if (((now - ci->last_flush_time) >= config_write_interval)
1422 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1423 && (ci->values_num > 0))
1424 {
1425 enqueue_cache_item (ci, TAIL);
1426 }
1428 pthread_mutex_unlock (&cache_lock);
1430 if (values_num < 1)
1431 return send_response(sock, RESP_ERR, "No values updated.\n");
1432 else
1433 return send_response(sock, RESP_OK,
1434 "errors, enqueued %i value(s).\n", values_num);
1436 /* NOTREACHED */
1437 assert(1==0);
1439 } /* }}} int handle_request_update */
1441 /* we came across a "WROTE" entry during journal replay.
1442 * throw away any values that we have accumulated for this file
1443 */
1444 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1445 {
1446 cache_item_t *ci;
1447 const char *file = buffer;
1449 pthread_mutex_lock(&cache_lock);
1451 ci = g_tree_lookup(cache_tree, file);
1452 if (ci == NULL)
1453 {
1454 pthread_mutex_unlock(&cache_lock);
1455 return (0);
1456 }
1458 if (ci->values)
1459 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1461 wipe_ci_values(ci, now);
1462 remove_from_queue(ci);
1464 pthread_mutex_unlock(&cache_lock);
1465 return (0);
1466 } /* }}} int handle_request_wrote */
1468 /* start "BATCH" processing */
1469 static int batch_start (HANDLER_PROTO) /* {{{ */
1470 {
1471 int status;
1472 if (sock->batch_start)
1473 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1475 status = send_response(sock, RESP_OK,
1476 "Go ahead. End with dot '.' on its own line.\n");
1477 sock->batch_start = time(NULL);
1478 sock->batch_cmd = 0;
1480 return status;
1481 } /* }}} static int batch_start */
1483 /* finish "BATCH" processing and return results to the client */
1484 static int batch_done (HANDLER_PROTO) /* {{{ */
1485 {
1486 assert(sock->batch_start);
1487 sock->batch_start = 0;
1488 sock->batch_cmd = 0;
1489 return send_response(sock, RESP_OK, "errors\n");
1490 } /* }}} static int batch_done */
1492 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1493 {
1494 return -1;
1495 } /* }}} static int handle_request_quit */
1497 static command_t list_of_commands[] = { /* {{{ */
1498 {
1499 "UPDATE",
1500 handle_request_update,
1501 CMD_CONTEXT_ANY,
1502 "UPDATE <filename> <values> [<values> ...]\n"
1503 ,
1504 "Adds the given file to the internal cache if it is not yet known and\n"
1505 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1506 "for details.\n"
1507 "\n"
1508 "Each <values> has the following form:\n"
1509 " <values> = <time>:<value>[:<value>[...]]\n"
1510 "See the rrdupdate(1) manpage for details.\n"
1511 },
1512 {
1513 "WROTE",
1514 handle_request_wrote,
1515 CMD_CONTEXT_JOURNAL,
1516 NULL,
1517 NULL
1518 },
1519 {
1520 "FLUSH",
1521 handle_request_flush,
1522 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1523 "FLUSH <filename>\n"
1524 ,
1525 "Adds the given filename to the head of the update queue and returns\n"
1526 "after it has been dequeued.\n"
1527 },
1528 {
1529 "FLUSHALL",
1530 handle_request_flushall,
1531 CMD_CONTEXT_CLIENT,
1532 "FLUSHALL\n"
1533 ,
1534 "Triggers writing of all pending updates. Returns immediately.\n"
1535 },
1536 {
1537 "PENDING",
1538 handle_request_pending,
1539 CMD_CONTEXT_CLIENT,
1540 "PENDING <filename>\n"
1541 ,
1542 "Shows any 'pending' updates for a file, in order.\n"
1543 "The updates shown have not yet been written to the underlying RRD file.\n"
1544 },
1545 {
1546 "FORGET",
1547 handle_request_forget,
1548 CMD_CONTEXT_ANY,
1549 "FORGET <filename>\n"
1550 ,
1551 "Removes the file completely from the cache.\n"
1552 "Any pending updates for the file will be lost.\n"
1553 },
1554 {
1555 "QUEUE",
1556 handle_request_queue,
1557 CMD_CONTEXT_CLIENT,
1558 "QUEUE\n"
1559 ,
1560 "Shows all files in the output queue.\n"
1561 "The output is zero or more lines in the following format:\n"
1562 "(where <num_vals> is the number of values to be written)\n"
1563 "\n"
1564 "<num_vals> <filename>\n"
1565 },
1566 {
1567 "STATS",
1568 handle_request_stats,
1569 CMD_CONTEXT_CLIENT,
1570 "STATS\n"
1571 ,
1572 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1573 "a description of the values.\n"
1574 },
1575 {
1576 "HELP",
1577 handle_request_help,
1578 CMD_CONTEXT_CLIENT,
1579 "HELP [<command>]\n",
1580 NULL, /* special! */
1581 },
1582 {
1583 "BATCH",
1584 batch_start,
1585 CMD_CONTEXT_CLIENT,
1586 "BATCH\n"
1587 ,
1588 "The 'BATCH' command permits the client to initiate a bulk load\n"
1589 " of commands to rrdcached.\n"
1590 "\n"
1591 "Usage:\n"
1592 "\n"
1593 " client: BATCH\n"
1594 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1595 " client: command #1\n"
1596 " client: command #2\n"
1597 " client: ... and so on\n"
1598 " client: .\n"
1599 " server: 2 errors\n"
1600 " server: 7 message for command #7\n"
1601 " server: 9 message for command #9\n"
1602 "\n"
1603 "For more information, consult the rrdcached(1) documentation.\n"
1604 },
1605 {
1606 ".", /* BATCH terminator */
1607 batch_done,
1608 CMD_CONTEXT_BATCH,
1609 NULL,
1610 NULL
1611 },
1612 {
1613 "QUIT",
1614 handle_request_quit,
1615 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1616 "QUIT\n"
1617 ,
1618 "Disconnect from rrdcached.\n"
1619 }
1620 }; /* }}} command_t list_of_commands[] */
1621 static size_t list_of_commands_len = sizeof (list_of_commands)
1622 / sizeof (list_of_commands[0]);
1624 static command_t *find_command(char *cmd)
1625 {
1626 size_t i;
1628 for (i = 0; i < list_of_commands_len; i++)
1629 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1630 return (&list_of_commands[i]);
1631 return NULL;
1632 }
1634 /* We currently use the index in the `list_of_commands' array as a bit position
1635 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1636 * outside these functions so that switching to a more elegant storage method
1637 * is easily possible. */
1638 static ssize_t find_command_index (const char *cmd) /* {{{ */
1639 {
1640 size_t i;
1642 for (i = 0; i < list_of_commands_len; i++)
1643 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1644 return ((ssize_t) i);
1645 return (-1);
1646 } /* }}} ssize_t find_command_index */
1648 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1649 const char *cmd)
1650 {
1651 ssize_t i;
1653 if (cmd == NULL)
1654 return (-1);
1656 if ((strcasecmp ("QUIT", cmd) == 0)
1657 || (strcasecmp ("HELP", cmd) == 0))
1658 return (1);
1659 else if (strcmp (".", cmd) == 0)
1660 cmd = "BATCH";
1662 i = find_command_index (cmd);
1663 if (i < 0)
1664 return (-1);
1665 assert (i < 32);
1667 if ((sock->permissions & (1 << i)) != 0)
1668 return (1);
1669 return (0);
1670 } /* }}} int socket_permission_check */
1672 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1673 const char *cmd)
1674 {
1675 ssize_t i;
1677 i = find_command_index (cmd);
1678 if (i < 0)
1679 return (-1);
1680 assert (i < 32);
1682 sock->permissions |= (1 << i);
1683 return (0);
1684 } /* }}} int socket_permission_add */
1686 /* check whether commands are received in the expected context */
1687 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1688 {
1689 if (sock == NULL)
1690 return (cmd->context & CMD_CONTEXT_JOURNAL);
1691 else if (sock->batch_start)
1692 return (cmd->context & CMD_CONTEXT_BATCH);
1693 else
1694 return (cmd->context & CMD_CONTEXT_CLIENT);
1696 /* NOTREACHED */
1697 assert(1==0);
1698 }
1700 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1701 {
1702 int status;
1703 char *cmd_str;
1704 char *resp_txt;
1705 command_t *help = NULL;
1707 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1708 if (status == 0)
1709 help = find_command(cmd_str);
1711 if (help && (help->syntax || help->help))
1712 {
1713 char tmp[CMD_MAX];
1715 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1716 resp_txt = tmp;
1718 if (help->syntax)
1719 add_response_info(sock, "Usage: %s\n", help->syntax);
1721 if (help->help)
1722 add_response_info(sock, "%s\n", help->help);
1723 }
1724 else
1725 {
1726 size_t i;
1728 resp_txt = "Command overview\n";
1730 for (i = 0; i < list_of_commands_len; i++)
1731 {
1732 if (list_of_commands[i].syntax == NULL)
1733 continue;
1734 add_response_info (sock, "%s", list_of_commands[i].syntax);
1735 }
1736 }
1738 return send_response(sock, RESP_OK, resp_txt);
1739 } /* }}} int handle_request_help */
1741 /* if sock==NULL, we are in journal replay mode */
1742 static int handle_request (DISPATCH_PROTO) /* {{{ */
1743 {
1744 char *buffer_ptr = buffer;
1745 char *cmd_str = NULL;
1746 command_t *cmd = NULL;
1747 int status;
1749 assert (buffer[buffer_size - 1] == '\0');
1751 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1752 if (status != 0)
1753 {
1754 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1755 return (-1);
1756 }
1758 if (sock != NULL && sock->batch_start)
1759 sock->batch_cmd++;
1761 cmd = find_command(cmd_str);
1762 if (!cmd)
1763 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1765 if (!socket_permission_check (sock, cmd->cmd))
1766 return send_response(sock, RESP_ERR, "Permission denied.\n");
1768 if (!command_check_context(sock, cmd))
1769 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1771 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1772 } /* }}} int handle_request */
1774 static void journal_set_free (journal_set *js) /* {{{ */
1775 {
1776 if (js == NULL)
1777 return;
1779 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1781 free(js);
1782 } /* }}} journal_set_free */
1784 static void journal_set_remove (journal_set *js) /* {{{ */
1785 {
1786 if (js == NULL)
1787 return;
1789 for (uint i=0; i < js->files_num; i++)
1790 {
1791 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1792 unlink(js->files[i]);
1793 }
1794 } /* }}} journal_set_remove */
1796 /* close current journal file handle.
1797 * MUST hold journal_lock before calling */
1798 static void journal_close(void) /* {{{ */
1799 {
1800 if (journal_fh != NULL)
1801 {
1802 if (fclose(journal_fh) != 0)
1803 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1804 }
1806 journal_fh = NULL;
1807 journal_size = 0;
1808 } /* }}} journal_close */
1810 /* MUST hold journal_lock before calling */
1811 static void journal_new_file(void) /* {{{ */
1812 {
1813 struct timeval now;
1814 int new_fd;
1815 char new_file[PATH_MAX + 1];
1817 assert(journal_dir != NULL);
1818 assert(journal_cur != NULL);
1820 journal_close();
1822 gettimeofday(&now, NULL);
1823 /* this format assures that the files sort in strcmp() order */
1824 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1825 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1827 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1828 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1829 if (new_fd < 0)
1830 goto error;
1832 journal_fh = fdopen(new_fd, "a");
1833 if (journal_fh == NULL)
1834 goto error;
1836 journal_size = ftell(journal_fh);
1837 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1839 /* record the file in the journal set */
1840 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1842 return;
1844 error:
1845 RRDD_LOG(LOG_CRIT,
1846 "JOURNALING DISABLED: Error while trying to create %s : %s",
1847 new_file, rrd_strerror(errno));
1848 RRDD_LOG(LOG_CRIT,
1849 "JOURNALING DISABLED: All values will be flushed at shutdown");
1851 close(new_fd);
1852 config_flush_at_shutdown = 1;
1854 } /* }}} journal_new_file */
1856 /* MUST NOT hold journal_lock before calling this */
1857 static void journal_rotate(void) /* {{{ */
1858 {
1859 journal_set *old_js = NULL;
1861 if (journal_dir == NULL)
1862 return;
1864 RRDD_LOG(LOG_DEBUG, "rotating journals");
1866 pthread_mutex_lock(&stats_lock);
1867 ++stats_journal_rotate;
1868 pthread_mutex_unlock(&stats_lock);
1870 pthread_mutex_lock(&journal_lock);
1872 journal_close();
1874 /* rotate the journal sets */
1875 old_js = journal_old;
1876 journal_old = journal_cur;
1877 journal_cur = calloc(1, sizeof(journal_set));
1879 if (journal_cur != NULL)
1880 journal_new_file();
1881 else
1882 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1884 pthread_mutex_unlock(&journal_lock);
1886 journal_set_remove(old_js);
1887 journal_set_free (old_js);
1889 } /* }}} static void journal_rotate */
1891 /* MUST hold journal_lock when calling */
1892 static void journal_done(void) /* {{{ */
1893 {
1894 if (journal_cur == NULL)
1895 return;
1897 journal_close();
1899 if (config_flush_at_shutdown)
1900 {
1901 RRDD_LOG(LOG_INFO, "removing journals");
1902 journal_set_remove(journal_old);
1903 journal_set_remove(journal_cur);
1904 }
1905 else
1906 {
1907 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1908 "journals will be used at next startup");
1909 }
1911 journal_set_free(journal_cur);
1912 journal_set_free(journal_old);
1913 free(journal_dir);
1915 } /* }}} static void journal_done */
1917 static int journal_write(char *cmd, char *args) /* {{{ */
1918 {
1919 int chars;
1921 if (journal_fh == NULL)
1922 return 0;
1924 pthread_mutex_lock(&journal_lock);
1925 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1926 journal_size += chars;
1928 if (journal_size > JOURNAL_MAX)
1929 journal_new_file();
1931 pthread_mutex_unlock(&journal_lock);
1933 if (chars > 0)
1934 {
1935 pthread_mutex_lock(&stats_lock);
1936 stats_journal_bytes += chars;
1937 pthread_mutex_unlock(&stats_lock);
1938 }
1940 return chars;
1941 } /* }}} static int journal_write */
1943 static int journal_replay (const char *file) /* {{{ */
1944 {
1945 FILE *fh;
1946 int entry_cnt = 0;
1947 int fail_cnt = 0;
1948 uint64_t line = 0;
1949 char entry[CMD_MAX];
1950 time_t now;
1952 if (file == NULL) return 0;
1954 {
1955 char *reason = "unknown error";
1956 int status = 0;
1957 struct stat statbuf;
1959 memset(&statbuf, 0, sizeof(statbuf));
1960 if (stat(file, &statbuf) != 0)
1961 {
1962 reason = "stat error";
1963 status = errno;
1964 }
1965 else if (!S_ISREG(statbuf.st_mode))
1966 {
1967 reason = "not a regular file";
1968 status = EPERM;
1969 }
1970 if (statbuf.st_uid != daemon_uid)
1971 {
1972 reason = "not owned by daemon user";
1973 status = EACCES;
1974 }
1975 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1976 {
1977 reason = "must not be user/group writable";
1978 status = EACCES;
1979 }
1981 if (status != 0)
1982 {
1983 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1984 file, rrd_strerror(status), reason);
1985 return 0;
1986 }
1987 }
1989 fh = fopen(file, "r");
1990 if (fh == NULL)
1991 {
1992 if (errno != ENOENT)
1993 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1994 file, rrd_strerror(errno));
1995 return 0;
1996 }
1997 else
1998 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2000 now = time(NULL);
2002 while(!feof(fh))
2003 {
2004 size_t entry_len;
2006 ++line;
2007 if (fgets(entry, sizeof(entry), fh) == NULL)
2008 break;
2009 entry_len = strlen(entry);
2011 /* check \n termination in case journal writing crashed mid-line */
2012 if (entry_len == 0)
2013 continue;
2014 else if (entry[entry_len - 1] != '\n')
2015 {
2016 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2017 ++fail_cnt;
2018 continue;
2019 }
2021 entry[entry_len - 1] = '\0';
2023 if (handle_request(NULL, now, entry, entry_len) == 0)
2024 ++entry_cnt;
2025 else
2026 ++fail_cnt;
2027 }
2029 fclose(fh);
2031 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2032 entry_cnt, fail_cnt);
2034 return entry_cnt > 0 ? 1 : 0;
2035 } /* }}} static int journal_replay */
2037 static int journal_sort(const void *v1, const void *v2)
2038 {
2039 char **jn1 = (char **) v1;
2040 char **jn2 = (char **) v2;
2042 return strcmp(*jn1,*jn2);
2043 }
2045 static void journal_init(void) /* {{{ */
2046 {
2047 int had_journal = 0;
2048 DIR *dir;
2049 struct dirent *dent;
2050 char path[PATH_MAX+1];
2052 if (journal_dir == NULL) return;
2054 pthread_mutex_lock(&journal_lock);
2056 journal_cur = calloc(1, sizeof(journal_set));
2057 if (journal_cur == NULL)
2058 {
2059 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2060 return;
2061 }
2063 RRDD_LOG(LOG_INFO, "checking for journal files");
2065 /* Handle old journal files during transition. This gives them the
2066 * correct sort order. TODO: remove after first release
2067 */
2068 {
2069 char old_path[PATH_MAX+1];
2070 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2071 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2072 rename(old_path, path);
2074 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2075 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2076 rename(old_path, path);
2077 }
2079 dir = opendir(journal_dir);
2080 while ((dent = readdir(dir)) != NULL)
2081 {
2082 /* looks like a journal file? */
2083 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2084 continue;
2086 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2088 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2089 {
2090 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2091 dent->d_name);
2092 break;
2093 }
2094 }
2095 closedir(dir);
2097 qsort(journal_cur->files, journal_cur->files_num,
2098 sizeof(journal_cur->files[0]), journal_sort);
2100 for (uint i=0; i < journal_cur->files_num; i++)
2101 had_journal += journal_replay(journal_cur->files[i]);
2103 journal_new_file();
2105 /* it must have been a crash. start a flush */
2106 if (had_journal && config_flush_at_shutdown)
2107 flush_old_values(-1);
2109 pthread_mutex_unlock(&journal_lock);
2111 RRDD_LOG(LOG_INFO, "journal processing complete");
2113 } /* }}} static void journal_init */
2115 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2116 {
2117 assert(sock != NULL);
2119 free(sock->rbuf); sock->rbuf = NULL;
2120 free(sock->wbuf); sock->wbuf = NULL;
2121 free(sock);
2122 } /* }}} void free_listen_socket */
2124 static void close_connection(listen_socket_t *sock) /* {{{ */
2125 {
2126 if (sock->fd >= 0)
2127 {
2128 close(sock->fd);
2129 sock->fd = -1;
2130 }
2132 free_listen_socket(sock);
2134 } /* }}} void close_connection */
2136 static void *connection_thread_main (void *args) /* {{{ */
2137 {
2138 listen_socket_t *sock;
2139 int fd;
2141 sock = (listen_socket_t *) args;
2142 fd = sock->fd;
2144 /* init read buffers */
2145 sock->next_read = sock->next_cmd = 0;
2146 sock->rbuf = malloc(RBUF_SIZE);
2147 if (sock->rbuf == NULL)
2148 {
2149 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2150 close_connection(sock);
2151 return NULL;
2152 }
2154 pthread_mutex_lock (&connection_threads_lock);
2155 connection_threads_num++;
2156 pthread_mutex_unlock (&connection_threads_lock);
2158 while (state == RUNNING)
2159 {
2160 char *cmd;
2161 ssize_t cmd_len;
2162 ssize_t rbytes;
2163 time_t now;
2165 struct pollfd pollfd;
2166 int status;
2168 pollfd.fd = fd;
2169 pollfd.events = POLLIN | POLLPRI;
2170 pollfd.revents = 0;
2172 status = poll (&pollfd, 1, /* timeout = */ 500);
2173 if (state != RUNNING)
2174 break;
2175 else if (status == 0) /* timeout */
2176 continue;
2177 else if (status < 0) /* error */
2178 {
2179 status = errno;
2180 if (status != EINTR)
2181 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2182 continue;
2183 }
2185 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2186 break;
2187 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2188 {
2189 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2190 "poll(2) returned something unexpected: %#04hx",
2191 pollfd.revents);
2192 break;
2193 }
2195 rbytes = read(fd, sock->rbuf + sock->next_read,
2196 RBUF_SIZE - sock->next_read);
2197 if (rbytes < 0)
2198 {
2199 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2200 break;
2201 }
2202 else if (rbytes == 0)
2203 break; /* eof */
2205 sock->next_read += rbytes;
2207 if (sock->batch_start)
2208 now = sock->batch_start;
2209 else
2210 now = time(NULL);
2212 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2213 {
2214 status = handle_request (sock, now, cmd, cmd_len+1);
2215 if (status != 0)
2216 goto out_close;
2217 }
2218 }
2220 out_close:
2221 close_connection(sock);
2223 /* Remove this thread from the connection threads list */
2224 pthread_mutex_lock (&connection_threads_lock);
2225 connection_threads_num--;
2226 if (connection_threads_num <= 0)
2227 pthread_cond_broadcast(&connection_threads_done);
2228 pthread_mutex_unlock (&connection_threads_lock);
2230 return (NULL);
2231 } /* }}} void *connection_thread_main */
2233 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2234 {
2235 int fd;
2236 struct sockaddr_un sa;
2237 listen_socket_t *temp;
2238 int status;
2239 const char *path;
2241 path = sock->addr;
2242 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2243 path += strlen("unix:");
2245 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2246 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2247 if (temp == NULL)
2248 {
2249 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2250 return (-1);
2251 }
2252 listen_fds = temp;
2253 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2255 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2256 if (fd < 0)
2257 {
2258 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2259 rrd_strerror(errno));
2260 return (-1);
2261 }
2263 memset (&sa, 0, sizeof (sa));
2264 sa.sun_family = AF_UNIX;
2265 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2267 /* if we've gotten this far, we own the pid file. any daemon started
2268 * with the same args must not be alive. therefore, ensure that we can
2269 * create the socket...
2270 */
2271 unlink(path);
2273 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2274 if (status != 0)
2275 {
2276 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2277 path, rrd_strerror(errno));
2278 close (fd);
2279 return (-1);
2280 }
2282 status = listen (fd, /* backlog = */ 10);
2283 if (status != 0)
2284 {
2285 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2286 path, rrd_strerror(errno));
2287 close (fd);
2288 unlink (path);
2289 return (-1);
2290 }
2292 listen_fds[listen_fds_num].fd = fd;
2293 listen_fds[listen_fds_num].family = PF_UNIX;
2294 strncpy(listen_fds[listen_fds_num].addr, path,
2295 sizeof (listen_fds[listen_fds_num].addr) - 1);
2296 listen_fds_num++;
2298 return (0);
2299 } /* }}} int open_listen_socket_unix */
2301 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2302 {
2303 struct addrinfo ai_hints;
2304 struct addrinfo *ai_res;
2305 struct addrinfo *ai_ptr;
2306 char addr_copy[NI_MAXHOST];
2307 char *addr;
2308 char *port;
2309 int status;
2311 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2312 addr_copy[sizeof (addr_copy) - 1] = 0;
2313 addr = addr_copy;
2315 memset (&ai_hints, 0, sizeof (ai_hints));
2316 ai_hints.ai_flags = 0;
2317 #ifdef AI_ADDRCONFIG
2318 ai_hints.ai_flags |= AI_ADDRCONFIG;
2319 #endif
2320 ai_hints.ai_family = AF_UNSPEC;
2321 ai_hints.ai_socktype = SOCK_STREAM;
2323 port = NULL;
2324 if (*addr == '[') /* IPv6+port format */
2325 {
2326 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2327 addr++;
2329 port = strchr (addr, ']');
2330 if (port == NULL)
2331 {
2332 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2333 return (-1);
2334 }
2335 *port = 0;
2336 port++;
2338 if (*port == ':')
2339 port++;
2340 else if (*port == 0)
2341 port = NULL;
2342 else
2343 {
2344 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2345 return (-1);
2346 }
2347 } /* if (*addr = ']') */
2348 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2349 {
2350 port = rindex(addr, ':');
2351 if (port != NULL)
2352 {
2353 *port = 0;
2354 port++;
2355 }
2356 }
2357 ai_res = NULL;
2358 status = getaddrinfo (addr,
2359 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2360 &ai_hints, &ai_res);
2361 if (status != 0)
2362 {
2363 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2364 addr, gai_strerror (status));
2365 return (-1);
2366 }
2368 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2369 {
2370 int fd;
2371 listen_socket_t *temp;
2372 int one = 1;
2374 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2375 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2376 if (temp == NULL)
2377 {
2378 fprintf (stderr,
2379 "rrdcached: open_listen_socket_network: realloc failed.\n");
2380 continue;
2381 }
2382 listen_fds = temp;
2383 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2385 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2386 if (fd < 0)
2387 {
2388 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2389 rrd_strerror(errno));
2390 continue;
2391 }
2393 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2395 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2396 if (status != 0)
2397 {
2398 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2399 sock->addr, rrd_strerror(errno));
2400 close (fd);
2401 continue;
2402 }
2404 status = listen (fd, /* backlog = */ 10);
2405 if (status != 0)
2406 {
2407 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2408 sock->addr, rrd_strerror(errno));
2409 close (fd);
2410 freeaddrinfo(ai_res);
2411 return (-1);
2412 }
2414 listen_fds[listen_fds_num].fd = fd;
2415 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2416 listen_fds_num++;
2417 } /* for (ai_ptr) */
2419 freeaddrinfo(ai_res);
2420 return (0);
2421 } /* }}} static int open_listen_socket_network */
2423 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2424 {
2425 assert(sock != NULL);
2426 assert(sock->addr != NULL);
2428 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2429 || sock->addr[0] == '/')
2430 return (open_listen_socket_unix(sock));
2431 else
2432 return (open_listen_socket_network(sock));
2433 } /* }}} int open_listen_socket */
2435 static int close_listen_sockets (void) /* {{{ */
2436 {
2437 size_t i;
2439 for (i = 0; i < listen_fds_num; i++)
2440 {
2441 close (listen_fds[i].fd);
2443 if (listen_fds[i].family == PF_UNIX)
2444 unlink(listen_fds[i].addr);
2445 }
2447 free (listen_fds);
2448 listen_fds = NULL;
2449 listen_fds_num = 0;
2451 return (0);
2452 } /* }}} int close_listen_sockets */
2454 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2455 {
2456 struct pollfd *pollfds;
2457 int pollfds_num;
2458 int status;
2459 int i;
2461 if (listen_fds_num < 1)
2462 {
2463 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2464 return (NULL);
2465 }
2467 pollfds_num = listen_fds_num;
2468 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2469 if (pollfds == NULL)
2470 {
2471 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2472 return (NULL);
2473 }
2474 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2476 RRDD_LOG(LOG_INFO, "listening for connections");
2478 while (state == RUNNING)
2479 {
2480 for (i = 0; i < pollfds_num; i++)
2481 {
2482 pollfds[i].fd = listen_fds[i].fd;
2483 pollfds[i].events = POLLIN | POLLPRI;
2484 pollfds[i].revents = 0;
2485 }
2487 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2488 if (state != RUNNING)
2489 break;
2490 else if (status == 0) /* timeout */
2491 continue;
2492 else if (status < 0) /* error */
2493 {
2494 status = errno;
2495 if (status != EINTR)
2496 {
2497 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2498 }
2499 continue;
2500 }
2502 for (i = 0; i < pollfds_num; i++)
2503 {
2504 listen_socket_t *client_sock;
2505 struct sockaddr_storage client_sa;
2506 socklen_t client_sa_size;
2507 pthread_t tid;
2508 pthread_attr_t attr;
2510 if (pollfds[i].revents == 0)
2511 continue;
2513 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2514 {
2515 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2516 "poll(2) returned something unexpected for listen FD #%i.",
2517 pollfds[i].fd);
2518 continue;
2519 }
2521 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2522 if (client_sock == NULL)
2523 {
2524 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2525 continue;
2526 }
2527 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2529 client_sa_size = sizeof (client_sa);
2530 client_sock->fd = accept (pollfds[i].fd,
2531 (struct sockaddr *) &client_sa, &client_sa_size);
2532 if (client_sock->fd < 0)
2533 {
2534 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2535 free(client_sock);
2536 continue;
2537 }
2539 pthread_attr_init (&attr);
2540 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2542 status = pthread_create (&tid, &attr, connection_thread_main,
2543 client_sock);
2544 if (status != 0)
2545 {
2546 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2547 close_connection(client_sock);
2548 continue;
2549 }
2550 } /* for (pollfds_num) */
2551 } /* while (state == RUNNING) */
2553 RRDD_LOG(LOG_INFO, "starting shutdown");
2555 close_listen_sockets ();
2557 pthread_mutex_lock (&connection_threads_lock);
2558 while (connection_threads_num > 0)
2559 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2560 pthread_mutex_unlock (&connection_threads_lock);
2562 free(pollfds);
2564 return (NULL);
2565 } /* }}} void *listen_thread_main */
2567 static int daemonize (void) /* {{{ */
2568 {
2569 int pid_fd;
2570 char *base_dir;
2572 daemon_uid = geteuid();
2574 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2575 if (pid_fd < 0)
2576 pid_fd = check_pidfile();
2577 if (pid_fd < 0)
2578 return pid_fd;
2580 /* open all the listen sockets */
2581 if (config_listen_address_list_len > 0)
2582 {
2583 for (size_t i = 0; i < config_listen_address_list_len; i++)
2584 open_listen_socket (config_listen_address_list[i]);
2586 rrd_free_ptrs((void ***) &config_listen_address_list,
2587 &config_listen_address_list_len);
2588 }
2589 else
2590 {
2591 listen_socket_t sock;
2592 memset(&sock, 0, sizeof(sock));
2593 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2594 open_listen_socket (&sock);
2595 }
2597 if (listen_fds_num < 1)
2598 {
2599 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2600 goto error;
2601 }
2603 if (!stay_foreground)
2604 {
2605 pid_t child;
2607 child = fork ();
2608 if (child < 0)
2609 {
2610 fprintf (stderr, "daemonize: fork(2) failed.\n");
2611 goto error;
2612 }
2613 else if (child > 0)
2614 exit(0);
2616 /* Become session leader */
2617 setsid ();
2619 /* Open the first three file descriptors to /dev/null */
2620 close (2);
2621 close (1);
2622 close (0);
2624 open ("/dev/null", O_RDWR);
2625 if (dup(0) == -1 || dup(0) == -1){
2626 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2627 }
2628 } /* if (!stay_foreground) */
2630 /* Change into the /tmp directory. */
2631 base_dir = (config_base_dir != NULL)
2632 ? config_base_dir
2633 : "/tmp";
2635 if (chdir (base_dir) != 0)
2636 {
2637 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2638 goto error;
2639 }
2641 install_signal_handlers();
2643 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2644 RRDD_LOG(LOG_INFO, "starting up");
2646 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2647 (GDestroyNotify) free_cache_item);
2648 if (cache_tree == NULL)
2649 {
2650 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2651 goto error;
2652 }
2654 return write_pidfile (pid_fd);
2656 error:
2657 remove_pidfile();
2658 return -1;
2659 } /* }}} int daemonize */
2661 static int cleanup (void) /* {{{ */
2662 {
2663 pthread_cond_broadcast (&flush_cond);
2664 pthread_join (flush_thread, NULL);
2666 pthread_cond_broadcast (&queue_cond);
2667 for (int i = 0; i < config_queue_threads; i++)
2668 pthread_join (queue_threads[i], NULL);
2670 if (config_flush_at_shutdown)
2671 {
2672 assert(cache_queue_head == NULL);
2673 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2674 }
2676 free(queue_threads);
2677 free(config_base_dir);
2678 free(config_pid_file);
2680 pthread_mutex_lock(&cache_lock);
2681 g_tree_destroy(cache_tree);
2683 pthread_mutex_lock(&journal_lock);
2684 journal_done();
2686 RRDD_LOG(LOG_INFO, "goodbye");
2687 closelog ();
2689 remove_pidfile ();
2691 return (0);
2692 } /* }}} int cleanup */
2694 static int read_options (int argc, char **argv) /* {{{ */
2695 {
2696 int option;
2697 int status = 0;
2699 char **permissions = NULL;
2700 size_t permissions_len = 0;
2702 while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2703 {
2704 switch (option)
2705 {
2706 case 'g':
2707 stay_foreground=1;
2708 break;
2710 case 'l':
2711 {
2712 listen_socket_t *new;
2714 new = malloc(sizeof(listen_socket_t));
2715 if (new == NULL)
2716 {
2717 fprintf(stderr, "read_options: malloc failed.\n");
2718 return(2);
2719 }
2720 memset(new, 0, sizeof(listen_socket_t));
2722 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2724 /* Add permissions to the socket {{{ */
2725 if (permissions_len != 0)
2726 {
2727 size_t i;
2728 for (i = 0; i < permissions_len; i++)
2729 {
2730 status = socket_permission_add (new, permissions[i]);
2731 if (status != 0)
2732 {
2733 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2734 "socket failed. Most likely, this permission doesn't "
2735 "exist. Check your command line.\n", permissions[i]);
2736 status = 4;
2737 }
2738 }
2739 }
2740 else /* if (permissions_len == 0) */
2741 {
2742 /* Add permission for ALL commands to the socket. */
2743 size_t i;
2744 for (i = 0; i < list_of_commands_len; i++)
2745 {
2746 status = socket_permission_add (new, list_of_commands[i].cmd);
2747 if (status != 0)
2748 {
2749 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2750 "socket failed. This should never happen, ever! Sorry.\n",
2751 permissions[i]);
2752 status = 4;
2753 }
2754 }
2755 }
2756 /* }}} Done adding permissions. */
2758 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2759 &config_listen_address_list_len, new))
2760 {
2761 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2762 return (2);
2763 }
2764 }
2765 break;
2767 case 'P':
2768 {
2769 char *optcopy;
2770 char *saveptr;
2771 char *dummy;
2772 char *ptr;
2774 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2776 optcopy = strdup (optarg);
2777 dummy = optcopy;
2778 saveptr = NULL;
2779 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2780 {
2781 dummy = NULL;
2782 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2783 }
2785 free (optcopy);
2786 }
2787 break;
2789 case 'f':
2790 {
2791 int temp;
2793 temp = atoi (optarg);
2794 if (temp > 0)
2795 config_flush_interval = temp;
2796 else
2797 {
2798 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2799 status = 3;
2800 }
2801 }
2802 break;
2804 case 'w':
2805 {
2806 int temp;
2808 temp = atoi (optarg);
2809 if (temp > 0)
2810 config_write_interval = temp;
2811 else
2812 {
2813 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2814 status = 2;
2815 }
2816 }
2817 break;
2819 case 'z':
2820 {
2821 int temp;
2823 temp = atoi(optarg);
2824 if (temp > 0)
2825 config_write_jitter = temp;
2826 else
2827 {
2828 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2829 status = 2;
2830 }
2832 break;
2833 }
2835 case 't':
2836 {
2837 int threads;
2838 threads = atoi(optarg);
2839 if (threads >= 1)
2840 config_queue_threads = threads;
2841 else
2842 {
2843 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2844 return 1;
2845 }
2846 }
2847 break;
2849 case 'B':
2850 config_write_base_only = 1;
2851 break;
2853 case 'b':
2854 {
2855 size_t len;
2856 char base_realpath[PATH_MAX];
2858 if (config_base_dir != NULL)
2859 free (config_base_dir);
2860 config_base_dir = strdup (optarg);
2861 if (config_base_dir == NULL)
2862 {
2863 fprintf (stderr, "read_options: strdup failed.\n");
2864 return (3);
2865 }
2867 /* make sure that the base directory is not resolved via
2868 * symbolic links. this makes some performance-enhancing
2869 * assumptions possible (we don't have to resolve paths
2870 * that start with a "/")
2871 */
2872 if (realpath(config_base_dir, base_realpath) == NULL)
2873 {
2874 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2875 return 5;
2876 }
2877 else if (strncmp(config_base_dir,
2878 base_realpath, sizeof(base_realpath)) != 0)
2879 {
2880 fprintf(stderr,
2881 "Base directory (-b) resolved via file system links!\n"
2882 "Please consult rrdcached '-b' documentation!\n"
2883 "Consider specifying the real directory (%s)\n",
2884 base_realpath);
2885 return 5;
2886 }
2888 len = strlen (config_base_dir);
2889 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2890 {
2891 config_base_dir[len - 1] = 0;
2892 len--;
2893 }
2895 if (len < 1)
2896 {
2897 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2898 return (4);
2899 }
2901 _config_base_dir_len = len;
2902 }
2903 break;
2905 case 'p':
2906 {
2907 if (config_pid_file != NULL)
2908 free (config_pid_file);
2909 config_pid_file = strdup (optarg);
2910 if (config_pid_file == NULL)
2911 {
2912 fprintf (stderr, "read_options: strdup failed.\n");
2913 return (3);
2914 }
2915 }
2916 break;
2918 case 'F':
2919 config_flush_at_shutdown = 1;
2920 break;
2922 case 'j':
2923 {
2924 struct stat statbuf;
2925 const char *dir = journal_dir = strdup(optarg);
2927 status = stat(dir, &statbuf);
2928 if (status != 0)
2929 {
2930 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2931 return 6;
2932 }
2934 if (!S_ISDIR(statbuf.st_mode)
2935 || access(dir, R_OK|W_OK|X_OK) != 0)
2936 {
2937 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2938 errno ? rrd_strerror(errno) : "");
2939 return 6;
2940 }
2941 }
2942 break;
2944 case 'h':
2945 case '?':
2946 printf ("RRDCacheD %s\n"
2947 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
2948 "\n"
2949 "Usage: rrdcached [options]\n"
2950 "\n"
2951 "Valid options are:\n"
2952 " -l <address> Socket address to listen to.\n"
2953 " -P <perms> Sets the permissions to assign to all following "
2954 "sockets\n"
2955 " -w <seconds> Interval in which to write data.\n"
2956 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2957 " -t <threads> Number of write threads.\n"
2958 " -f <seconds> Interval in which to flush dead data.\n"
2959 " -p <file> Location of the PID-file.\n"
2960 " -b <dir> Base directory to change to.\n"
2961 " -B Restrict file access to paths within -b <dir>\n"
2962 " -g Do not fork and run in the foreground.\n"
2963 " -j <dir> Directory in which to create the journal files.\n"
2964 " -F Always flush all updates at shutdown\n"
2965 "\n"
2966 "For more information and a detailed description of all options "
2967 "please refer\n"
2968 "to the rrdcached(1) manual page.\n",
2969 VERSION);
2970 status = -1;
2971 break;
2972 } /* switch (option) */
2973 } /* while (getopt) */
2975 /* advise the user when values are not sane */
2976 if (config_flush_interval < 2 * config_write_interval)
2977 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2978 " 2x write interval (-w) !\n");
2979 if (config_write_jitter > config_write_interval)
2980 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2981 " write interval (-w) !\n");
2983 if (config_write_base_only && config_base_dir == NULL)
2984 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2985 " Consult the rrdcached documentation\n");
2987 if (journal_dir == NULL)
2988 config_flush_at_shutdown = 1;
2990 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2992 return (status);
2993 } /* }}} int read_options */
2995 int main (int argc, char **argv)
2996 {
2997 int status;
2999 status = read_options (argc, argv);
3000 if (status != 0)
3001 {
3002 if (status < 0)
3003 status = 0;
3004 return (status);
3005 }
3007 status = daemonize ();
3008 if (status != 0)
3009 {
3010 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3011 return (1);
3012 }
3014 journal_init();
3016 /* start the queue threads */
3017 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3018 if (queue_threads == NULL)
3019 {
3020 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3021 cleanup();
3022 return (1);
3023 }
3024 for (int i = 0; i < config_queue_threads; i++)
3025 {
3026 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3027 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3028 if (status != 0)
3029 {
3030 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3031 cleanup();
3032 return (1);
3033 }
3034 }
3036 /* start the flush thread */
3037 memset(&flush_thread, 0, sizeof(flush_thread));
3038 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3039 if (status != 0)
3040 {
3041 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3042 cleanup();
3043 return (1);
3044 }
3046 listen_thread_main (NULL);
3047 cleanup ();
3049 return (0);
3050 } /* int main */
3052 /*
3053 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3054 */