1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #include "rrd_tool.h"
67 #include "rrd_client.h"
68 #include "unused.h"
70 #include <stdlib.h>
72 #ifndef WIN32
73 #ifdef HAVE_STDINT_H
74 # include <stdint.h>
75 #endif
76 #include <unistd.h>
77 #include <strings.h>
78 #include <inttypes.h>
79 #include <sys/socket.h>
81 #else
83 #endif
84 #include <stdio.h>
85 #include <string.h>
87 #include <sys/types.h>
88 #include <sys/stat.h>
89 #include <dirent.h>
90 #include <fcntl.h>
91 #include <signal.h>
92 #include <sys/un.h>
93 #include <netdb.h>
94 #include <poll.h>
95 #include <syslog.h>
96 #include <pthread.h>
97 #include <errno.h>
98 #include <assert.h>
99 #include <sys/time.h>
100 #include <time.h>
101 #include <libgen.h>
102 #include <grp.h>
104 #ifdef HAVE_LIBWRAP
105 #include <tcpd.h>
106 #endif /* HAVE_LIBWRAP */
108 #include <glib.h>
109 /* }}} */
111 #define RRDD_LOG(severity, ...) \
112 do { \
113 if (stay_foreground) { \
114 fprintf(stderr, __VA_ARGS__); \
115 fprintf(stderr, "\n"); } \
116 syslog ((severity), __VA_ARGS__); \
117 } while (0)
119 /*
120 * Types
121 */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124 struct listen_socket_s
125 {
126 int fd;
127 char addr[PATH_MAX + 1];
128 int family;
130 /* state for BATCH processing */
131 time_t batch_start;
132 int batch_cmd;
134 /* buffered IO */
135 char *rbuf;
136 off_t next_cmd;
137 off_t next_read;
139 char *wbuf;
140 ssize_t wbuf_len;
142 uint32_t permissions;
144 gid_t socket_group;
145 mode_t socket_permissions;
146 };
147 typedef struct listen_socket_s listen_socket_t;
149 struct command_s;
150 typedef struct command_s command_t;
151 /* note: guard against "unused" warnings in the handlers */
152 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
153 time_t UNUSED(now),\
154 char UNUSED(*buffer),\
155 size_t UNUSED(buffer_size)
157 #define HANDLER_PROTO command_t UNUSED(*cmd),\
158 DISPATCH_PROTO
160 struct command_s {
161 char *cmd;
162 int (*handler)(HANDLER_PROTO);
164 char context; /* where we expect to see it */
165 #define CMD_CONTEXT_CLIENT (1<<0)
166 #define CMD_CONTEXT_BATCH (1<<1)
167 #define CMD_CONTEXT_JOURNAL (1<<2)
168 #define CMD_CONTEXT_ANY (0x7f)
170 char *syntax;
171 char *help;
172 };
174 struct cache_item_s;
175 typedef struct cache_item_s cache_item_t;
176 struct cache_item_s
177 {
178 char *file;
179 char **values;
180 size_t values_num;
181 time_t last_flush_time;
182 double last_update_stamp;
183 #define CI_FLAGS_IN_TREE (1<<0)
184 #define CI_FLAGS_IN_QUEUE (1<<1)
185 int flags;
186 pthread_cond_t flushed;
187 cache_item_t *prev;
188 cache_item_t *next;
189 };
191 struct callback_flush_data_s
192 {
193 time_t now;
194 time_t abs_timeout;
195 char **keys;
196 size_t keys_num;
197 };
198 typedef struct callback_flush_data_s callback_flush_data_t;
200 enum queue_side_e
201 {
202 HEAD,
203 TAIL
204 };
205 typedef enum queue_side_e queue_side_t;
207 /* describe a set of journal files */
208 typedef struct {
209 char **files;
210 size_t files_num;
211 } journal_set;
213 /* max length of socket command or response */
214 #define CMD_MAX 4096
215 #define RBUF_SIZE (CMD_MAX*2)
217 /*
218 * Variables
219 */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
226 static listen_socket_t default_socket;
228 enum {
229 RUNNING, /* normal operation */
230 FLUSHING, /* flushing remaining values */
231 SHUTDOWN /* shutting down */
232 } state = RUNNING;
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
245 /* Cache stuff */
246 static GTree *cache_tree = NULL;
247 static cache_item_t *cache_queue_head = NULL;
248 static cache_item_t *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
251 static int config_write_interval = 300;
252 static int config_write_jitter = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
260 static listen_socket_t **config_listen_address_list = NULL;
261 static size_t config_listen_address_list_len = 0;
263 static uint64_t stats_queue_length = 0;
264 static uint64_t stats_updates_received = 0;
265 static uint64_t stats_flush_received = 0;
266 static uint64_t stats_updates_written = 0;
267 static uint64_t stats_data_sets_written = 0;
268 static uint64_t stats_journal_bytes = 0;
269 static uint64_t stats_journal_rotate = 0;
270 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
272 /* Journaled updates */
273 #define JOURNAL_REPLAY(s) ((s) == NULL)
274 #define JOURNAL_BASE "rrd.journal"
275 static journal_set *journal_cur = NULL;
276 static journal_set *journal_old = NULL;
277 static char *journal_dir = NULL;
278 static FILE *journal_fh = NULL; /* current journal file handle */
279 static long journal_size = 0; /* current journal size */
280 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
281 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
282 static int journal_write(char *cmd, char *args);
283 static void journal_done(void);
284 static void journal_rotate(void);
286 /* prototypes for forward refernces */
287 static int handle_request_help (HANDLER_PROTO);
289 /*
290 * Functions
291 */
292 static void sig_common (const char *sig) /* {{{ */
293 {
294 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
295 if (state == RUNNING) {
296 state = FLUSHING;
297 }
298 pthread_cond_broadcast(&flush_cond);
299 pthread_cond_broadcast(&queue_cond);
300 } /* }}} void sig_common */
302 static void sig_int_handler (int UNUSED(s)) /* {{{ */
303 {
304 sig_common("INT");
305 } /* }}} void sig_int_handler */
307 static void sig_term_handler (int UNUSED(s)) /* {{{ */
308 {
309 sig_common("TERM");
310 } /* }}} void sig_term_handler */
312 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
313 {
314 config_flush_at_shutdown = 1;
315 sig_common("USR1");
316 } /* }}} void sig_usr1_handler */
318 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
319 {
320 config_flush_at_shutdown = 0;
321 sig_common("USR2");
322 } /* }}} void sig_usr2_handler */
324 static void install_signal_handlers(void) /* {{{ */
325 {
326 /* These structures are static, because `sigaction' behaves weird if the are
327 * overwritten.. */
328 static struct sigaction sa_int;
329 static struct sigaction sa_term;
330 static struct sigaction sa_pipe;
331 static struct sigaction sa_usr1;
332 static struct sigaction sa_usr2;
334 /* Install signal handlers */
335 memset (&sa_int, 0, sizeof (sa_int));
336 sa_int.sa_handler = sig_int_handler;
337 sigaction (SIGINT, &sa_int, NULL);
339 memset (&sa_term, 0, sizeof (sa_term));
340 sa_term.sa_handler = sig_term_handler;
341 sigaction (SIGTERM, &sa_term, NULL);
343 memset (&sa_pipe, 0, sizeof (sa_pipe));
344 sa_pipe.sa_handler = SIG_IGN;
345 sigaction (SIGPIPE, &sa_pipe, NULL);
347 memset (&sa_pipe, 0, sizeof (sa_usr1));
348 sa_usr1.sa_handler = sig_usr1_handler;
349 sigaction (SIGUSR1, &sa_usr1, NULL);
351 memset (&sa_usr2, 0, sizeof (sa_usr2));
352 sa_usr2.sa_handler = sig_usr2_handler;
353 sigaction (SIGUSR2, &sa_usr2, NULL);
355 } /* }}} void install_signal_handlers */
357 static int open_pidfile(char *action, int oflag) /* {{{ */
358 {
359 int fd;
360 const char *file;
361 char *file_copy, *dir;
363 file = (config_pid_file != NULL)
364 ? config_pid_file
365 : LOCALSTATEDIR "/run/rrdcached.pid";
367 /* dirname may modify its argument */
368 file_copy = strdup(file);
369 if (file_copy == NULL)
370 {
371 fprintf(stderr, "rrdcached: strdup(): %s\n",
372 rrd_strerror(errno));
373 return -1;
374 }
376 dir = dirname(file_copy);
377 if (rrd_mkdir_p(dir, 0777) != 0)
378 {
379 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
380 dir, rrd_strerror(errno));
381 return -1;
382 }
384 free(file_copy);
386 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
387 if (fd < 0)
388 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
389 action, file, rrd_strerror(errno));
391 return(fd);
392 } /* }}} static int open_pidfile */
394 /* check existing pid file to see whether a daemon is running */
395 static int check_pidfile(void)
396 {
397 int pid_fd;
398 pid_t pid;
399 char pid_str[16];
401 pid_fd = open_pidfile("open", O_RDWR);
402 if (pid_fd < 0)
403 return pid_fd;
405 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
406 return -1;
408 pid = atoi(pid_str);
409 if (pid <= 0)
410 return -1;
412 /* another running process that we can signal COULD be
413 * a competing rrdcached */
414 if (pid != getpid() && kill(pid, 0) == 0)
415 {
416 fprintf(stderr,
417 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
418 close(pid_fd);
419 return -1;
420 }
422 lseek(pid_fd, 0, SEEK_SET);
423 if (ftruncate(pid_fd, 0) == -1)
424 {
425 fprintf(stderr,
426 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
427 close(pid_fd);
428 return -1;
429 }
431 fprintf(stderr,
432 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
433 "rrdcached: starting normally.\n", pid);
435 return pid_fd;
436 } /* }}} static int check_pidfile */
438 static int write_pidfile (int fd) /* {{{ */
439 {
440 pid_t pid;
441 FILE *fh;
443 pid = getpid ();
445 fh = fdopen (fd, "w");
446 if (fh == NULL)
447 {
448 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
449 close(fd);
450 return (-1);
451 }
453 fprintf (fh, "%i\n", (int) pid);
454 fclose (fh);
456 return (0);
457 } /* }}} int write_pidfile */
459 static int remove_pidfile (void) /* {{{ */
460 {
461 char *file;
462 int status;
464 file = (config_pid_file != NULL)
465 ? config_pid_file
466 : LOCALSTATEDIR "/run/rrdcached.pid";
468 status = unlink (file);
469 if (status == 0)
470 return (0);
471 return (errno);
472 } /* }}} int remove_pidfile */
474 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
475 {
476 char *eol;
478 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
479 sock->next_read - sock->next_cmd);
481 if (eol == NULL)
482 {
483 /* no commands left, move remainder back to front of rbuf */
484 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
485 sock->next_read - sock->next_cmd);
486 sock->next_read -= sock->next_cmd;
487 sock->next_cmd = 0;
488 *len = 0;
489 return NULL;
490 }
491 else
492 {
493 char *cmd = sock->rbuf + sock->next_cmd;
494 *eol = '\0';
496 sock->next_cmd = eol - sock->rbuf + 1;
498 if (eol > sock->rbuf && *(eol-1) == '\r')
499 *(--eol) = '\0'; /* handle "\r\n" EOL */
501 *len = eol - cmd;
503 return cmd;
504 }
506 /* NOTREACHED */
507 assert(1==0);
508 } /* }}} char *next_cmd */
510 /* add the characters directly to the write buffer */
511 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
512 {
513 char *new_buf;
515 assert(sock != NULL);
517 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
518 if (new_buf == NULL)
519 {
520 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
521 return -1;
522 }
524 strncpy(new_buf + sock->wbuf_len, str, len + 1);
526 sock->wbuf = new_buf;
527 sock->wbuf_len += len;
529 return 0;
530 } /* }}} static int add_to_wbuf */
532 /* add the text to the "extra" info that's sent after the status line */
533 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
534 {
535 va_list argp;
536 char buffer[CMD_MAX];
537 int len;
539 if (JOURNAL_REPLAY(sock)) return 0;
540 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
542 va_start(argp, fmt);
543 #ifdef HAVE_VSNPRINTF
544 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
545 #else
546 len = vsprintf(buffer, fmt, argp);
547 #endif
548 va_end(argp);
549 if (len < 0)
550 {
551 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
552 return -1;
553 }
555 return add_to_wbuf(sock, buffer, len);
556 } /* }}} static int add_response_info */
558 static int count_lines(char *str) /* {{{ */
559 {
560 int lines = 0;
562 if (str != NULL)
563 {
564 while ((str = strchr(str, '\n')) != NULL)
565 {
566 ++lines;
567 ++str;
568 }
569 }
571 return lines;
572 } /* }}} static int count_lines */
574 /* send the response back to the user.
575 * returns 0 on success, -1 on error
576 * write buffer is always zeroed after this call */
577 static int send_response (listen_socket_t *sock, response_code rc,
578 char *fmt, ...) /* {{{ */
579 {
580 va_list argp;
581 char buffer[CMD_MAX];
582 int lines;
583 ssize_t wrote;
584 int rclen, len;
586 if (JOURNAL_REPLAY(sock)) return rc;
588 if (sock->batch_start)
589 {
590 if (rc == RESP_OK)
591 return rc; /* no response on success during BATCH */
592 lines = sock->batch_cmd;
593 }
594 else if (rc == RESP_OK)
595 lines = count_lines(sock->wbuf);
596 else
597 lines = -1;
599 rclen = sprintf(buffer, "%d ", lines);
600 va_start(argp, fmt);
601 #ifdef HAVE_VSNPRINTF
602 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
603 #else
604 len = vsprintf(buffer+rclen, fmt, argp);
605 #endif
606 va_end(argp);
607 if (len < 0)
608 return -1;
610 len += rclen;
612 /* append the result to the wbuf, don't write to the user */
613 if (sock->batch_start)
614 return add_to_wbuf(sock, buffer, len);
616 /* first write must be complete */
617 if (len != write(sock->fd, buffer, len))
618 {
619 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
620 return -1;
621 }
623 if (sock->wbuf != NULL && rc == RESP_OK)
624 {
625 wrote = 0;
626 while (wrote < sock->wbuf_len)
627 {
628 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
629 if (wb <= 0)
630 {
631 RRDD_LOG(LOG_INFO, "send_response: could not write results");
632 return -1;
633 }
634 wrote += wb;
635 }
636 }
638 free(sock->wbuf); sock->wbuf = NULL;
639 sock->wbuf_len = 0;
641 return 0;
642 } /* }}} */
644 static void wipe_ci_values(cache_item_t *ci, time_t when)
645 {
646 ci->values = NULL;
647 ci->values_num = 0;
649 ci->last_flush_time = when;
650 if (config_write_jitter > 0)
651 ci->last_flush_time += (rrd_random() % config_write_jitter);
652 }
654 /* remove_from_queue
655 * remove a "cache_item_t" item from the queue.
656 * must hold 'cache_lock' when calling this
657 */
658 static void remove_from_queue(cache_item_t *ci) /* {{{ */
659 {
660 if (ci == NULL) return;
661 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
663 if (ci->prev == NULL)
664 cache_queue_head = ci->next; /* reset head */
665 else
666 ci->prev->next = ci->next;
668 if (ci->next == NULL)
669 cache_queue_tail = ci->prev; /* reset the tail */
670 else
671 ci->next->prev = ci->prev;
673 ci->next = ci->prev = NULL;
674 ci->flags &= ~CI_FLAGS_IN_QUEUE;
676 pthread_mutex_lock (&stats_lock);
677 assert (stats_queue_length > 0);
678 stats_queue_length--;
679 pthread_mutex_unlock (&stats_lock);
681 } /* }}} static void remove_from_queue */
683 /* free the resources associated with the cache_item_t
684 * must hold cache_lock when calling this function
685 */
686 static void *free_cache_item(cache_item_t *ci) /* {{{ */
687 {
688 if (ci == NULL) return NULL;
690 remove_from_queue(ci);
692 for (size_t i=0; i < ci->values_num; i++)
693 free(ci->values[i]);
695 free (ci->values);
696 free (ci->file);
698 /* in case anyone is waiting */
699 pthread_cond_broadcast(&ci->flushed);
700 pthread_cond_destroy(&ci->flushed);
702 free (ci);
704 return NULL;
705 } /* }}} static void *free_cache_item */
707 /*
708 * enqueue_cache_item:
709 * `cache_lock' must be acquired before calling this function!
710 */
711 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
712 queue_side_t side)
713 {
714 if (ci == NULL)
715 return (-1);
717 if (ci->values_num == 0)
718 return (0);
720 if (side == HEAD)
721 {
722 if (cache_queue_head == ci)
723 return 0;
725 /* remove if further down in queue */
726 remove_from_queue(ci);
728 ci->prev = NULL;
729 ci->next = cache_queue_head;
730 if (ci->next != NULL)
731 ci->next->prev = ci;
732 cache_queue_head = ci;
734 if (cache_queue_tail == NULL)
735 cache_queue_tail = cache_queue_head;
736 }
737 else /* (side == TAIL) */
738 {
739 /* We don't move values back in the list.. */
740 if (ci->flags & CI_FLAGS_IN_QUEUE)
741 return (0);
743 assert (ci->next == NULL);
744 assert (ci->prev == NULL);
746 ci->prev = cache_queue_tail;
748 if (cache_queue_tail == NULL)
749 cache_queue_head = ci;
750 else
751 cache_queue_tail->next = ci;
753 cache_queue_tail = ci;
754 }
756 ci->flags |= CI_FLAGS_IN_QUEUE;
758 pthread_cond_signal(&queue_cond);
759 pthread_mutex_lock (&stats_lock);
760 stats_queue_length++;
761 pthread_mutex_unlock (&stats_lock);
763 return (0);
764 } /* }}} int enqueue_cache_item */
766 /*
767 * tree_callback_flush:
768 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
769 * while this is in progress.
770 */
771 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
772 gpointer data)
773 {
774 cache_item_t *ci;
775 callback_flush_data_t *cfd;
777 ci = (cache_item_t *) value;
778 cfd = (callback_flush_data_t *) data;
780 if (ci->flags & CI_FLAGS_IN_QUEUE)
781 return FALSE;
783 if (ci->values_num > 0
784 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
785 {
786 enqueue_cache_item (ci, TAIL);
787 }
788 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
789 && (ci->values_num <= 0))
790 {
791 assert ((char *) key == ci->file);
792 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
793 {
794 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
795 return (FALSE);
796 }
797 }
799 return (FALSE);
800 } /* }}} gboolean tree_callback_flush */
802 static int flush_old_values (int max_age)
803 {
804 callback_flush_data_t cfd;
805 size_t k;
807 memset (&cfd, 0, sizeof (cfd));
808 /* Pass the current time as user data so that we don't need to call
809 * `time' for each node. */
810 cfd.now = time (NULL);
811 cfd.keys = NULL;
812 cfd.keys_num = 0;
814 if (max_age > 0)
815 cfd.abs_timeout = cfd.now - max_age;
816 else
817 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
819 /* `tree_callback_flush' will return the keys of all values that haven't
820 * been touched in the last `config_flush_interval' seconds in `cfd'.
821 * The char*'s in this array point to the same memory as ci->file, so we
822 * don't need to free them separately. */
823 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
825 for (k = 0; k < cfd.keys_num; k++)
826 {
827 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
828 /* should never fail, since we have held the cache_lock
829 * the entire time */
830 assert(status == TRUE);
831 }
833 if (cfd.keys != NULL)
834 {
835 free (cfd.keys);
836 cfd.keys = NULL;
837 }
839 return (0);
840 } /* int flush_old_values */
842 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
843 {
844 struct timeval now;
845 struct timespec next_flush;
846 int status;
848 gettimeofday (&now, NULL);
849 next_flush.tv_sec = now.tv_sec + config_flush_interval;
850 next_flush.tv_nsec = 1000 * now.tv_usec;
852 pthread_mutex_lock(&cache_lock);
854 while (state == RUNNING)
855 {
856 gettimeofday (&now, NULL);
857 if ((now.tv_sec > next_flush.tv_sec)
858 || ((now.tv_sec == next_flush.tv_sec)
859 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
860 {
861 RRDD_LOG(LOG_DEBUG, "flushing old values");
863 /* Determine the time of the next cache flush. */
864 next_flush.tv_sec = now.tv_sec + config_flush_interval;
866 /* Flush all values that haven't been written in the last
867 * `config_write_interval' seconds. */
868 flush_old_values (config_write_interval);
870 /* unlock the cache while we rotate so we don't block incoming
871 * updates if the fsync() blocks on disk I/O */
872 pthread_mutex_unlock(&cache_lock);
873 journal_rotate();
874 pthread_mutex_lock(&cache_lock);
875 }
877 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
878 if (status != 0 && status != ETIMEDOUT)
879 {
880 RRDD_LOG (LOG_ERR, "flush_thread_main: "
881 "pthread_cond_timedwait returned %i.", status);
882 }
883 }
885 if (config_flush_at_shutdown)
886 flush_old_values (-1); /* flush everything */
888 state = SHUTDOWN;
890 pthread_mutex_unlock(&cache_lock);
892 return NULL;
893 } /* void *flush_thread_main */
895 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
896 {
897 pthread_mutex_lock (&cache_lock);
899 while (state != SHUTDOWN
900 || (cache_queue_head != NULL && config_flush_at_shutdown))
901 {
902 cache_item_t *ci;
903 char *file;
904 char **values;
905 size_t values_num;
906 int status;
908 /* Now, check if there's something to store away. If not, wait until
909 * something comes in. */
910 if (cache_queue_head == NULL)
911 {
912 status = pthread_cond_wait (&queue_cond, &cache_lock);
913 if ((status != 0) && (status != ETIMEDOUT))
914 {
915 RRDD_LOG (LOG_ERR, "queue_thread_main: "
916 "pthread_cond_wait returned %i.", status);
917 }
918 }
920 /* Check if a value has arrived. This may be NULL if we timed out or there
921 * was an interrupt such as a signal. */
922 if (cache_queue_head == NULL)
923 continue;
925 ci = cache_queue_head;
927 /* copy the relevant parts */
928 file = strdup (ci->file);
929 if (file == NULL)
930 {
931 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
932 continue;
933 }
935 assert(ci->values != NULL);
936 assert(ci->values_num > 0);
938 values = ci->values;
939 values_num = ci->values_num;
941 wipe_ci_values(ci, time(NULL));
942 remove_from_queue(ci);
944 pthread_mutex_unlock (&cache_lock);
946 rrd_clear_error ();
947 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
948 if (status != 0)
949 {
950 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
951 "rrd_update_r (%s) failed with status %i. (%s)",
952 file, status, rrd_get_error());
953 }
955 journal_write("wrote", file);
957 /* Search again in the tree. It's possible someone issued a "FORGET"
958 * while we were writing the update values. */
959 pthread_mutex_lock(&cache_lock);
960 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
961 if (ci)
962 pthread_cond_broadcast(&ci->flushed);
963 pthread_mutex_unlock(&cache_lock);
965 if (status == 0)
966 {
967 pthread_mutex_lock (&stats_lock);
968 stats_updates_written++;
969 stats_data_sets_written += values_num;
970 pthread_mutex_unlock (&stats_lock);
971 }
973 rrd_free_ptrs((void ***) &values, &values_num);
974 free(file);
976 pthread_mutex_lock (&cache_lock);
977 }
978 pthread_mutex_unlock (&cache_lock);
980 return (NULL);
981 } /* }}} void *queue_thread_main */
983 static int buffer_get_field (char **buffer_ret, /* {{{ */
984 size_t *buffer_size_ret, char **field_ret)
985 {
986 char *buffer;
987 size_t buffer_pos;
988 size_t buffer_size;
989 char *field;
990 size_t field_size;
991 int status;
993 buffer = *buffer_ret;
994 buffer_pos = 0;
995 buffer_size = *buffer_size_ret;
996 field = *buffer_ret;
997 field_size = 0;
999 if (buffer_size <= 0)
1000 return (-1);
1002 /* This is ensured by `handle_request'. */
1003 assert (buffer[buffer_size - 1] == '\0');
1005 status = -1;
1006 while (buffer_pos < buffer_size)
1007 {
1008 /* Check for end-of-field or end-of-buffer */
1009 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1010 {
1011 field[field_size] = 0;
1012 field_size++;
1013 buffer_pos++;
1014 status = 0;
1015 break;
1016 }
1017 /* Handle escaped characters. */
1018 else if (buffer[buffer_pos] == '\\')
1019 {
1020 if (buffer_pos >= (buffer_size - 1))
1021 break;
1022 buffer_pos++;
1023 field[field_size] = buffer[buffer_pos];
1024 field_size++;
1025 buffer_pos++;
1026 }
1027 /* Normal operation */
1028 else
1029 {
1030 field[field_size] = buffer[buffer_pos];
1031 field_size++;
1032 buffer_pos++;
1033 }
1034 } /* while (buffer_pos < buffer_size) */
1036 if (status != 0)
1037 return (status);
1039 *buffer_ret = buffer + buffer_pos;
1040 *buffer_size_ret = buffer_size - buffer_pos;
1041 *field_ret = field;
1043 return (0);
1044 } /* }}} int buffer_get_field */
1046 /* if we're restricting writes to the base directory,
1047 * check whether the file falls within the dir
1048 * returns 1 if OK, otherwise 0
1049 */
1050 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1051 {
1052 assert(file != NULL);
1054 if (!config_write_base_only
1055 || JOURNAL_REPLAY(sock)
1056 || config_base_dir == NULL)
1057 return 1;
1059 if (strstr(file, "../") != NULL) goto err;
1061 /* relative paths without "../" are ok */
1062 if (*file != '/') return 1;
1064 /* file must be of the format base + "/" + <1+ char filename> */
1065 if (strlen(file) < _config_base_dir_len + 2) goto err;
1066 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1067 if (*(file + _config_base_dir_len) != '/') goto err;
1069 return 1;
1071 err:
1072 if (sock != NULL && sock->fd >= 0)
1073 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1075 return 0;
1076 } /* }}} static int check_file_access */
1078 /* when using a base dir, convert relative paths to absolute paths.
1079 * if necessary, modifies the "filename" pointer to point
1080 * to the new path created in "tmp". "tmp" is provided
1081 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1082 *
1083 * this allows us to optimize for the expected case (absolute path)
1084 * with a no-op.
1085 */
1086 static void get_abs_path(char **filename, char *tmp)
1087 {
1088 assert(tmp != NULL);
1089 assert(filename != NULL && *filename != NULL);
1091 if (config_base_dir == NULL || **filename == '/')
1092 return;
1094 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1095 *filename = tmp;
1096 } /* }}} static int get_abs_path */
1098 static int flush_file (const char *filename) /* {{{ */
1099 {
1100 cache_item_t *ci;
1102 pthread_mutex_lock (&cache_lock);
1104 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1105 if (ci == NULL)
1106 {
1107 pthread_mutex_unlock (&cache_lock);
1108 return (ENOENT);
1109 }
1111 if (ci->values_num > 0)
1112 {
1113 /* Enqueue at head */
1114 enqueue_cache_item (ci, HEAD);
1115 pthread_cond_wait(&ci->flushed, &cache_lock);
1116 }
1118 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1119 * may have been purged during our cond_wait() */
1121 pthread_mutex_unlock(&cache_lock);
1123 return (0);
1124 } /* }}} int flush_file */
1126 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1127 {
1128 char *err = "Syntax error.\n";
1130 if (cmd && cmd->syntax)
1131 err = cmd->syntax;
1133 return send_response(sock, RESP_ERR, "Usage: %s", err);
1134 } /* }}} static int syntax_error() */
1136 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1137 {
1138 uint64_t copy_queue_length;
1139 uint64_t copy_updates_received;
1140 uint64_t copy_flush_received;
1141 uint64_t copy_updates_written;
1142 uint64_t copy_data_sets_written;
1143 uint64_t copy_journal_bytes;
1144 uint64_t copy_journal_rotate;
1146 uint64_t tree_nodes_number;
1147 uint64_t tree_depth;
1149 pthread_mutex_lock (&stats_lock);
1150 copy_queue_length = stats_queue_length;
1151 copy_updates_received = stats_updates_received;
1152 copy_flush_received = stats_flush_received;
1153 copy_updates_written = stats_updates_written;
1154 copy_data_sets_written = stats_data_sets_written;
1155 copy_journal_bytes = stats_journal_bytes;
1156 copy_journal_rotate = stats_journal_rotate;
1157 pthread_mutex_unlock (&stats_lock);
1159 pthread_mutex_lock (&cache_lock);
1160 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1161 tree_depth = (uint64_t) g_tree_height (cache_tree);
1162 pthread_mutex_unlock (&cache_lock);
1164 add_response_info(sock,
1165 "QueueLength: %"PRIu64"\n", copy_queue_length);
1166 add_response_info(sock,
1167 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1168 add_response_info(sock,
1169 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1170 add_response_info(sock,
1171 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1172 add_response_info(sock,
1173 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1174 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1175 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1176 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1177 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1179 send_response(sock, RESP_OK, "Statistics follow\n");
1181 return (0);
1182 } /* }}} int handle_request_stats */
1184 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1185 {
1186 char *file, file_tmp[PATH_MAX];
1187 int status;
1189 status = buffer_get_field (&buffer, &buffer_size, &file);
1190 if (status != 0)
1191 {
1192 return syntax_error(sock,cmd);
1193 }
1194 else
1195 {
1196 pthread_mutex_lock(&stats_lock);
1197 stats_flush_received++;
1198 pthread_mutex_unlock(&stats_lock);
1200 get_abs_path(&file, file_tmp);
1201 if (!check_file_access(file, sock)) return 0;
1203 status = flush_file (file);
1204 if (status == 0)
1205 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1206 else if (status == ENOENT)
1207 {
1208 /* no file in our tree; see whether it exists at all */
1209 struct stat statbuf;
1211 memset(&statbuf, 0, sizeof(statbuf));
1212 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1213 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1214 else
1215 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1216 }
1217 else if (status < 0)
1218 return send_response(sock, RESP_ERR, "Internal error.\n");
1219 else
1220 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1221 }
1223 /* NOTREACHED */
1224 assert(1==0);
1225 } /* }}} int handle_request_flush */
1227 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1228 {
1229 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1231 pthread_mutex_lock(&cache_lock);
1232 flush_old_values(-1);
1233 pthread_mutex_unlock(&cache_lock);
1235 return send_response(sock, RESP_OK, "Started flush.\n");
1236 } /* }}} static int handle_request_flushall */
1238 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1239 {
1240 int status;
1241 char *file, file_tmp[PATH_MAX];
1242 cache_item_t *ci;
1244 status = buffer_get_field(&buffer, &buffer_size, &file);
1245 if (status != 0)
1246 return syntax_error(sock,cmd);
1248 get_abs_path(&file, file_tmp);
1250 pthread_mutex_lock(&cache_lock);
1251 ci = g_tree_lookup(cache_tree, file);
1252 if (ci == NULL)
1253 {
1254 pthread_mutex_unlock(&cache_lock);
1255 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1256 }
1258 for (size_t i=0; i < ci->values_num; i++)
1259 add_response_info(sock, "%s\n", ci->values[i]);
1261 pthread_mutex_unlock(&cache_lock);
1262 return send_response(sock, RESP_OK, "updates pending\n");
1263 } /* }}} static int handle_request_pending */
1265 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1266 {
1267 int status;
1268 gboolean found;
1269 char *file, file_tmp[PATH_MAX];
1271 status = buffer_get_field(&buffer, &buffer_size, &file);
1272 if (status != 0)
1273 return syntax_error(sock,cmd);
1275 get_abs_path(&file, file_tmp);
1276 if (!check_file_access(file, sock)) return 0;
1278 pthread_mutex_lock(&cache_lock);
1279 found = g_tree_remove(cache_tree, file);
1280 pthread_mutex_unlock(&cache_lock);
1282 if (found == TRUE)
1283 {
1284 if (!JOURNAL_REPLAY(sock))
1285 journal_write("forget", file);
1287 return send_response(sock, RESP_OK, "Gone!\n");
1288 }
1289 else
1290 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1292 /* NOTREACHED */
1293 assert(1==0);
1294 } /* }}} static int handle_request_forget */
1296 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1297 {
1298 cache_item_t *ci;
1300 pthread_mutex_lock(&cache_lock);
1302 ci = cache_queue_head;
1303 while (ci != NULL)
1304 {
1305 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1306 ci = ci->next;
1307 }
1309 pthread_mutex_unlock(&cache_lock);
1311 return send_response(sock, RESP_OK, "in queue.\n");
1312 } /* }}} int handle_request_queue */
1314 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1315 {
1316 char *file, file_tmp[PATH_MAX];
1317 int values_num = 0;
1318 int status;
1319 char orig_buf[CMD_MAX];
1321 cache_item_t *ci;
1323 /* save it for the journal later */
1324 if (!JOURNAL_REPLAY(sock))
1325 strncpy(orig_buf, buffer, min(CMD_MAX,buffer_size));
1327 status = buffer_get_field (&buffer, &buffer_size, &file);
1328 if (status != 0)
1329 return syntax_error(sock,cmd);
1331 pthread_mutex_lock(&stats_lock);
1332 stats_updates_received++;
1333 pthread_mutex_unlock(&stats_lock);
1335 get_abs_path(&file, file_tmp);
1336 if (!check_file_access(file, sock)) return 0;
1338 pthread_mutex_lock (&cache_lock);
1339 ci = g_tree_lookup (cache_tree, file);
1341 if (ci == NULL) /* {{{ */
1342 {
1343 struct stat statbuf;
1344 cache_item_t *tmp;
1346 /* don't hold the lock while we setup; stat(2) might block */
1347 pthread_mutex_unlock(&cache_lock);
1349 memset (&statbuf, 0, sizeof (statbuf));
1350 status = stat (file, &statbuf);
1351 if (status != 0)
1352 {
1353 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1355 status = errno;
1356 if (status == ENOENT)
1357 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1358 else
1359 return send_response(sock, RESP_ERR,
1360 "stat failed with error %i.\n", status);
1361 }
1362 if (!S_ISREG (statbuf.st_mode))
1363 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1365 if (access(file, R_OK|W_OK) != 0)
1366 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1367 file, rrd_strerror(errno));
1369 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1370 if (ci == NULL)
1371 {
1372 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1374 return send_response(sock, RESP_ERR, "malloc failed.\n");
1375 }
1376 memset (ci, 0, sizeof (cache_item_t));
1378 ci->file = strdup (file);
1379 if (ci->file == NULL)
1380 {
1381 free (ci);
1382 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1384 return send_response(sock, RESP_ERR, "strdup failed.\n");
1385 }
1387 wipe_ci_values(ci, now);
1388 ci->flags = CI_FLAGS_IN_TREE;
1389 pthread_cond_init(&ci->flushed, NULL);
1391 pthread_mutex_lock(&cache_lock);
1393 /* another UPDATE might have added this entry in the meantime */
1394 tmp = g_tree_lookup (cache_tree, file);
1395 if (tmp == NULL)
1396 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1397 else
1398 {
1399 free_cache_item (ci);
1400 ci = tmp;
1401 }
1403 /* state may have changed while we were unlocked */
1404 if (state == SHUTDOWN)
1405 return -1;
1406 } /* }}} */
1407 assert (ci != NULL);
1409 /* don't re-write updates in replay mode */
1410 if (!JOURNAL_REPLAY(sock))
1411 journal_write("update", orig_buf);
1413 while (buffer_size > 0)
1414 {
1415 char *value;
1416 double stamp;
1417 char *eostamp;
1419 status = buffer_get_field (&buffer, &buffer_size, &value);
1420 if (status != 0)
1421 {
1422 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1423 break;
1424 }
1426 /* make sure update time is always moving forward. We use double here since
1427 update does support subsecond precision for timestamps ... */
1428 stamp = strtod(value, &eostamp);
1429 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1430 {
1431 pthread_mutex_unlock(&cache_lock);
1432 return send_response(sock, RESP_ERR,
1433 "Cannot find timestamp in '%s'!\n", value);
1434 }
1435 else if (stamp <= ci->last_update_stamp)
1436 {
1437 pthread_mutex_unlock(&cache_lock);
1438 return send_response(sock, RESP_ERR,
1439 "illegal attempt to update using time %lf when last"
1440 " update time is %lf (minimum one second step)\n",
1441 stamp, ci->last_update_stamp);
1442 }
1443 else
1444 ci->last_update_stamp = stamp;
1446 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1447 {
1448 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1449 continue;
1450 }
1452 values_num++;
1453 }
1455 if (((now - ci->last_flush_time) >= config_write_interval)
1456 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1457 && (ci->values_num > 0))
1458 {
1459 enqueue_cache_item (ci, TAIL);
1460 }
1462 pthread_mutex_unlock (&cache_lock);
1464 if (values_num < 1)
1465 return send_response(sock, RESP_ERR, "No values updated.\n");
1466 else
1467 return send_response(sock, RESP_OK,
1468 "errors, enqueued %i value(s).\n", values_num);
1470 /* NOTREACHED */
1471 assert(1==0);
1473 } /* }}} int handle_request_update */
1475 /* we came across a "WROTE" entry during journal replay.
1476 * throw away any values that we have accumulated for this file
1477 */
1478 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1479 {
1480 cache_item_t *ci;
1481 const char *file = buffer;
1483 pthread_mutex_lock(&cache_lock);
1485 ci = g_tree_lookup(cache_tree, file);
1486 if (ci == NULL)
1487 {
1488 pthread_mutex_unlock(&cache_lock);
1489 return (0);
1490 }
1492 if (ci->values)
1493 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1495 wipe_ci_values(ci, now);
1496 remove_from_queue(ci);
1498 pthread_mutex_unlock(&cache_lock);
1499 return (0);
1500 } /* }}} int handle_request_wrote */
1502 /* start "BATCH" processing */
1503 static int batch_start (HANDLER_PROTO) /* {{{ */
1504 {
1505 int status;
1506 if (sock->batch_start)
1507 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1509 status = send_response(sock, RESP_OK,
1510 "Go ahead. End with dot '.' on its own line.\n");
1511 sock->batch_start = time(NULL);
1512 sock->batch_cmd = 0;
1514 return status;
1515 } /* }}} static int batch_start */
1517 /* finish "BATCH" processing and return results to the client */
1518 static int batch_done (HANDLER_PROTO) /* {{{ */
1519 {
1520 assert(sock->batch_start);
1521 sock->batch_start = 0;
1522 sock->batch_cmd = 0;
1523 return send_response(sock, RESP_OK, "errors\n");
1524 } /* }}} static int batch_done */
1526 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1527 {
1528 return -1;
1529 } /* }}} static int handle_request_quit */
1531 static command_t list_of_commands[] = { /* {{{ */
1532 {
1533 "UPDATE",
1534 handle_request_update,
1535 CMD_CONTEXT_ANY,
1536 "UPDATE <filename> <values> [<values> ...]\n"
1537 ,
1538 "Adds the given file to the internal cache if it is not yet known and\n"
1539 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1540 "for details.\n"
1541 "\n"
1542 "Each <values> has the following form:\n"
1543 " <values> = <time>:<value>[:<value>[...]]\n"
1544 "See the rrdupdate(1) manpage for details.\n"
1545 },
1546 {
1547 "WROTE",
1548 handle_request_wrote,
1549 CMD_CONTEXT_JOURNAL,
1550 NULL,
1551 NULL
1552 },
1553 {
1554 "FLUSH",
1555 handle_request_flush,
1556 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1557 "FLUSH <filename>\n"
1558 ,
1559 "Adds the given filename to the head of the update queue and returns\n"
1560 "after it has been dequeued.\n"
1561 },
1562 {
1563 "FLUSHALL",
1564 handle_request_flushall,
1565 CMD_CONTEXT_CLIENT,
1566 "FLUSHALL\n"
1567 ,
1568 "Triggers writing of all pending updates. Returns immediately.\n"
1569 },
1570 {
1571 "PENDING",
1572 handle_request_pending,
1573 CMD_CONTEXT_CLIENT,
1574 "PENDING <filename>\n"
1575 ,
1576 "Shows any 'pending' updates for a file, in order.\n"
1577 "The updates shown have not yet been written to the underlying RRD file.\n"
1578 },
1579 {
1580 "FORGET",
1581 handle_request_forget,
1582 CMD_CONTEXT_ANY,
1583 "FORGET <filename>\n"
1584 ,
1585 "Removes the file completely from the cache.\n"
1586 "Any pending updates for the file will be lost.\n"
1587 },
1588 {
1589 "QUEUE",
1590 handle_request_queue,
1591 CMD_CONTEXT_CLIENT,
1592 "QUEUE\n"
1593 ,
1594 "Shows all files in the output queue.\n"
1595 "The output is zero or more lines in the following format:\n"
1596 "(where <num_vals> is the number of values to be written)\n"
1597 "\n"
1598 "<num_vals> <filename>\n"
1599 },
1600 {
1601 "STATS",
1602 handle_request_stats,
1603 CMD_CONTEXT_CLIENT,
1604 "STATS\n"
1605 ,
1606 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1607 "a description of the values.\n"
1608 },
1609 {
1610 "HELP",
1611 handle_request_help,
1612 CMD_CONTEXT_CLIENT,
1613 "HELP [<command>]\n",
1614 NULL, /* special! */
1615 },
1616 {
1617 "BATCH",
1618 batch_start,
1619 CMD_CONTEXT_CLIENT,
1620 "BATCH\n"
1621 ,
1622 "The 'BATCH' command permits the client to initiate a bulk load\n"
1623 " of commands to rrdcached.\n"
1624 "\n"
1625 "Usage:\n"
1626 "\n"
1627 " client: BATCH\n"
1628 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1629 " client: command #1\n"
1630 " client: command #2\n"
1631 " client: ... and so on\n"
1632 " client: .\n"
1633 " server: 2 errors\n"
1634 " server: 7 message for command #7\n"
1635 " server: 9 message for command #9\n"
1636 "\n"
1637 "For more information, consult the rrdcached(1) documentation.\n"
1638 },
1639 {
1640 ".", /* BATCH terminator */
1641 batch_done,
1642 CMD_CONTEXT_BATCH,
1643 NULL,
1644 NULL
1645 },
1646 {
1647 "QUIT",
1648 handle_request_quit,
1649 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1650 "QUIT\n"
1651 ,
1652 "Disconnect from rrdcached.\n"
1653 }
1654 }; /* }}} command_t list_of_commands[] */
1655 static size_t list_of_commands_len = sizeof (list_of_commands)
1656 / sizeof (list_of_commands[0]);
1658 static command_t *find_command(char *cmd)
1659 {
1660 size_t i;
1662 for (i = 0; i < list_of_commands_len; i++)
1663 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1664 return (&list_of_commands[i]);
1665 return NULL;
1666 }
1668 /* We currently use the index in the `list_of_commands' array as a bit position
1669 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1670 * outside these functions so that switching to a more elegant storage method
1671 * is easily possible. */
1672 static ssize_t find_command_index (const char *cmd) /* {{{ */
1673 {
1674 size_t i;
1676 for (i = 0; i < list_of_commands_len; i++)
1677 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1678 return ((ssize_t) i);
1679 return (-1);
1680 } /* }}} ssize_t find_command_index */
1682 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1683 const char *cmd)
1684 {
1685 ssize_t i;
1687 if (JOURNAL_REPLAY(sock))
1688 return (1);
1690 if (cmd == NULL)
1691 return (-1);
1693 if ((strcasecmp ("QUIT", cmd) == 0)
1694 || (strcasecmp ("HELP", cmd) == 0))
1695 return (1);
1696 else if (strcmp (".", cmd) == 0)
1697 cmd = "BATCH";
1699 i = find_command_index (cmd);
1700 if (i < 0)
1701 return (-1);
1702 assert (i < 32);
1704 if ((sock->permissions & (1 << i)) != 0)
1705 return (1);
1706 return (0);
1707 } /* }}} int socket_permission_check */
1709 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1710 const char *cmd)
1711 {
1712 ssize_t i;
1714 i = find_command_index (cmd);
1715 if (i < 0)
1716 return (-1);
1717 assert (i < 32);
1719 sock->permissions |= (1 << i);
1720 return (0);
1721 } /* }}} int socket_permission_add */
1723 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1724 {
1725 sock->permissions = 0;
1726 } /* }}} socket_permission_clear */
1728 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1729 listen_socket_t *src)
1730 {
1731 dest->permissions = src->permissions;
1732 } /* }}} socket_permission_copy */
1734 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
1735 {
1736 size_t i;
1738 sock->permissions = 0;
1739 for (i = 0; i < list_of_commands_len; i++)
1740 sock->permissions |= (1 << i);
1741 } /* }}} void socket_permission_set_all */
1743 /* check whether commands are received in the expected context */
1744 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1745 {
1746 if (JOURNAL_REPLAY(sock))
1747 return (cmd->context & CMD_CONTEXT_JOURNAL);
1748 else if (sock->batch_start)
1749 return (cmd->context & CMD_CONTEXT_BATCH);
1750 else
1751 return (cmd->context & CMD_CONTEXT_CLIENT);
1753 /* NOTREACHED */
1754 assert(1==0);
1755 }
1757 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1758 {
1759 int status;
1760 char *cmd_str;
1761 char *resp_txt;
1762 command_t *help = NULL;
1764 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1765 if (status == 0)
1766 help = find_command(cmd_str);
1768 if (help && (help->syntax || help->help))
1769 {
1770 char tmp[CMD_MAX];
1772 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1773 resp_txt = tmp;
1775 if (help->syntax)
1776 add_response_info(sock, "Usage: %s\n", help->syntax);
1778 if (help->help)
1779 add_response_info(sock, "%s\n", help->help);
1780 }
1781 else
1782 {
1783 size_t i;
1785 resp_txt = "Command overview\n";
1787 for (i = 0; i < list_of_commands_len; i++)
1788 {
1789 if (list_of_commands[i].syntax == NULL)
1790 continue;
1791 add_response_info (sock, "%s", list_of_commands[i].syntax);
1792 }
1793 }
1795 return send_response(sock, RESP_OK, resp_txt);
1796 } /* }}} int handle_request_help */
1798 static int handle_request (DISPATCH_PROTO) /* {{{ */
1799 {
1800 char *buffer_ptr = buffer;
1801 char *cmd_str = NULL;
1802 command_t *cmd = NULL;
1803 int status;
1805 assert (buffer[buffer_size - 1] == '\0');
1807 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1808 if (status != 0)
1809 {
1810 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1811 return (-1);
1812 }
1814 if (sock != NULL && sock->batch_start)
1815 sock->batch_cmd++;
1817 cmd = find_command(cmd_str);
1818 if (!cmd)
1819 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1821 if (!socket_permission_check (sock, cmd->cmd))
1822 return send_response(sock, RESP_ERR, "Permission denied.\n");
1824 if (!command_check_context(sock, cmd))
1825 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1827 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1828 } /* }}} int handle_request */
1830 static void journal_set_free (journal_set *js) /* {{{ */
1831 {
1832 if (js == NULL)
1833 return;
1835 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1837 free(js);
1838 } /* }}} journal_set_free */
1840 static void journal_set_remove (journal_set *js) /* {{{ */
1841 {
1842 if (js == NULL)
1843 return;
1845 for (uint i=0; i < js->files_num; i++)
1846 {
1847 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1848 unlink(js->files[i]);
1849 }
1850 } /* }}} journal_set_remove */
1852 /* close current journal file handle.
1853 * MUST hold journal_lock before calling */
1854 static void journal_close(void) /* {{{ */
1855 {
1856 if (journal_fh != NULL)
1857 {
1858 if (fclose(journal_fh) != 0)
1859 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1860 }
1862 journal_fh = NULL;
1863 journal_size = 0;
1864 } /* }}} journal_close */
1866 /* MUST hold journal_lock before calling */
1867 static void journal_new_file(void) /* {{{ */
1868 {
1869 struct timeval now;
1870 int new_fd;
1871 char new_file[PATH_MAX + 1];
1873 assert(journal_dir != NULL);
1874 assert(journal_cur != NULL);
1876 journal_close();
1878 gettimeofday(&now, NULL);
1879 /* this format assures that the files sort in strcmp() order */
1880 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1881 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1883 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1884 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1885 if (new_fd < 0)
1886 goto error;
1888 journal_fh = fdopen(new_fd, "a");
1889 if (journal_fh == NULL)
1890 goto error;
1892 journal_size = ftell(journal_fh);
1893 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1895 /* record the file in the journal set */
1896 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1898 return;
1900 error:
1901 RRDD_LOG(LOG_CRIT,
1902 "JOURNALING DISABLED: Error while trying to create %s : %s",
1903 new_file, rrd_strerror(errno));
1904 RRDD_LOG(LOG_CRIT,
1905 "JOURNALING DISABLED: All values will be flushed at shutdown");
1907 close(new_fd);
1908 config_flush_at_shutdown = 1;
1910 } /* }}} journal_new_file */
1912 /* MUST NOT hold journal_lock before calling this */
1913 static void journal_rotate(void) /* {{{ */
1914 {
1915 journal_set *old_js = NULL;
1917 if (journal_dir == NULL)
1918 return;
1920 RRDD_LOG(LOG_DEBUG, "rotating journals");
1922 pthread_mutex_lock(&stats_lock);
1923 ++stats_journal_rotate;
1924 pthread_mutex_unlock(&stats_lock);
1926 pthread_mutex_lock(&journal_lock);
1928 journal_close();
1930 /* rotate the journal sets */
1931 old_js = journal_old;
1932 journal_old = journal_cur;
1933 journal_cur = calloc(1, sizeof(journal_set));
1935 if (journal_cur != NULL)
1936 journal_new_file();
1937 else
1938 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1940 pthread_mutex_unlock(&journal_lock);
1942 journal_set_remove(old_js);
1943 journal_set_free (old_js);
1945 } /* }}} static void journal_rotate */
1947 /* MUST hold journal_lock when calling */
1948 static void journal_done(void) /* {{{ */
1949 {
1950 if (journal_cur == NULL)
1951 return;
1953 journal_close();
1955 if (config_flush_at_shutdown)
1956 {
1957 RRDD_LOG(LOG_INFO, "removing journals");
1958 journal_set_remove(journal_old);
1959 journal_set_remove(journal_cur);
1960 }
1961 else
1962 {
1963 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1964 "journals will be used at next startup");
1965 }
1967 journal_set_free(journal_cur);
1968 journal_set_free(journal_old);
1969 free(journal_dir);
1971 } /* }}} static void journal_done */
1973 static int journal_write(char *cmd, char *args) /* {{{ */
1974 {
1975 int chars;
1977 if (journal_fh == NULL)
1978 return 0;
1980 pthread_mutex_lock(&journal_lock);
1981 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1982 journal_size += chars;
1984 if (journal_size > JOURNAL_MAX)
1985 journal_new_file();
1987 pthread_mutex_unlock(&journal_lock);
1989 if (chars > 0)
1990 {
1991 pthread_mutex_lock(&stats_lock);
1992 stats_journal_bytes += chars;
1993 pthread_mutex_unlock(&stats_lock);
1994 }
1996 return chars;
1997 } /* }}} static int journal_write */
1999 static int journal_replay (const char *file) /* {{{ */
2000 {
2001 FILE *fh;
2002 int entry_cnt = 0;
2003 int fail_cnt = 0;
2004 uint64_t line = 0;
2005 char entry[CMD_MAX];
2006 time_t now;
2008 if (file == NULL) return 0;
2010 {
2011 char *reason = "unknown error";
2012 int status = 0;
2013 struct stat statbuf;
2015 memset(&statbuf, 0, sizeof(statbuf));
2016 if (stat(file, &statbuf) != 0)
2017 {
2018 reason = "stat error";
2019 status = errno;
2020 }
2021 else if (!S_ISREG(statbuf.st_mode))
2022 {
2023 reason = "not a regular file";
2024 status = EPERM;
2025 }
2026 if (statbuf.st_uid != daemon_uid)
2027 {
2028 reason = "not owned by daemon user";
2029 status = EACCES;
2030 }
2031 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2032 {
2033 reason = "must not be user/group writable";
2034 status = EACCES;
2035 }
2037 if (status != 0)
2038 {
2039 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2040 file, rrd_strerror(status), reason);
2041 return 0;
2042 }
2043 }
2045 fh = fopen(file, "r");
2046 if (fh == NULL)
2047 {
2048 if (errno != ENOENT)
2049 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2050 file, rrd_strerror(errno));
2051 return 0;
2052 }
2053 else
2054 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2056 now = time(NULL);
2058 while(!feof(fh))
2059 {
2060 size_t entry_len;
2062 ++line;
2063 if (fgets(entry, sizeof(entry), fh) == NULL)
2064 break;
2065 entry_len = strlen(entry);
2067 /* check \n termination in case journal writing crashed mid-line */
2068 if (entry_len == 0)
2069 continue;
2070 else if (entry[entry_len - 1] != '\n')
2071 {
2072 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2073 ++fail_cnt;
2074 continue;
2075 }
2077 entry[entry_len - 1] = '\0';
2079 if (handle_request(NULL, now, entry, entry_len) == 0)
2080 ++entry_cnt;
2081 else
2082 ++fail_cnt;
2083 }
2085 fclose(fh);
2087 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2088 entry_cnt, fail_cnt);
2090 return entry_cnt > 0 ? 1 : 0;
2091 } /* }}} static int journal_replay */
2093 static int journal_sort(const void *v1, const void *v2)
2094 {
2095 char **jn1 = (char **) v1;
2096 char **jn2 = (char **) v2;
2098 return strcmp(*jn1,*jn2);
2099 }
2101 static void journal_init(void) /* {{{ */
2102 {
2103 int had_journal = 0;
2104 DIR *dir;
2105 struct dirent *dent;
2106 char path[PATH_MAX+1];
2108 if (journal_dir == NULL) return;
2110 pthread_mutex_lock(&journal_lock);
2112 journal_cur = calloc(1, sizeof(journal_set));
2113 if (journal_cur == NULL)
2114 {
2115 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2116 return;
2117 }
2119 RRDD_LOG(LOG_INFO, "checking for journal files");
2121 /* Handle old journal files during transition. This gives them the
2122 * correct sort order. TODO: remove after first release
2123 */
2124 {
2125 char old_path[PATH_MAX+1];
2126 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2127 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2128 rename(old_path, path);
2130 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2131 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2132 rename(old_path, path);
2133 }
2135 dir = opendir(journal_dir);
2136 if (!dir) {
2137 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2138 return;
2139 }
2140 while ((dent = readdir(dir)) != NULL)
2141 {
2142 /* looks like a journal file? */
2143 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2144 continue;
2146 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2148 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2149 {
2150 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2151 dent->d_name);
2152 break;
2153 }
2154 }
2155 closedir(dir);
2157 qsort(journal_cur->files, journal_cur->files_num,
2158 sizeof(journal_cur->files[0]), journal_sort);
2160 for (uint i=0; i < journal_cur->files_num; i++)
2161 had_journal += journal_replay(journal_cur->files[i]);
2163 journal_new_file();
2165 /* it must have been a crash. start a flush */
2166 if (had_journal && config_flush_at_shutdown)
2167 flush_old_values(-1);
2169 pthread_mutex_unlock(&journal_lock);
2171 RRDD_LOG(LOG_INFO, "journal processing complete");
2173 } /* }}} static void journal_init */
2175 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2176 {
2177 assert(sock != NULL);
2179 free(sock->rbuf); sock->rbuf = NULL;
2180 free(sock->wbuf); sock->wbuf = NULL;
2181 free(sock);
2182 } /* }}} void free_listen_socket */
2184 static void close_connection(listen_socket_t *sock) /* {{{ */
2185 {
2186 if (sock->fd >= 0)
2187 {
2188 close(sock->fd);
2189 sock->fd = -1;
2190 }
2192 free_listen_socket(sock);
2194 } /* }}} void close_connection */
2196 static void *connection_thread_main (void *args) /* {{{ */
2197 {
2198 listen_socket_t *sock;
2199 int fd;
2201 sock = (listen_socket_t *) args;
2202 fd = sock->fd;
2204 /* init read buffers */
2205 sock->next_read = sock->next_cmd = 0;
2206 sock->rbuf = malloc(RBUF_SIZE);
2207 if (sock->rbuf == NULL)
2208 {
2209 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2210 close_connection(sock);
2211 return NULL;
2212 }
2214 pthread_mutex_lock (&connection_threads_lock);
2215 #ifdef HAVE_LIBWRAP
2216 /* LIBWRAP does not support multiple threads! By putting this code
2217 inside pthread_mutex_lock we do not have to worry about request_info
2218 getting overwritten by another thread.
2219 */
2220 struct request_info req;
2221 request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2222 fromhost(&req);
2223 if(!hosts_access(&req)) {
2224 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2225 pthread_mutex_unlock (&connection_threads_lock);
2226 close_connection(sock);
2227 return NULL;
2228 }
2229 #endif /* HAVE_LIBWRAP */
2230 connection_threads_num++;
2231 pthread_mutex_unlock (&connection_threads_lock);
2233 while (state == RUNNING)
2234 {
2235 char *cmd;
2236 ssize_t cmd_len;
2237 ssize_t rbytes;
2238 time_t now;
2240 struct pollfd pollfd;
2241 int status;
2243 pollfd.fd = fd;
2244 pollfd.events = POLLIN | POLLPRI;
2245 pollfd.revents = 0;
2247 status = poll (&pollfd, 1, /* timeout = */ 500);
2248 if (state != RUNNING)
2249 break;
2250 else if (status == 0) /* timeout */
2251 continue;
2252 else if (status < 0) /* error */
2253 {
2254 status = errno;
2255 if (status != EINTR)
2256 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2257 continue;
2258 }
2260 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2261 break;
2262 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2263 {
2264 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2265 "poll(2) returned something unexpected: %#04hx",
2266 pollfd.revents);
2267 break;
2268 }
2270 rbytes = read(fd, sock->rbuf + sock->next_read,
2271 RBUF_SIZE - sock->next_read);
2272 if (rbytes < 0)
2273 {
2274 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2275 break;
2276 }
2277 else if (rbytes == 0)
2278 break; /* eof */
2280 sock->next_read += rbytes;
2282 if (sock->batch_start)
2283 now = sock->batch_start;
2284 else
2285 now = time(NULL);
2287 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2288 {
2289 status = handle_request (sock, now, cmd, cmd_len+1);
2290 if (status != 0)
2291 goto out_close;
2292 }
2293 }
2295 out_close:
2296 close_connection(sock);
2298 /* Remove this thread from the connection threads list */
2299 pthread_mutex_lock (&connection_threads_lock);
2300 connection_threads_num--;
2301 if (connection_threads_num <= 0)
2302 pthread_cond_broadcast(&connection_threads_done);
2303 pthread_mutex_unlock (&connection_threads_lock);
2305 return (NULL);
2306 } /* }}} void *connection_thread_main */
2308 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2309 {
2310 int fd;
2311 struct sockaddr_un sa;
2312 listen_socket_t *temp;
2313 int status;
2314 const char *path;
2315 char *path_copy, *dir;
2317 path = sock->addr;
2318 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2319 path += strlen("unix:");
2321 /* dirname may modify its argument */
2322 path_copy = strdup(path);
2323 if (path_copy == NULL)
2324 {
2325 fprintf(stderr, "rrdcached: strdup(): %s\n",
2326 rrd_strerror(errno));
2327 return (-1);
2328 }
2330 dir = dirname(path_copy);
2331 if (rrd_mkdir_p(dir, 0777) != 0)
2332 {
2333 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2334 dir, rrd_strerror(errno));
2335 return (-1);
2336 }
2338 free(path_copy);
2340 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2341 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2342 if (temp == NULL)
2343 {
2344 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2345 return (-1);
2346 }
2347 listen_fds = temp;
2348 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2350 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2351 if (fd < 0)
2352 {
2353 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2354 rrd_strerror(errno));
2355 return (-1);
2356 }
2358 memset (&sa, 0, sizeof (sa));
2359 sa.sun_family = AF_UNIX;
2360 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2362 /* if we've gotten this far, we own the pid file. any daemon started
2363 * with the same args must not be alive. therefore, ensure that we can
2364 * create the socket...
2365 */
2366 unlink(path);
2368 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2369 if (status != 0)
2370 {
2371 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2372 path, rrd_strerror(errno));
2373 close (fd);
2374 return (-1);
2375 }
2377 /* tweak the sockets group ownership */
2378 if (sock->socket_group != (gid_t)-1)
2379 {
2380 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2381 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2382 {
2383 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2384 }
2385 }
2387 if (sock->socket_permissions != (mode_t)-1)
2388 {
2389 if (chmod(path, sock->socket_permissions) != 0)
2390 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2391 (unsigned int)sock->socket_permissions, strerror(errno));
2392 }
2394 status = listen (fd, /* backlog = */ 10);
2395 if (status != 0)
2396 {
2397 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2398 path, rrd_strerror(errno));
2399 close (fd);
2400 unlink (path);
2401 return (-1);
2402 }
2404 listen_fds[listen_fds_num].fd = fd;
2405 listen_fds[listen_fds_num].family = PF_UNIX;
2406 strncpy(listen_fds[listen_fds_num].addr, path,
2407 sizeof (listen_fds[listen_fds_num].addr) - 1);
2408 listen_fds_num++;
2410 return (0);
2411 } /* }}} int open_listen_socket_unix */
2413 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2414 {
2415 struct addrinfo ai_hints;
2416 struct addrinfo *ai_res;
2417 struct addrinfo *ai_ptr;
2418 char addr_copy[NI_MAXHOST];
2419 char *addr;
2420 char *port;
2421 int status;
2423 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2424 addr_copy[sizeof (addr_copy) - 1] = 0;
2425 addr = addr_copy;
2427 memset (&ai_hints, 0, sizeof (ai_hints));
2428 ai_hints.ai_flags = 0;
2429 #ifdef AI_ADDRCONFIG
2430 ai_hints.ai_flags |= AI_ADDRCONFIG;
2431 #endif
2432 ai_hints.ai_family = AF_UNSPEC;
2433 ai_hints.ai_socktype = SOCK_STREAM;
2435 port = NULL;
2436 if (*addr == '[') /* IPv6+port format */
2437 {
2438 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2439 addr++;
2441 port = strchr (addr, ']');
2442 if (port == NULL)
2443 {
2444 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2445 return (-1);
2446 }
2447 *port = 0;
2448 port++;
2450 if (*port == ':')
2451 port++;
2452 else if (*port == 0)
2453 port = NULL;
2454 else
2455 {
2456 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2457 return (-1);
2458 }
2459 } /* if (*addr == '[') */
2460 else
2461 {
2462 port = rindex(addr, ':');
2463 if (port != NULL)
2464 {
2465 *port = 0;
2466 port++;
2467 }
2468 }
2469 ai_res = NULL;
2470 status = getaddrinfo (addr,
2471 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2472 &ai_hints, &ai_res);
2473 if (status != 0)
2474 {
2475 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2476 addr, gai_strerror (status));
2477 return (-1);
2478 }
2480 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2481 {
2482 int fd;
2483 listen_socket_t *temp;
2484 int one = 1;
2486 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2487 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2488 if (temp == NULL)
2489 {
2490 fprintf (stderr,
2491 "rrdcached: open_listen_socket_network: realloc failed.\n");
2492 continue;
2493 }
2494 listen_fds = temp;
2495 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2497 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2498 if (fd < 0)
2499 {
2500 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2501 rrd_strerror(errno));
2502 continue;
2503 }
2505 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2507 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2508 if (status != 0)
2509 {
2510 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2511 sock->addr, rrd_strerror(errno));
2512 close (fd);
2513 continue;
2514 }
2516 status = listen (fd, /* backlog = */ 10);
2517 if (status != 0)
2518 {
2519 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2520 sock->addr, rrd_strerror(errno));
2521 close (fd);
2522 freeaddrinfo(ai_res);
2523 return (-1);
2524 }
2526 listen_fds[listen_fds_num].fd = fd;
2527 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2528 listen_fds_num++;
2529 } /* for (ai_ptr) */
2531 freeaddrinfo(ai_res);
2532 return (0);
2533 } /* }}} static int open_listen_socket_network */
2535 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2536 {
2537 assert(sock != NULL);
2538 assert(sock->addr != NULL);
2540 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2541 || sock->addr[0] == '/')
2542 return (open_listen_socket_unix(sock));
2543 else
2544 return (open_listen_socket_network(sock));
2545 } /* }}} int open_listen_socket */
2547 static int close_listen_sockets (void) /* {{{ */
2548 {
2549 size_t i;
2551 for (i = 0; i < listen_fds_num; i++)
2552 {
2553 close (listen_fds[i].fd);
2555 if (listen_fds[i].family == PF_UNIX)
2556 unlink(listen_fds[i].addr);
2557 }
2559 free (listen_fds);
2560 listen_fds = NULL;
2561 listen_fds_num = 0;
2563 return (0);
2564 } /* }}} int close_listen_sockets */
2566 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2567 {
2568 struct pollfd *pollfds;
2569 int pollfds_num;
2570 int status;
2571 int i;
2573 if (listen_fds_num < 1)
2574 {
2575 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2576 return (NULL);
2577 }
2579 pollfds_num = listen_fds_num;
2580 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2581 if (pollfds == NULL)
2582 {
2583 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2584 return (NULL);
2585 }
2586 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2588 RRDD_LOG(LOG_INFO, "listening for connections");
2590 while (state == RUNNING)
2591 {
2592 for (i = 0; i < pollfds_num; i++)
2593 {
2594 pollfds[i].fd = listen_fds[i].fd;
2595 pollfds[i].events = POLLIN | POLLPRI;
2596 pollfds[i].revents = 0;
2597 }
2599 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2600 if (state != RUNNING)
2601 break;
2602 else if (status == 0) /* timeout */
2603 continue;
2604 else if (status < 0) /* error */
2605 {
2606 status = errno;
2607 if (status != EINTR)
2608 {
2609 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2610 }
2611 continue;
2612 }
2614 for (i = 0; i < pollfds_num; i++)
2615 {
2616 listen_socket_t *client_sock;
2617 struct sockaddr_storage client_sa;
2618 socklen_t client_sa_size;
2619 pthread_t tid;
2620 pthread_attr_t attr;
2622 if (pollfds[i].revents == 0)
2623 continue;
2625 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2626 {
2627 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2628 "poll(2) returned something unexpected for listen FD #%i.",
2629 pollfds[i].fd);
2630 continue;
2631 }
2633 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2634 if (client_sock == NULL)
2635 {
2636 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2637 continue;
2638 }
2639 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2641 client_sa_size = sizeof (client_sa);
2642 client_sock->fd = accept (pollfds[i].fd,
2643 (struct sockaddr *) &client_sa, &client_sa_size);
2644 if (client_sock->fd < 0)
2645 {
2646 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2647 free(client_sock);
2648 continue;
2649 }
2651 pthread_attr_init (&attr);
2652 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2654 status = pthread_create (&tid, &attr, connection_thread_main,
2655 client_sock);
2656 if (status != 0)
2657 {
2658 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2659 close_connection(client_sock);
2660 continue;
2661 }
2662 } /* for (pollfds_num) */
2663 } /* while (state == RUNNING) */
2665 RRDD_LOG(LOG_INFO, "starting shutdown");
2667 close_listen_sockets ();
2669 pthread_mutex_lock (&connection_threads_lock);
2670 while (connection_threads_num > 0)
2671 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2672 pthread_mutex_unlock (&connection_threads_lock);
2674 free(pollfds);
2676 return (NULL);
2677 } /* }}} void *listen_thread_main */
2679 static int daemonize (void) /* {{{ */
2680 {
2681 int pid_fd;
2682 char *base_dir;
2684 daemon_uid = geteuid();
2686 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2687 if (pid_fd < 0)
2688 pid_fd = check_pidfile();
2689 if (pid_fd < 0)
2690 return pid_fd;
2692 /* open all the listen sockets */
2693 if (config_listen_address_list_len > 0)
2694 {
2695 for (size_t i = 0; i < config_listen_address_list_len; i++)
2696 open_listen_socket (config_listen_address_list[i]);
2698 rrd_free_ptrs((void ***) &config_listen_address_list,
2699 &config_listen_address_list_len);
2700 }
2701 else
2702 {
2703 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2704 sizeof(default_socket.addr) - 1);
2705 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2707 if (default_socket.permissions == 0)
2708 socket_permission_set_all (&default_socket);
2710 open_listen_socket (&default_socket);
2711 }
2713 if (listen_fds_num < 1)
2714 {
2715 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2716 goto error;
2717 }
2719 if (!stay_foreground)
2720 {
2721 pid_t child;
2723 child = fork ();
2724 if (child < 0)
2725 {
2726 fprintf (stderr, "daemonize: fork(2) failed.\n");
2727 goto error;
2728 }
2729 else if (child > 0)
2730 exit(0);
2732 /* Become session leader */
2733 setsid ();
2735 /* Open the first three file descriptors to /dev/null */
2736 close (2);
2737 close (1);
2738 close (0);
2740 open ("/dev/null", O_RDWR);
2741 if (dup(0) == -1 || dup(0) == -1){
2742 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2743 }
2744 } /* if (!stay_foreground) */
2746 /* Change into the /tmp directory. */
2747 base_dir = (config_base_dir != NULL)
2748 ? config_base_dir
2749 : "/tmp";
2751 if (chdir (base_dir) != 0)
2752 {
2753 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2754 goto error;
2755 }
2757 install_signal_handlers();
2759 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2760 RRDD_LOG(LOG_INFO, "starting up");
2762 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2763 (GDestroyNotify) free_cache_item);
2764 if (cache_tree == NULL)
2765 {
2766 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2767 goto error;
2768 }
2770 return write_pidfile (pid_fd);
2772 error:
2773 remove_pidfile();
2774 return -1;
2775 } /* }}} int daemonize */
2777 static int cleanup (void) /* {{{ */
2778 {
2779 pthread_cond_broadcast (&flush_cond);
2780 pthread_join (flush_thread, NULL);
2782 pthread_cond_broadcast (&queue_cond);
2783 for (int i = 0; i < config_queue_threads; i++)
2784 pthread_join (queue_threads[i], NULL);
2786 if (config_flush_at_shutdown)
2787 {
2788 assert(cache_queue_head == NULL);
2789 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2790 }
2792 free(queue_threads);
2793 free(config_base_dir);
2795 pthread_mutex_lock(&cache_lock);
2796 g_tree_destroy(cache_tree);
2798 pthread_mutex_lock(&journal_lock);
2799 journal_done();
2801 RRDD_LOG(LOG_INFO, "goodbye");
2802 closelog ();
2804 remove_pidfile ();
2805 free(config_pid_file);
2807 return (0);
2808 } /* }}} int cleanup */
2810 static int read_options (int argc, char **argv) /* {{{ */
2811 {
2812 int option;
2813 int status = 0;
2815 socket_permission_clear (&default_socket);
2817 default_socket.socket_group = (gid_t)-1;
2818 default_socket.socket_permissions = (mode_t)-1;
2820 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2821 {
2822 switch (option)
2823 {
2824 case 'g':
2825 stay_foreground=1;
2826 break;
2828 case 'l':
2829 {
2830 listen_socket_t *new;
2832 new = malloc(sizeof(listen_socket_t));
2833 if (new == NULL)
2834 {
2835 fprintf(stderr, "read_options: malloc failed.\n");
2836 return(2);
2837 }
2838 memset(new, 0, sizeof(listen_socket_t));
2840 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2842 /* Add permissions to the socket {{{ */
2843 if (default_socket.permissions != 0)
2844 {
2845 socket_permission_copy (new, &default_socket);
2846 }
2847 else /* if (default_socket.permissions == 0) */
2848 {
2849 /* Add permission for ALL commands to the socket. */
2850 socket_permission_set_all (new);
2851 }
2852 /* }}} Done adding permissions. */
2854 new->socket_group = default_socket.socket_group;
2855 new->socket_permissions = default_socket.socket_permissions;
2857 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2858 &config_listen_address_list_len, new))
2859 {
2860 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2861 return (2);
2862 }
2863 }
2864 break;
2866 /* set socket group permissions */
2867 case 's':
2868 {
2869 gid_t group_gid;
2870 struct group *grp;
2872 group_gid = strtoul(optarg, NULL, 10);
2873 if (errno != EINVAL && group_gid>0)
2874 {
2875 /* we were passed a number */
2876 grp = getgrgid(group_gid);
2877 }
2878 else
2879 {
2880 grp = getgrnam(optarg);
2881 }
2883 if (grp)
2884 {
2885 default_socket.socket_group = grp->gr_gid;
2886 }
2887 else
2888 {
2889 /* no idea what the user wanted... */
2890 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2891 return (5);
2892 }
2893 }
2894 break;
2896 /* set socket file permissions */
2897 case 'm':
2898 {
2899 long tmp;
2900 char *endptr = NULL;
2902 tmp = strtol (optarg, &endptr, 8);
2903 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2904 || (tmp > 07777) || (tmp < 0)) {
2905 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2906 optarg);
2907 return (5);
2908 }
2910 default_socket.socket_permissions = (mode_t)tmp;
2911 }
2912 break;
2914 case 'P':
2915 {
2916 char *optcopy;
2917 char *saveptr;
2918 char *dummy;
2919 char *ptr;
2921 socket_permission_clear (&default_socket);
2923 optcopy = strdup (optarg);
2924 dummy = optcopy;
2925 saveptr = NULL;
2926 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2927 {
2928 dummy = NULL;
2929 status = socket_permission_add (&default_socket, ptr);
2930 if (status != 0)
2931 {
2932 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2933 "socket failed. Most likely, this permission doesn't "
2934 "exist. Check your command line.\n", ptr);
2935 status = 4;
2936 }
2937 }
2939 free (optcopy);
2940 }
2941 break;
2943 case 'f':
2944 {
2945 int temp;
2947 temp = atoi (optarg);
2948 if (temp > 0)
2949 config_flush_interval = temp;
2950 else
2951 {
2952 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2953 status = 3;
2954 }
2955 }
2956 break;
2958 case 'w':
2959 {
2960 int temp;
2962 temp = atoi (optarg);
2963 if (temp > 0)
2964 config_write_interval = temp;
2965 else
2966 {
2967 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2968 status = 2;
2969 }
2970 }
2971 break;
2973 case 'z':
2974 {
2975 int temp;
2977 temp = atoi(optarg);
2978 if (temp > 0)
2979 config_write_jitter = temp;
2980 else
2981 {
2982 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2983 status = 2;
2984 }
2986 break;
2987 }
2989 case 't':
2990 {
2991 int threads;
2992 threads = atoi(optarg);
2993 if (threads >= 1)
2994 config_queue_threads = threads;
2995 else
2996 {
2997 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2998 return 1;
2999 }
3000 }
3001 break;
3003 case 'B':
3004 config_write_base_only = 1;
3005 break;
3007 case 'b':
3008 {
3009 size_t len;
3010 char base_realpath[PATH_MAX];
3012 if (config_base_dir != NULL)
3013 free (config_base_dir);
3014 config_base_dir = strdup (optarg);
3015 if (config_base_dir == NULL)
3016 {
3017 fprintf (stderr, "read_options: strdup failed.\n");
3018 return (3);
3019 }
3021 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3022 {
3023 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3024 config_base_dir, rrd_strerror (errno));
3025 return (3);
3026 }
3028 /* make sure that the base directory is not resolved via
3029 * symbolic links. this makes some performance-enhancing
3030 * assumptions possible (we don't have to resolve paths
3031 * that start with a "/")
3032 */
3033 if (realpath(config_base_dir, base_realpath) == NULL)
3034 {
3035 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3036 "%s\n", config_base_dir, rrd_strerror(errno));
3037 return 5;
3038 }
3040 len = strlen (config_base_dir);
3041 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3042 {
3043 config_base_dir[len - 1] = 0;
3044 len--;
3045 }
3047 if (len < 1)
3048 {
3049 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3050 return (4);
3051 }
3053 _config_base_dir_len = len;
3055 len = strlen (base_realpath);
3056 while ((len > 0) && (base_realpath[len - 1] == '/'))
3057 {
3058 base_realpath[len - 1] = '\0';
3059 len--;
3060 }
3062 if (strncmp(config_base_dir,
3063 base_realpath, sizeof(base_realpath)) != 0)
3064 {
3065 fprintf(stderr,
3066 "Base directory (-b) resolved via file system links!\n"
3067 "Please consult rrdcached '-b' documentation!\n"
3068 "Consider specifying the real directory (%s)\n",
3069 base_realpath);
3070 return 5;
3071 }
3072 }
3073 break;
3075 case 'p':
3076 {
3077 if (config_pid_file != NULL)
3078 free (config_pid_file);
3079 config_pid_file = strdup (optarg);
3080 if (config_pid_file == NULL)
3081 {
3082 fprintf (stderr, "read_options: strdup failed.\n");
3083 return (3);
3084 }
3085 }
3086 break;
3088 case 'F':
3089 config_flush_at_shutdown = 1;
3090 break;
3092 case 'j':
3093 {
3094 char journal_dir_actual[PATH_MAX];
3095 const char *dir;
3096 if (realpath((const char *)optarg, journal_dir_actual) == NULL)
3097 {
3098 fprintf(stderr, "Failed to canonicalize the journal directory '%s': %s\n",
3099 optarg, rrd_strerror(errno));
3100 return 7;
3101 }
3102 dir = journal_dir = strdup(journal_dir_actual);
3103 if (dir == NULL) {
3104 fprintf (stderr, "read_options: strdup failed.\n");
3105 return (3);
3106 }
3108 status = rrd_mkdir_p(dir, 0777);
3109 if (status != 0)
3110 {
3111 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3112 dir, rrd_strerror(errno));
3113 return 6;
3114 }
3116 if (access(dir, R_OK|W_OK|X_OK) != 0)
3117 {
3118 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3119 errno ? rrd_strerror(errno) : "");
3120 return 6;
3121 }
3122 }
3123 break;
3125 case 'h':
3126 case '?':
3127 printf ("RRDCacheD %s\n"
3128 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3129 "\n"
3130 "Usage: rrdcached [options]\n"
3131 "\n"
3132 "Valid options are:\n"
3133 " -l <address> Socket address to listen to.\n"
3134 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3135 " -P <perms> Sets the permissions to assign to all following "
3136 "sockets\n"
3137 " -w <seconds> Interval in which to write data.\n"
3138 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3139 " -t <threads> Number of write threads.\n"
3140 " -f <seconds> Interval in which to flush dead data.\n"
3141 " -p <file> Location of the PID-file.\n"
3142 " -b <dir> Base directory to change to.\n"
3143 " -B Restrict file access to paths within -b <dir>\n"
3144 " -g Do not fork and run in the foreground.\n"
3145 " -j <dir> Directory in which to create the journal files.\n"
3146 " -F Always flush all updates at shutdown\n"
3147 " -s <id|name> Group owner of all following UNIX sockets\n"
3148 " (the socket will also have read/write permissions "
3149 "for that group)\n"
3150 " -m <mode> File permissions (octal) of all following UNIX "
3151 "sockets\n"
3152 "\n"
3153 "For more information and a detailed description of all options "
3154 "please refer\n"
3155 "to the rrdcached(1) manual page.\n",
3156 VERSION);
3157 if (option == 'h')
3158 status = -1;
3159 else
3160 status = 1;
3161 break;
3162 } /* switch (option) */
3163 } /* while (getopt) */
3165 /* advise the user when values are not sane */
3166 if (config_flush_interval < 2 * config_write_interval)
3167 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3168 " 2x write interval (-w) !\n");
3169 if (config_write_jitter > config_write_interval)
3170 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3171 " write interval (-w) !\n");
3173 if (config_write_base_only && config_base_dir == NULL)
3174 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3175 " Consult the rrdcached documentation\n");
3177 if (journal_dir == NULL)
3178 config_flush_at_shutdown = 1;
3180 return (status);
3181 } /* }}} int read_options */
3183 int main (int argc, char **argv)
3184 {
3185 int status;
3187 status = read_options (argc, argv);
3188 if (status != 0)
3189 {
3190 if (status < 0)
3191 status = 0;
3192 return (status);
3193 }
3195 status = daemonize ();
3196 if (status != 0)
3197 {
3198 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3199 return (1);
3200 }
3202 journal_init();
3204 /* start the queue threads */
3205 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3206 if (queue_threads == NULL)
3207 {
3208 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3209 cleanup();
3210 return (1);
3211 }
3212 for (int i = 0; i < config_queue_threads; i++)
3213 {
3214 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3215 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3216 if (status != 0)
3217 {
3218 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3219 cleanup();
3220 return (1);
3221 }
3222 }
3224 /* start the flush thread */
3225 memset(&flush_thread, 0, sizeof(flush_thread));
3226 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3227 if (status != 0)
3228 {
3229 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3230 cleanup();
3231 return (1);
3232 }
3234 listen_thread_main (NULL);
3235 cleanup ();
3237 return (0);
3238 } /* int main */
3240 /*
3241 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3242 */