1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 # include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116 do { \
117 if (stay_foreground) { \
118 fprintf(stderr, __VA_ARGS__); \
119 fprintf(stderr, "\n"); } \
120 syslog ((severity), __VA_ARGS__); \
121 } while (0)
123 /*
124 * Types
125 */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
129 {
130 int fd;
131 char addr[PATH_MAX + 1];
132 int family;
134 /* state for BATCH processing */
135 time_t batch_start;
136 int batch_cmd;
138 /* buffered IO */
139 char *rbuf;
140 off_t next_cmd;
141 off_t next_read;
143 char *wbuf;
144 ssize_t wbuf_len;
146 uint32_t permissions;
148 gid_t socket_group;
149 mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
157 time_t UNUSED(now),\
158 char UNUSED(*buffer),\
159 size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO command_t UNUSED(*cmd),\
162 DISPATCH_PROTO
164 struct command_s {
165 char *cmd;
166 int (*handler)(HANDLER_PROTO);
168 char context; /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT (1<<0)
170 #define CMD_CONTEXT_BATCH (1<<1)
171 #define CMD_CONTEXT_JOURNAL (1<<2)
172 #define CMD_CONTEXT_ANY (0x7f)
174 char *syntax;
175 char *help;
176 };
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182 char *file;
183 char **values;
184 size_t values_num; /* number of valid pointers */
185 size_t values_alloc; /* number of allocated pointers */
186 time_t last_flush_time;
187 time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
190 int flags;
191 pthread_cond_t flushed;
192 cache_item_t *prev;
193 cache_item_t *next;
194 };
196 struct callback_flush_data_s
197 {
198 time_t now;
199 time_t abs_timeout;
200 char **keys;
201 size_t keys_num;
202 };
203 typedef struct callback_flush_data_s callback_flush_data_t;
205 enum queue_side_e
206 {
207 HEAD,
208 TAIL
209 };
210 typedef enum queue_side_e queue_side_t;
212 /* describe a set of journal files */
213 typedef struct {
214 char **files;
215 size_t files_num;
216 } journal_set;
218 /* max length of socket command or response */
219 #define CMD_MAX 4096
220 #define RBUF_SIZE (CMD_MAX*2)
222 /*
223 * Variables
224 */
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
231 static listen_socket_t default_socket;
233 enum {
234 RUNNING, /* normal operation */
235 FLUSHING, /* flushing remaining values */
236 SHUTDOWN /* shutting down */
237 } state = RUNNING;
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
250 /* Cache stuff */
251 static GTree *cache_tree = NULL;
252 static cache_item_t *cache_queue_head = NULL;
253 static cache_item_t *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
256 static int config_write_interval = 300;
257 static int config_write_jitter = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
278 /* Journaled updates */
279 #define JOURNAL_REPLAY(s) ((s) == NULL)
280 #define JOURNAL_BASE "rrd.journal"
281 static journal_set *journal_cur = NULL;
282 static journal_set *journal_old = NULL;
283 static char *journal_dir = NULL;
284 static FILE *journal_fh = NULL; /* current journal file handle */
285 static long journal_size = 0; /* current journal size */
286 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
287 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
288 static int journal_write(char *cmd, char *args);
289 static void journal_done(void);
290 static void journal_rotate(void);
292 /* prototypes for forward refernces */
293 static int handle_request_help (HANDLER_PROTO);
295 /*
296 * Functions
297 */
298 static void sig_common (const char *sig) /* {{{ */
299 {
300 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
301 state = FLUSHING;
302 pthread_cond_broadcast(&flush_cond);
303 pthread_cond_broadcast(&queue_cond);
304 } /* }}} void sig_common */
306 static void sig_int_handler (int UNUSED(s)) /* {{{ */
307 {
308 sig_common("INT");
309 } /* }}} void sig_int_handler */
311 static void sig_term_handler (int UNUSED(s)) /* {{{ */
312 {
313 sig_common("TERM");
314 } /* }}} void sig_term_handler */
316 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
317 {
318 config_flush_at_shutdown = 1;
319 sig_common("USR1");
320 } /* }}} void sig_usr1_handler */
322 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
323 {
324 config_flush_at_shutdown = 0;
325 sig_common("USR2");
326 } /* }}} void sig_usr2_handler */
328 static void install_signal_handlers(void) /* {{{ */
329 {
330 /* These structures are static, because `sigaction' behaves weird if the are
331 * overwritten.. */
332 static struct sigaction sa_int;
333 static struct sigaction sa_term;
334 static struct sigaction sa_pipe;
335 static struct sigaction sa_usr1;
336 static struct sigaction sa_usr2;
338 /* Install signal handlers */
339 memset (&sa_int, 0, sizeof (sa_int));
340 sa_int.sa_handler = sig_int_handler;
341 sigaction (SIGINT, &sa_int, NULL);
343 memset (&sa_term, 0, sizeof (sa_term));
344 sa_term.sa_handler = sig_term_handler;
345 sigaction (SIGTERM, &sa_term, NULL);
347 memset (&sa_pipe, 0, sizeof (sa_pipe));
348 sa_pipe.sa_handler = SIG_IGN;
349 sigaction (SIGPIPE, &sa_pipe, NULL);
351 memset (&sa_pipe, 0, sizeof (sa_usr1));
352 sa_usr1.sa_handler = sig_usr1_handler;
353 sigaction (SIGUSR1, &sa_usr1, NULL);
355 memset (&sa_usr2, 0, sizeof (sa_usr2));
356 sa_usr2.sa_handler = sig_usr2_handler;
357 sigaction (SIGUSR2, &sa_usr2, NULL);
359 } /* }}} void install_signal_handlers */
361 static int open_pidfile(char *action, int oflag) /* {{{ */
362 {
363 int fd;
364 const char *file;
365 char *file_copy, *dir;
367 file = (config_pid_file != NULL)
368 ? config_pid_file
369 : LOCALSTATEDIR "/run/rrdcached.pid";
371 /* dirname may modify its argument */
372 file_copy = strdup(file);
373 if (file_copy == NULL)
374 {
375 fprintf(stderr, "rrdcached: strdup(): %s\n",
376 rrd_strerror(errno));
377 return -1;
378 }
380 dir = dirname(file_copy);
381 if (rrd_mkdir_p(dir, 0777) != 0)
382 {
383 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
384 dir, rrd_strerror(errno));
385 return -1;
386 }
388 free(file_copy);
390 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
391 if (fd < 0)
392 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
393 action, file, rrd_strerror(errno));
395 return(fd);
396 } /* }}} static int open_pidfile */
398 /* check existing pid file to see whether a daemon is running */
399 static int check_pidfile(void)
400 {
401 int pid_fd;
402 pid_t pid;
403 char pid_str[16];
405 pid_fd = open_pidfile("open", O_RDWR);
406 if (pid_fd < 0)
407 return pid_fd;
409 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
410 return -1;
412 pid = atoi(pid_str);
413 if (pid <= 0)
414 return -1;
416 /* another running process that we can signal COULD be
417 * a competing rrdcached */
418 if (pid != getpid() && kill(pid, 0) == 0)
419 {
420 fprintf(stderr,
421 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
422 close(pid_fd);
423 return -1;
424 }
426 lseek(pid_fd, 0, SEEK_SET);
427 if (ftruncate(pid_fd, 0) == -1)
428 {
429 fprintf(stderr,
430 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
431 close(pid_fd);
432 return -1;
433 }
435 fprintf(stderr,
436 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
437 "rrdcached: starting normally.\n", pid);
439 return pid_fd;
440 } /* }}} static int check_pidfile */
442 static int write_pidfile (int fd) /* {{{ */
443 {
444 pid_t pid;
445 FILE *fh;
447 pid = getpid ();
449 fh = fdopen (fd, "w");
450 if (fh == NULL)
451 {
452 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
453 close(fd);
454 return (-1);
455 }
457 fprintf (fh, "%i\n", (int) pid);
458 fclose (fh);
460 return (0);
461 } /* }}} int write_pidfile */
463 static int remove_pidfile (void) /* {{{ */
464 {
465 char *file;
466 int status;
468 file = (config_pid_file != NULL)
469 ? config_pid_file
470 : LOCALSTATEDIR "/run/rrdcached.pid";
472 status = unlink (file);
473 if (status == 0)
474 return (0);
475 return (errno);
476 } /* }}} int remove_pidfile */
478 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
479 {
480 char *eol;
482 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
483 sock->next_read - sock->next_cmd);
485 if (eol == NULL)
486 {
487 /* no commands left, move remainder back to front of rbuf */
488 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
489 sock->next_read - sock->next_cmd);
490 sock->next_read -= sock->next_cmd;
491 sock->next_cmd = 0;
492 *len = 0;
493 return NULL;
494 }
495 else
496 {
497 char *cmd = sock->rbuf + sock->next_cmd;
498 *eol = '\0';
500 sock->next_cmd = eol - sock->rbuf + 1;
502 if (eol > sock->rbuf && *(eol-1) == '\r')
503 *(--eol) = '\0'; /* handle "\r\n" EOL */
505 *len = eol - cmd;
507 return cmd;
508 }
510 /* NOTREACHED */
511 assert(1==0);
512 } /* }}} char *next_cmd */
514 /* add the characters directly to the write buffer */
515 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
516 {
517 char *new_buf;
519 assert(sock != NULL);
521 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
522 if (new_buf == NULL)
523 {
524 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
525 return -1;
526 }
528 strncpy(new_buf + sock->wbuf_len, str, len + 1);
530 sock->wbuf = new_buf;
531 sock->wbuf_len += len;
533 return 0;
534 } /* }}} static int add_to_wbuf */
536 /* add the text to the "extra" info that's sent after the status line */
537 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
538 {
539 va_list argp;
540 char buffer[CMD_MAX];
541 int len;
543 if (JOURNAL_REPLAY(sock)) return 0;
544 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
546 va_start(argp, fmt);
547 #ifdef HAVE_VSNPRINTF
548 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
549 #else
550 len = vsprintf(buffer, fmt, argp);
551 #endif
552 va_end(argp);
553 if (len < 0)
554 {
555 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
556 return -1;
557 }
559 return add_to_wbuf(sock, buffer, len);
560 } /* }}} static int add_response_info */
562 static int count_lines(char *str) /* {{{ */
563 {
564 int lines = 0;
566 if (str != NULL)
567 {
568 while ((str = strchr(str, '\n')) != NULL)
569 {
570 ++lines;
571 ++str;
572 }
573 }
575 return lines;
576 } /* }}} static int count_lines */
578 /* send the response back to the user.
579 * returns 0 on success, -1 on error
580 * write buffer is always zeroed after this call */
581 static int send_response (listen_socket_t *sock, response_code rc,
582 char *fmt, ...) /* {{{ */
583 {
584 va_list argp;
585 char buffer[CMD_MAX];
586 int lines;
587 ssize_t wrote;
588 int rclen, len;
590 if (JOURNAL_REPLAY(sock)) return rc;
592 if (sock->batch_start)
593 {
594 if (rc == RESP_OK)
595 return rc; /* no response on success during BATCH */
596 lines = sock->batch_cmd;
597 }
598 else if (rc == RESP_OK)
599 lines = count_lines(sock->wbuf);
600 else
601 lines = -1;
603 rclen = sprintf(buffer, "%d ", lines);
604 va_start(argp, fmt);
605 #ifdef HAVE_VSNPRINTF
606 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
607 #else
608 len = vsprintf(buffer+rclen, fmt, argp);
609 #endif
610 va_end(argp);
611 if (len < 0)
612 return -1;
614 len += rclen;
616 /* append the result to the wbuf, don't write to the user */
617 if (sock->batch_start)
618 return add_to_wbuf(sock, buffer, len);
620 /* first write must be complete */
621 if (len != write(sock->fd, buffer, len))
622 {
623 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
624 return -1;
625 }
627 if (sock->wbuf != NULL && rc == RESP_OK)
628 {
629 wrote = 0;
630 while (wrote < sock->wbuf_len)
631 {
632 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
633 if (wb <= 0)
634 {
635 RRDD_LOG(LOG_INFO, "send_response: could not write results");
636 return -1;
637 }
638 wrote += wb;
639 }
640 }
642 free(sock->wbuf); sock->wbuf = NULL;
643 sock->wbuf_len = 0;
645 return 0;
646 } /* }}} */
648 static void wipe_ci_values(cache_item_t *ci, time_t when)
649 {
650 ci->values = NULL;
651 ci->values_num = 0;
652 ci->values_alloc = 0;
654 ci->last_flush_time = when;
655 if (config_write_jitter > 0)
656 ci->last_flush_time += (rrd_random() % config_write_jitter);
657 }
659 /* remove_from_queue
660 * remove a "cache_item_t" item from the queue.
661 * must hold 'cache_lock' when calling this
662 */
663 static void remove_from_queue(cache_item_t *ci) /* {{{ */
664 {
665 if (ci == NULL) return;
666 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
668 if (ci->prev == NULL)
669 cache_queue_head = ci->next; /* reset head */
670 else
671 ci->prev->next = ci->next;
673 if (ci->next == NULL)
674 cache_queue_tail = ci->prev; /* reset the tail */
675 else
676 ci->next->prev = ci->prev;
678 ci->next = ci->prev = NULL;
679 ci->flags &= ~CI_FLAGS_IN_QUEUE;
681 pthread_mutex_lock (&stats_lock);
682 assert (stats_queue_length > 0);
683 stats_queue_length--;
684 pthread_mutex_unlock (&stats_lock);
686 } /* }}} static void remove_from_queue */
688 /* free the resources associated with the cache_item_t
689 * must hold cache_lock when calling this function
690 */
691 static void *free_cache_item(cache_item_t *ci) /* {{{ */
692 {
693 if (ci == NULL) return NULL;
695 remove_from_queue(ci);
697 for (size_t i=0; i < ci->values_num; i++)
698 free(ci->values[i]);
700 free (ci->values);
701 free (ci->file);
703 /* in case anyone is waiting */
704 pthread_cond_broadcast(&ci->flushed);
705 pthread_cond_destroy(&ci->flushed);
707 free (ci);
709 return NULL;
710 } /* }}} static void *free_cache_item */
712 /*
713 * enqueue_cache_item:
714 * `cache_lock' must be acquired before calling this function!
715 */
716 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
717 queue_side_t side)
718 {
719 if (ci == NULL)
720 return (-1);
722 if (ci->values_num == 0)
723 return (0);
725 if (side == HEAD)
726 {
727 if (cache_queue_head == ci)
728 return 0;
730 /* remove if further down in queue */
731 remove_from_queue(ci);
733 ci->prev = NULL;
734 ci->next = cache_queue_head;
735 if (ci->next != NULL)
736 ci->next->prev = ci;
737 cache_queue_head = ci;
739 if (cache_queue_tail == NULL)
740 cache_queue_tail = cache_queue_head;
741 }
742 else /* (side == TAIL) */
743 {
744 /* We don't move values back in the list.. */
745 if (ci->flags & CI_FLAGS_IN_QUEUE)
746 return (0);
748 assert (ci->next == NULL);
749 assert (ci->prev == NULL);
751 ci->prev = cache_queue_tail;
753 if (cache_queue_tail == NULL)
754 cache_queue_head = ci;
755 else
756 cache_queue_tail->next = ci;
758 cache_queue_tail = ci;
759 }
761 ci->flags |= CI_FLAGS_IN_QUEUE;
763 pthread_cond_signal(&queue_cond);
764 pthread_mutex_lock (&stats_lock);
765 stats_queue_length++;
766 pthread_mutex_unlock (&stats_lock);
768 return (0);
769 } /* }}} int enqueue_cache_item */
771 /*
772 * tree_callback_flush:
773 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
774 * while this is in progress.
775 */
776 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
777 gpointer data)
778 {
779 cache_item_t *ci;
780 callback_flush_data_t *cfd;
782 ci = (cache_item_t *) value;
783 cfd = (callback_flush_data_t *) data;
785 if (ci->flags & CI_FLAGS_IN_QUEUE)
786 return FALSE;
788 if (ci->values_num > 0
789 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
790 {
791 enqueue_cache_item (ci, TAIL);
792 }
793 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
794 && (ci->values_num <= 0))
795 {
796 assert ((char *) key == ci->file);
797 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
798 {
799 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
800 return (FALSE);
801 }
802 }
804 return (FALSE);
805 } /* }}} gboolean tree_callback_flush */
807 static int flush_old_values (int max_age)
808 {
809 callback_flush_data_t cfd;
810 size_t k;
812 memset (&cfd, 0, sizeof (cfd));
813 /* Pass the current time as user data so that we don't need to call
814 * `time' for each node. */
815 cfd.now = time (NULL);
816 cfd.keys = NULL;
817 cfd.keys_num = 0;
819 if (max_age > 0)
820 cfd.abs_timeout = cfd.now - max_age;
821 else
822 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
824 /* `tree_callback_flush' will return the keys of all values that haven't
825 * been touched in the last `config_flush_interval' seconds in `cfd'.
826 * The char*'s in this array point to the same memory as ci->file, so we
827 * don't need to free them separately. */
828 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
830 for (k = 0; k < cfd.keys_num; k++)
831 {
832 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
833 /* should never fail, since we have held the cache_lock
834 * the entire time */
835 assert(status == TRUE);
836 }
838 if (cfd.keys != NULL)
839 {
840 free (cfd.keys);
841 cfd.keys = NULL;
842 }
844 return (0);
845 } /* int flush_old_values */
847 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
848 {
849 struct timeval now;
850 struct timespec next_flush;
851 int status;
853 gettimeofday (&now, NULL);
854 next_flush.tv_sec = now.tv_sec + config_flush_interval;
855 next_flush.tv_nsec = 1000 * now.tv_usec;
857 pthread_mutex_lock(&cache_lock);
859 while (state == RUNNING)
860 {
861 gettimeofday (&now, NULL);
862 if ((now.tv_sec > next_flush.tv_sec)
863 || ((now.tv_sec == next_flush.tv_sec)
864 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
865 {
866 RRDD_LOG(LOG_DEBUG, "flushing old values");
868 /* Determine the time of the next cache flush. */
869 next_flush.tv_sec = now.tv_sec + config_flush_interval;
871 /* Flush all values that haven't been written in the last
872 * `config_write_interval' seconds. */
873 flush_old_values (config_write_interval);
875 /* unlock the cache while we rotate so we don't block incoming
876 * updates if the fsync() blocks on disk I/O */
877 pthread_mutex_unlock(&cache_lock);
878 journal_rotate();
879 pthread_mutex_lock(&cache_lock);
880 }
882 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
883 if (status != 0 && status != ETIMEDOUT)
884 {
885 RRDD_LOG (LOG_ERR, "flush_thread_main: "
886 "pthread_cond_timedwait returned %i.", status);
887 }
888 }
890 if (config_flush_at_shutdown)
891 flush_old_values (-1); /* flush everything */
893 state = SHUTDOWN;
895 pthread_mutex_unlock(&cache_lock);
897 return NULL;
898 } /* void *flush_thread_main */
900 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
901 {
902 pthread_mutex_lock (&cache_lock);
904 while (state != SHUTDOWN
905 || (cache_queue_head != NULL && config_flush_at_shutdown))
906 {
907 cache_item_t *ci;
908 char *file;
909 char **values;
910 size_t values_num;
911 int status;
913 /* Now, check if there's something to store away. If not, wait until
914 * something comes in. */
915 if (cache_queue_head == NULL)
916 {
917 status = pthread_cond_wait (&queue_cond, &cache_lock);
918 if ((status != 0) && (status != ETIMEDOUT))
919 {
920 RRDD_LOG (LOG_ERR, "queue_thread_main: "
921 "pthread_cond_wait returned %i.", status);
922 }
923 }
925 /* Check if a value has arrived. This may be NULL if we timed out or there
926 * was an interrupt such as a signal. */
927 if (cache_queue_head == NULL)
928 continue;
930 ci = cache_queue_head;
932 /* copy the relevant parts */
933 file = strdup (ci->file);
934 if (file == NULL)
935 {
936 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
937 continue;
938 }
940 assert(ci->values != NULL);
941 assert(ci->values_num > 0);
943 values = ci->values;
944 values_num = ci->values_num;
946 wipe_ci_values(ci, time(NULL));
947 remove_from_queue(ci);
949 pthread_mutex_unlock (&cache_lock);
951 rrd_clear_error ();
952 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
953 if (status != 0)
954 {
955 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
956 "rrd_update_r (%s) failed with status %i. (%s)",
957 file, status, rrd_get_error());
958 }
960 journal_write("wrote", file);
962 /* Search again in the tree. It's possible someone issued a "FORGET"
963 * while we were writing the update values. */
964 pthread_mutex_lock(&cache_lock);
965 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
966 if (ci)
967 pthread_cond_broadcast(&ci->flushed);
968 pthread_mutex_unlock(&cache_lock);
970 if (status == 0)
971 {
972 pthread_mutex_lock (&stats_lock);
973 stats_updates_written++;
974 stats_data_sets_written += values_num;
975 pthread_mutex_unlock (&stats_lock);
976 }
978 rrd_free_ptrs((void ***) &values, &values_num);
979 free(file);
981 pthread_mutex_lock (&cache_lock);
982 }
983 pthread_mutex_unlock (&cache_lock);
985 return (NULL);
986 } /* }}} void *queue_thread_main */
988 static int buffer_get_field (char **buffer_ret, /* {{{ */
989 size_t *buffer_size_ret, char **field_ret)
990 {
991 char *buffer;
992 size_t buffer_pos;
993 size_t buffer_size;
994 char *field;
995 size_t field_size;
996 int status;
998 buffer = *buffer_ret;
999 buffer_pos = 0;
1000 buffer_size = *buffer_size_ret;
1001 field = *buffer_ret;
1002 field_size = 0;
1004 if (buffer_size <= 0)
1005 return (-1);
1007 /* This is ensured by `handle_request'. */
1008 assert (buffer[buffer_size - 1] == '\0');
1010 status = -1;
1011 while (buffer_pos < buffer_size)
1012 {
1013 /* Check for end-of-field or end-of-buffer */
1014 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1015 {
1016 field[field_size] = 0;
1017 field_size++;
1018 buffer_pos++;
1019 status = 0;
1020 break;
1021 }
1022 /* Handle escaped characters. */
1023 else if (buffer[buffer_pos] == '\\')
1024 {
1025 if (buffer_pos >= (buffer_size - 1))
1026 break;
1027 buffer_pos++;
1028 field[field_size] = buffer[buffer_pos];
1029 field_size++;
1030 buffer_pos++;
1031 }
1032 /* Normal operation */
1033 else
1034 {
1035 field[field_size] = buffer[buffer_pos];
1036 field_size++;
1037 buffer_pos++;
1038 }
1039 } /* while (buffer_pos < buffer_size) */
1041 if (status != 0)
1042 return (status);
1044 *buffer_ret = buffer + buffer_pos;
1045 *buffer_size_ret = buffer_size - buffer_pos;
1046 *field_ret = field;
1048 return (0);
1049 } /* }}} int buffer_get_field */
1051 /* if we're restricting writes to the base directory,
1052 * check whether the file falls within the dir
1053 * returns 1 if OK, otherwise 0
1054 */
1055 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1056 {
1057 assert(file != NULL);
1059 if (!config_write_base_only
1060 || JOURNAL_REPLAY(sock)
1061 || config_base_dir == NULL)
1062 return 1;
1064 if (strstr(file, "../") != NULL) goto err;
1066 /* relative paths without "../" are ok */
1067 if (*file != '/') return 1;
1069 /* file must be of the format base + "/" + <1+ char filename> */
1070 if (strlen(file) < _config_base_dir_len + 2) goto err;
1071 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1072 if (*(file + _config_base_dir_len) != '/') goto err;
1074 return 1;
1076 err:
1077 if (sock != NULL && sock->fd >= 0)
1078 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1080 return 0;
1081 } /* }}} static int check_file_access */
1083 /* when using a base dir, convert relative paths to absolute paths.
1084 * if necessary, modifies the "filename" pointer to point
1085 * to the new path created in "tmp". "tmp" is provided
1086 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1087 *
1088 * this allows us to optimize for the expected case (absolute path)
1089 * with a no-op.
1090 */
1091 static void get_abs_path(char **filename, char *tmp)
1092 {
1093 assert(tmp != NULL);
1094 assert(filename != NULL && *filename != NULL);
1096 if (config_base_dir == NULL || **filename == '/')
1097 return;
1099 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1100 *filename = tmp;
1101 } /* }}} static int get_abs_path */
1103 static int flush_file (const char *filename) /* {{{ */
1104 {
1105 cache_item_t *ci;
1107 pthread_mutex_lock (&cache_lock);
1109 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1110 if (ci == NULL)
1111 {
1112 pthread_mutex_unlock (&cache_lock);
1113 return (ENOENT);
1114 }
1116 if (ci->values_num > 0)
1117 {
1118 /* Enqueue at head */
1119 enqueue_cache_item (ci, HEAD);
1120 pthread_cond_wait(&ci->flushed, &cache_lock);
1121 }
1123 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1124 * may have been purged during our cond_wait() */
1126 pthread_mutex_unlock(&cache_lock);
1128 return (0);
1129 } /* }}} int flush_file */
1131 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1132 {
1133 char *err = "Syntax error.\n";
1135 if (cmd && cmd->syntax)
1136 err = cmd->syntax;
1138 return send_response(sock, RESP_ERR, "Usage: %s", err);
1139 } /* }}} static int syntax_error() */
1141 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1142 {
1143 uint64_t copy_queue_length;
1144 uint64_t copy_updates_received;
1145 uint64_t copy_flush_received;
1146 uint64_t copy_updates_written;
1147 uint64_t copy_data_sets_written;
1148 uint64_t copy_journal_bytes;
1149 uint64_t copy_journal_rotate;
1151 uint64_t tree_nodes_number;
1152 uint64_t tree_depth;
1154 pthread_mutex_lock (&stats_lock);
1155 copy_queue_length = stats_queue_length;
1156 copy_updates_received = stats_updates_received;
1157 copy_flush_received = stats_flush_received;
1158 copy_updates_written = stats_updates_written;
1159 copy_data_sets_written = stats_data_sets_written;
1160 copy_journal_bytes = stats_journal_bytes;
1161 copy_journal_rotate = stats_journal_rotate;
1162 pthread_mutex_unlock (&stats_lock);
1164 pthread_mutex_lock (&cache_lock);
1165 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1166 tree_depth = (uint64_t) g_tree_height (cache_tree);
1167 pthread_mutex_unlock (&cache_lock);
1169 add_response_info(sock,
1170 "QueueLength: %"PRIu64"\n", copy_queue_length);
1171 add_response_info(sock,
1172 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1173 add_response_info(sock,
1174 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1175 add_response_info(sock,
1176 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1177 add_response_info(sock,
1178 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1179 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1180 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1181 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1182 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1184 send_response(sock, RESP_OK, "Statistics follow\n");
1186 return (0);
1187 } /* }}} int handle_request_stats */
1189 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1190 {
1191 char *file, file_tmp[PATH_MAX];
1192 int status;
1194 status = buffer_get_field (&buffer, &buffer_size, &file);
1195 if (status != 0)
1196 {
1197 return syntax_error(sock,cmd);
1198 }
1199 else
1200 {
1201 pthread_mutex_lock(&stats_lock);
1202 stats_flush_received++;
1203 pthread_mutex_unlock(&stats_lock);
1205 get_abs_path(&file, file_tmp);
1206 if (!check_file_access(file, sock)) return 0;
1208 status = flush_file (file);
1209 if (status == 0)
1210 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1211 else if (status == ENOENT)
1212 {
1213 /* no file in our tree; see whether it exists at all */
1214 struct stat statbuf;
1216 memset(&statbuf, 0, sizeof(statbuf));
1217 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1218 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1219 else
1220 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1221 }
1222 else if (status < 0)
1223 return send_response(sock, RESP_ERR, "Internal error.\n");
1224 else
1225 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1226 }
1228 /* NOTREACHED */
1229 assert(1==0);
1230 } /* }}} int handle_request_flush */
1232 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1233 {
1234 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1236 pthread_mutex_lock(&cache_lock);
1237 flush_old_values(-1);
1238 pthread_mutex_unlock(&cache_lock);
1240 return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1243 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1244 {
1245 int status;
1246 char *file, file_tmp[PATH_MAX];
1247 cache_item_t *ci;
1249 status = buffer_get_field(&buffer, &buffer_size, &file);
1250 if (status != 0)
1251 return syntax_error(sock,cmd);
1253 get_abs_path(&file, file_tmp);
1255 pthread_mutex_lock(&cache_lock);
1256 ci = g_tree_lookup(cache_tree, file);
1257 if (ci == NULL)
1258 {
1259 pthread_mutex_unlock(&cache_lock);
1260 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1261 }
1263 for (size_t i=0; i < ci->values_num; i++)
1264 add_response_info(sock, "%s\n", ci->values[i]);
1266 pthread_mutex_unlock(&cache_lock);
1267 return send_response(sock, RESP_OK, "updates pending\n");
1268 } /* }}} static int handle_request_pending */
1270 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1271 {
1272 int status;
1273 gboolean found;
1274 char *file, file_tmp[PATH_MAX];
1276 status = buffer_get_field(&buffer, &buffer_size, &file);
1277 if (status != 0)
1278 return syntax_error(sock,cmd);
1280 get_abs_path(&file, file_tmp);
1281 if (!check_file_access(file, sock)) return 0;
1283 pthread_mutex_lock(&cache_lock);
1284 found = g_tree_remove(cache_tree, file);
1285 pthread_mutex_unlock(&cache_lock);
1287 if (found == TRUE)
1288 {
1289 if (!JOURNAL_REPLAY(sock))
1290 journal_write("forget", file);
1292 return send_response(sock, RESP_OK, "Gone!\n");
1293 }
1294 else
1295 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1297 /* NOTREACHED */
1298 assert(1==0);
1299 } /* }}} static int handle_request_forget */
1301 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1302 {
1303 cache_item_t *ci;
1305 pthread_mutex_lock(&cache_lock);
1307 ci = cache_queue_head;
1308 while (ci != NULL)
1309 {
1310 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1311 ci = ci->next;
1312 }
1314 pthread_mutex_unlock(&cache_lock);
1316 return send_response(sock, RESP_OK, "in queue.\n");
1317 } /* }}} int handle_request_queue */
1319 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1320 {
1321 char *file, file_tmp[PATH_MAX];
1322 int values_num = 0;
1323 int status;
1324 char orig_buf[CMD_MAX];
1326 cache_item_t *ci;
1328 /* save it for the journal later */
1329 if (!JOURNAL_REPLAY(sock))
1330 strncpy(orig_buf, buffer, buffer_size);
1332 status = buffer_get_field (&buffer, &buffer_size, &file);
1333 if (status != 0)
1334 return syntax_error(sock,cmd);
1336 pthread_mutex_lock(&stats_lock);
1337 stats_updates_received++;
1338 pthread_mutex_unlock(&stats_lock);
1340 get_abs_path(&file, file_tmp);
1341 if (!check_file_access(file, sock)) return 0;
1343 pthread_mutex_lock (&cache_lock);
1344 ci = g_tree_lookup (cache_tree, file);
1346 if (ci == NULL) /* {{{ */
1347 {
1348 struct stat statbuf;
1349 cache_item_t *tmp;
1351 /* don't hold the lock while we setup; stat(2) might block */
1352 pthread_mutex_unlock(&cache_lock);
1354 memset (&statbuf, 0, sizeof (statbuf));
1355 status = stat (file, &statbuf);
1356 if (status != 0)
1357 {
1358 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1360 status = errno;
1361 if (status == ENOENT)
1362 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1363 else
1364 return send_response(sock, RESP_ERR,
1365 "stat failed with error %i.\n", status);
1366 }
1367 if (!S_ISREG (statbuf.st_mode))
1368 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1370 if (access(file, R_OK|W_OK) != 0)
1371 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1372 file, rrd_strerror(errno));
1374 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1375 if (ci == NULL)
1376 {
1377 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1379 return send_response(sock, RESP_ERR, "malloc failed.\n");
1380 }
1381 memset (ci, 0, sizeof (cache_item_t));
1383 ci->file = strdup (file);
1384 if (ci->file == NULL)
1385 {
1386 free (ci);
1387 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1389 return send_response(sock, RESP_ERR, "strdup failed.\n");
1390 }
1392 wipe_ci_values(ci, now);
1393 ci->flags = CI_FLAGS_IN_TREE;
1394 pthread_cond_init(&ci->flushed, NULL);
1396 pthread_mutex_lock(&cache_lock);
1398 /* another UPDATE might have added this entry in the meantime */
1399 tmp = g_tree_lookup (cache_tree, file);
1400 if (tmp == NULL)
1401 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1402 else
1403 {
1404 free_cache_item (ci);
1405 ci = tmp;
1406 }
1408 /* state may have changed while we were unlocked */
1409 if (state == SHUTDOWN)
1410 return -1;
1411 } /* }}} */
1412 assert (ci != NULL);
1414 /* don't re-write updates in replay mode */
1415 if (!JOURNAL_REPLAY(sock))
1416 journal_write("update", orig_buf);
1418 while (buffer_size > 0)
1419 {
1420 char *value;
1421 time_t stamp;
1422 char *eostamp;
1424 status = buffer_get_field (&buffer, &buffer_size, &value);
1425 if (status != 0)
1426 {
1427 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1428 break;
1429 }
1431 /* make sure update time is always moving forward */
1432 stamp = strtol(value, &eostamp, 10);
1433 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434 {
1435 pthread_mutex_unlock(&cache_lock);
1436 return send_response(sock, RESP_ERR,
1437 "Cannot find timestamp in '%s'!\n", value);
1438 }
1439 else if (stamp <= ci->last_update_stamp)
1440 {
1441 pthread_mutex_unlock(&cache_lock);
1442 return send_response(sock, RESP_ERR,
1443 "illegal attempt to update using time %ld when last"
1444 " update time is %ld (minimum one second step)\n",
1445 stamp, ci->last_update_stamp);
1446 }
1447 else
1448 ci->last_update_stamp = stamp;
1450 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451 &ci->values_alloc, config_alloc_chunk))
1452 {
1453 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454 continue;
1455 }
1457 values_num++;
1458 }
1460 if (((now - ci->last_flush_time) >= config_write_interval)
1461 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462 && (ci->values_num > 0))
1463 {
1464 enqueue_cache_item (ci, TAIL);
1465 }
1467 pthread_mutex_unlock (&cache_lock);
1469 if (values_num < 1)
1470 return send_response(sock, RESP_ERR, "No values updated.\n");
1471 else
1472 return send_response(sock, RESP_OK,
1473 "errors, enqueued %i value(s).\n", values_num);
1475 /* NOTREACHED */
1476 assert(1==0);
1478 } /* }}} int handle_request_update */
1480 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1481 {
1482 char *file, file_tmp[PATH_MAX];
1483 char *cf;
1485 char *start_str;
1486 char *end_str;
1487 time_t start_tm;
1488 time_t end_tm;
1490 unsigned long step;
1491 unsigned long ds_cnt;
1492 char **ds_namv;
1493 rrd_value_t *data;
1495 int status;
1496 unsigned long i;
1497 time_t t;
1498 rrd_value_t *data_ptr;
1500 file = NULL;
1501 cf = NULL;
1502 start_str = NULL;
1503 end_str = NULL;
1505 /* Read the arguments */
1506 do /* while (0) */
1507 {
1508 status = buffer_get_field (&buffer, &buffer_size, &file);
1509 if (status != 0)
1510 break;
1512 status = buffer_get_field (&buffer, &buffer_size, &cf);
1513 if (status != 0)
1514 break;
1516 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1517 if (status != 0)
1518 {
1519 start_str = NULL;
1520 status = 0;
1521 break;
1522 }
1524 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1525 if (status != 0)
1526 {
1527 end_str = NULL;
1528 status = 0;
1529 break;
1530 }
1531 } while (0);
1533 if (status != 0)
1534 return (syntax_error(sock,cmd));
1536 get_abs_path(&file, file_tmp);
1537 if (!check_file_access(file, sock)) return 0;
1539 status = flush_file (file);
1540 if ((status != 0) && (status != ENOENT))
1541 return (send_response (sock, RESP_ERR,
1542 "flush_file (%s) failed with status %i.\n", file, status));
1544 t = time (NULL); /* "now" */
1546 /* Parse start time */
1547 if (start_str != NULL)
1548 {
1549 char *endptr;
1550 long value;
1552 endptr = NULL;
1553 errno = 0;
1554 value = strtol (start_str, &endptr, /* base = */ 0);
1555 if ((endptr == start_str) || (errno != 0))
1556 return (send_response(sock, RESP_ERR,
1557 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1558 start_str));
1560 if (value > 0)
1561 start_tm = (time_t) value;
1562 else
1563 start_tm = (time_t) (t + value);
1564 }
1565 else
1566 {
1567 start_tm = t - 86400;
1568 }
1570 /* Parse end time */
1571 if (end_str != NULL)
1572 {
1573 char *endptr;
1574 long value;
1576 endptr = NULL;
1577 errno = 0;
1578 value = strtol (end_str, &endptr, /* base = */ 0);
1579 if ((endptr == end_str) || (errno != 0))
1580 return (send_response(sock, RESP_ERR,
1581 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1582 end_str));
1584 if (value > 0)
1585 end_tm = (time_t) value;
1586 else
1587 end_tm = (time_t) (t + value);
1588 }
1589 else
1590 {
1591 end_tm = t;
1592 }
1594 step = -1;
1595 ds_cnt = 0;
1596 ds_namv = NULL;
1597 data = NULL;
1599 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1600 &ds_cnt, &ds_namv, &data);
1601 if (status != 0)
1602 return (send_response(sock, RESP_ERR,
1603 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1605 add_response_info (sock, "FlushVersion: %lu\n", 1);
1606 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1607 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1608 add_response_info (sock, "Step: %lu\n", step);
1609 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1611 #define SSTRCAT(buffer,str,buffer_fill) do { \
1612 size_t str_len = strlen (str); \
1613 if ((buffer_fill + str_len) > sizeof (buffer)) \
1614 str_len = sizeof (buffer) - buffer_fill; \
1615 if (str_len > 0) { \
1616 strncpy (buffer + buffer_fill, str, str_len); \
1617 buffer_fill += str_len; \
1618 assert (buffer_fill <= sizeof (buffer)); \
1619 if (buffer_fill == sizeof (buffer)) \
1620 buffer[buffer_fill - 1] = 0; \
1621 else \
1622 buffer[buffer_fill] = 0; \
1623 } \
1624 } while (0)
1626 { /* Add list of DS names */
1627 char linebuf[1024];
1628 size_t linebuf_fill;
1630 memset (linebuf, 0, sizeof (linebuf));
1631 linebuf_fill = 0;
1632 for (i = 0; i < ds_cnt; i++)
1633 {
1634 if (i > 0)
1635 SSTRCAT (linebuf, " ", linebuf_fill);
1636 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1637 rrd_freemem(ds_namv[i]);
1638 }
1639 rrd_freemem(ds_namv);
1640 add_response_info (sock, "DSName: %s\n", linebuf);
1641 }
1643 /* Add the actual data */
1644 assert (step > 0);
1645 data_ptr = data;
1646 for (t = start_tm + step; t <= end_tm; t += step)
1647 {
1648 char linebuf[1024];
1649 size_t linebuf_fill;
1650 char tmp[128];
1652 memset (linebuf, 0, sizeof (linebuf));
1653 linebuf_fill = 0;
1654 for (i = 0; i < ds_cnt; i++)
1655 {
1656 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1657 tmp[sizeof (tmp) - 1] = 0;
1658 SSTRCAT (linebuf, tmp, linebuf_fill);
1660 data_ptr++;
1661 }
1663 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1664 } /* for (t) */
1665 rrd_freemem(data);
1667 return (send_response (sock, RESP_OK, "Success\n"));
1668 #undef SSTRCAT
1669 } /* }}} int handle_request_fetch */
1671 /* we came across a "WROTE" entry during journal replay.
1672 * throw away any values that we have accumulated for this file
1673 */
1674 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1675 {
1676 cache_item_t *ci;
1677 const char *file = buffer;
1679 pthread_mutex_lock(&cache_lock);
1681 ci = g_tree_lookup(cache_tree, file);
1682 if (ci == NULL)
1683 {
1684 pthread_mutex_unlock(&cache_lock);
1685 return (0);
1686 }
1688 if (ci->values)
1689 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1691 wipe_ci_values(ci, now);
1692 remove_from_queue(ci);
1694 pthread_mutex_unlock(&cache_lock);
1695 return (0);
1696 } /* }}} int handle_request_wrote */
1698 /* start "BATCH" processing */
1699 static int batch_start (HANDLER_PROTO) /* {{{ */
1700 {
1701 int status;
1702 if (sock->batch_start)
1703 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1705 status = send_response(sock, RESP_OK,
1706 "Go ahead. End with dot '.' on its own line.\n");
1707 sock->batch_start = time(NULL);
1708 sock->batch_cmd = 0;
1710 return status;
1711 } /* }}} static int batch_start */
1713 /* finish "BATCH" processing and return results to the client */
1714 static int batch_done (HANDLER_PROTO) /* {{{ */
1715 {
1716 assert(sock->batch_start);
1717 sock->batch_start = 0;
1718 sock->batch_cmd = 0;
1719 return send_response(sock, RESP_OK, "errors\n");
1720 } /* }}} static int batch_done */
1722 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1723 {
1724 return -1;
1725 } /* }}} static int handle_request_quit */
1727 static command_t list_of_commands[] = { /* {{{ */
1728 {
1729 "UPDATE",
1730 handle_request_update,
1731 CMD_CONTEXT_ANY,
1732 "UPDATE <filename> <values> [<values> ...]\n"
1733 ,
1734 "Adds the given file to the internal cache if it is not yet known and\n"
1735 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1736 "for details.\n"
1737 "\n"
1738 "Each <values> has the following form:\n"
1739 " <values> = <time>:<value>[:<value>[...]]\n"
1740 "See the rrdupdate(1) manpage for details.\n"
1741 },
1742 {
1743 "WROTE",
1744 handle_request_wrote,
1745 CMD_CONTEXT_JOURNAL,
1746 NULL,
1747 NULL
1748 },
1749 {
1750 "FLUSH",
1751 handle_request_flush,
1752 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1753 "FLUSH <filename>\n"
1754 ,
1755 "Adds the given filename to the head of the update queue and returns\n"
1756 "after it has been dequeued.\n"
1757 },
1758 {
1759 "FLUSHALL",
1760 handle_request_flushall,
1761 CMD_CONTEXT_CLIENT,
1762 "FLUSHALL\n"
1763 ,
1764 "Triggers writing of all pending updates. Returns immediately.\n"
1765 },
1766 {
1767 "PENDING",
1768 handle_request_pending,
1769 CMD_CONTEXT_CLIENT,
1770 "PENDING <filename>\n"
1771 ,
1772 "Shows any 'pending' updates for a file, in order.\n"
1773 "The updates shown have not yet been written to the underlying RRD file.\n"
1774 },
1775 {
1776 "FORGET",
1777 handle_request_forget,
1778 CMD_CONTEXT_ANY,
1779 "FORGET <filename>\n"
1780 ,
1781 "Removes the file completely from the cache.\n"
1782 "Any pending updates for the file will be lost.\n"
1783 },
1784 {
1785 "QUEUE",
1786 handle_request_queue,
1787 CMD_CONTEXT_CLIENT,
1788 "QUEUE\n"
1789 ,
1790 "Shows all files in the output queue.\n"
1791 "The output is zero or more lines in the following format:\n"
1792 "(where <num_vals> is the number of values to be written)\n"
1793 "\n"
1794 "<num_vals> <filename>\n"
1795 },
1796 {
1797 "STATS",
1798 handle_request_stats,
1799 CMD_CONTEXT_CLIENT,
1800 "STATS\n"
1801 ,
1802 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1803 "a description of the values.\n"
1804 },
1805 {
1806 "HELP",
1807 handle_request_help,
1808 CMD_CONTEXT_CLIENT,
1809 "HELP [<command>]\n",
1810 NULL, /* special! */
1811 },
1812 {
1813 "BATCH",
1814 batch_start,
1815 CMD_CONTEXT_CLIENT,
1816 "BATCH\n"
1817 ,
1818 "The 'BATCH' command permits the client to initiate a bulk load\n"
1819 " of commands to rrdcached.\n"
1820 "\n"
1821 "Usage:\n"
1822 "\n"
1823 " client: BATCH\n"
1824 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1825 " client: command #1\n"
1826 " client: command #2\n"
1827 " client: ... and so on\n"
1828 " client: .\n"
1829 " server: 2 errors\n"
1830 " server: 7 message for command #7\n"
1831 " server: 9 message for command #9\n"
1832 "\n"
1833 "For more information, consult the rrdcached(1) documentation.\n"
1834 },
1835 {
1836 ".", /* BATCH terminator */
1837 batch_done,
1838 CMD_CONTEXT_BATCH,
1839 NULL,
1840 NULL
1841 },
1842 {
1843 "FETCH",
1844 handle_request_fetch,
1845 CMD_CONTEXT_CLIENT,
1846 "FETCH <file> <CF> [<start> [<end>]]\n"
1847 ,
1848 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1849 },
1850 {
1851 "QUIT",
1852 handle_request_quit,
1853 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1854 "QUIT\n"
1855 ,
1856 "Disconnect from rrdcached.\n"
1857 }
1858 }; /* }}} command_t list_of_commands[] */
1859 static size_t list_of_commands_len = sizeof (list_of_commands)
1860 / sizeof (list_of_commands[0]);
1862 static command_t *find_command(char *cmd)
1863 {
1864 size_t i;
1866 for (i = 0; i < list_of_commands_len; i++)
1867 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1868 return (&list_of_commands[i]);
1869 return NULL;
1870 }
1872 /* We currently use the index in the `list_of_commands' array as a bit position
1873 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1874 * outside these functions so that switching to a more elegant storage method
1875 * is easily possible. */
1876 static ssize_t find_command_index (const char *cmd) /* {{{ */
1877 {
1878 size_t i;
1880 for (i = 0; i < list_of_commands_len; i++)
1881 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1882 return ((ssize_t) i);
1883 return (-1);
1884 } /* }}} ssize_t find_command_index */
1886 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1887 const char *cmd)
1888 {
1889 ssize_t i;
1891 if (JOURNAL_REPLAY(sock))
1892 return (1);
1894 if (cmd == NULL)
1895 return (-1);
1897 if ((strcasecmp ("QUIT", cmd) == 0)
1898 || (strcasecmp ("HELP", cmd) == 0))
1899 return (1);
1900 else if (strcmp (".", cmd) == 0)
1901 cmd = "BATCH";
1903 i = find_command_index (cmd);
1904 if (i < 0)
1905 return (-1);
1906 assert (i < 32);
1908 if ((sock->permissions & (1 << i)) != 0)
1909 return (1);
1910 return (0);
1911 } /* }}} int socket_permission_check */
1913 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1914 const char *cmd)
1915 {
1916 ssize_t i;
1918 i = find_command_index (cmd);
1919 if (i < 0)
1920 return (-1);
1921 assert (i < 32);
1923 sock->permissions |= (1 << i);
1924 return (0);
1925 } /* }}} int socket_permission_add */
1927 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1928 {
1929 sock->permissions = 0;
1930 } /* }}} socket_permission_clear */
1932 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1933 listen_socket_t *src)
1934 {
1935 dest->permissions = src->permissions;
1936 } /* }}} socket_permission_copy */
1938 /* check whether commands are received in the expected context */
1939 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1940 {
1941 if (JOURNAL_REPLAY(sock))
1942 return (cmd->context & CMD_CONTEXT_JOURNAL);
1943 else if (sock->batch_start)
1944 return (cmd->context & CMD_CONTEXT_BATCH);
1945 else
1946 return (cmd->context & CMD_CONTEXT_CLIENT);
1948 /* NOTREACHED */
1949 assert(1==0);
1950 }
1952 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1953 {
1954 int status;
1955 char *cmd_str;
1956 char *resp_txt;
1957 command_t *help = NULL;
1959 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1960 if (status == 0)
1961 help = find_command(cmd_str);
1963 if (help && (help->syntax || help->help))
1964 {
1965 char tmp[CMD_MAX];
1967 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1968 resp_txt = tmp;
1970 if (help->syntax)
1971 add_response_info(sock, "Usage: %s\n", help->syntax);
1973 if (help->help)
1974 add_response_info(sock, "%s\n", help->help);
1975 }
1976 else
1977 {
1978 size_t i;
1980 resp_txt = "Command overview\n";
1982 for (i = 0; i < list_of_commands_len; i++)
1983 {
1984 if (list_of_commands[i].syntax == NULL)
1985 continue;
1986 add_response_info (sock, "%s", list_of_commands[i].syntax);
1987 }
1988 }
1990 return send_response(sock, RESP_OK, resp_txt);
1991 } /* }}} int handle_request_help */
1993 static int handle_request (DISPATCH_PROTO) /* {{{ */
1994 {
1995 char *buffer_ptr = buffer;
1996 char *cmd_str = NULL;
1997 command_t *cmd = NULL;
1998 int status;
2000 assert (buffer[buffer_size - 1] == '\0');
2002 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2003 if (status != 0)
2004 {
2005 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2006 return (-1);
2007 }
2009 if (sock != NULL && sock->batch_start)
2010 sock->batch_cmd++;
2012 cmd = find_command(cmd_str);
2013 if (!cmd)
2014 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2016 if (!socket_permission_check (sock, cmd->cmd))
2017 return send_response(sock, RESP_ERR, "Permission denied.\n");
2019 if (!command_check_context(sock, cmd))
2020 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2022 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2023 } /* }}} int handle_request */
2025 static void journal_set_free (journal_set *js) /* {{{ */
2026 {
2027 if (js == NULL)
2028 return;
2030 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2032 free(js);
2033 } /* }}} journal_set_free */
2035 static void journal_set_remove (journal_set *js) /* {{{ */
2036 {
2037 if (js == NULL)
2038 return;
2040 for (uint i=0; i < js->files_num; i++)
2041 {
2042 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2043 unlink(js->files[i]);
2044 }
2045 } /* }}} journal_set_remove */
2047 /* close current journal file handle.
2048 * MUST hold journal_lock before calling */
2049 static void journal_close(void) /* {{{ */
2050 {
2051 if (journal_fh != NULL)
2052 {
2053 if (fclose(journal_fh) != 0)
2054 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2055 }
2057 journal_fh = NULL;
2058 journal_size = 0;
2059 } /* }}} journal_close */
2061 /* MUST hold journal_lock before calling */
2062 static void journal_new_file(void) /* {{{ */
2063 {
2064 struct timeval now;
2065 int new_fd;
2066 char new_file[PATH_MAX + 1];
2068 assert(journal_dir != NULL);
2069 assert(journal_cur != NULL);
2071 journal_close();
2073 gettimeofday(&now, NULL);
2074 /* this format assures that the files sort in strcmp() order */
2075 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2076 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2078 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2079 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2080 if (new_fd < 0)
2081 goto error;
2083 journal_fh = fdopen(new_fd, "a");
2084 if (journal_fh == NULL)
2085 goto error;
2087 journal_size = ftell(journal_fh);
2088 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2090 /* record the file in the journal set */
2091 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2093 return;
2095 error:
2096 RRDD_LOG(LOG_CRIT,
2097 "JOURNALING DISABLED: Error while trying to create %s : %s",
2098 new_file, rrd_strerror(errno));
2099 RRDD_LOG(LOG_CRIT,
2100 "JOURNALING DISABLED: All values will be flushed at shutdown");
2102 close(new_fd);
2103 config_flush_at_shutdown = 1;
2105 } /* }}} journal_new_file */
2107 /* MUST NOT hold journal_lock before calling this */
2108 static void journal_rotate(void) /* {{{ */
2109 {
2110 journal_set *old_js = NULL;
2112 if (journal_dir == NULL)
2113 return;
2115 RRDD_LOG(LOG_DEBUG, "rotating journals");
2117 pthread_mutex_lock(&stats_lock);
2118 ++stats_journal_rotate;
2119 pthread_mutex_unlock(&stats_lock);
2121 pthread_mutex_lock(&journal_lock);
2123 journal_close();
2125 /* rotate the journal sets */
2126 old_js = journal_old;
2127 journal_old = journal_cur;
2128 journal_cur = calloc(1, sizeof(journal_set));
2130 if (journal_cur != NULL)
2131 journal_new_file();
2132 else
2133 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2135 pthread_mutex_unlock(&journal_lock);
2137 journal_set_remove(old_js);
2138 journal_set_free (old_js);
2140 } /* }}} static void journal_rotate */
2142 /* MUST hold journal_lock when calling */
2143 static void journal_done(void) /* {{{ */
2144 {
2145 if (journal_cur == NULL)
2146 return;
2148 journal_close();
2150 if (config_flush_at_shutdown)
2151 {
2152 RRDD_LOG(LOG_INFO, "removing journals");
2153 journal_set_remove(journal_old);
2154 journal_set_remove(journal_cur);
2155 }
2156 else
2157 {
2158 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2159 "journals will be used at next startup");
2160 }
2162 journal_set_free(journal_cur);
2163 journal_set_free(journal_old);
2164 free(journal_dir);
2166 } /* }}} static void journal_done */
2168 static int journal_write(char *cmd, char *args) /* {{{ */
2169 {
2170 int chars;
2172 if (journal_fh == NULL)
2173 return 0;
2175 pthread_mutex_lock(&journal_lock);
2176 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2177 journal_size += chars;
2179 if (journal_size > JOURNAL_MAX)
2180 journal_new_file();
2182 pthread_mutex_unlock(&journal_lock);
2184 if (chars > 0)
2185 {
2186 pthread_mutex_lock(&stats_lock);
2187 stats_journal_bytes += chars;
2188 pthread_mutex_unlock(&stats_lock);
2189 }
2191 return chars;
2192 } /* }}} static int journal_write */
2194 static int journal_replay (const char *file) /* {{{ */
2195 {
2196 FILE *fh;
2197 int entry_cnt = 0;
2198 int fail_cnt = 0;
2199 uint64_t line = 0;
2200 char entry[CMD_MAX];
2201 time_t now;
2203 if (file == NULL) return 0;
2205 {
2206 char *reason = "unknown error";
2207 int status = 0;
2208 struct stat statbuf;
2210 memset(&statbuf, 0, sizeof(statbuf));
2211 if (stat(file, &statbuf) != 0)
2212 {
2213 reason = "stat error";
2214 status = errno;
2215 }
2216 else if (!S_ISREG(statbuf.st_mode))
2217 {
2218 reason = "not a regular file";
2219 status = EPERM;
2220 }
2221 if (statbuf.st_uid != daemon_uid)
2222 {
2223 reason = "not owned by daemon user";
2224 status = EACCES;
2225 }
2226 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2227 {
2228 reason = "must not be user/group writable";
2229 status = EACCES;
2230 }
2232 if (status != 0)
2233 {
2234 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2235 file, rrd_strerror(status), reason);
2236 return 0;
2237 }
2238 }
2240 fh = fopen(file, "r");
2241 if (fh == NULL)
2242 {
2243 if (errno != ENOENT)
2244 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2245 file, rrd_strerror(errno));
2246 return 0;
2247 }
2248 else
2249 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2251 now = time(NULL);
2253 while(!feof(fh))
2254 {
2255 size_t entry_len;
2257 ++line;
2258 if (fgets(entry, sizeof(entry), fh) == NULL)
2259 break;
2260 entry_len = strlen(entry);
2262 /* check \n termination in case journal writing crashed mid-line */
2263 if (entry_len == 0)
2264 continue;
2265 else if (entry[entry_len - 1] != '\n')
2266 {
2267 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2268 ++fail_cnt;
2269 continue;
2270 }
2272 entry[entry_len - 1] = '\0';
2274 if (handle_request(NULL, now, entry, entry_len) == 0)
2275 ++entry_cnt;
2276 else
2277 ++fail_cnt;
2278 }
2280 fclose(fh);
2282 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2283 entry_cnt, fail_cnt);
2285 return entry_cnt > 0 ? 1 : 0;
2286 } /* }}} static int journal_replay */
2288 static int journal_sort(const void *v1, const void *v2)
2289 {
2290 char **jn1 = (char **) v1;
2291 char **jn2 = (char **) v2;
2293 return strcmp(*jn1,*jn2);
2294 }
2296 static void journal_init(void) /* {{{ */
2297 {
2298 int had_journal = 0;
2299 DIR *dir;
2300 struct dirent *dent;
2301 char path[PATH_MAX+1];
2303 if (journal_dir == NULL) return;
2305 pthread_mutex_lock(&journal_lock);
2307 journal_cur = calloc(1, sizeof(journal_set));
2308 if (journal_cur == NULL)
2309 {
2310 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2311 return;
2312 }
2314 RRDD_LOG(LOG_INFO, "checking for journal files");
2316 /* Handle old journal files during transition. This gives them the
2317 * correct sort order. TODO: remove after first release
2318 */
2319 {
2320 char old_path[PATH_MAX+1];
2321 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2322 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2323 rename(old_path, path);
2325 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2326 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2327 rename(old_path, path);
2328 }
2330 dir = opendir(journal_dir);
2331 if (!dir) {
2332 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2333 return;
2334 }
2335 while ((dent = readdir(dir)) != NULL)
2336 {
2337 /* looks like a journal file? */
2338 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2339 continue;
2341 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2343 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2344 {
2345 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2346 dent->d_name);
2347 break;
2348 }
2349 }
2350 closedir(dir);
2352 qsort(journal_cur->files, journal_cur->files_num,
2353 sizeof(journal_cur->files[0]), journal_sort);
2355 for (uint i=0; i < journal_cur->files_num; i++)
2356 had_journal += journal_replay(journal_cur->files[i]);
2358 journal_new_file();
2360 /* it must have been a crash. start a flush */
2361 if (had_journal && config_flush_at_shutdown)
2362 flush_old_values(-1);
2364 pthread_mutex_unlock(&journal_lock);
2366 RRDD_LOG(LOG_INFO, "journal processing complete");
2368 } /* }}} static void journal_init */
2370 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2371 {
2372 assert(sock != NULL);
2374 free(sock->rbuf); sock->rbuf = NULL;
2375 free(sock->wbuf); sock->wbuf = NULL;
2376 free(sock);
2377 } /* }}} void free_listen_socket */
2379 static void close_connection(listen_socket_t *sock) /* {{{ */
2380 {
2381 if (sock->fd >= 0)
2382 {
2383 close(sock->fd);
2384 sock->fd = -1;
2385 }
2387 free_listen_socket(sock);
2389 } /* }}} void close_connection */
2391 static void *connection_thread_main (void *args) /* {{{ */
2392 {
2393 listen_socket_t *sock;
2394 int fd;
2396 sock = (listen_socket_t *) args;
2397 fd = sock->fd;
2399 /* init read buffers */
2400 sock->next_read = sock->next_cmd = 0;
2401 sock->rbuf = malloc(RBUF_SIZE);
2402 if (sock->rbuf == NULL)
2403 {
2404 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2405 close_connection(sock);
2406 return NULL;
2407 }
2409 pthread_mutex_lock (&connection_threads_lock);
2410 connection_threads_num++;
2411 pthread_mutex_unlock (&connection_threads_lock);
2413 while (state == RUNNING)
2414 {
2415 char *cmd;
2416 ssize_t cmd_len;
2417 ssize_t rbytes;
2418 time_t now;
2420 struct pollfd pollfd;
2421 int status;
2423 pollfd.fd = fd;
2424 pollfd.events = POLLIN | POLLPRI;
2425 pollfd.revents = 0;
2427 status = poll (&pollfd, 1, /* timeout = */ 500);
2428 if (state != RUNNING)
2429 break;
2430 else if (status == 0) /* timeout */
2431 continue;
2432 else if (status < 0) /* error */
2433 {
2434 status = errno;
2435 if (status != EINTR)
2436 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2437 continue;
2438 }
2440 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2441 break;
2442 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2443 {
2444 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2445 "poll(2) returned something unexpected: %#04hx",
2446 pollfd.revents);
2447 break;
2448 }
2450 rbytes = read(fd, sock->rbuf + sock->next_read,
2451 RBUF_SIZE - sock->next_read);
2452 if (rbytes < 0)
2453 {
2454 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2455 break;
2456 }
2457 else if (rbytes == 0)
2458 break; /* eof */
2460 sock->next_read += rbytes;
2462 if (sock->batch_start)
2463 now = sock->batch_start;
2464 else
2465 now = time(NULL);
2467 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2468 {
2469 status = handle_request (sock, now, cmd, cmd_len+1);
2470 if (status != 0)
2471 goto out_close;
2472 }
2473 }
2475 out_close:
2476 close_connection(sock);
2478 /* Remove this thread from the connection threads list */
2479 pthread_mutex_lock (&connection_threads_lock);
2480 connection_threads_num--;
2481 if (connection_threads_num <= 0)
2482 pthread_cond_broadcast(&connection_threads_done);
2483 pthread_mutex_unlock (&connection_threads_lock);
2485 return (NULL);
2486 } /* }}} void *connection_thread_main */
2488 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2489 {
2490 int fd;
2491 struct sockaddr_un sa;
2492 listen_socket_t *temp;
2493 int status;
2494 const char *path;
2495 char *path_copy, *dir;
2497 path = sock->addr;
2498 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2499 path += strlen("unix:");
2501 /* dirname may modify its argument */
2502 path_copy = strdup(path);
2503 if (path_copy == NULL)
2504 {
2505 fprintf(stderr, "rrdcached: strdup(): %s\n",
2506 rrd_strerror(errno));
2507 return (-1);
2508 }
2510 dir = dirname(path_copy);
2511 if (rrd_mkdir_p(dir, 0777) != 0)
2512 {
2513 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2514 dir, rrd_strerror(errno));
2515 return (-1);
2516 }
2518 free(path_copy);
2520 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2521 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2522 if (temp == NULL)
2523 {
2524 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2525 return (-1);
2526 }
2527 listen_fds = temp;
2528 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2530 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2531 if (fd < 0)
2532 {
2533 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2534 rrd_strerror(errno));
2535 return (-1);
2536 }
2538 memset (&sa, 0, sizeof (sa));
2539 sa.sun_family = AF_UNIX;
2540 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2542 /* if we've gotten this far, we own the pid file. any daemon started
2543 * with the same args must not be alive. therefore, ensure that we can
2544 * create the socket...
2545 */
2546 unlink(path);
2548 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2549 if (status != 0)
2550 {
2551 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2552 path, rrd_strerror(errno));
2553 close (fd);
2554 return (-1);
2555 }
2557 /* tweak the sockets group ownership */
2558 if (sock->socket_group != (gid_t)-1)
2559 {
2560 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2561 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2562 {
2563 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2564 }
2565 }
2567 if (sock->socket_permissions != (mode_t)-1)
2568 {
2569 if (chmod(path, sock->socket_permissions) != 0)
2570 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2571 (unsigned int)sock->socket_permissions, strerror(errno));
2572 }
2574 status = listen (fd, /* backlog = */ 10);
2575 if (status != 0)
2576 {
2577 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2578 path, rrd_strerror(errno));
2579 close (fd);
2580 unlink (path);
2581 return (-1);
2582 }
2584 listen_fds[listen_fds_num].fd = fd;
2585 listen_fds[listen_fds_num].family = PF_UNIX;
2586 strncpy(listen_fds[listen_fds_num].addr, path,
2587 sizeof (listen_fds[listen_fds_num].addr) - 1);
2588 listen_fds_num++;
2590 return (0);
2591 } /* }}} int open_listen_socket_unix */
2593 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2594 {
2595 struct addrinfo ai_hints;
2596 struct addrinfo *ai_res;
2597 struct addrinfo *ai_ptr;
2598 char addr_copy[NI_MAXHOST];
2599 char *addr;
2600 char *port;
2601 int status;
2603 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2604 addr_copy[sizeof (addr_copy) - 1] = 0;
2605 addr = addr_copy;
2607 memset (&ai_hints, 0, sizeof (ai_hints));
2608 ai_hints.ai_flags = 0;
2609 #ifdef AI_ADDRCONFIG
2610 ai_hints.ai_flags |= AI_ADDRCONFIG;
2611 #endif
2612 ai_hints.ai_family = AF_UNSPEC;
2613 ai_hints.ai_socktype = SOCK_STREAM;
2615 port = NULL;
2616 if (*addr == '[') /* IPv6+port format */
2617 {
2618 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2619 addr++;
2621 port = strchr (addr, ']');
2622 if (port == NULL)
2623 {
2624 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2625 return (-1);
2626 }
2627 *port = 0;
2628 port++;
2630 if (*port == ':')
2631 port++;
2632 else if (*port == 0)
2633 port = NULL;
2634 else
2635 {
2636 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2637 return (-1);
2638 }
2639 } /* if (*addr == '[') */
2640 else
2641 {
2642 port = rindex(addr, ':');
2643 if (port != NULL)
2644 {
2645 *port = 0;
2646 port++;
2647 }
2648 }
2649 ai_res = NULL;
2650 status = getaddrinfo (addr,
2651 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2652 &ai_hints, &ai_res);
2653 if (status != 0)
2654 {
2655 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2656 addr, gai_strerror (status));
2657 return (-1);
2658 }
2660 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2661 {
2662 int fd;
2663 listen_socket_t *temp;
2664 int one = 1;
2666 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2667 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2668 if (temp == NULL)
2669 {
2670 fprintf (stderr,
2671 "rrdcached: open_listen_socket_network: realloc failed.\n");
2672 continue;
2673 }
2674 listen_fds = temp;
2675 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2677 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2678 if (fd < 0)
2679 {
2680 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2681 rrd_strerror(errno));
2682 continue;
2683 }
2685 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2687 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2688 if (status != 0)
2689 {
2690 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2691 sock->addr, rrd_strerror(errno));
2692 close (fd);
2693 continue;
2694 }
2696 status = listen (fd, /* backlog = */ 10);
2697 if (status != 0)
2698 {
2699 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2700 sock->addr, rrd_strerror(errno));
2701 close (fd);
2702 freeaddrinfo(ai_res);
2703 return (-1);
2704 }
2706 listen_fds[listen_fds_num].fd = fd;
2707 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2708 listen_fds_num++;
2709 } /* for (ai_ptr) */
2711 freeaddrinfo(ai_res);
2712 return (0);
2713 } /* }}} static int open_listen_socket_network */
2715 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2716 {
2717 assert(sock != NULL);
2718 assert(sock->addr != NULL);
2720 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2721 || sock->addr[0] == '/')
2722 return (open_listen_socket_unix(sock));
2723 else
2724 return (open_listen_socket_network(sock));
2725 } /* }}} int open_listen_socket */
2727 static int close_listen_sockets (void) /* {{{ */
2728 {
2729 size_t i;
2731 for (i = 0; i < listen_fds_num; i++)
2732 {
2733 close (listen_fds[i].fd);
2735 if (listen_fds[i].family == PF_UNIX)
2736 unlink(listen_fds[i].addr);
2737 }
2739 free (listen_fds);
2740 listen_fds = NULL;
2741 listen_fds_num = 0;
2743 return (0);
2744 } /* }}} int close_listen_sockets */
2746 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2747 {
2748 struct pollfd *pollfds;
2749 int pollfds_num;
2750 int status;
2751 int i;
2753 if (listen_fds_num < 1)
2754 {
2755 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2756 return (NULL);
2757 }
2759 pollfds_num = listen_fds_num;
2760 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2761 if (pollfds == NULL)
2762 {
2763 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2764 return (NULL);
2765 }
2766 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2768 RRDD_LOG(LOG_INFO, "listening for connections");
2770 while (state == RUNNING)
2771 {
2772 for (i = 0; i < pollfds_num; i++)
2773 {
2774 pollfds[i].fd = listen_fds[i].fd;
2775 pollfds[i].events = POLLIN | POLLPRI;
2776 pollfds[i].revents = 0;
2777 }
2779 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2780 if (state != RUNNING)
2781 break;
2782 else if (status == 0) /* timeout */
2783 continue;
2784 else if (status < 0) /* error */
2785 {
2786 status = errno;
2787 if (status != EINTR)
2788 {
2789 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2790 }
2791 continue;
2792 }
2794 for (i = 0; i < pollfds_num; i++)
2795 {
2796 listen_socket_t *client_sock;
2797 struct sockaddr_storage client_sa;
2798 socklen_t client_sa_size;
2799 pthread_t tid;
2800 pthread_attr_t attr;
2802 if (pollfds[i].revents == 0)
2803 continue;
2805 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2806 {
2807 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2808 "poll(2) returned something unexpected for listen FD #%i.",
2809 pollfds[i].fd);
2810 continue;
2811 }
2813 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2814 if (client_sock == NULL)
2815 {
2816 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2817 continue;
2818 }
2819 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2821 client_sa_size = sizeof (client_sa);
2822 client_sock->fd = accept (pollfds[i].fd,
2823 (struct sockaddr *) &client_sa, &client_sa_size);
2824 if (client_sock->fd < 0)
2825 {
2826 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2827 free(client_sock);
2828 continue;
2829 }
2831 pthread_attr_init (&attr);
2832 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2834 status = pthread_create (&tid, &attr, connection_thread_main,
2835 client_sock);
2836 if (status != 0)
2837 {
2838 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2839 close_connection(client_sock);
2840 continue;
2841 }
2842 } /* for (pollfds_num) */
2843 } /* while (state == RUNNING) */
2845 RRDD_LOG(LOG_INFO, "starting shutdown");
2847 close_listen_sockets ();
2849 pthread_mutex_lock (&connection_threads_lock);
2850 while (connection_threads_num > 0)
2851 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2852 pthread_mutex_unlock (&connection_threads_lock);
2854 free(pollfds);
2856 return (NULL);
2857 } /* }}} void *listen_thread_main */
2859 static int daemonize (void) /* {{{ */
2860 {
2861 int pid_fd;
2862 char *base_dir;
2864 daemon_uid = geteuid();
2866 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2867 if (pid_fd < 0)
2868 pid_fd = check_pidfile();
2869 if (pid_fd < 0)
2870 return pid_fd;
2872 /* open all the listen sockets */
2873 if (config_listen_address_list_len > 0)
2874 {
2875 for (size_t i = 0; i < config_listen_address_list_len; i++)
2876 open_listen_socket (config_listen_address_list[i]);
2878 rrd_free_ptrs((void ***) &config_listen_address_list,
2879 &config_listen_address_list_len);
2880 }
2881 else
2882 {
2883 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2884 sizeof(default_socket.addr) - 1);
2885 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2886 open_listen_socket (&default_socket);
2887 }
2889 if (listen_fds_num < 1)
2890 {
2891 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2892 goto error;
2893 }
2895 if (!stay_foreground)
2896 {
2897 pid_t child;
2899 child = fork ();
2900 if (child < 0)
2901 {
2902 fprintf (stderr, "daemonize: fork(2) failed.\n");
2903 goto error;
2904 }
2905 else if (child > 0)
2906 exit(0);
2908 /* Become session leader */
2909 setsid ();
2911 /* Open the first three file descriptors to /dev/null */
2912 close (2);
2913 close (1);
2914 close (0);
2916 open ("/dev/null", O_RDWR);
2917 if (dup(0) == -1 || dup(0) == -1){
2918 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2919 }
2920 } /* if (!stay_foreground) */
2922 /* Change into the /tmp directory. */
2923 base_dir = (config_base_dir != NULL)
2924 ? config_base_dir
2925 : "/tmp";
2927 if (chdir (base_dir) != 0)
2928 {
2929 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2930 goto error;
2931 }
2933 install_signal_handlers();
2935 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2936 RRDD_LOG(LOG_INFO, "starting up");
2938 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2939 (GDestroyNotify) free_cache_item);
2940 if (cache_tree == NULL)
2941 {
2942 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2943 goto error;
2944 }
2946 return write_pidfile (pid_fd);
2948 error:
2949 remove_pidfile();
2950 return -1;
2951 } /* }}} int daemonize */
2953 static int cleanup (void) /* {{{ */
2954 {
2955 pthread_cond_broadcast (&flush_cond);
2956 pthread_join (flush_thread, NULL);
2958 pthread_cond_broadcast (&queue_cond);
2959 for (int i = 0; i < config_queue_threads; i++)
2960 pthread_join (queue_threads[i], NULL);
2962 if (config_flush_at_shutdown)
2963 {
2964 assert(cache_queue_head == NULL);
2965 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2966 }
2968 free(queue_threads);
2969 free(config_base_dir);
2971 pthread_mutex_lock(&cache_lock);
2972 g_tree_destroy(cache_tree);
2974 pthread_mutex_lock(&journal_lock);
2975 journal_done();
2977 RRDD_LOG(LOG_INFO, "goodbye");
2978 closelog ();
2980 remove_pidfile ();
2981 free(config_pid_file);
2983 return (0);
2984 } /* }}} int cleanup */
2986 static int read_options (int argc, char **argv) /* {{{ */
2987 {
2988 int option;
2989 int status = 0;
2991 socket_permission_clear (&default_socket);
2993 default_socket.socket_group = (gid_t)-1;
2994 default_socket.socket_permissions = (mode_t)-1;
2996 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2997 {
2998 switch (option)
2999 {
3000 case 'g':
3001 stay_foreground=1;
3002 break;
3004 case 'l':
3005 {
3006 listen_socket_t *new;
3008 new = malloc(sizeof(listen_socket_t));
3009 if (new == NULL)
3010 {
3011 fprintf(stderr, "read_options: malloc failed.\n");
3012 return(2);
3013 }
3014 memset(new, 0, sizeof(listen_socket_t));
3016 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3018 /* Add permissions to the socket {{{ */
3019 if (default_socket.permissions != 0)
3020 {
3021 socket_permission_copy (new, &default_socket);
3022 }
3023 else /* if (default_socket.permissions == 0) */
3024 {
3025 /* Add permission for ALL commands to the socket. */
3026 size_t i;
3027 for (i = 0; i < list_of_commands_len; i++)
3028 {
3029 status = socket_permission_add (new, list_of_commands[i].cmd);
3030 if (status != 0)
3031 {
3032 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3033 "socket failed. This should never happen, ever! Sorry.\n",
3034 list_of_commands[i].cmd);
3035 status = 4;
3036 }
3037 }
3038 }
3039 /* }}} Done adding permissions. */
3041 new->socket_group = default_socket.socket_group;
3042 new->socket_permissions = default_socket.socket_permissions;
3044 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3045 &config_listen_address_list_len, new))
3046 {
3047 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3048 return (2);
3049 }
3050 }
3051 break;
3053 /* set socket group permissions */
3054 case 's':
3055 {
3056 gid_t group_gid;
3057 struct group *grp;
3059 group_gid = strtoul(optarg, NULL, 10);
3060 if (errno != EINVAL && group_gid>0)
3061 {
3062 /* we were passed a number */
3063 grp = getgrgid(group_gid);
3064 }
3065 else
3066 {
3067 grp = getgrnam(optarg);
3068 }
3070 if (grp)
3071 {
3072 default_socket.socket_group = grp->gr_gid;
3073 }
3074 else
3075 {
3076 /* no idea what the user wanted... */
3077 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3078 return (5);
3079 }
3080 }
3081 break;
3083 /* set socket file permissions */
3084 case 'm':
3085 {
3086 long tmp;
3087 char *endptr = NULL;
3089 tmp = strtol (optarg, &endptr, 8);
3090 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3091 || (tmp > 07777) || (tmp < 0)) {
3092 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3093 optarg);
3094 return (5);
3095 }
3097 default_socket.socket_permissions = (mode_t)tmp;
3098 }
3099 break;
3101 case 'P':
3102 {
3103 char *optcopy;
3104 char *saveptr;
3105 char *dummy;
3106 char *ptr;
3108 socket_permission_clear (&default_socket);
3110 optcopy = strdup (optarg);
3111 dummy = optcopy;
3112 saveptr = NULL;
3113 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3114 {
3115 dummy = NULL;
3116 status = socket_permission_add (&default_socket, ptr);
3117 if (status != 0)
3118 {
3119 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3120 "socket failed. Most likely, this permission doesn't "
3121 "exist. Check your command line.\n", ptr);
3122 status = 4;
3123 }
3124 }
3126 free (optcopy);
3127 }
3128 break;
3130 case 'f':
3131 {
3132 int temp;
3134 temp = atoi (optarg);
3135 if (temp > 0)
3136 config_flush_interval = temp;
3137 else
3138 {
3139 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3140 status = 3;
3141 }
3142 }
3143 break;
3145 case 'w':
3146 {
3147 int temp;
3149 temp = atoi (optarg);
3150 if (temp > 0)
3151 config_write_interval = temp;
3152 else
3153 {
3154 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3155 status = 2;
3156 }
3157 }
3158 break;
3160 case 'z':
3161 {
3162 int temp;
3164 temp = atoi(optarg);
3165 if (temp > 0)
3166 config_write_jitter = temp;
3167 else
3168 {
3169 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3170 status = 2;
3171 }
3173 break;
3174 }
3176 case 't':
3177 {
3178 int threads;
3179 threads = atoi(optarg);
3180 if (threads >= 1)
3181 config_queue_threads = threads;
3182 else
3183 {
3184 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3185 return 1;
3186 }
3187 }
3188 break;
3190 case 'B':
3191 config_write_base_only = 1;
3192 break;
3194 case 'b':
3195 {
3196 size_t len;
3197 char base_realpath[PATH_MAX];
3199 if (config_base_dir != NULL)
3200 free (config_base_dir);
3201 config_base_dir = strdup (optarg);
3202 if (config_base_dir == NULL)
3203 {
3204 fprintf (stderr, "read_options: strdup failed.\n");
3205 return (3);
3206 }
3208 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3209 {
3210 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3211 config_base_dir, rrd_strerror (errno));
3212 return (3);
3213 }
3215 /* make sure that the base directory is not resolved via
3216 * symbolic links. this makes some performance-enhancing
3217 * assumptions possible (we don't have to resolve paths
3218 * that start with a "/")
3219 */
3220 if (realpath(config_base_dir, base_realpath) == NULL)
3221 {
3222 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3223 "%s\n", config_base_dir, rrd_strerror(errno));
3224 return 5;
3225 }
3227 len = strlen (config_base_dir);
3228 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3229 {
3230 config_base_dir[len - 1] = 0;
3231 len--;
3232 }
3234 if (len < 1)
3235 {
3236 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3237 return (4);
3238 }
3240 _config_base_dir_len = len;
3242 len = strlen (base_realpath);
3243 while ((len > 0) && (base_realpath[len - 1] == '/'))
3244 {
3245 base_realpath[len - 1] = '\0';
3246 len--;
3247 }
3249 if (strncmp(config_base_dir,
3250 base_realpath, sizeof(base_realpath)) != 0)
3251 {
3252 fprintf(stderr,
3253 "Base directory (-b) resolved via file system links!\n"
3254 "Please consult rrdcached '-b' documentation!\n"
3255 "Consider specifying the real directory (%s)\n",
3256 base_realpath);
3257 return 5;
3258 }
3259 }
3260 break;
3262 case 'p':
3263 {
3264 if (config_pid_file != NULL)
3265 free (config_pid_file);
3266 config_pid_file = strdup (optarg);
3267 if (config_pid_file == NULL)
3268 {
3269 fprintf (stderr, "read_options: strdup failed.\n");
3270 return (3);
3271 }
3272 }
3273 break;
3275 case 'F':
3276 config_flush_at_shutdown = 1;
3277 break;
3279 case 'j':
3280 {
3281 char journal_dir_actual[PATH_MAX];
3282 const char *dir;
3283 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3285 status = rrd_mkdir_p(dir, 0777);
3286 if (status != 0)
3287 {
3288 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3289 dir, rrd_strerror(errno));
3290 return 6;
3291 }
3293 if (access(dir, R_OK|W_OK|X_OK) != 0)
3294 {
3295 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3296 errno ? rrd_strerror(errno) : "");
3297 return 6;
3298 }
3299 }
3300 break;
3302 case 'a':
3303 {
3304 int temp = atoi(optarg);
3305 if (temp > 0)
3306 config_alloc_chunk = temp;
3307 else
3308 {
3309 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3310 return 10;
3311 }
3312 }
3313 break;
3315 case 'h':
3316 case '?':
3317 printf ("RRDCacheD %s\n"
3318 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3319 "\n"
3320 "Usage: rrdcached [options]\n"
3321 "\n"
3322 "Valid options are:\n"
3323 " -l <address> Socket address to listen to.\n"
3324 " -P <perms> Sets the permissions to assign to all following "
3325 "sockets\n"
3326 " -w <seconds> Interval in which to write data.\n"
3327 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3328 " -t <threads> Number of write threads.\n"
3329 " -f <seconds> Interval in which to flush dead data.\n"
3330 " -p <file> Location of the PID-file.\n"
3331 " -b <dir> Base directory to change to.\n"
3332 " -B Restrict file access to paths within -b <dir>\n"
3333 " -g Do not fork and run in the foreground.\n"
3334 " -j <dir> Directory in which to create the journal files.\n"
3335 " -F Always flush all updates at shutdown\n"
3336 " -s <id|name> Group owner of all following UNIX sockets\n"
3337 " (the socket will also have read/write permissions "
3338 "for that group)\n"
3339 " -m <mode> File permissions (octal) of all following UNIX "
3340 "sockets\n"
3341 " -a <size> Memory allocation chunk size. Default is 1."
3342 "\n"
3343 "For more information and a detailed description of all options "
3344 "please refer\n"
3345 "to the rrdcached(1) manual page.\n",
3346 VERSION);
3347 if (option == 'h')
3348 status = -1;
3349 else
3350 status = 1;
3351 break;
3352 } /* switch (option) */
3353 } /* while (getopt) */
3355 /* advise the user when values are not sane */
3356 if (config_flush_interval < 2 * config_write_interval)
3357 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3358 " 2x write interval (-w) !\n");
3359 if (config_write_jitter > config_write_interval)
3360 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3361 " write interval (-w) !\n");
3363 if (config_write_base_only && config_base_dir == NULL)
3364 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3365 " Consult the rrdcached documentation\n");
3367 if (journal_dir == NULL)
3368 config_flush_at_shutdown = 1;
3370 return (status);
3371 } /* }}} int read_options */
3373 int main (int argc, char **argv)
3374 {
3375 int status;
3377 status = read_options (argc, argv);
3378 if (status != 0)
3379 {
3380 if (status < 0)
3381 status = 0;
3382 return (status);
3383 }
3385 status = daemonize ();
3386 if (status != 0)
3387 {
3388 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3389 return (1);
3390 }
3392 journal_init();
3394 /* start the queue threads */
3395 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3396 if (queue_threads == NULL)
3397 {
3398 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3399 cleanup();
3400 return (1);
3401 }
3402 for (int i = 0; i < config_queue_threads; i++)
3403 {
3404 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3405 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3406 if (status != 0)
3407 {
3408 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3409 cleanup();
3410 return (1);
3411 }
3412 }
3414 /* start the flush thread */
3415 memset(&flush_thread, 0, sizeof(flush_thread));
3416 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3417 if (status != 0)
3418 {
3419 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3420 cleanup();
3421 return (1);
3422 }
3424 listen_thread_main (NULL);
3425 cleanup ();
3427 return (0);
3428 } /* int main */
3430 /*
3431 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3432 */