1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 # include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <dirent.h>
95 #include <fcntl.h>
96 #include <signal.h>
97 #include <sys/un.h>
98 #include <netdb.h>
99 #include <poll.h>
100 #include <syslog.h>
101 #include <pthread.h>
102 #include <errno.h>
103 #include <assert.h>
104 #include <sys/time.h>
105 #include <time.h>
107 #include <glib-2.0/glib.h>
108 /* }}} */
110 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
112 #ifndef __GNUC__
113 # define __attribute__(x) /**/
114 #endif
116 /*
117 * Types
118 */
119 typedef enum
120 {
121 PRIV_LOW,
122 PRIV_HIGH
123 } socket_privilege;
125 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127 struct listen_socket_s
128 {
129 int fd;
130 char addr[PATH_MAX + 1];
131 int family;
132 socket_privilege privilege;
134 /* state for BATCH processing */
135 time_t batch_start;
136 int batch_cmd;
138 /* buffered IO */
139 char *rbuf;
140 off_t next_cmd;
141 off_t next_read;
143 char *wbuf;
144 ssize_t wbuf_len;
145 };
146 typedef struct listen_socket_s listen_socket_t;
148 struct command;
149 /* note: guard against "unused" warnings in the handlers */
150 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
151 time_t now __attribute__((unused)),\
152 char *buffer __attribute__((unused)),\
153 size_t buffer_size __attribute__((unused))
155 #define HANDLER_PROTO struct command *cmd __attribute__((unused)),\
156 DISPATCH_PROTO
158 struct command {
159 char *cmd;
160 int (*handler)(HANDLER_PROTO);
161 socket_privilege min_priv;
163 char context; /* where we expect to see it */
164 #define CMD_CONTEXT_CLIENT (1<<0)
165 #define CMD_CONTEXT_BATCH (1<<1)
166 #define CMD_CONTEXT_JOURNAL (1<<2)
167 #define CMD_CONTEXT_ANY (0x7f)
169 char *syntax;
170 char *help;
171 };
173 struct cache_item_s;
174 typedef struct cache_item_s cache_item_t;
175 struct cache_item_s
176 {
177 char *file;
178 char **values;
179 size_t values_num;
180 time_t last_flush_time;
181 time_t last_update_stamp;
182 #define CI_FLAGS_IN_TREE (1<<0)
183 #define CI_FLAGS_IN_QUEUE (1<<1)
184 int flags;
185 pthread_cond_t flushed;
186 cache_item_t *prev;
187 cache_item_t *next;
188 };
190 struct callback_flush_data_s
191 {
192 time_t now;
193 time_t abs_timeout;
194 char **keys;
195 size_t keys_num;
196 };
197 typedef struct callback_flush_data_s callback_flush_data_t;
199 enum queue_side_e
200 {
201 HEAD,
202 TAIL
203 };
204 typedef enum queue_side_e queue_side_t;
206 /* describe a set of journal files */
207 typedef struct {
208 char **files;
209 size_t files_num;
210 } journal_set;
212 /* max length of socket command or response */
213 #define CMD_MAX 4096
214 #define RBUF_SIZE (CMD_MAX*2)
216 /*
217 * Variables
218 */
219 static int stay_foreground = 0;
220 static uid_t daemon_uid;
222 static listen_socket_t *listen_fds = NULL;
223 static size_t listen_fds_num = 0;
225 enum {
226 RUNNING, /* normal operation */
227 FLUSHING, /* flushing remaining values */
228 SHUTDOWN /* shutting down */
229 } state = RUNNING;
231 static pthread_t *queue_threads;
232 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
233 static int config_queue_threads = 4;
235 static pthread_t flush_thread;
236 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
238 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
239 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
240 static int connection_threads_num = 0;
242 /* Cache stuff */
243 static GTree *cache_tree = NULL;
244 static cache_item_t *cache_queue_head = NULL;
245 static cache_item_t *cache_queue_tail = NULL;
246 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
248 static int config_write_interval = 300;
249 static int config_write_jitter = 0;
250 static int config_flush_interval = 3600;
251 static int config_flush_at_shutdown = 0;
252 static char *config_pid_file = NULL;
253 static char *config_base_dir = NULL;
254 static size_t _config_base_dir_len = 0;
255 static int config_write_base_only = 0;
257 static listen_socket_t **config_listen_address_list = NULL;
258 static size_t config_listen_address_list_len = 0;
260 static uint64_t stats_queue_length = 0;
261 static uint64_t stats_updates_received = 0;
262 static uint64_t stats_flush_received = 0;
263 static uint64_t stats_updates_written = 0;
264 static uint64_t stats_data_sets_written = 0;
265 static uint64_t stats_journal_bytes = 0;
266 static uint64_t stats_journal_rotate = 0;
267 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
269 /* Journaled updates */
270 #define JOURNAL_BASE "rrd.journal"
271 static journal_set *journal_cur = NULL;
272 static journal_set *journal_old = NULL;
273 static char *journal_dir = NULL;
274 static FILE *journal_fh = NULL; /* current journal file handle */
275 static long journal_size = 0; /* current journal size */
276 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
277 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
278 static int journal_write(char *cmd, char *args);
279 static void journal_done(void);
280 static void journal_rotate(void);
282 /* prototypes for forward refernces */
283 static int handle_request_help (HANDLER_PROTO);
285 /*
286 * Functions
287 */
288 static void sig_common (const char *sig) /* {{{ */
289 {
290 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
291 state = FLUSHING;
292 pthread_cond_broadcast(&flush_cond);
293 pthread_cond_broadcast(&queue_cond);
294 } /* }}} void sig_common */
296 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
297 {
298 sig_common("INT");
299 } /* }}} void sig_int_handler */
301 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
302 {
303 sig_common("TERM");
304 } /* }}} void sig_term_handler */
306 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
307 {
308 config_flush_at_shutdown = 1;
309 sig_common("USR1");
310 } /* }}} void sig_usr1_handler */
312 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
313 {
314 config_flush_at_shutdown = 0;
315 sig_common("USR2");
316 } /* }}} void sig_usr2_handler */
318 static void install_signal_handlers(void) /* {{{ */
319 {
320 /* These structures are static, because `sigaction' behaves weird if the are
321 * overwritten.. */
322 static struct sigaction sa_int;
323 static struct sigaction sa_term;
324 static struct sigaction sa_pipe;
325 static struct sigaction sa_usr1;
326 static struct sigaction sa_usr2;
328 /* Install signal handlers */
329 memset (&sa_int, 0, sizeof (sa_int));
330 sa_int.sa_handler = sig_int_handler;
331 sigaction (SIGINT, &sa_int, NULL);
333 memset (&sa_term, 0, sizeof (sa_term));
334 sa_term.sa_handler = sig_term_handler;
335 sigaction (SIGTERM, &sa_term, NULL);
337 memset (&sa_pipe, 0, sizeof (sa_pipe));
338 sa_pipe.sa_handler = SIG_IGN;
339 sigaction (SIGPIPE, &sa_pipe, NULL);
341 memset (&sa_pipe, 0, sizeof (sa_usr1));
342 sa_usr1.sa_handler = sig_usr1_handler;
343 sigaction (SIGUSR1, &sa_usr1, NULL);
345 memset (&sa_usr2, 0, sizeof (sa_usr2));
346 sa_usr2.sa_handler = sig_usr2_handler;
347 sigaction (SIGUSR2, &sa_usr2, NULL);
349 } /* }}} void install_signal_handlers */
351 static int open_pidfile(char *action, int oflag) /* {{{ */
352 {
353 int fd;
354 char *file;
356 file = (config_pid_file != NULL)
357 ? config_pid_file
358 : LOCALSTATEDIR "/run/rrdcached.pid";
360 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
361 if (fd < 0)
362 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
363 action, file, rrd_strerror(errno));
365 return(fd);
366 } /* }}} static int open_pidfile */
368 /* check existing pid file to see whether a daemon is running */
369 static int check_pidfile(void)
370 {
371 int pid_fd;
372 pid_t pid;
373 char pid_str[16];
375 pid_fd = open_pidfile("open", O_RDWR);
376 if (pid_fd < 0)
377 return pid_fd;
379 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
380 return -1;
382 pid = atoi(pid_str);
383 if (pid <= 0)
384 return -1;
386 /* another running process that we can signal COULD be
387 * a competing rrdcached */
388 if (pid != getpid() && kill(pid, 0) == 0)
389 {
390 fprintf(stderr,
391 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
392 close(pid_fd);
393 return -1;
394 }
396 lseek(pid_fd, 0, SEEK_SET);
397 if (ftruncate(pid_fd, 0) == -1)
398 {
399 fprintf(stderr,
400 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
401 close(pid_fd);
402 return -1;
403 }
405 fprintf(stderr,
406 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
407 "rrdcached: starting normally.\n", pid);
409 return pid_fd;
410 } /* }}} static int check_pidfile */
412 static int write_pidfile (int fd) /* {{{ */
413 {
414 pid_t pid;
415 FILE *fh;
417 pid = getpid ();
419 fh = fdopen (fd, "w");
420 if (fh == NULL)
421 {
422 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
423 close(fd);
424 return (-1);
425 }
427 fprintf (fh, "%i\n", (int) pid);
428 fclose (fh);
430 return (0);
431 } /* }}} int write_pidfile */
433 static int remove_pidfile (void) /* {{{ */
434 {
435 char *file;
436 int status;
438 file = (config_pid_file != NULL)
439 ? config_pid_file
440 : LOCALSTATEDIR "/run/rrdcached.pid";
442 status = unlink (file);
443 if (status == 0)
444 return (0);
445 return (errno);
446 } /* }}} int remove_pidfile */
448 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
449 {
450 char *eol;
452 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
453 sock->next_read - sock->next_cmd);
455 if (eol == NULL)
456 {
457 /* no commands left, move remainder back to front of rbuf */
458 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
459 sock->next_read - sock->next_cmd);
460 sock->next_read -= sock->next_cmd;
461 sock->next_cmd = 0;
462 *len = 0;
463 return NULL;
464 }
465 else
466 {
467 char *cmd = sock->rbuf + sock->next_cmd;
468 *eol = '\0';
470 sock->next_cmd = eol - sock->rbuf + 1;
472 if (eol > sock->rbuf && *(eol-1) == '\r')
473 *(--eol) = '\0'; /* handle "\r\n" EOL */
475 *len = eol - cmd;
477 return cmd;
478 }
480 /* NOTREACHED */
481 assert(1==0);
482 }
484 /* add the characters directly to the write buffer */
485 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
486 {
487 char *new_buf;
489 assert(sock != NULL);
491 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
492 if (new_buf == NULL)
493 {
494 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
495 return -1;
496 }
498 strncpy(new_buf + sock->wbuf_len, str, len + 1);
500 sock->wbuf = new_buf;
501 sock->wbuf_len += len;
503 return 0;
504 } /* }}} static int add_to_wbuf */
506 /* add the text to the "extra" info that's sent after the status line */
507 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
508 {
509 va_list argp;
510 char buffer[CMD_MAX];
511 int len;
513 if (sock == NULL) return 0; /* journal replay mode */
514 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
516 va_start(argp, fmt);
517 #ifdef HAVE_VSNPRINTF
518 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
519 #else
520 len = vsprintf(buffer, fmt, argp);
521 #endif
522 va_end(argp);
523 if (len < 0)
524 {
525 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
526 return -1;
527 }
529 return add_to_wbuf(sock, buffer, len);
530 } /* }}} static int add_response_info */
532 static int count_lines(char *str) /* {{{ */
533 {
534 int lines = 0;
536 if (str != NULL)
537 {
538 while ((str = strchr(str, '\n')) != NULL)
539 {
540 ++lines;
541 ++str;
542 }
543 }
545 return lines;
546 } /* }}} static int count_lines */
548 /* send the response back to the user.
549 * returns 0 on success, -1 on error
550 * write buffer is always zeroed after this call */
551 static int send_response (listen_socket_t *sock, response_code rc,
552 char *fmt, ...) /* {{{ */
553 {
554 va_list argp;
555 char buffer[CMD_MAX];
556 int lines;
557 ssize_t wrote;
558 int rclen, len;
560 if (sock == NULL) return rc; /* journal replay mode */
562 if (sock->batch_start)
563 {
564 if (rc == RESP_OK)
565 return rc; /* no response on success during BATCH */
566 lines = sock->batch_cmd;
567 }
568 else if (rc == RESP_OK)
569 lines = count_lines(sock->wbuf);
570 else
571 lines = -1;
573 rclen = sprintf(buffer, "%d ", lines);
574 va_start(argp, fmt);
575 #ifdef HAVE_VSNPRINTF
576 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
577 #else
578 len = vsprintf(buffer+rclen, fmt, argp);
579 #endif
580 va_end(argp);
581 if (len < 0)
582 return -1;
584 len += rclen;
586 /* append the result to the wbuf, don't write to the user */
587 if (sock->batch_start)
588 return add_to_wbuf(sock, buffer, len);
590 /* first write must be complete */
591 if (len != write(sock->fd, buffer, len))
592 {
593 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
594 return -1;
595 }
597 if (sock->wbuf != NULL && rc == RESP_OK)
598 {
599 wrote = 0;
600 while (wrote < sock->wbuf_len)
601 {
602 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
603 if (wb <= 0)
604 {
605 RRDD_LOG(LOG_INFO, "send_response: could not write results");
606 return -1;
607 }
608 wrote += wb;
609 }
610 }
612 free(sock->wbuf); sock->wbuf = NULL;
613 sock->wbuf_len = 0;
615 return 0;
616 } /* }}} */
618 static void wipe_ci_values(cache_item_t *ci, time_t when)
619 {
620 ci->values = NULL;
621 ci->values_num = 0;
623 ci->last_flush_time = when;
624 if (config_write_jitter > 0)
625 ci->last_flush_time += (rrd_random() % config_write_jitter);
626 }
628 /* remove_from_queue
629 * remove a "cache_item_t" item from the queue.
630 * must hold 'cache_lock' when calling this
631 */
632 static void remove_from_queue(cache_item_t *ci) /* {{{ */
633 {
634 if (ci == NULL) return;
635 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
637 if (ci->prev == NULL)
638 cache_queue_head = ci->next; /* reset head */
639 else
640 ci->prev->next = ci->next;
642 if (ci->next == NULL)
643 cache_queue_tail = ci->prev; /* reset the tail */
644 else
645 ci->next->prev = ci->prev;
647 ci->next = ci->prev = NULL;
648 ci->flags &= ~CI_FLAGS_IN_QUEUE;
650 pthread_mutex_lock (&stats_lock);
651 assert (stats_queue_length > 0);
652 stats_queue_length--;
653 pthread_mutex_unlock (&stats_lock);
655 } /* }}} static void remove_from_queue */
657 /* free the resources associated with the cache_item_t
658 * must hold cache_lock when calling this function
659 */
660 static void *free_cache_item(cache_item_t *ci) /* {{{ */
661 {
662 if (ci == NULL) return NULL;
664 remove_from_queue(ci);
666 for (size_t i=0; i < ci->values_num; i++)
667 free(ci->values[i]);
669 free (ci->values);
670 free (ci->file);
672 /* in case anyone is waiting */
673 pthread_cond_broadcast(&ci->flushed);
674 pthread_cond_destroy(&ci->flushed);
676 free (ci);
678 return NULL;
679 } /* }}} static void *free_cache_item */
681 /*
682 * enqueue_cache_item:
683 * `cache_lock' must be acquired before calling this function!
684 */
685 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
686 queue_side_t side)
687 {
688 if (ci == NULL)
689 return (-1);
691 if (ci->values_num == 0)
692 return (0);
694 if (side == HEAD)
695 {
696 if (cache_queue_head == ci)
697 return 0;
699 /* remove if further down in queue */
700 remove_from_queue(ci);
702 ci->prev = NULL;
703 ci->next = cache_queue_head;
704 if (ci->next != NULL)
705 ci->next->prev = ci;
706 cache_queue_head = ci;
708 if (cache_queue_tail == NULL)
709 cache_queue_tail = cache_queue_head;
710 }
711 else /* (side == TAIL) */
712 {
713 /* We don't move values back in the list.. */
714 if (ci->flags & CI_FLAGS_IN_QUEUE)
715 return (0);
717 assert (ci->next == NULL);
718 assert (ci->prev == NULL);
720 ci->prev = cache_queue_tail;
722 if (cache_queue_tail == NULL)
723 cache_queue_head = ci;
724 else
725 cache_queue_tail->next = ci;
727 cache_queue_tail = ci;
728 }
730 ci->flags |= CI_FLAGS_IN_QUEUE;
732 pthread_cond_signal(&queue_cond);
733 pthread_mutex_lock (&stats_lock);
734 stats_queue_length++;
735 pthread_mutex_unlock (&stats_lock);
737 return (0);
738 } /* }}} int enqueue_cache_item */
740 /*
741 * tree_callback_flush:
742 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
743 * while this is in progress.
744 */
745 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
746 gpointer data)
747 {
748 cache_item_t *ci;
749 callback_flush_data_t *cfd;
751 ci = (cache_item_t *) value;
752 cfd = (callback_flush_data_t *) data;
754 if (ci->flags & CI_FLAGS_IN_QUEUE)
755 return FALSE;
757 if (ci->values_num > 0
758 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
759 {
760 enqueue_cache_item (ci, TAIL);
761 }
762 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
763 && (ci->values_num <= 0))
764 {
765 assert ((char *) key == ci->file);
766 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
767 {
768 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
769 return (FALSE);
770 }
771 }
773 return (FALSE);
774 } /* }}} gboolean tree_callback_flush */
776 static int flush_old_values (int max_age)
777 {
778 callback_flush_data_t cfd;
779 size_t k;
781 memset (&cfd, 0, sizeof (cfd));
782 /* Pass the current time as user data so that we don't need to call
783 * `time' for each node. */
784 cfd.now = time (NULL);
785 cfd.keys = NULL;
786 cfd.keys_num = 0;
788 if (max_age > 0)
789 cfd.abs_timeout = cfd.now - max_age;
790 else
791 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
793 /* `tree_callback_flush' will return the keys of all values that haven't
794 * been touched in the last `config_flush_interval' seconds in `cfd'.
795 * The char*'s in this array point to the same memory as ci->file, so we
796 * don't need to free them separately. */
797 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
799 for (k = 0; k < cfd.keys_num; k++)
800 {
801 /* should never fail, since we have held the cache_lock
802 * the entire time */
803 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
804 }
806 if (cfd.keys != NULL)
807 {
808 free (cfd.keys);
809 cfd.keys = NULL;
810 }
812 return (0);
813 } /* int flush_old_values */
815 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
816 {
817 struct timeval now;
818 struct timespec next_flush;
819 int status;
821 gettimeofday (&now, NULL);
822 next_flush.tv_sec = now.tv_sec + config_flush_interval;
823 next_flush.tv_nsec = 1000 * now.tv_usec;
825 pthread_mutex_lock(&cache_lock);
827 while (state == RUNNING)
828 {
829 gettimeofday (&now, NULL);
830 if ((now.tv_sec > next_flush.tv_sec)
831 || ((now.tv_sec == next_flush.tv_sec)
832 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
833 {
834 RRDD_LOG(LOG_DEBUG, "flushing old values");
836 /* Determine the time of the next cache flush. */
837 next_flush.tv_sec = now.tv_sec + config_flush_interval;
839 /* Flush all values that haven't been written in the last
840 * `config_write_interval' seconds. */
841 flush_old_values (config_write_interval);
843 /* unlock the cache while we rotate so we don't block incoming
844 * updates if the fsync() blocks on disk I/O */
845 pthread_mutex_unlock(&cache_lock);
846 journal_rotate();
847 pthread_mutex_lock(&cache_lock);
848 }
850 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
851 if (status != 0 && status != ETIMEDOUT)
852 {
853 RRDD_LOG (LOG_ERR, "flush_thread_main: "
854 "pthread_cond_timedwait returned %i.", status);
855 }
856 }
858 if (config_flush_at_shutdown)
859 flush_old_values (-1); /* flush everything */
861 state = SHUTDOWN;
863 pthread_mutex_unlock(&cache_lock);
865 return NULL;
866 } /* void *flush_thread_main */
868 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
869 {
870 pthread_mutex_lock (&cache_lock);
872 while (state != SHUTDOWN
873 || (cache_queue_head != NULL && config_flush_at_shutdown))
874 {
875 cache_item_t *ci;
876 char *file;
877 char **values;
878 size_t values_num;
879 int status;
881 /* Now, check if there's something to store away. If not, wait until
882 * something comes in. */
883 if (cache_queue_head == NULL)
884 {
885 status = pthread_cond_wait (&queue_cond, &cache_lock);
886 if ((status != 0) && (status != ETIMEDOUT))
887 {
888 RRDD_LOG (LOG_ERR, "queue_thread_main: "
889 "pthread_cond_wait returned %i.", status);
890 }
891 }
893 /* Check if a value has arrived. This may be NULL if we timed out or there
894 * was an interrupt such as a signal. */
895 if (cache_queue_head == NULL)
896 continue;
898 ci = cache_queue_head;
900 /* copy the relevant parts */
901 file = strdup (ci->file);
902 if (file == NULL)
903 {
904 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
905 continue;
906 }
908 assert(ci->values != NULL);
909 assert(ci->values_num > 0);
911 values = ci->values;
912 values_num = ci->values_num;
914 wipe_ci_values(ci, time(NULL));
915 remove_from_queue(ci);
917 pthread_mutex_unlock (&cache_lock);
919 rrd_clear_error ();
920 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
921 if (status != 0)
922 {
923 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
924 "rrd_update_r (%s) failed with status %i. (%s)",
925 file, status, rrd_get_error());
926 }
928 journal_write("wrote", file);
930 /* Search again in the tree. It's possible someone issued a "FORGET"
931 * while we were writing the update values. */
932 pthread_mutex_lock(&cache_lock);
933 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
934 if (ci)
935 pthread_cond_broadcast(&ci->flushed);
936 pthread_mutex_unlock(&cache_lock);
938 if (status == 0)
939 {
940 pthread_mutex_lock (&stats_lock);
941 stats_updates_written++;
942 stats_data_sets_written += values_num;
943 pthread_mutex_unlock (&stats_lock);
944 }
946 rrd_free_ptrs((void ***) &values, &values_num);
947 free(file);
949 pthread_mutex_lock (&cache_lock);
950 }
951 pthread_mutex_unlock (&cache_lock);
953 return (NULL);
954 } /* }}} void *queue_thread_main */
956 static int buffer_get_field (char **buffer_ret, /* {{{ */
957 size_t *buffer_size_ret, char **field_ret)
958 {
959 char *buffer;
960 size_t buffer_pos;
961 size_t buffer_size;
962 char *field;
963 size_t field_size;
964 int status;
966 buffer = *buffer_ret;
967 buffer_pos = 0;
968 buffer_size = *buffer_size_ret;
969 field = *buffer_ret;
970 field_size = 0;
972 if (buffer_size <= 0)
973 return (-1);
975 /* This is ensured by `handle_request'. */
976 assert (buffer[buffer_size - 1] == '\0');
978 status = -1;
979 while (buffer_pos < buffer_size)
980 {
981 /* Check for end-of-field or end-of-buffer */
982 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
983 {
984 field[field_size] = 0;
985 field_size++;
986 buffer_pos++;
987 status = 0;
988 break;
989 }
990 /* Handle escaped characters. */
991 else if (buffer[buffer_pos] == '\\')
992 {
993 if (buffer_pos >= (buffer_size - 1))
994 break;
995 buffer_pos++;
996 field[field_size] = buffer[buffer_pos];
997 field_size++;
998 buffer_pos++;
999 }
1000 /* Normal operation */
1001 else
1002 {
1003 field[field_size] = buffer[buffer_pos];
1004 field_size++;
1005 buffer_pos++;
1006 }
1007 } /* while (buffer_pos < buffer_size) */
1009 if (status != 0)
1010 return (status);
1012 *buffer_ret = buffer + buffer_pos;
1013 *buffer_size_ret = buffer_size - buffer_pos;
1014 *field_ret = field;
1016 return (0);
1017 } /* }}} int buffer_get_field */
1019 /* if we're restricting writes to the base directory,
1020 * check whether the file falls within the dir
1021 * returns 1 if OK, otherwise 0
1022 */
1023 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1024 {
1025 assert(file != NULL);
1027 if (!config_write_base_only
1028 || sock == NULL /* journal replay */
1029 || config_base_dir == NULL)
1030 return 1;
1032 if (strstr(file, "../") != NULL) goto err;
1034 /* relative paths without "../" are ok */
1035 if (*file != '/') return 1;
1037 /* file must be of the format base + "/" + <1+ char filename> */
1038 if (strlen(file) < _config_base_dir_len + 2) goto err;
1039 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1040 if (*(file + _config_base_dir_len) != '/') goto err;
1042 return 1;
1044 err:
1045 if (sock != NULL && sock->fd >= 0)
1046 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1048 return 0;
1049 } /* }}} static int check_file_access */
1051 /* when using a base dir, convert relative paths to absolute paths.
1052 * if necessary, modifies the "filename" pointer to point
1053 * to the new path created in "tmp". "tmp" is provided
1054 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1055 *
1056 * this allows us to optimize for the expected case (absolute path)
1057 * with a no-op.
1058 */
1059 static void get_abs_path(char **filename, char *tmp)
1060 {
1061 assert(tmp != NULL);
1062 assert(filename != NULL && *filename != NULL);
1064 if (config_base_dir == NULL || **filename == '/')
1065 return;
1067 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1068 *filename = tmp;
1069 } /* }}} static int get_abs_path */
1071 /* returns 1 if we have the required privilege level,
1072 * otherwise issue an error to the user on sock */
1073 static int has_privilege (listen_socket_t *sock, /* {{{ */
1074 socket_privilege priv)
1075 {
1076 if (sock == NULL) /* journal replay */
1077 return 1;
1079 if (sock->privilege >= priv)
1080 return 1;
1082 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1083 } /* }}} static int has_privilege */
1085 static int flush_file (const char *filename) /* {{{ */
1086 {
1087 cache_item_t *ci;
1089 pthread_mutex_lock (&cache_lock);
1091 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1092 if (ci == NULL)
1093 {
1094 pthread_mutex_unlock (&cache_lock);
1095 return (ENOENT);
1096 }
1098 if (ci->values_num > 0)
1099 {
1100 /* Enqueue at head */
1101 enqueue_cache_item (ci, HEAD);
1102 pthread_cond_wait(&ci->flushed, &cache_lock);
1103 }
1105 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1106 * may have been purged during our cond_wait() */
1108 pthread_mutex_unlock(&cache_lock);
1110 return (0);
1111 } /* }}} int flush_file */
1113 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1114 {
1115 char *err = "Syntax error.\n";
1117 if (cmd && cmd->syntax)
1118 err = cmd->syntax;
1120 return send_response(sock, RESP_ERR, "Usage: %s", err);
1121 } /* }}} static int syntax_error() */
1123 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1124 {
1125 uint64_t copy_queue_length;
1126 uint64_t copy_updates_received;
1127 uint64_t copy_flush_received;
1128 uint64_t copy_updates_written;
1129 uint64_t copy_data_sets_written;
1130 uint64_t copy_journal_bytes;
1131 uint64_t copy_journal_rotate;
1133 uint64_t tree_nodes_number;
1134 uint64_t tree_depth;
1136 pthread_mutex_lock (&stats_lock);
1137 copy_queue_length = stats_queue_length;
1138 copy_updates_received = stats_updates_received;
1139 copy_flush_received = stats_flush_received;
1140 copy_updates_written = stats_updates_written;
1141 copy_data_sets_written = stats_data_sets_written;
1142 copy_journal_bytes = stats_journal_bytes;
1143 copy_journal_rotate = stats_journal_rotate;
1144 pthread_mutex_unlock (&stats_lock);
1146 pthread_mutex_lock (&cache_lock);
1147 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1148 tree_depth = (uint64_t) g_tree_height (cache_tree);
1149 pthread_mutex_unlock (&cache_lock);
1151 add_response_info(sock,
1152 "QueueLength: %"PRIu64"\n", copy_queue_length);
1153 add_response_info(sock,
1154 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1155 add_response_info(sock,
1156 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1157 add_response_info(sock,
1158 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1159 add_response_info(sock,
1160 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1161 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1162 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1163 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1164 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1166 send_response(sock, RESP_OK, "Statistics follow\n");
1168 return (0);
1169 } /* }}} int handle_request_stats */
1171 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1172 {
1173 char *file, file_tmp[PATH_MAX];
1174 int status;
1176 status = buffer_get_field (&buffer, &buffer_size, &file);
1177 if (status != 0)
1178 {
1179 return syntax_error(sock,cmd);
1180 }
1181 else
1182 {
1183 pthread_mutex_lock(&stats_lock);
1184 stats_flush_received++;
1185 pthread_mutex_unlock(&stats_lock);
1187 get_abs_path(&file, file_tmp);
1188 if (!check_file_access(file, sock)) return 0;
1190 status = flush_file (file);
1191 if (status == 0)
1192 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1193 else if (status == ENOENT)
1194 {
1195 /* no file in our tree; see whether it exists at all */
1196 struct stat statbuf;
1198 memset(&statbuf, 0, sizeof(statbuf));
1199 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1200 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1201 else
1202 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1203 }
1204 else if (status < 0)
1205 return send_response(sock, RESP_ERR, "Internal error.\n");
1206 else
1207 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1208 }
1210 /* NOTREACHED */
1211 assert(1==0);
1212 } /* }}} int handle_request_flush */
1214 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1215 {
1216 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1218 pthread_mutex_lock(&cache_lock);
1219 flush_old_values(-1);
1220 pthread_mutex_unlock(&cache_lock);
1222 return send_response(sock, RESP_OK, "Started flush.\n");
1223 } /* }}} static int handle_request_flushall */
1225 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1226 {
1227 int status;
1228 char *file, file_tmp[PATH_MAX];
1229 cache_item_t *ci;
1231 status = buffer_get_field(&buffer, &buffer_size, &file);
1232 if (status != 0)
1233 return syntax_error(sock,cmd);
1235 get_abs_path(&file, file_tmp);
1237 pthread_mutex_lock(&cache_lock);
1238 ci = g_tree_lookup(cache_tree, file);
1239 if (ci == NULL)
1240 {
1241 pthread_mutex_unlock(&cache_lock);
1242 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1243 }
1245 for (size_t i=0; i < ci->values_num; i++)
1246 add_response_info(sock, "%s\n", ci->values[i]);
1248 pthread_mutex_unlock(&cache_lock);
1249 return send_response(sock, RESP_OK, "updates pending\n");
1250 } /* }}} static int handle_request_pending */
1252 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1253 {
1254 int status;
1255 gboolean found;
1256 char *file, file_tmp[PATH_MAX];
1258 status = buffer_get_field(&buffer, &buffer_size, &file);
1259 if (status != 0)
1260 return syntax_error(sock,cmd);
1262 get_abs_path(&file, file_tmp);
1263 if (!check_file_access(file, sock)) return 0;
1265 pthread_mutex_lock(&cache_lock);
1266 found = g_tree_remove(cache_tree, file);
1267 pthread_mutex_unlock(&cache_lock);
1269 if (found == TRUE)
1270 {
1271 if (sock != NULL)
1272 journal_write("forget", file);
1274 return send_response(sock, RESP_OK, "Gone!\n");
1275 }
1276 else
1277 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1279 /* NOTREACHED */
1280 assert(1==0);
1281 } /* }}} static int handle_request_forget */
1283 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1284 {
1285 cache_item_t *ci;
1287 pthread_mutex_lock(&cache_lock);
1289 ci = cache_queue_head;
1290 while (ci != NULL)
1291 {
1292 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1293 ci = ci->next;
1294 }
1296 pthread_mutex_unlock(&cache_lock);
1298 return send_response(sock, RESP_OK, "in queue.\n");
1299 } /* }}} int handle_request_queue */
1301 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1302 {
1303 char *file, file_tmp[PATH_MAX];
1304 int values_num = 0;
1305 int status;
1306 char orig_buf[CMD_MAX];
1308 cache_item_t *ci;
1310 /* save it for the journal later */
1311 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1313 status = buffer_get_field (&buffer, &buffer_size, &file);
1314 if (status != 0)
1315 return syntax_error(sock,cmd);
1317 pthread_mutex_lock(&stats_lock);
1318 stats_updates_received++;
1319 pthread_mutex_unlock(&stats_lock);
1321 get_abs_path(&file, file_tmp);
1322 if (!check_file_access(file, sock)) return 0;
1324 pthread_mutex_lock (&cache_lock);
1325 ci = g_tree_lookup (cache_tree, file);
1327 if (ci == NULL) /* {{{ */
1328 {
1329 struct stat statbuf;
1330 cache_item_t *tmp;
1332 /* don't hold the lock while we setup; stat(2) might block */
1333 pthread_mutex_unlock(&cache_lock);
1335 memset (&statbuf, 0, sizeof (statbuf));
1336 status = stat (file, &statbuf);
1337 if (status != 0)
1338 {
1339 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1341 status = errno;
1342 if (status == ENOENT)
1343 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1344 else
1345 return send_response(sock, RESP_ERR,
1346 "stat failed with error %i.\n", status);
1347 }
1348 if (!S_ISREG (statbuf.st_mode))
1349 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1351 if (access(file, R_OK|W_OK) != 0)
1352 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1353 file, rrd_strerror(errno));
1355 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1356 if (ci == NULL)
1357 {
1358 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1360 return send_response(sock, RESP_ERR, "malloc failed.\n");
1361 }
1362 memset (ci, 0, sizeof (cache_item_t));
1364 ci->file = strdup (file);
1365 if (ci->file == NULL)
1366 {
1367 free (ci);
1368 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1370 return send_response(sock, RESP_ERR, "strdup failed.\n");
1371 }
1373 wipe_ci_values(ci, now);
1374 ci->flags = CI_FLAGS_IN_TREE;
1375 pthread_cond_init(&ci->flushed, NULL);
1377 pthread_mutex_lock(&cache_lock);
1379 /* another UPDATE might have added this entry in the meantime */
1380 tmp = g_tree_lookup (cache_tree, file);
1381 if (tmp == NULL)
1382 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1383 else
1384 {
1385 free_cache_item (ci);
1386 ci = tmp;
1387 }
1389 /* state may have changed while we were unlocked */
1390 if (state == SHUTDOWN)
1391 return -1;
1392 } /* }}} */
1393 assert (ci != NULL);
1395 /* don't re-write updates in replay mode */
1396 if (sock != NULL)
1397 journal_write("update", orig_buf);
1399 while (buffer_size > 0)
1400 {
1401 char *value;
1402 time_t stamp;
1403 char *eostamp;
1405 status = buffer_get_field (&buffer, &buffer_size, &value);
1406 if (status != 0)
1407 {
1408 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1409 break;
1410 }
1412 /* make sure update time is always moving forward */
1413 stamp = strtol(value, &eostamp, 10);
1414 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1415 {
1416 pthread_mutex_unlock(&cache_lock);
1417 return send_response(sock, RESP_ERR,
1418 "Cannot find timestamp in '%s'!\n", value);
1419 }
1420 else if (stamp <= ci->last_update_stamp)
1421 {
1422 pthread_mutex_unlock(&cache_lock);
1423 return send_response(sock, RESP_ERR,
1424 "illegal attempt to update using time %ld when last"
1425 " update time is %ld (minimum one second step)\n",
1426 stamp, ci->last_update_stamp);
1427 }
1428 else
1429 ci->last_update_stamp = stamp;
1431 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1432 {
1433 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1434 continue;
1435 }
1437 values_num++;
1438 }
1440 if (((now - ci->last_flush_time) >= config_write_interval)
1441 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1442 && (ci->values_num > 0))
1443 {
1444 enqueue_cache_item (ci, TAIL);
1445 }
1447 pthread_mutex_unlock (&cache_lock);
1449 if (values_num < 1)
1450 return send_response(sock, RESP_ERR, "No values updated.\n");
1451 else
1452 return send_response(sock, RESP_OK,
1453 "errors, enqueued %i value(s).\n", values_num);
1455 /* NOTREACHED */
1456 assert(1==0);
1458 } /* }}} int handle_request_update */
1460 /* we came across a "WROTE" entry during journal replay.
1461 * throw away any values that we have accumulated for this file
1462 */
1463 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1464 {
1465 cache_item_t *ci;
1466 const char *file = buffer;
1468 pthread_mutex_lock(&cache_lock);
1470 ci = g_tree_lookup(cache_tree, file);
1471 if (ci == NULL)
1472 {
1473 pthread_mutex_unlock(&cache_lock);
1474 return (0);
1475 }
1477 if (ci->values)
1478 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1480 wipe_ci_values(ci, now);
1481 remove_from_queue(ci);
1483 pthread_mutex_unlock(&cache_lock);
1484 return (0);
1485 } /* }}} int handle_request_wrote */
1487 /* start "BATCH" processing */
1488 static int batch_start (HANDLER_PROTO) /* {{{ */
1489 {
1490 int status;
1491 if (sock->batch_start)
1492 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1494 status = send_response(sock, RESP_OK,
1495 "Go ahead. End with dot '.' on its own line.\n");
1496 sock->batch_start = time(NULL);
1497 sock->batch_cmd = 0;
1499 return status;
1500 } /* }}} static int batch_start */
1502 /* finish "BATCH" processing and return results to the client */
1503 static int batch_done (HANDLER_PROTO) /* {{{ */
1504 {
1505 assert(sock->batch_start);
1506 sock->batch_start = 0;
1507 sock->batch_cmd = 0;
1508 return send_response(sock, RESP_OK, "errors\n");
1509 } /* }}} static int batch_done */
1511 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1512 {
1513 return -1;
1514 } /* }}} static int handle_request_quit */
1516 struct command COMMANDS[] = {
1517 {
1518 "UPDATE",
1519 handle_request_update,
1520 PRIV_HIGH,
1521 CMD_CONTEXT_ANY,
1522 "UPDATE <filename> <values> [<values> ...]\n"
1523 ,
1524 "Adds the given file to the internal cache if it is not yet known and\n"
1525 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1526 "for details.\n"
1527 "\n"
1528 "Each <values> has the following form:\n"
1529 " <values> = <time>:<value>[:<value>[...]]\n"
1530 "See the rrdupdate(1) manpage for details.\n"
1531 },
1532 {
1533 "WROTE",
1534 handle_request_wrote,
1535 PRIV_HIGH,
1536 CMD_CONTEXT_JOURNAL,
1537 NULL,
1538 NULL
1539 },
1540 {
1541 "FLUSH",
1542 handle_request_flush,
1543 PRIV_LOW,
1544 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1545 "FLUSH <filename>\n"
1546 ,
1547 "Adds the given filename to the head of the update queue and returns\n"
1548 "after it has been dequeued.\n"
1549 },
1550 {
1551 "FLUSHALL",
1552 handle_request_flushall,
1553 PRIV_HIGH,
1554 CMD_CONTEXT_CLIENT,
1555 "FLUSHALL\n"
1556 ,
1557 "Triggers writing of all pending updates. Returns immediately.\n"
1558 },
1559 {
1560 "PENDING",
1561 handle_request_pending,
1562 PRIV_HIGH,
1563 CMD_CONTEXT_CLIENT,
1564 "PENDING <filename>\n"
1565 ,
1566 "Shows any 'pending' updates for a file, in order.\n"
1567 "The updates shown have not yet been written to the underlying RRD file.\n"
1568 },
1569 {
1570 "FORGET",
1571 handle_request_forget,
1572 PRIV_HIGH,
1573 CMD_CONTEXT_ANY,
1574 "FORGET <filename>\n"
1575 ,
1576 "Removes the file completely from the cache.\n"
1577 "Any pending updates for the file will be lost.\n"
1578 },
1579 {
1580 "QUEUE",
1581 handle_request_queue,
1582 PRIV_LOW,
1583 CMD_CONTEXT_CLIENT,
1584 "QUEUE\n"
1585 ,
1586 "Shows all files in the output queue.\n"
1587 "The output is zero or more lines in the following format:\n"
1588 "(where <num_vals> is the number of values to be written)\n"
1589 "\n"
1590 "<num_vals> <filename>\n"
1591 },
1592 {
1593 "STATS",
1594 handle_request_stats,
1595 PRIV_LOW,
1596 CMD_CONTEXT_CLIENT,
1597 "STATS\n"
1598 ,
1599 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1600 "a description of the values.\n"
1601 },
1602 {
1603 "HELP",
1604 handle_request_help,
1605 PRIV_LOW,
1606 CMD_CONTEXT_CLIENT,
1607 "HELP [<command>]\n",
1608 NULL, /* special! */
1609 },
1610 {
1611 "BATCH",
1612 batch_start,
1613 PRIV_LOW,
1614 CMD_CONTEXT_CLIENT,
1615 "BATCH\n"
1616 ,
1617 "The 'BATCH' command permits the client to initiate a bulk load\n"
1618 " of commands to rrdcached.\n"
1619 "\n"
1620 "Usage:\n"
1621 "\n"
1622 " client: BATCH\n"
1623 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1624 " client: command #1\n"
1625 " client: command #2\n"
1626 " client: ... and so on\n"
1627 " client: .\n"
1628 " server: 2 errors\n"
1629 " server: 7 message for command #7\n"
1630 " server: 9 message for command #9\n"
1631 "\n"
1632 "For more information, consult the rrdcached(1) documentation.\n"
1633 },
1634 {
1635 ".", /* BATCH terminator */
1636 batch_done,
1637 PRIV_LOW,
1638 CMD_CONTEXT_BATCH,
1639 NULL,
1640 NULL
1641 },
1642 {
1643 "QUIT",
1644 handle_request_quit,
1645 PRIV_LOW,
1646 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1647 "QUIT\n"
1648 ,
1649 "Disconnect from rrdcached.\n"
1650 },
1651 {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */
1652 };
1654 static struct command *find_command(char *cmd)
1655 {
1656 struct command *c = COMMANDS;
1658 while (c->cmd != NULL)
1659 {
1660 if (strcasecmp(cmd, c->cmd) == 0)
1661 break;
1662 c++;
1663 }
1665 if (c->cmd == NULL)
1666 return NULL;
1667 else
1668 return c;
1669 }
1671 /* check whether commands are received in the expected context */
1672 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1673 {
1674 if (sock == NULL)
1675 return (cmd->context & CMD_CONTEXT_JOURNAL);
1676 else if (sock->batch_start)
1677 return (cmd->context & CMD_CONTEXT_BATCH);
1678 else
1679 return (cmd->context & CMD_CONTEXT_CLIENT);
1681 /* NOTREACHED */
1682 assert(1==0);
1683 }
1685 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1686 {
1687 int status;
1688 char *cmd_str;
1689 char *resp_txt;
1690 struct command *help = NULL;
1692 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1693 if (status == 0)
1694 help = find_command(cmd_str);
1696 if (help && (help->syntax || help->help))
1697 {
1698 char tmp[CMD_MAX];
1700 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1701 resp_txt = tmp;
1703 if (help->syntax)
1704 add_response_info(sock, "Usage: %s\n", help->syntax);
1706 if (help->help)
1707 add_response_info(sock, "%s\n", help->help);
1708 }
1709 else
1710 {
1711 help = COMMANDS;
1712 resp_txt = "Command overview\n";
1714 while (help->cmd)
1715 {
1716 if (help->syntax)
1717 add_response_info(sock, "%s", help->syntax);
1718 help++;
1719 }
1720 }
1722 return send_response(sock, RESP_OK, resp_txt);
1723 } /* }}} int handle_request_help */
1725 /* if sock==NULL, we are in journal replay mode */
1726 static int handle_request (DISPATCH_PROTO) /* {{{ */
1727 {
1728 char *buffer_ptr = buffer;
1729 char *cmd_str = NULL;
1730 struct command *cmd = NULL;
1731 int status;
1733 assert (buffer[buffer_size - 1] == '\0');
1735 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1736 if (status != 0)
1737 {
1738 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1739 return (-1);
1740 }
1742 if (sock != NULL && sock->batch_start)
1743 sock->batch_cmd++;
1745 cmd = find_command(cmd_str);
1746 if (!cmd)
1747 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1749 status = has_privilege(sock, cmd->min_priv);
1750 if (status <= 0)
1751 return status;
1753 if (!command_check_context(sock, cmd))
1754 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1756 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1757 } /* }}} int handle_request */
1759 static void journal_set_free (journal_set *js) /* {{{ */
1760 {
1761 if (js == NULL)
1762 return;
1764 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1766 free(js);
1767 } /* }}} journal_set_free */
1769 static void journal_set_remove (journal_set *js) /* {{{ */
1770 {
1771 if (js == NULL)
1772 return;
1774 for (uint i=0; i < js->files_num; i++)
1775 {
1776 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1777 unlink(js->files[i]);
1778 }
1779 } /* }}} journal_set_remove */
1781 /* close current journal file handle.
1782 * MUST hold journal_lock before calling */
1783 static void journal_close(void) /* {{{ */
1784 {
1785 if (journal_fh != NULL)
1786 {
1787 if (fclose(journal_fh) != 0)
1788 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1789 }
1791 journal_fh = NULL;
1792 journal_size = 0;
1793 } /* }}} journal_close */
1795 /* MUST hold journal_lock before calling */
1796 static void journal_new_file(void) /* {{{ */
1797 {
1798 struct timeval now;
1799 int new_fd;
1800 char new_file[PATH_MAX + 1];
1802 assert(journal_dir != NULL);
1803 assert(journal_cur != NULL);
1805 journal_close();
1807 gettimeofday(&now, NULL);
1808 /* this format assures that the files sort in strcmp() order */
1809 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1810 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1812 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1813 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1814 if (new_fd < 0)
1815 goto error;
1817 journal_fh = fdopen(new_fd, "a");
1818 if (journal_fh == NULL)
1819 goto error;
1821 journal_size = ftell(journal_fh);
1822 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1824 /* record the file in the journal set */
1825 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1827 return;
1829 error:
1830 RRDD_LOG(LOG_CRIT,
1831 "JOURNALING DISABLED: Error while trying to create %s : %s",
1832 new_file, rrd_strerror(errno));
1833 RRDD_LOG(LOG_CRIT,
1834 "JOURNALING DISABLED: All values will be flushed at shutdown");
1836 close(new_fd);
1837 config_flush_at_shutdown = 1;
1839 } /* }}} journal_new_file */
1841 /* MUST NOT hold journal_lock before calling this */
1842 static void journal_rotate(void) /* {{{ */
1843 {
1844 journal_set *old_js = NULL;
1846 if (journal_dir == NULL)
1847 return;
1849 RRDD_LOG(LOG_DEBUG, "rotating journals");
1851 pthread_mutex_lock(&stats_lock);
1852 ++stats_journal_rotate;
1853 pthread_mutex_unlock(&stats_lock);
1855 pthread_mutex_lock(&journal_lock);
1857 journal_close();
1859 /* rotate the journal sets */
1860 old_js = journal_old;
1861 journal_old = journal_cur;
1862 journal_cur = calloc(1, sizeof(journal_set));
1864 if (journal_cur != NULL)
1865 journal_new_file();
1866 else
1867 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1869 pthread_mutex_unlock(&journal_lock);
1871 journal_set_remove(old_js);
1872 journal_set_free (old_js);
1874 } /* }}} static void journal_rotate */
1876 /* MUST hold journal_lock when calling */
1877 static void journal_done(void) /* {{{ */
1878 {
1879 if (journal_cur == NULL)
1880 return;
1882 journal_close();
1884 if (config_flush_at_shutdown)
1885 {
1886 RRDD_LOG(LOG_INFO, "removing journals");
1887 journal_set_remove(journal_old);
1888 journal_set_remove(journal_cur);
1889 }
1890 else
1891 {
1892 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1893 "journals will be used at next startup");
1894 }
1896 journal_set_free(journal_cur);
1897 journal_set_free(journal_old);
1898 free(journal_dir);
1900 } /* }}} static void journal_done */
1902 static int journal_write(char *cmd, char *args) /* {{{ */
1903 {
1904 int chars;
1906 if (journal_fh == NULL)
1907 return 0;
1909 pthread_mutex_lock(&journal_lock);
1910 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1911 journal_size += chars;
1913 if (journal_size > JOURNAL_MAX)
1914 journal_new_file();
1916 pthread_mutex_unlock(&journal_lock);
1918 if (chars > 0)
1919 {
1920 pthread_mutex_lock(&stats_lock);
1921 stats_journal_bytes += chars;
1922 pthread_mutex_unlock(&stats_lock);
1923 }
1925 return chars;
1926 } /* }}} static int journal_write */
1928 static int journal_replay (const char *file) /* {{{ */
1929 {
1930 FILE *fh;
1931 int entry_cnt = 0;
1932 int fail_cnt = 0;
1933 uint64_t line = 0;
1934 char entry[CMD_MAX];
1935 time_t now;
1937 if (file == NULL) return 0;
1939 {
1940 char *reason = "unknown error";
1941 int status = 0;
1942 struct stat statbuf;
1944 memset(&statbuf, 0, sizeof(statbuf));
1945 if (stat(file, &statbuf) != 0)
1946 {
1947 reason = "stat error";
1948 status = errno;
1949 }
1950 else if (!S_ISREG(statbuf.st_mode))
1951 {
1952 reason = "not a regular file";
1953 status = EPERM;
1954 }
1955 if (statbuf.st_uid != daemon_uid)
1956 {
1957 reason = "not owned by daemon user";
1958 status = EACCES;
1959 }
1960 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1961 {
1962 reason = "must not be user/group writable";
1963 status = EACCES;
1964 }
1966 if (status != 0)
1967 {
1968 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1969 file, rrd_strerror(status), reason);
1970 return 0;
1971 }
1972 }
1974 fh = fopen(file, "r");
1975 if (fh == NULL)
1976 {
1977 if (errno != ENOENT)
1978 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1979 file, rrd_strerror(errno));
1980 return 0;
1981 }
1982 else
1983 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1985 now = time(NULL);
1987 while(!feof(fh))
1988 {
1989 size_t entry_len;
1991 ++line;
1992 if (fgets(entry, sizeof(entry), fh) == NULL)
1993 break;
1994 entry_len = strlen(entry);
1996 /* check \n termination in case journal writing crashed mid-line */
1997 if (entry_len == 0)
1998 continue;
1999 else if (entry[entry_len - 1] != '\n')
2000 {
2001 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2002 ++fail_cnt;
2003 continue;
2004 }
2006 entry[entry_len - 1] = '\0';
2008 if (handle_request(NULL, now, entry, entry_len) == 0)
2009 ++entry_cnt;
2010 else
2011 ++fail_cnt;
2012 }
2014 fclose(fh);
2016 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2017 entry_cnt, fail_cnt);
2019 return entry_cnt > 0 ? 1 : 0;
2020 } /* }}} static int journal_replay */
2022 static int journal_sort(const void *v1, const void *v2)
2023 {
2024 char **jn1 = (char **) v1;
2025 char **jn2 = (char **) v2;
2027 return strcmp(*jn1,*jn2);
2028 }
2030 static void journal_init(void) /* {{{ */
2031 {
2032 int had_journal = 0;
2033 DIR *dir;
2034 struct dirent *dent;
2035 char path[PATH_MAX+1];
2037 if (journal_dir == NULL) return;
2039 pthread_mutex_lock(&journal_lock);
2041 journal_cur = calloc(1, sizeof(journal_set));
2042 if (journal_cur == NULL)
2043 {
2044 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2045 return;
2046 }
2048 RRDD_LOG(LOG_INFO, "checking for journal files");
2050 /* Handle old journal files during transition. This gives them the
2051 * correct sort order. TODO: remove after first release
2052 */
2053 {
2054 char old_path[PATH_MAX+1];
2055 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2056 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2057 rename(old_path, path);
2059 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2060 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2061 rename(old_path, path);
2062 }
2064 dir = opendir(journal_dir);
2065 while ((dent = readdir(dir)) != NULL)
2066 {
2067 /* looks like a journal file? */
2068 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2069 continue;
2071 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2073 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2074 {
2075 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2076 dent->d_name);
2077 break;
2078 }
2079 }
2080 closedir(dir);
2082 qsort(journal_cur->files, journal_cur->files_num,
2083 sizeof(journal_cur->files[0]), journal_sort);
2085 for (uint i=0; i < journal_cur->files_num; i++)
2086 had_journal += journal_replay(journal_cur->files[i]);
2088 journal_new_file();
2090 /* it must have been a crash. start a flush */
2091 if (had_journal && config_flush_at_shutdown)
2092 flush_old_values(-1);
2094 pthread_mutex_unlock(&journal_lock);
2096 RRDD_LOG(LOG_INFO, "journal processing complete");
2098 } /* }}} static void journal_init */
2100 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2101 {
2102 assert(sock != NULL);
2104 free(sock->rbuf); sock->rbuf = NULL;
2105 free(sock->wbuf); sock->wbuf = NULL;
2106 free(sock);
2107 } /* }}} void free_listen_socket */
2109 static void close_connection(listen_socket_t *sock) /* {{{ */
2110 {
2111 if (sock->fd >= 0)
2112 {
2113 close(sock->fd);
2114 sock->fd = -1;
2115 }
2117 free_listen_socket(sock);
2119 } /* }}} void close_connection */
2121 static void *connection_thread_main (void *args) /* {{{ */
2122 {
2123 listen_socket_t *sock;
2124 int fd;
2126 sock = (listen_socket_t *) args;
2127 fd = sock->fd;
2129 /* init read buffers */
2130 sock->next_read = sock->next_cmd = 0;
2131 sock->rbuf = malloc(RBUF_SIZE);
2132 if (sock->rbuf == NULL)
2133 {
2134 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2135 close_connection(sock);
2136 return NULL;
2137 }
2139 pthread_mutex_lock (&connection_threads_lock);
2140 connection_threads_num++;
2141 pthread_mutex_unlock (&connection_threads_lock);
2143 while (state == RUNNING)
2144 {
2145 char *cmd;
2146 ssize_t cmd_len;
2147 ssize_t rbytes;
2148 time_t now;
2150 struct pollfd pollfd;
2151 int status;
2153 pollfd.fd = fd;
2154 pollfd.events = POLLIN | POLLPRI;
2155 pollfd.revents = 0;
2157 status = poll (&pollfd, 1, /* timeout = */ 500);
2158 if (state != RUNNING)
2159 break;
2160 else if (status == 0) /* timeout */
2161 continue;
2162 else if (status < 0) /* error */
2163 {
2164 status = errno;
2165 if (status != EINTR)
2166 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2167 continue;
2168 }
2170 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2171 break;
2172 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2173 {
2174 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2175 "poll(2) returned something unexpected: %#04hx",
2176 pollfd.revents);
2177 break;
2178 }
2180 rbytes = read(fd, sock->rbuf + sock->next_read,
2181 RBUF_SIZE - sock->next_read);
2182 if (rbytes < 0)
2183 {
2184 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2185 break;
2186 }
2187 else if (rbytes == 0)
2188 break; /* eof */
2190 sock->next_read += rbytes;
2192 if (sock->batch_start)
2193 now = sock->batch_start;
2194 else
2195 now = time(NULL);
2197 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2198 {
2199 status = handle_request (sock, now, cmd, cmd_len+1);
2200 if (status != 0)
2201 goto out_close;
2202 }
2203 }
2205 out_close:
2206 close_connection(sock);
2208 /* Remove this thread from the connection threads list */
2209 pthread_mutex_lock (&connection_threads_lock);
2210 connection_threads_num--;
2211 if (connection_threads_num <= 0)
2212 pthread_cond_broadcast(&connection_threads_done);
2213 pthread_mutex_unlock (&connection_threads_lock);
2215 return (NULL);
2216 } /* }}} void *connection_thread_main */
2218 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2219 {
2220 int fd;
2221 struct sockaddr_un sa;
2222 listen_socket_t *temp;
2223 int status;
2224 const char *path;
2226 path = sock->addr;
2227 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2228 path += strlen("unix:");
2230 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2231 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2232 if (temp == NULL)
2233 {
2234 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2235 return (-1);
2236 }
2237 listen_fds = temp;
2238 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2240 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2241 if (fd < 0)
2242 {
2243 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2244 rrd_strerror(errno));
2245 return (-1);
2246 }
2248 memset (&sa, 0, sizeof (sa));
2249 sa.sun_family = AF_UNIX;
2250 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2252 /* if we've gotten this far, we own the pid file. any daemon started
2253 * with the same args must not be alive. therefore, ensure that we can
2254 * create the socket...
2255 */
2256 unlink(path);
2258 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2259 if (status != 0)
2260 {
2261 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2262 path, rrd_strerror(errno));
2263 close (fd);
2264 return (-1);
2265 }
2267 status = listen (fd, /* backlog = */ 10);
2268 if (status != 0)
2269 {
2270 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2271 path, rrd_strerror(errno));
2272 close (fd);
2273 unlink (path);
2274 return (-1);
2275 }
2277 listen_fds[listen_fds_num].fd = fd;
2278 listen_fds[listen_fds_num].family = PF_UNIX;
2279 strncpy(listen_fds[listen_fds_num].addr, path,
2280 sizeof (listen_fds[listen_fds_num].addr) - 1);
2281 listen_fds_num++;
2283 return (0);
2284 } /* }}} int open_listen_socket_unix */
2286 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2287 {
2288 struct addrinfo ai_hints;
2289 struct addrinfo *ai_res;
2290 struct addrinfo *ai_ptr;
2291 char addr_copy[NI_MAXHOST];
2292 char *addr;
2293 char *port;
2294 int status;
2296 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2297 addr_copy[sizeof (addr_copy) - 1] = 0;
2298 addr = addr_copy;
2300 memset (&ai_hints, 0, sizeof (ai_hints));
2301 ai_hints.ai_flags = 0;
2302 #ifdef AI_ADDRCONFIG
2303 ai_hints.ai_flags |= AI_ADDRCONFIG;
2304 #endif
2305 ai_hints.ai_family = AF_UNSPEC;
2306 ai_hints.ai_socktype = SOCK_STREAM;
2308 port = NULL;
2309 if (*addr == '[') /* IPv6+port format */
2310 {
2311 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2312 addr++;
2314 port = strchr (addr, ']');
2315 if (port == NULL)
2316 {
2317 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2318 return (-1);
2319 }
2320 *port = 0;
2321 port++;
2323 if (*port == ':')
2324 port++;
2325 else if (*port == 0)
2326 port = NULL;
2327 else
2328 {
2329 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2330 return (-1);
2331 }
2332 } /* if (*addr = ']') */
2333 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2334 {
2335 port = rindex(addr, ':');
2336 if (port != NULL)
2337 {
2338 *port = 0;
2339 port++;
2340 }
2341 }
2342 ai_res = NULL;
2343 status = getaddrinfo (addr,
2344 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2345 &ai_hints, &ai_res);
2346 if (status != 0)
2347 {
2348 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2349 addr, gai_strerror (status));
2350 return (-1);
2351 }
2353 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2354 {
2355 int fd;
2356 listen_socket_t *temp;
2357 int one = 1;
2359 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2360 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2361 if (temp == NULL)
2362 {
2363 fprintf (stderr,
2364 "rrdcached: open_listen_socket_network: realloc failed.\n");
2365 continue;
2366 }
2367 listen_fds = temp;
2368 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2370 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2371 if (fd < 0)
2372 {
2373 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2374 rrd_strerror(errno));
2375 continue;
2376 }
2378 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2380 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2381 if (status != 0)
2382 {
2383 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2384 sock->addr, rrd_strerror(errno));
2385 close (fd);
2386 continue;
2387 }
2389 status = listen (fd, /* backlog = */ 10);
2390 if (status != 0)
2391 {
2392 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2393 sock->addr, rrd_strerror(errno));
2394 close (fd);
2395 freeaddrinfo(ai_res);
2396 return (-1);
2397 }
2399 listen_fds[listen_fds_num].fd = fd;
2400 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2401 listen_fds_num++;
2402 } /* for (ai_ptr) */
2404 freeaddrinfo(ai_res);
2405 return (0);
2406 } /* }}} static int open_listen_socket_network */
2408 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2409 {
2410 assert(sock != NULL);
2411 assert(sock->addr != NULL);
2413 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2414 || sock->addr[0] == '/')
2415 return (open_listen_socket_unix(sock));
2416 else
2417 return (open_listen_socket_network(sock));
2418 } /* }}} int open_listen_socket */
2420 static int close_listen_sockets (void) /* {{{ */
2421 {
2422 size_t i;
2424 for (i = 0; i < listen_fds_num; i++)
2425 {
2426 close (listen_fds[i].fd);
2428 if (listen_fds[i].family == PF_UNIX)
2429 unlink(listen_fds[i].addr);
2430 }
2432 free (listen_fds);
2433 listen_fds = NULL;
2434 listen_fds_num = 0;
2436 return (0);
2437 } /* }}} int close_listen_sockets */
2439 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2440 {
2441 struct pollfd *pollfds;
2442 int pollfds_num;
2443 int status;
2444 int i;
2446 if (listen_fds_num < 1)
2447 {
2448 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2449 return (NULL);
2450 }
2452 pollfds_num = listen_fds_num;
2453 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2454 if (pollfds == NULL)
2455 {
2456 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2457 return (NULL);
2458 }
2459 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2461 RRDD_LOG(LOG_INFO, "listening for connections");
2463 while (state == RUNNING)
2464 {
2465 for (i = 0; i < pollfds_num; i++)
2466 {
2467 pollfds[i].fd = listen_fds[i].fd;
2468 pollfds[i].events = POLLIN | POLLPRI;
2469 pollfds[i].revents = 0;
2470 }
2472 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2473 if (state != RUNNING)
2474 break;
2475 else if (status == 0) /* timeout */
2476 continue;
2477 else if (status < 0) /* error */
2478 {
2479 status = errno;
2480 if (status != EINTR)
2481 {
2482 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2483 }
2484 continue;
2485 }
2487 for (i = 0; i < pollfds_num; i++)
2488 {
2489 listen_socket_t *client_sock;
2490 struct sockaddr_storage client_sa;
2491 socklen_t client_sa_size;
2492 pthread_t tid;
2493 pthread_attr_t attr;
2495 if (pollfds[i].revents == 0)
2496 continue;
2498 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2499 {
2500 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2501 "poll(2) returned something unexpected for listen FD #%i.",
2502 pollfds[i].fd);
2503 continue;
2504 }
2506 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2507 if (client_sock == NULL)
2508 {
2509 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2510 continue;
2511 }
2512 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2514 client_sa_size = sizeof (client_sa);
2515 client_sock->fd = accept (pollfds[i].fd,
2516 (struct sockaddr *) &client_sa, &client_sa_size);
2517 if (client_sock->fd < 0)
2518 {
2519 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2520 free(client_sock);
2521 continue;
2522 }
2524 pthread_attr_init (&attr);
2525 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2527 status = pthread_create (&tid, &attr, connection_thread_main,
2528 client_sock);
2529 if (status != 0)
2530 {
2531 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2532 close_connection(client_sock);
2533 continue;
2534 }
2535 } /* for (pollfds_num) */
2536 } /* while (state == RUNNING) */
2538 RRDD_LOG(LOG_INFO, "starting shutdown");
2540 close_listen_sockets ();
2542 pthread_mutex_lock (&connection_threads_lock);
2543 while (connection_threads_num > 0)
2544 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2545 pthread_mutex_unlock (&connection_threads_lock);
2547 free(pollfds);
2549 return (NULL);
2550 } /* }}} void *listen_thread_main */
2552 static int daemonize (void) /* {{{ */
2553 {
2554 int pid_fd;
2555 char *base_dir;
2557 daemon_uid = geteuid();
2559 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2560 if (pid_fd < 0)
2561 pid_fd = check_pidfile();
2562 if (pid_fd < 0)
2563 return pid_fd;
2565 /* open all the listen sockets */
2566 if (config_listen_address_list_len > 0)
2567 {
2568 for (size_t i = 0; i < config_listen_address_list_len; i++)
2569 open_listen_socket (config_listen_address_list[i]);
2571 rrd_free_ptrs((void ***) &config_listen_address_list,
2572 &config_listen_address_list_len);
2573 }
2574 else
2575 {
2576 listen_socket_t sock;
2577 memset(&sock, 0, sizeof(sock));
2578 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2579 open_listen_socket (&sock);
2580 }
2582 if (listen_fds_num < 1)
2583 {
2584 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2585 goto error;
2586 }
2588 if (!stay_foreground)
2589 {
2590 pid_t child;
2592 child = fork ();
2593 if (child < 0)
2594 {
2595 fprintf (stderr, "daemonize: fork(2) failed.\n");
2596 goto error;
2597 }
2598 else if (child > 0)
2599 exit(0);
2601 /* Become session leader */
2602 setsid ();
2604 /* Open the first three file descriptors to /dev/null */
2605 close (2);
2606 close (1);
2607 close (0);
2609 open ("/dev/null", O_RDWR);
2610 if (dup(0) == -1 || dup(0) == -1){
2611 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2612 }
2613 } /* if (!stay_foreground) */
2615 /* Change into the /tmp directory. */
2616 base_dir = (config_base_dir != NULL)
2617 ? config_base_dir
2618 : "/tmp";
2620 if (chdir (base_dir) != 0)
2621 {
2622 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2623 goto error;
2624 }
2626 install_signal_handlers();
2628 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2629 RRDD_LOG(LOG_INFO, "starting up");
2631 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2632 (GDestroyNotify) free_cache_item);
2633 if (cache_tree == NULL)
2634 {
2635 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2636 goto error;
2637 }
2639 return write_pidfile (pid_fd);
2641 error:
2642 remove_pidfile();
2643 return -1;
2644 } /* }}} int daemonize */
2646 static int cleanup (void) /* {{{ */
2647 {
2648 pthread_cond_broadcast (&flush_cond);
2649 pthread_join (flush_thread, NULL);
2651 pthread_cond_broadcast (&queue_cond);
2652 for (int i = 0; i < config_queue_threads; i++)
2653 pthread_join (queue_threads[i], NULL);
2655 if (config_flush_at_shutdown)
2656 {
2657 assert(cache_queue_head == NULL);
2658 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2659 }
2661 free(queue_threads);
2662 free(config_base_dir);
2663 free(config_pid_file);
2665 pthread_mutex_lock(&cache_lock);
2666 g_tree_destroy(cache_tree);
2668 pthread_mutex_lock(&journal_lock);
2669 journal_done();
2671 RRDD_LOG(LOG_INFO, "goodbye");
2672 closelog ();
2674 remove_pidfile ();
2676 return (0);
2677 } /* }}} int cleanup */
2679 static int read_options (int argc, char **argv) /* {{{ */
2680 {
2681 int option;
2682 int status = 0;
2684 while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2685 {
2686 switch (option)
2687 {
2688 case 'g':
2689 stay_foreground=1;
2690 break;
2692 case 'L':
2693 case 'l':
2694 {
2695 listen_socket_t *new;
2697 new = malloc(sizeof(listen_socket_t));
2698 if (new == NULL)
2699 {
2700 fprintf(stderr, "read_options: malloc failed.\n");
2701 return(2);
2702 }
2703 memset(new, 0, sizeof(listen_socket_t));
2705 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2706 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2708 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2709 &config_listen_address_list_len, new))
2710 {
2711 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2712 return (2);
2713 }
2714 }
2715 break;
2717 case 'f':
2718 {
2719 int temp;
2721 temp = atoi (optarg);
2722 if (temp > 0)
2723 config_flush_interval = temp;
2724 else
2725 {
2726 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2727 status = 3;
2728 }
2729 }
2730 break;
2732 case 'w':
2733 {
2734 int temp;
2736 temp = atoi (optarg);
2737 if (temp > 0)
2738 config_write_interval = temp;
2739 else
2740 {
2741 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2742 status = 2;
2743 }
2744 }
2745 break;
2747 case 'z':
2748 {
2749 int temp;
2751 temp = atoi(optarg);
2752 if (temp > 0)
2753 config_write_jitter = temp;
2754 else
2755 {
2756 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2757 status = 2;
2758 }
2760 break;
2761 }
2763 case 't':
2764 {
2765 int threads;
2766 threads = atoi(optarg);
2767 if (threads >= 1)
2768 config_queue_threads = threads;
2769 else
2770 {
2771 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2772 return 1;
2773 }
2774 }
2775 break;
2777 case 'B':
2778 config_write_base_only = 1;
2779 break;
2781 case 'b':
2782 {
2783 size_t len;
2784 char base_realpath[PATH_MAX];
2786 if (config_base_dir != NULL)
2787 free (config_base_dir);
2788 config_base_dir = strdup (optarg);
2789 if (config_base_dir == NULL)
2790 {
2791 fprintf (stderr, "read_options: strdup failed.\n");
2792 return (3);
2793 }
2795 /* make sure that the base directory is not resolved via
2796 * symbolic links. this makes some performance-enhancing
2797 * assumptions possible (we don't have to resolve paths
2798 * that start with a "/")
2799 */
2800 if (realpath(config_base_dir, base_realpath) == NULL)
2801 {
2802 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2803 return 5;
2804 }
2805 else if (strncmp(config_base_dir,
2806 base_realpath, sizeof(base_realpath)) != 0)
2807 {
2808 fprintf(stderr,
2809 "Base directory (-b) resolved via file system links!\n"
2810 "Please consult rrdcached '-b' documentation!\n"
2811 "Consider specifying the real directory (%s)\n",
2812 base_realpath);
2813 return 5;
2814 }
2816 len = strlen (config_base_dir);
2817 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2818 {
2819 config_base_dir[len - 1] = 0;
2820 len--;
2821 }
2823 if (len < 1)
2824 {
2825 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2826 return (4);
2827 }
2829 _config_base_dir_len = len;
2830 }
2831 break;
2833 case 'p':
2834 {
2835 if (config_pid_file != NULL)
2836 free (config_pid_file);
2837 config_pid_file = strdup (optarg);
2838 if (config_pid_file == NULL)
2839 {
2840 fprintf (stderr, "read_options: strdup failed.\n");
2841 return (3);
2842 }
2843 }
2844 break;
2846 case 'F':
2847 config_flush_at_shutdown = 1;
2848 break;
2850 case 'j':
2851 {
2852 struct stat statbuf;
2853 const char *dir = journal_dir = strdup(optarg);
2855 status = stat(dir, &statbuf);
2856 if (status != 0)
2857 {
2858 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2859 return 6;
2860 }
2862 if (!S_ISDIR(statbuf.st_mode)
2863 || access(dir, R_OK|W_OK|X_OK) != 0)
2864 {
2865 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2866 errno ? rrd_strerror(errno) : "");
2867 return 6;
2868 }
2869 }
2870 break;
2872 case 'h':
2873 case '?':
2874 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2875 "\n"
2876 "Usage: rrdcached [options]\n"
2877 "\n"
2878 "Valid options are:\n"
2879 " -l <address> Socket address to listen to.\n"
2880 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2881 " -w <seconds> Interval in which to write data.\n"
2882 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2883 " -t <threads> Number of write threads.\n"
2884 " -f <seconds> Interval in which to flush dead data.\n"
2885 " -p <file> Location of the PID-file.\n"
2886 " -b <dir> Base directory to change to.\n"
2887 " -B Restrict file access to paths within -b <dir>\n"
2888 " -g Do not fork and run in the foreground.\n"
2889 " -j <dir> Directory in which to create the journal files.\n"
2890 " -F Always flush all updates at shutdown\n"
2891 "\n"
2892 "For more information and a detailed description of all options "
2893 "please refer\n"
2894 "to the rrdcached(1) manual page.\n",
2895 VERSION);
2896 status = -1;
2897 break;
2898 } /* switch (option) */
2899 } /* while (getopt) */
2901 /* advise the user when values are not sane */
2902 if (config_flush_interval < 2 * config_write_interval)
2903 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2904 " 2x write interval (-w) !\n");
2905 if (config_write_jitter > config_write_interval)
2906 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2907 " write interval (-w) !\n");
2909 if (config_write_base_only && config_base_dir == NULL)
2910 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2911 " Consult the rrdcached documentation\n");
2913 if (journal_dir == NULL)
2914 config_flush_at_shutdown = 1;
2916 return (status);
2917 } /* }}} int read_options */
2919 int main (int argc, char **argv)
2920 {
2921 int status;
2923 status = read_options (argc, argv);
2924 if (status != 0)
2925 {
2926 if (status < 0)
2927 status = 0;
2928 return (status);
2929 }
2931 status = daemonize ();
2932 if (status != 0)
2933 {
2934 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2935 return (1);
2936 }
2938 journal_init();
2940 /* start the queue threads */
2941 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2942 if (queue_threads == NULL)
2943 {
2944 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2945 cleanup();
2946 return (1);
2947 }
2948 for (int i = 0; i < config_queue_threads; i++)
2949 {
2950 memset (&queue_threads[i], 0, sizeof (*queue_threads));
2951 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2952 if (status != 0)
2953 {
2954 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2955 cleanup();
2956 return (1);
2957 }
2958 }
2960 /* start the flush thread */
2961 memset(&flush_thread, 0, sizeof(flush_thread));
2962 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2963 if (status != 0)
2964 {
2965 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2966 cleanup();
2967 return (1);
2968 }
2970 listen_thread_main (NULL);
2971 cleanup ();
2973 return (0);
2974 } /* int main */
2976 /*
2977 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2978 */