1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #include "rrd_tool.h"
67 #include "rrd_client.h"
68 #include "unused.h"
70 #include <stdlib.h>
72 #ifndef WIN32
73 #ifdef HAVE_STDINT_H
74 # include <stdint.h>
75 #endif
76 #include <unistd.h>
77 #include <strings.h>
78 #include <inttypes.h>
79 #include <sys/socket.h>
81 #else
83 #endif
84 #include <stdio.h>
85 #include <string.h>
87 #include <sys/types.h>
88 #include <sys/stat.h>
89 #include <dirent.h>
90 #include <fcntl.h>
91 #include <signal.h>
92 #include <sys/un.h>
93 #include <netdb.h>
94 #include <poll.h>
95 #include <syslog.h>
96 #include <pthread.h>
97 #include <errno.h>
98 #include <assert.h>
99 #include <sys/time.h>
100 #include <time.h>
101 #include <libgen.h>
102 #include <grp.h>
104 #ifdef HAVE_LIBWRAP
105 #include <tcpd.h>
106 #endif /* HAVE_LIBWRAP */
108 #include <glib-2.0/glib.h>
109 /* }}} */
111 #define RRDD_LOG(severity, ...) \
112 do { \
113 if (stay_foreground) { \
114 fprintf(stderr, __VA_ARGS__); \
115 fprintf(stderr, "\n"); } \
116 syslog ((severity), __VA_ARGS__); \
117 } while (0)
119 /*
120 * Types
121 */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124 struct listen_socket_s
125 {
126 int fd;
127 char addr[PATH_MAX + 1];
128 int family;
130 /* state for BATCH processing */
131 time_t batch_start;
132 int batch_cmd;
134 /* buffered IO */
135 char *rbuf;
136 off_t next_cmd;
137 off_t next_read;
139 char *wbuf;
140 ssize_t wbuf_len;
142 uint32_t permissions;
144 gid_t socket_group;
145 mode_t socket_permissions;
146 };
147 typedef struct listen_socket_s listen_socket_t;
149 struct command_s;
150 typedef struct command_s command_t;
151 /* note: guard against "unused" warnings in the handlers */
152 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
153 time_t UNUSED(now),\
154 char UNUSED(*buffer),\
155 size_t UNUSED(buffer_size)
157 #define HANDLER_PROTO command_t UNUSED(*cmd),\
158 DISPATCH_PROTO
160 struct command_s {
161 char *cmd;
162 int (*handler)(HANDLER_PROTO);
164 char context; /* where we expect to see it */
165 #define CMD_CONTEXT_CLIENT (1<<0)
166 #define CMD_CONTEXT_BATCH (1<<1)
167 #define CMD_CONTEXT_JOURNAL (1<<2)
168 #define CMD_CONTEXT_ANY (0x7f)
170 char *syntax;
171 char *help;
172 };
174 struct cache_item_s;
175 typedef struct cache_item_s cache_item_t;
176 struct cache_item_s
177 {
178 char *file;
179 char **values;
180 size_t values_num;
181 time_t last_flush_time;
182 double last_update_stamp;
183 #define CI_FLAGS_IN_TREE (1<<0)
184 #define CI_FLAGS_IN_QUEUE (1<<1)
185 int flags;
186 pthread_cond_t flushed;
187 cache_item_t *prev;
188 cache_item_t *next;
189 };
191 struct callback_flush_data_s
192 {
193 time_t now;
194 time_t abs_timeout;
195 char **keys;
196 size_t keys_num;
197 };
198 typedef struct callback_flush_data_s callback_flush_data_t;
200 enum queue_side_e
201 {
202 HEAD,
203 TAIL
204 };
205 typedef enum queue_side_e queue_side_t;
207 /* describe a set of journal files */
208 typedef struct {
209 char **files;
210 size_t files_num;
211 } journal_set;
213 /* max length of socket command or response */
214 #define CMD_MAX 4096
215 #define RBUF_SIZE (CMD_MAX*2)
217 /*
218 * Variables
219 */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
226 static listen_socket_t default_socket;
228 enum {
229 RUNNING, /* normal operation */
230 FLUSHING, /* flushing remaining values */
231 SHUTDOWN /* shutting down */
232 } state = RUNNING;
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
245 /* Cache stuff */
246 static GTree *cache_tree = NULL;
247 static cache_item_t *cache_queue_head = NULL;
248 static cache_item_t *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
251 static int config_write_interval = 300;
252 static int config_write_jitter = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
260 static listen_socket_t **config_listen_address_list = NULL;
261 static size_t config_listen_address_list_len = 0;
263 static uint64_t stats_queue_length = 0;
264 static uint64_t stats_updates_received = 0;
265 static uint64_t stats_flush_received = 0;
266 static uint64_t stats_updates_written = 0;
267 static uint64_t stats_data_sets_written = 0;
268 static uint64_t stats_journal_bytes = 0;
269 static uint64_t stats_journal_rotate = 0;
270 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
272 /* Journaled updates */
273 #define JOURNAL_REPLAY(s) ((s) == NULL)
274 #define JOURNAL_BASE "rrd.journal"
275 static journal_set *journal_cur = NULL;
276 static journal_set *journal_old = NULL;
277 static char *journal_dir = NULL;
278 static FILE *journal_fh = NULL; /* current journal file handle */
279 static long journal_size = 0; /* current journal size */
280 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
281 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
282 static int journal_write(char *cmd, char *args);
283 static void journal_done(void);
284 static void journal_rotate(void);
286 /* prototypes for forward refernces */
287 static int handle_request_help (HANDLER_PROTO);
289 /*
290 * Functions
291 */
292 static void sig_common (const char *sig) /* {{{ */
293 {
294 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
295 state = FLUSHING;
296 pthread_cond_broadcast(&flush_cond);
297 pthread_cond_broadcast(&queue_cond);
298 } /* }}} void sig_common */
300 static void sig_int_handler (int UNUSED(s)) /* {{{ */
301 {
302 sig_common("INT");
303 } /* }}} void sig_int_handler */
305 static void sig_term_handler (int UNUSED(s)) /* {{{ */
306 {
307 sig_common("TERM");
308 } /* }}} void sig_term_handler */
310 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
311 {
312 config_flush_at_shutdown = 1;
313 sig_common("USR1");
314 } /* }}} void sig_usr1_handler */
316 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
317 {
318 config_flush_at_shutdown = 0;
319 sig_common("USR2");
320 } /* }}} void sig_usr2_handler */
322 static void install_signal_handlers(void) /* {{{ */
323 {
324 /* These structures are static, because `sigaction' behaves weird if the are
325 * overwritten.. */
326 static struct sigaction sa_int;
327 static struct sigaction sa_term;
328 static struct sigaction sa_pipe;
329 static struct sigaction sa_usr1;
330 static struct sigaction sa_usr2;
332 /* Install signal handlers */
333 memset (&sa_int, 0, sizeof (sa_int));
334 sa_int.sa_handler = sig_int_handler;
335 sigaction (SIGINT, &sa_int, NULL);
337 memset (&sa_term, 0, sizeof (sa_term));
338 sa_term.sa_handler = sig_term_handler;
339 sigaction (SIGTERM, &sa_term, NULL);
341 memset (&sa_pipe, 0, sizeof (sa_pipe));
342 sa_pipe.sa_handler = SIG_IGN;
343 sigaction (SIGPIPE, &sa_pipe, NULL);
345 memset (&sa_pipe, 0, sizeof (sa_usr1));
346 sa_usr1.sa_handler = sig_usr1_handler;
347 sigaction (SIGUSR1, &sa_usr1, NULL);
349 memset (&sa_usr2, 0, sizeof (sa_usr2));
350 sa_usr2.sa_handler = sig_usr2_handler;
351 sigaction (SIGUSR2, &sa_usr2, NULL);
353 } /* }}} void install_signal_handlers */
355 static int open_pidfile(char *action, int oflag) /* {{{ */
356 {
357 int fd;
358 const char *file;
359 char *file_copy, *dir;
361 file = (config_pid_file != NULL)
362 ? config_pid_file
363 : LOCALSTATEDIR "/run/rrdcached.pid";
365 /* dirname may modify its argument */
366 file_copy = strdup(file);
367 if (file_copy == NULL)
368 {
369 fprintf(stderr, "rrdcached: strdup(): %s\n",
370 rrd_strerror(errno));
371 return -1;
372 }
374 dir = dirname(file_copy);
375 if (rrd_mkdir_p(dir, 0777) != 0)
376 {
377 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
378 dir, rrd_strerror(errno));
379 return -1;
380 }
382 free(file_copy);
384 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
385 if (fd < 0)
386 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
387 action, file, rrd_strerror(errno));
389 return(fd);
390 } /* }}} static int open_pidfile */
392 /* check existing pid file to see whether a daemon is running */
393 static int check_pidfile(void)
394 {
395 int pid_fd;
396 pid_t pid;
397 char pid_str[16];
399 pid_fd = open_pidfile("open", O_RDWR);
400 if (pid_fd < 0)
401 return pid_fd;
403 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
404 return -1;
406 pid = atoi(pid_str);
407 if (pid <= 0)
408 return -1;
410 /* another running process that we can signal COULD be
411 * a competing rrdcached */
412 if (pid != getpid() && kill(pid, 0) == 0)
413 {
414 fprintf(stderr,
415 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
416 close(pid_fd);
417 return -1;
418 }
420 lseek(pid_fd, 0, SEEK_SET);
421 if (ftruncate(pid_fd, 0) == -1)
422 {
423 fprintf(stderr,
424 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
425 close(pid_fd);
426 return -1;
427 }
429 fprintf(stderr,
430 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
431 "rrdcached: starting normally.\n", pid);
433 return pid_fd;
434 } /* }}} static int check_pidfile */
436 static int write_pidfile (int fd) /* {{{ */
437 {
438 pid_t pid;
439 FILE *fh;
441 pid = getpid ();
443 fh = fdopen (fd, "w");
444 if (fh == NULL)
445 {
446 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
447 close(fd);
448 return (-1);
449 }
451 fprintf (fh, "%i\n", (int) pid);
452 fclose (fh);
454 return (0);
455 } /* }}} int write_pidfile */
457 static int remove_pidfile (void) /* {{{ */
458 {
459 char *file;
460 int status;
462 file = (config_pid_file != NULL)
463 ? config_pid_file
464 : LOCALSTATEDIR "/run/rrdcached.pid";
466 status = unlink (file);
467 if (status == 0)
468 return (0);
469 return (errno);
470 } /* }}} int remove_pidfile */
472 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
473 {
474 char *eol;
476 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
477 sock->next_read - sock->next_cmd);
479 if (eol == NULL)
480 {
481 /* no commands left, move remainder back to front of rbuf */
482 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
483 sock->next_read - sock->next_cmd);
484 sock->next_read -= sock->next_cmd;
485 sock->next_cmd = 0;
486 *len = 0;
487 return NULL;
488 }
489 else
490 {
491 char *cmd = sock->rbuf + sock->next_cmd;
492 *eol = '\0';
494 sock->next_cmd = eol - sock->rbuf + 1;
496 if (eol > sock->rbuf && *(eol-1) == '\r')
497 *(--eol) = '\0'; /* handle "\r\n" EOL */
499 *len = eol - cmd;
501 return cmd;
502 }
504 /* NOTREACHED */
505 assert(1==0);
506 } /* }}} char *next_cmd */
508 /* add the characters directly to the write buffer */
509 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
510 {
511 char *new_buf;
513 assert(sock != NULL);
515 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
516 if (new_buf == NULL)
517 {
518 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
519 return -1;
520 }
522 strncpy(new_buf + sock->wbuf_len, str, len + 1);
524 sock->wbuf = new_buf;
525 sock->wbuf_len += len;
527 return 0;
528 } /* }}} static int add_to_wbuf */
530 /* add the text to the "extra" info that's sent after the status line */
531 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
532 {
533 va_list argp;
534 char buffer[CMD_MAX];
535 int len;
537 if (JOURNAL_REPLAY(sock)) return 0;
538 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
540 va_start(argp, fmt);
541 #ifdef HAVE_VSNPRINTF
542 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
543 #else
544 len = vsprintf(buffer, fmt, argp);
545 #endif
546 va_end(argp);
547 if (len < 0)
548 {
549 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
550 return -1;
551 }
553 return add_to_wbuf(sock, buffer, len);
554 } /* }}} static int add_response_info */
556 static int count_lines(char *str) /* {{{ */
557 {
558 int lines = 0;
560 if (str != NULL)
561 {
562 while ((str = strchr(str, '\n')) != NULL)
563 {
564 ++lines;
565 ++str;
566 }
567 }
569 return lines;
570 } /* }}} static int count_lines */
572 /* send the response back to the user.
573 * returns 0 on success, -1 on error
574 * write buffer is always zeroed after this call */
575 static int send_response (listen_socket_t *sock, response_code rc,
576 char *fmt, ...) /* {{{ */
577 {
578 va_list argp;
579 char buffer[CMD_MAX];
580 int lines;
581 ssize_t wrote;
582 int rclen, len;
584 if (JOURNAL_REPLAY(sock)) return rc;
586 if (sock->batch_start)
587 {
588 if (rc == RESP_OK)
589 return rc; /* no response on success during BATCH */
590 lines = sock->batch_cmd;
591 }
592 else if (rc == RESP_OK)
593 lines = count_lines(sock->wbuf);
594 else
595 lines = -1;
597 rclen = sprintf(buffer, "%d ", lines);
598 va_start(argp, fmt);
599 #ifdef HAVE_VSNPRINTF
600 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
601 #else
602 len = vsprintf(buffer+rclen, fmt, argp);
603 #endif
604 va_end(argp);
605 if (len < 0)
606 return -1;
608 len += rclen;
610 /* append the result to the wbuf, don't write to the user */
611 if (sock->batch_start)
612 return add_to_wbuf(sock, buffer, len);
614 /* first write must be complete */
615 if (len != write(sock->fd, buffer, len))
616 {
617 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
618 return -1;
619 }
621 if (sock->wbuf != NULL && rc == RESP_OK)
622 {
623 wrote = 0;
624 while (wrote < sock->wbuf_len)
625 {
626 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
627 if (wb <= 0)
628 {
629 RRDD_LOG(LOG_INFO, "send_response: could not write results");
630 return -1;
631 }
632 wrote += wb;
633 }
634 }
636 free(sock->wbuf); sock->wbuf = NULL;
637 sock->wbuf_len = 0;
639 return 0;
640 } /* }}} */
642 static void wipe_ci_values(cache_item_t *ci, time_t when)
643 {
644 ci->values = NULL;
645 ci->values_num = 0;
647 ci->last_flush_time = when;
648 if (config_write_jitter > 0)
649 ci->last_flush_time += (rrd_random() % config_write_jitter);
650 }
652 /* remove_from_queue
653 * remove a "cache_item_t" item from the queue.
654 * must hold 'cache_lock' when calling this
655 */
656 static void remove_from_queue(cache_item_t *ci) /* {{{ */
657 {
658 if (ci == NULL) return;
659 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
661 if (ci->prev == NULL)
662 cache_queue_head = ci->next; /* reset head */
663 else
664 ci->prev->next = ci->next;
666 if (ci->next == NULL)
667 cache_queue_tail = ci->prev; /* reset the tail */
668 else
669 ci->next->prev = ci->prev;
671 ci->next = ci->prev = NULL;
672 ci->flags &= ~CI_FLAGS_IN_QUEUE;
674 pthread_mutex_lock (&stats_lock);
675 assert (stats_queue_length > 0);
676 stats_queue_length--;
677 pthread_mutex_unlock (&stats_lock);
679 } /* }}} static void remove_from_queue */
681 /* free the resources associated with the cache_item_t
682 * must hold cache_lock when calling this function
683 */
684 static void *free_cache_item(cache_item_t *ci) /* {{{ */
685 {
686 if (ci == NULL) return NULL;
688 remove_from_queue(ci);
690 for (size_t i=0; i < ci->values_num; i++)
691 free(ci->values[i]);
693 free (ci->values);
694 free (ci->file);
696 /* in case anyone is waiting */
697 pthread_cond_broadcast(&ci->flushed);
698 pthread_cond_destroy(&ci->flushed);
700 free (ci);
702 return NULL;
703 } /* }}} static void *free_cache_item */
705 /*
706 * enqueue_cache_item:
707 * `cache_lock' must be acquired before calling this function!
708 */
709 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
710 queue_side_t side)
711 {
712 if (ci == NULL)
713 return (-1);
715 if (ci->values_num == 0)
716 return (0);
718 if (side == HEAD)
719 {
720 if (cache_queue_head == ci)
721 return 0;
723 /* remove if further down in queue */
724 remove_from_queue(ci);
726 ci->prev = NULL;
727 ci->next = cache_queue_head;
728 if (ci->next != NULL)
729 ci->next->prev = ci;
730 cache_queue_head = ci;
732 if (cache_queue_tail == NULL)
733 cache_queue_tail = cache_queue_head;
734 }
735 else /* (side == TAIL) */
736 {
737 /* We don't move values back in the list.. */
738 if (ci->flags & CI_FLAGS_IN_QUEUE)
739 return (0);
741 assert (ci->next == NULL);
742 assert (ci->prev == NULL);
744 ci->prev = cache_queue_tail;
746 if (cache_queue_tail == NULL)
747 cache_queue_head = ci;
748 else
749 cache_queue_tail->next = ci;
751 cache_queue_tail = ci;
752 }
754 ci->flags |= CI_FLAGS_IN_QUEUE;
756 pthread_cond_signal(&queue_cond);
757 pthread_mutex_lock (&stats_lock);
758 stats_queue_length++;
759 pthread_mutex_unlock (&stats_lock);
761 return (0);
762 } /* }}} int enqueue_cache_item */
764 /*
765 * tree_callback_flush:
766 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
767 * while this is in progress.
768 */
769 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
770 gpointer data)
771 {
772 cache_item_t *ci;
773 callback_flush_data_t *cfd;
775 ci = (cache_item_t *) value;
776 cfd = (callback_flush_data_t *) data;
778 if (ci->flags & CI_FLAGS_IN_QUEUE)
779 return FALSE;
781 if (ci->values_num > 0
782 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
783 {
784 enqueue_cache_item (ci, TAIL);
785 }
786 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
787 && (ci->values_num <= 0))
788 {
789 assert ((char *) key == ci->file);
790 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
791 {
792 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
793 return (FALSE);
794 }
795 }
797 return (FALSE);
798 } /* }}} gboolean tree_callback_flush */
800 static int flush_old_values (int max_age)
801 {
802 callback_flush_data_t cfd;
803 size_t k;
805 memset (&cfd, 0, sizeof (cfd));
806 /* Pass the current time as user data so that we don't need to call
807 * `time' for each node. */
808 cfd.now = time (NULL);
809 cfd.keys = NULL;
810 cfd.keys_num = 0;
812 if (max_age > 0)
813 cfd.abs_timeout = cfd.now - max_age;
814 else
815 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
817 /* `tree_callback_flush' will return the keys of all values that haven't
818 * been touched in the last `config_flush_interval' seconds in `cfd'.
819 * The char*'s in this array point to the same memory as ci->file, so we
820 * don't need to free them separately. */
821 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
823 for (k = 0; k < cfd.keys_num; k++)
824 {
825 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
826 /* should never fail, since we have held the cache_lock
827 * the entire time */
828 assert(status == TRUE);
829 }
831 if (cfd.keys != NULL)
832 {
833 free (cfd.keys);
834 cfd.keys = NULL;
835 }
837 return (0);
838 } /* int flush_old_values */
840 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
841 {
842 struct timeval now;
843 struct timespec next_flush;
844 int status;
846 gettimeofday (&now, NULL);
847 next_flush.tv_sec = now.tv_sec + config_flush_interval;
848 next_flush.tv_nsec = 1000 * now.tv_usec;
850 pthread_mutex_lock(&cache_lock);
852 while (state == RUNNING)
853 {
854 gettimeofday (&now, NULL);
855 if ((now.tv_sec > next_flush.tv_sec)
856 || ((now.tv_sec == next_flush.tv_sec)
857 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
858 {
859 RRDD_LOG(LOG_DEBUG, "flushing old values");
861 /* Determine the time of the next cache flush. */
862 next_flush.tv_sec = now.tv_sec + config_flush_interval;
864 /* Flush all values that haven't been written in the last
865 * `config_write_interval' seconds. */
866 flush_old_values (config_write_interval);
868 /* unlock the cache while we rotate so we don't block incoming
869 * updates if the fsync() blocks on disk I/O */
870 pthread_mutex_unlock(&cache_lock);
871 journal_rotate();
872 pthread_mutex_lock(&cache_lock);
873 }
875 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
876 if (status != 0 && status != ETIMEDOUT)
877 {
878 RRDD_LOG (LOG_ERR, "flush_thread_main: "
879 "pthread_cond_timedwait returned %i.", status);
880 }
881 }
883 if (config_flush_at_shutdown)
884 flush_old_values (-1); /* flush everything */
886 state = SHUTDOWN;
888 pthread_mutex_unlock(&cache_lock);
890 return NULL;
891 } /* void *flush_thread_main */
893 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
894 {
895 pthread_mutex_lock (&cache_lock);
897 while (state != SHUTDOWN
898 || (cache_queue_head != NULL && config_flush_at_shutdown))
899 {
900 cache_item_t *ci;
901 char *file;
902 char **values;
903 size_t values_num;
904 int status;
906 /* Now, check if there's something to store away. If not, wait until
907 * something comes in. */
908 if (cache_queue_head == NULL)
909 {
910 status = pthread_cond_wait (&queue_cond, &cache_lock);
911 if ((status != 0) && (status != ETIMEDOUT))
912 {
913 RRDD_LOG (LOG_ERR, "queue_thread_main: "
914 "pthread_cond_wait returned %i.", status);
915 }
916 }
918 /* Check if a value has arrived. This may be NULL if we timed out or there
919 * was an interrupt such as a signal. */
920 if (cache_queue_head == NULL)
921 continue;
923 ci = cache_queue_head;
925 /* copy the relevant parts */
926 file = strdup (ci->file);
927 if (file == NULL)
928 {
929 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
930 continue;
931 }
933 assert(ci->values != NULL);
934 assert(ci->values_num > 0);
936 values = ci->values;
937 values_num = ci->values_num;
939 wipe_ci_values(ci, time(NULL));
940 remove_from_queue(ci);
942 pthread_mutex_unlock (&cache_lock);
944 rrd_clear_error ();
945 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
946 if (status != 0)
947 {
948 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
949 "rrd_update_r (%s) failed with status %i. (%s)",
950 file, status, rrd_get_error());
951 }
953 journal_write("wrote", file);
955 /* Search again in the tree. It's possible someone issued a "FORGET"
956 * while we were writing the update values. */
957 pthread_mutex_lock(&cache_lock);
958 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
959 if (ci)
960 pthread_cond_broadcast(&ci->flushed);
961 pthread_mutex_unlock(&cache_lock);
963 if (status == 0)
964 {
965 pthread_mutex_lock (&stats_lock);
966 stats_updates_written++;
967 stats_data_sets_written += values_num;
968 pthread_mutex_unlock (&stats_lock);
969 }
971 rrd_free_ptrs((void ***) &values, &values_num);
972 free(file);
974 pthread_mutex_lock (&cache_lock);
975 }
976 pthread_mutex_unlock (&cache_lock);
978 return (NULL);
979 } /* }}} void *queue_thread_main */
981 static int buffer_get_field (char **buffer_ret, /* {{{ */
982 size_t *buffer_size_ret, char **field_ret)
983 {
984 char *buffer;
985 size_t buffer_pos;
986 size_t buffer_size;
987 char *field;
988 size_t field_size;
989 int status;
991 buffer = *buffer_ret;
992 buffer_pos = 0;
993 buffer_size = *buffer_size_ret;
994 field = *buffer_ret;
995 field_size = 0;
997 if (buffer_size <= 0)
998 return (-1);
1000 /* This is ensured by `handle_request'. */
1001 assert (buffer[buffer_size - 1] == '\0');
1003 status = -1;
1004 while (buffer_pos < buffer_size)
1005 {
1006 /* Check for end-of-field or end-of-buffer */
1007 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1008 {
1009 field[field_size] = 0;
1010 field_size++;
1011 buffer_pos++;
1012 status = 0;
1013 break;
1014 }
1015 /* Handle escaped characters. */
1016 else if (buffer[buffer_pos] == '\\')
1017 {
1018 if (buffer_pos >= (buffer_size - 1))
1019 break;
1020 buffer_pos++;
1021 field[field_size] = buffer[buffer_pos];
1022 field_size++;
1023 buffer_pos++;
1024 }
1025 /* Normal operation */
1026 else
1027 {
1028 field[field_size] = buffer[buffer_pos];
1029 field_size++;
1030 buffer_pos++;
1031 }
1032 } /* while (buffer_pos < buffer_size) */
1034 if (status != 0)
1035 return (status);
1037 *buffer_ret = buffer + buffer_pos;
1038 *buffer_size_ret = buffer_size - buffer_pos;
1039 *field_ret = field;
1041 return (0);
1042 } /* }}} int buffer_get_field */
1044 /* if we're restricting writes to the base directory,
1045 * check whether the file falls within the dir
1046 * returns 1 if OK, otherwise 0
1047 */
1048 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1049 {
1050 assert(file != NULL);
1052 if (!config_write_base_only
1053 || JOURNAL_REPLAY(sock)
1054 || config_base_dir == NULL)
1055 return 1;
1057 if (strstr(file, "../") != NULL) goto err;
1059 /* relative paths without "../" are ok */
1060 if (*file != '/') return 1;
1062 /* file must be of the format base + "/" + <1+ char filename> */
1063 if (strlen(file) < _config_base_dir_len + 2) goto err;
1064 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1065 if (*(file + _config_base_dir_len) != '/') goto err;
1067 return 1;
1069 err:
1070 if (sock != NULL && sock->fd >= 0)
1071 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1073 return 0;
1074 } /* }}} static int check_file_access */
1076 /* when using a base dir, convert relative paths to absolute paths.
1077 * if necessary, modifies the "filename" pointer to point
1078 * to the new path created in "tmp". "tmp" is provided
1079 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1080 *
1081 * this allows us to optimize for the expected case (absolute path)
1082 * with a no-op.
1083 */
1084 static void get_abs_path(char **filename, char *tmp)
1085 {
1086 assert(tmp != NULL);
1087 assert(filename != NULL && *filename != NULL);
1089 if (config_base_dir == NULL || **filename == '/')
1090 return;
1092 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1093 *filename = tmp;
1094 } /* }}} static int get_abs_path */
1096 static int flush_file (const char *filename) /* {{{ */
1097 {
1098 cache_item_t *ci;
1100 pthread_mutex_lock (&cache_lock);
1102 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1103 if (ci == NULL)
1104 {
1105 pthread_mutex_unlock (&cache_lock);
1106 return (ENOENT);
1107 }
1109 if (ci->values_num > 0)
1110 {
1111 /* Enqueue at head */
1112 enqueue_cache_item (ci, HEAD);
1113 pthread_cond_wait(&ci->flushed, &cache_lock);
1114 }
1116 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1117 * may have been purged during our cond_wait() */
1119 pthread_mutex_unlock(&cache_lock);
1121 return (0);
1122 } /* }}} int flush_file */
1124 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1125 {
1126 char *err = "Syntax error.\n";
1128 if (cmd && cmd->syntax)
1129 err = cmd->syntax;
1131 return send_response(sock, RESP_ERR, "Usage: %s", err);
1132 } /* }}} static int syntax_error() */
1134 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1135 {
1136 uint64_t copy_queue_length;
1137 uint64_t copy_updates_received;
1138 uint64_t copy_flush_received;
1139 uint64_t copy_updates_written;
1140 uint64_t copy_data_sets_written;
1141 uint64_t copy_journal_bytes;
1142 uint64_t copy_journal_rotate;
1144 uint64_t tree_nodes_number;
1145 uint64_t tree_depth;
1147 pthread_mutex_lock (&stats_lock);
1148 copy_queue_length = stats_queue_length;
1149 copy_updates_received = stats_updates_received;
1150 copy_flush_received = stats_flush_received;
1151 copy_updates_written = stats_updates_written;
1152 copy_data_sets_written = stats_data_sets_written;
1153 copy_journal_bytes = stats_journal_bytes;
1154 copy_journal_rotate = stats_journal_rotate;
1155 pthread_mutex_unlock (&stats_lock);
1157 pthread_mutex_lock (&cache_lock);
1158 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1159 tree_depth = (uint64_t) g_tree_height (cache_tree);
1160 pthread_mutex_unlock (&cache_lock);
1162 add_response_info(sock,
1163 "QueueLength: %"PRIu64"\n", copy_queue_length);
1164 add_response_info(sock,
1165 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1166 add_response_info(sock,
1167 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1168 add_response_info(sock,
1169 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1170 add_response_info(sock,
1171 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1172 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1173 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1174 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1175 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1177 send_response(sock, RESP_OK, "Statistics follow\n");
1179 return (0);
1180 } /* }}} int handle_request_stats */
1182 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1183 {
1184 char *file, file_tmp[PATH_MAX];
1185 int status;
1187 status = buffer_get_field (&buffer, &buffer_size, &file);
1188 if (status != 0)
1189 {
1190 return syntax_error(sock,cmd);
1191 }
1192 else
1193 {
1194 pthread_mutex_lock(&stats_lock);
1195 stats_flush_received++;
1196 pthread_mutex_unlock(&stats_lock);
1198 get_abs_path(&file, file_tmp);
1199 if (!check_file_access(file, sock)) return 0;
1201 status = flush_file (file);
1202 if (status == 0)
1203 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1204 else if (status == ENOENT)
1205 {
1206 /* no file in our tree; see whether it exists at all */
1207 struct stat statbuf;
1209 memset(&statbuf, 0, sizeof(statbuf));
1210 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1211 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1212 else
1213 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1214 }
1215 else if (status < 0)
1216 return send_response(sock, RESP_ERR, "Internal error.\n");
1217 else
1218 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1219 }
1221 /* NOTREACHED */
1222 assert(1==0);
1223 } /* }}} int handle_request_flush */
1225 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1226 {
1227 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1229 pthread_mutex_lock(&cache_lock);
1230 flush_old_values(-1);
1231 pthread_mutex_unlock(&cache_lock);
1233 return send_response(sock, RESP_OK, "Started flush.\n");
1234 } /* }}} static int handle_request_flushall */
1236 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1237 {
1238 int status;
1239 char *file, file_tmp[PATH_MAX];
1240 cache_item_t *ci;
1242 status = buffer_get_field(&buffer, &buffer_size, &file);
1243 if (status != 0)
1244 return syntax_error(sock,cmd);
1246 get_abs_path(&file, file_tmp);
1248 pthread_mutex_lock(&cache_lock);
1249 ci = g_tree_lookup(cache_tree, file);
1250 if (ci == NULL)
1251 {
1252 pthread_mutex_unlock(&cache_lock);
1253 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1254 }
1256 for (size_t i=0; i < ci->values_num; i++)
1257 add_response_info(sock, "%s\n", ci->values[i]);
1259 pthread_mutex_unlock(&cache_lock);
1260 return send_response(sock, RESP_OK, "updates pending\n");
1261 } /* }}} static int handle_request_pending */
1263 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1264 {
1265 int status;
1266 gboolean found;
1267 char *file, file_tmp[PATH_MAX];
1269 status = buffer_get_field(&buffer, &buffer_size, &file);
1270 if (status != 0)
1271 return syntax_error(sock,cmd);
1273 get_abs_path(&file, file_tmp);
1274 if (!check_file_access(file, sock)) return 0;
1276 pthread_mutex_lock(&cache_lock);
1277 found = g_tree_remove(cache_tree, file);
1278 pthread_mutex_unlock(&cache_lock);
1280 if (found == TRUE)
1281 {
1282 if (!JOURNAL_REPLAY(sock))
1283 journal_write("forget", file);
1285 return send_response(sock, RESP_OK, "Gone!\n");
1286 }
1287 else
1288 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1290 /* NOTREACHED */
1291 assert(1==0);
1292 } /* }}} static int handle_request_forget */
1294 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1295 {
1296 cache_item_t *ci;
1298 pthread_mutex_lock(&cache_lock);
1300 ci = cache_queue_head;
1301 while (ci != NULL)
1302 {
1303 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1304 ci = ci->next;
1305 }
1307 pthread_mutex_unlock(&cache_lock);
1309 return send_response(sock, RESP_OK, "in queue.\n");
1310 } /* }}} int handle_request_queue */
1312 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1313 {
1314 char *file, file_tmp[PATH_MAX];
1315 int values_num = 0;
1316 int status;
1317 char orig_buf[CMD_MAX];
1319 cache_item_t *ci;
1321 /* save it for the journal later */
1322 if (!JOURNAL_REPLAY(sock))
1323 strncpy(orig_buf, buffer, min(CMD_MAX,buffer_size));
1325 status = buffer_get_field (&buffer, &buffer_size, &file);
1326 if (status != 0)
1327 return syntax_error(sock,cmd);
1329 pthread_mutex_lock(&stats_lock);
1330 stats_updates_received++;
1331 pthread_mutex_unlock(&stats_lock);
1333 get_abs_path(&file, file_tmp);
1334 if (!check_file_access(file, sock)) return 0;
1336 pthread_mutex_lock (&cache_lock);
1337 ci = g_tree_lookup (cache_tree, file);
1339 if (ci == NULL) /* {{{ */
1340 {
1341 struct stat statbuf;
1342 cache_item_t *tmp;
1344 /* don't hold the lock while we setup; stat(2) might block */
1345 pthread_mutex_unlock(&cache_lock);
1347 memset (&statbuf, 0, sizeof (statbuf));
1348 status = stat (file, &statbuf);
1349 if (status != 0)
1350 {
1351 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1353 status = errno;
1354 if (status == ENOENT)
1355 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1356 else
1357 return send_response(sock, RESP_ERR,
1358 "stat failed with error %i.\n", status);
1359 }
1360 if (!S_ISREG (statbuf.st_mode))
1361 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1363 if (access(file, R_OK|W_OK) != 0)
1364 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1365 file, rrd_strerror(errno));
1367 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1368 if (ci == NULL)
1369 {
1370 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1372 return send_response(sock, RESP_ERR, "malloc failed.\n");
1373 }
1374 memset (ci, 0, sizeof (cache_item_t));
1376 ci->file = strdup (file);
1377 if (ci->file == NULL)
1378 {
1379 free (ci);
1380 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1382 return send_response(sock, RESP_ERR, "strdup failed.\n");
1383 }
1385 wipe_ci_values(ci, now);
1386 ci->flags = CI_FLAGS_IN_TREE;
1387 pthread_cond_init(&ci->flushed, NULL);
1389 pthread_mutex_lock(&cache_lock);
1391 /* another UPDATE might have added this entry in the meantime */
1392 tmp = g_tree_lookup (cache_tree, file);
1393 if (tmp == NULL)
1394 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1395 else
1396 {
1397 free_cache_item (ci);
1398 ci = tmp;
1399 }
1401 /* state may have changed while we were unlocked */
1402 if (state == SHUTDOWN)
1403 return -1;
1404 } /* }}} */
1405 assert (ci != NULL);
1407 /* don't re-write updates in replay mode */
1408 if (!JOURNAL_REPLAY(sock))
1409 journal_write("update", orig_buf);
1411 while (buffer_size > 0)
1412 {
1413 char *value;
1414 double stamp;
1415 char *eostamp;
1417 status = buffer_get_field (&buffer, &buffer_size, &value);
1418 if (status != 0)
1419 {
1420 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1421 break;
1422 }
1424 /* make sure update time is always moving forward. We use double here since
1425 update does support subsecond precision for timestamps ... */
1426 stamp = strtod(value, &eostamp);
1427 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1428 {
1429 pthread_mutex_unlock(&cache_lock);
1430 return send_response(sock, RESP_ERR,
1431 "Cannot find timestamp in '%s'!\n", value);
1432 }
1433 else if (stamp <= ci->last_update_stamp)
1434 {
1435 pthread_mutex_unlock(&cache_lock);
1436 return send_response(sock, RESP_ERR,
1437 "illegal attempt to update using time %lf when last"
1438 " update time is %lf (minimum one second step)\n",
1439 stamp, ci->last_update_stamp);
1440 }
1441 else
1442 ci->last_update_stamp = stamp;
1444 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1445 {
1446 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1447 continue;
1448 }
1450 values_num++;
1451 }
1453 if (((now - ci->last_flush_time) >= config_write_interval)
1454 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1455 && (ci->values_num > 0))
1456 {
1457 enqueue_cache_item (ci, TAIL);
1458 }
1460 pthread_mutex_unlock (&cache_lock);
1462 if (values_num < 1)
1463 return send_response(sock, RESP_ERR, "No values updated.\n");
1464 else
1465 return send_response(sock, RESP_OK,
1466 "errors, enqueued %i value(s).\n", values_num);
1468 /* NOTREACHED */
1469 assert(1==0);
1471 } /* }}} int handle_request_update */
1473 /* we came across a "WROTE" entry during journal replay.
1474 * throw away any values that we have accumulated for this file
1475 */
1476 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1477 {
1478 cache_item_t *ci;
1479 const char *file = buffer;
1481 pthread_mutex_lock(&cache_lock);
1483 ci = g_tree_lookup(cache_tree, file);
1484 if (ci == NULL)
1485 {
1486 pthread_mutex_unlock(&cache_lock);
1487 return (0);
1488 }
1490 if (ci->values)
1491 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1493 wipe_ci_values(ci, now);
1494 remove_from_queue(ci);
1496 pthread_mutex_unlock(&cache_lock);
1497 return (0);
1498 } /* }}} int handle_request_wrote */
1500 /* start "BATCH" processing */
1501 static int batch_start (HANDLER_PROTO) /* {{{ */
1502 {
1503 int status;
1504 if (sock->batch_start)
1505 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1507 status = send_response(sock, RESP_OK,
1508 "Go ahead. End with dot '.' on its own line.\n");
1509 sock->batch_start = time(NULL);
1510 sock->batch_cmd = 0;
1512 return status;
1513 } /* }}} static int batch_start */
1515 /* finish "BATCH" processing and return results to the client */
1516 static int batch_done (HANDLER_PROTO) /* {{{ */
1517 {
1518 assert(sock->batch_start);
1519 sock->batch_start = 0;
1520 sock->batch_cmd = 0;
1521 return send_response(sock, RESP_OK, "errors\n");
1522 } /* }}} static int batch_done */
1524 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1525 {
1526 return -1;
1527 } /* }}} static int handle_request_quit */
1529 static command_t list_of_commands[] = { /* {{{ */
1530 {
1531 "UPDATE",
1532 handle_request_update,
1533 CMD_CONTEXT_ANY,
1534 "UPDATE <filename> <values> [<values> ...]\n"
1535 ,
1536 "Adds the given file to the internal cache if it is not yet known and\n"
1537 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1538 "for details.\n"
1539 "\n"
1540 "Each <values> has the following form:\n"
1541 " <values> = <time>:<value>[:<value>[...]]\n"
1542 "See the rrdupdate(1) manpage for details.\n"
1543 },
1544 {
1545 "WROTE",
1546 handle_request_wrote,
1547 CMD_CONTEXT_JOURNAL,
1548 NULL,
1549 NULL
1550 },
1551 {
1552 "FLUSH",
1553 handle_request_flush,
1554 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1555 "FLUSH <filename>\n"
1556 ,
1557 "Adds the given filename to the head of the update queue and returns\n"
1558 "after it has been dequeued.\n"
1559 },
1560 {
1561 "FLUSHALL",
1562 handle_request_flushall,
1563 CMD_CONTEXT_CLIENT,
1564 "FLUSHALL\n"
1565 ,
1566 "Triggers writing of all pending updates. Returns immediately.\n"
1567 },
1568 {
1569 "PENDING",
1570 handle_request_pending,
1571 CMD_CONTEXT_CLIENT,
1572 "PENDING <filename>\n"
1573 ,
1574 "Shows any 'pending' updates for a file, in order.\n"
1575 "The updates shown have not yet been written to the underlying RRD file.\n"
1576 },
1577 {
1578 "FORGET",
1579 handle_request_forget,
1580 CMD_CONTEXT_ANY,
1581 "FORGET <filename>\n"
1582 ,
1583 "Removes the file completely from the cache.\n"
1584 "Any pending updates for the file will be lost.\n"
1585 },
1586 {
1587 "QUEUE",
1588 handle_request_queue,
1589 CMD_CONTEXT_CLIENT,
1590 "QUEUE\n"
1591 ,
1592 "Shows all files in the output queue.\n"
1593 "The output is zero or more lines in the following format:\n"
1594 "(where <num_vals> is the number of values to be written)\n"
1595 "\n"
1596 "<num_vals> <filename>\n"
1597 },
1598 {
1599 "STATS",
1600 handle_request_stats,
1601 CMD_CONTEXT_CLIENT,
1602 "STATS\n"
1603 ,
1604 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1605 "a description of the values.\n"
1606 },
1607 {
1608 "HELP",
1609 handle_request_help,
1610 CMD_CONTEXT_CLIENT,
1611 "HELP [<command>]\n",
1612 NULL, /* special! */
1613 },
1614 {
1615 "BATCH",
1616 batch_start,
1617 CMD_CONTEXT_CLIENT,
1618 "BATCH\n"
1619 ,
1620 "The 'BATCH' command permits the client to initiate a bulk load\n"
1621 " of commands to rrdcached.\n"
1622 "\n"
1623 "Usage:\n"
1624 "\n"
1625 " client: BATCH\n"
1626 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1627 " client: command #1\n"
1628 " client: command #2\n"
1629 " client: ... and so on\n"
1630 " client: .\n"
1631 " server: 2 errors\n"
1632 " server: 7 message for command #7\n"
1633 " server: 9 message for command #9\n"
1634 "\n"
1635 "For more information, consult the rrdcached(1) documentation.\n"
1636 },
1637 {
1638 ".", /* BATCH terminator */
1639 batch_done,
1640 CMD_CONTEXT_BATCH,
1641 NULL,
1642 NULL
1643 },
1644 {
1645 "QUIT",
1646 handle_request_quit,
1647 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1648 "QUIT\n"
1649 ,
1650 "Disconnect from rrdcached.\n"
1651 }
1652 }; /* }}} command_t list_of_commands[] */
1653 static size_t list_of_commands_len = sizeof (list_of_commands)
1654 / sizeof (list_of_commands[0]);
1656 static command_t *find_command(char *cmd)
1657 {
1658 size_t i;
1660 for (i = 0; i < list_of_commands_len; i++)
1661 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1662 return (&list_of_commands[i]);
1663 return NULL;
1664 }
1666 /* We currently use the index in the `list_of_commands' array as a bit position
1667 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1668 * outside these functions so that switching to a more elegant storage method
1669 * is easily possible. */
1670 static ssize_t find_command_index (const char *cmd) /* {{{ */
1671 {
1672 size_t i;
1674 for (i = 0; i < list_of_commands_len; i++)
1675 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1676 return ((ssize_t) i);
1677 return (-1);
1678 } /* }}} ssize_t find_command_index */
1680 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1681 const char *cmd)
1682 {
1683 ssize_t i;
1685 if (JOURNAL_REPLAY(sock))
1686 return (1);
1688 if (cmd == NULL)
1689 return (-1);
1691 if ((strcasecmp ("QUIT", cmd) == 0)
1692 || (strcasecmp ("HELP", cmd) == 0))
1693 return (1);
1694 else if (strcmp (".", cmd) == 0)
1695 cmd = "BATCH";
1697 i = find_command_index (cmd);
1698 if (i < 0)
1699 return (-1);
1700 assert (i < 32);
1702 if ((sock->permissions & (1 << i)) != 0)
1703 return (1);
1704 return (0);
1705 } /* }}} int socket_permission_check */
1707 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1708 const char *cmd)
1709 {
1710 ssize_t i;
1712 i = find_command_index (cmd);
1713 if (i < 0)
1714 return (-1);
1715 assert (i < 32);
1717 sock->permissions |= (1 << i);
1718 return (0);
1719 } /* }}} int socket_permission_add */
1721 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1722 {
1723 sock->permissions = 0;
1724 } /* }}} socket_permission_clear */
1726 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1727 listen_socket_t *src)
1728 {
1729 dest->permissions = src->permissions;
1730 } /* }}} socket_permission_copy */
1732 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
1733 {
1734 size_t i;
1736 sock->permissions = 0;
1737 for (i = 0; i < list_of_commands_len; i++)
1738 sock->permissions |= (1 << i);
1739 } /* }}} void socket_permission_set_all */
1741 /* check whether commands are received in the expected context */
1742 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1743 {
1744 if (JOURNAL_REPLAY(sock))
1745 return (cmd->context & CMD_CONTEXT_JOURNAL);
1746 else if (sock->batch_start)
1747 return (cmd->context & CMD_CONTEXT_BATCH);
1748 else
1749 return (cmd->context & CMD_CONTEXT_CLIENT);
1751 /* NOTREACHED */
1752 assert(1==0);
1753 }
1755 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1756 {
1757 int status;
1758 char *cmd_str;
1759 char *resp_txt;
1760 command_t *help = NULL;
1762 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1763 if (status == 0)
1764 help = find_command(cmd_str);
1766 if (help && (help->syntax || help->help))
1767 {
1768 char tmp[CMD_MAX];
1770 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1771 resp_txt = tmp;
1773 if (help->syntax)
1774 add_response_info(sock, "Usage: %s\n", help->syntax);
1776 if (help->help)
1777 add_response_info(sock, "%s\n", help->help);
1778 }
1779 else
1780 {
1781 size_t i;
1783 resp_txt = "Command overview\n";
1785 for (i = 0; i < list_of_commands_len; i++)
1786 {
1787 if (list_of_commands[i].syntax == NULL)
1788 continue;
1789 add_response_info (sock, "%s", list_of_commands[i].syntax);
1790 }
1791 }
1793 return send_response(sock, RESP_OK, resp_txt);
1794 } /* }}} int handle_request_help */
1796 static int handle_request (DISPATCH_PROTO) /* {{{ */
1797 {
1798 char *buffer_ptr = buffer;
1799 char *cmd_str = NULL;
1800 command_t *cmd = NULL;
1801 int status;
1803 assert (buffer[buffer_size - 1] == '\0');
1805 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1806 if (status != 0)
1807 {
1808 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1809 return (-1);
1810 }
1812 if (sock != NULL && sock->batch_start)
1813 sock->batch_cmd++;
1815 cmd = find_command(cmd_str);
1816 if (!cmd)
1817 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1819 if (!socket_permission_check (sock, cmd->cmd))
1820 return send_response(sock, RESP_ERR, "Permission denied.\n");
1822 if (!command_check_context(sock, cmd))
1823 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1825 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1826 } /* }}} int handle_request */
1828 static void journal_set_free (journal_set *js) /* {{{ */
1829 {
1830 if (js == NULL)
1831 return;
1833 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1835 free(js);
1836 } /* }}} journal_set_free */
1838 static void journal_set_remove (journal_set *js) /* {{{ */
1839 {
1840 if (js == NULL)
1841 return;
1843 for (uint i=0; i < js->files_num; i++)
1844 {
1845 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1846 unlink(js->files[i]);
1847 }
1848 } /* }}} journal_set_remove */
1850 /* close current journal file handle.
1851 * MUST hold journal_lock before calling */
1852 static void journal_close(void) /* {{{ */
1853 {
1854 if (journal_fh != NULL)
1855 {
1856 if (fclose(journal_fh) != 0)
1857 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1858 }
1860 journal_fh = NULL;
1861 journal_size = 0;
1862 } /* }}} journal_close */
1864 /* MUST hold journal_lock before calling */
1865 static void journal_new_file(void) /* {{{ */
1866 {
1867 struct timeval now;
1868 int new_fd;
1869 char new_file[PATH_MAX + 1];
1871 assert(journal_dir != NULL);
1872 assert(journal_cur != NULL);
1874 journal_close();
1876 gettimeofday(&now, NULL);
1877 /* this format assures that the files sort in strcmp() order */
1878 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1879 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1881 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1882 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1883 if (new_fd < 0)
1884 goto error;
1886 journal_fh = fdopen(new_fd, "a");
1887 if (journal_fh == NULL)
1888 goto error;
1890 journal_size = ftell(journal_fh);
1891 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1893 /* record the file in the journal set */
1894 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1896 return;
1898 error:
1899 RRDD_LOG(LOG_CRIT,
1900 "JOURNALING DISABLED: Error while trying to create %s : %s",
1901 new_file, rrd_strerror(errno));
1902 RRDD_LOG(LOG_CRIT,
1903 "JOURNALING DISABLED: All values will be flushed at shutdown");
1905 close(new_fd);
1906 config_flush_at_shutdown = 1;
1908 } /* }}} journal_new_file */
1910 /* MUST NOT hold journal_lock before calling this */
1911 static void journal_rotate(void) /* {{{ */
1912 {
1913 journal_set *old_js = NULL;
1915 if (journal_dir == NULL)
1916 return;
1918 RRDD_LOG(LOG_DEBUG, "rotating journals");
1920 pthread_mutex_lock(&stats_lock);
1921 ++stats_journal_rotate;
1922 pthread_mutex_unlock(&stats_lock);
1924 pthread_mutex_lock(&journal_lock);
1926 journal_close();
1928 /* rotate the journal sets */
1929 old_js = journal_old;
1930 journal_old = journal_cur;
1931 journal_cur = calloc(1, sizeof(journal_set));
1933 if (journal_cur != NULL)
1934 journal_new_file();
1935 else
1936 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1938 pthread_mutex_unlock(&journal_lock);
1940 journal_set_remove(old_js);
1941 journal_set_free (old_js);
1943 } /* }}} static void journal_rotate */
1945 /* MUST hold journal_lock when calling */
1946 static void journal_done(void) /* {{{ */
1947 {
1948 if (journal_cur == NULL)
1949 return;
1951 journal_close();
1953 if (config_flush_at_shutdown)
1954 {
1955 RRDD_LOG(LOG_INFO, "removing journals");
1956 journal_set_remove(journal_old);
1957 journal_set_remove(journal_cur);
1958 }
1959 else
1960 {
1961 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1962 "journals will be used at next startup");
1963 }
1965 journal_set_free(journal_cur);
1966 journal_set_free(journal_old);
1967 free(journal_dir);
1969 } /* }}} static void journal_done */
1971 static int journal_write(char *cmd, char *args) /* {{{ */
1972 {
1973 int chars;
1975 if (journal_fh == NULL)
1976 return 0;
1978 pthread_mutex_lock(&journal_lock);
1979 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1980 journal_size += chars;
1982 if (journal_size > JOURNAL_MAX)
1983 journal_new_file();
1985 pthread_mutex_unlock(&journal_lock);
1987 if (chars > 0)
1988 {
1989 pthread_mutex_lock(&stats_lock);
1990 stats_journal_bytes += chars;
1991 pthread_mutex_unlock(&stats_lock);
1992 }
1994 return chars;
1995 } /* }}} static int journal_write */
1997 static int journal_replay (const char *file) /* {{{ */
1998 {
1999 FILE *fh;
2000 int entry_cnt = 0;
2001 int fail_cnt = 0;
2002 uint64_t line = 0;
2003 char entry[CMD_MAX];
2004 time_t now;
2006 if (file == NULL) return 0;
2008 {
2009 char *reason = "unknown error";
2010 int status = 0;
2011 struct stat statbuf;
2013 memset(&statbuf, 0, sizeof(statbuf));
2014 if (stat(file, &statbuf) != 0)
2015 {
2016 reason = "stat error";
2017 status = errno;
2018 }
2019 else if (!S_ISREG(statbuf.st_mode))
2020 {
2021 reason = "not a regular file";
2022 status = EPERM;
2023 }
2024 if (statbuf.st_uid != daemon_uid)
2025 {
2026 reason = "not owned by daemon user";
2027 status = EACCES;
2028 }
2029 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2030 {
2031 reason = "must not be user/group writable";
2032 status = EACCES;
2033 }
2035 if (status != 0)
2036 {
2037 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2038 file, rrd_strerror(status), reason);
2039 return 0;
2040 }
2041 }
2043 fh = fopen(file, "r");
2044 if (fh == NULL)
2045 {
2046 if (errno != ENOENT)
2047 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2048 file, rrd_strerror(errno));
2049 return 0;
2050 }
2051 else
2052 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2054 now = time(NULL);
2056 while(!feof(fh))
2057 {
2058 size_t entry_len;
2060 ++line;
2061 if (fgets(entry, sizeof(entry), fh) == NULL)
2062 break;
2063 entry_len = strlen(entry);
2065 /* check \n termination in case journal writing crashed mid-line */
2066 if (entry_len == 0)
2067 continue;
2068 else if (entry[entry_len - 1] != '\n')
2069 {
2070 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2071 ++fail_cnt;
2072 continue;
2073 }
2075 entry[entry_len - 1] = '\0';
2077 if (handle_request(NULL, now, entry, entry_len) == 0)
2078 ++entry_cnt;
2079 else
2080 ++fail_cnt;
2081 }
2083 fclose(fh);
2085 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2086 entry_cnt, fail_cnt);
2088 return entry_cnt > 0 ? 1 : 0;
2089 } /* }}} static int journal_replay */
2091 static int journal_sort(const void *v1, const void *v2)
2092 {
2093 char **jn1 = (char **) v1;
2094 char **jn2 = (char **) v2;
2096 return strcmp(*jn1,*jn2);
2097 }
2099 static void journal_init(void) /* {{{ */
2100 {
2101 int had_journal = 0;
2102 DIR *dir;
2103 struct dirent *dent;
2104 char path[PATH_MAX+1];
2106 if (journal_dir == NULL) return;
2108 pthread_mutex_lock(&journal_lock);
2110 journal_cur = calloc(1, sizeof(journal_set));
2111 if (journal_cur == NULL)
2112 {
2113 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2114 return;
2115 }
2117 RRDD_LOG(LOG_INFO, "checking for journal files");
2119 /* Handle old journal files during transition. This gives them the
2120 * correct sort order. TODO: remove after first release
2121 */
2122 {
2123 char old_path[PATH_MAX+1];
2124 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2125 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2126 rename(old_path, path);
2128 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2129 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2130 rename(old_path, path);
2131 }
2133 dir = opendir(journal_dir);
2134 if (!dir) {
2135 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2136 return;
2137 }
2138 while ((dent = readdir(dir)) != NULL)
2139 {
2140 /* looks like a journal file? */
2141 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2142 continue;
2144 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2146 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2147 {
2148 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2149 dent->d_name);
2150 break;
2151 }
2152 }
2153 closedir(dir);
2155 qsort(journal_cur->files, journal_cur->files_num,
2156 sizeof(journal_cur->files[0]), journal_sort);
2158 for (uint i=0; i < journal_cur->files_num; i++)
2159 had_journal += journal_replay(journal_cur->files[i]);
2161 journal_new_file();
2163 /* it must have been a crash. start a flush */
2164 if (had_journal && config_flush_at_shutdown)
2165 flush_old_values(-1);
2167 pthread_mutex_unlock(&journal_lock);
2169 RRDD_LOG(LOG_INFO, "journal processing complete");
2171 } /* }}} static void journal_init */
2173 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2174 {
2175 assert(sock != NULL);
2177 free(sock->rbuf); sock->rbuf = NULL;
2178 free(sock->wbuf); sock->wbuf = NULL;
2179 free(sock);
2180 } /* }}} void free_listen_socket */
2182 static void close_connection(listen_socket_t *sock) /* {{{ */
2183 {
2184 if (sock->fd >= 0)
2185 {
2186 close(sock->fd);
2187 sock->fd = -1;
2188 }
2190 free_listen_socket(sock);
2192 } /* }}} void close_connection */
2194 static void *connection_thread_main (void *args) /* {{{ */
2195 {
2196 listen_socket_t *sock;
2197 int fd;
2199 sock = (listen_socket_t *) args;
2200 fd = sock->fd;
2202 /* init read buffers */
2203 sock->next_read = sock->next_cmd = 0;
2204 sock->rbuf = malloc(RBUF_SIZE);
2205 if (sock->rbuf == NULL)
2206 {
2207 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2208 close_connection(sock);
2209 return NULL;
2210 }
2212 pthread_mutex_lock (&connection_threads_lock);
2213 #ifdef HAVE_LIBWRAP
2214 /* LIBWRAP does not support multiple threads! By putting this code
2215 inside pthread_mutex_lock we do not have to worry about request_info
2216 getting overwritten by another thread.
2217 */
2218 struct request_info req;
2219 request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2220 fromhost(&req);
2221 if(!hosts_access(&req)) {
2222 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2223 pthread_mutex_unlock (&connection_threads_lock);
2224 close_connection(sock);
2225 return NULL;
2226 }
2227 #endif /* HAVE_LIBWRAP */
2228 connection_threads_num++;
2229 pthread_mutex_unlock (&connection_threads_lock);
2231 while (state == RUNNING)
2232 {
2233 char *cmd;
2234 ssize_t cmd_len;
2235 ssize_t rbytes;
2236 time_t now;
2238 struct pollfd pollfd;
2239 int status;
2241 pollfd.fd = fd;
2242 pollfd.events = POLLIN | POLLPRI;
2243 pollfd.revents = 0;
2245 status = poll (&pollfd, 1, /* timeout = */ 500);
2246 if (state != RUNNING)
2247 break;
2248 else if (status == 0) /* timeout */
2249 continue;
2250 else if (status < 0) /* error */
2251 {
2252 status = errno;
2253 if (status != EINTR)
2254 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2255 continue;
2256 }
2258 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2259 break;
2260 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2261 {
2262 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2263 "poll(2) returned something unexpected: %#04hx",
2264 pollfd.revents);
2265 break;
2266 }
2268 rbytes = read(fd, sock->rbuf + sock->next_read,
2269 RBUF_SIZE - sock->next_read);
2270 if (rbytes < 0)
2271 {
2272 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2273 break;
2274 }
2275 else if (rbytes == 0)
2276 break; /* eof */
2278 sock->next_read += rbytes;
2280 if (sock->batch_start)
2281 now = sock->batch_start;
2282 else
2283 now = time(NULL);
2285 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2286 {
2287 status = handle_request (sock, now, cmd, cmd_len+1);
2288 if (status != 0)
2289 goto out_close;
2290 }
2291 }
2293 out_close:
2294 close_connection(sock);
2296 /* Remove this thread from the connection threads list */
2297 pthread_mutex_lock (&connection_threads_lock);
2298 connection_threads_num--;
2299 if (connection_threads_num <= 0)
2300 pthread_cond_broadcast(&connection_threads_done);
2301 pthread_mutex_unlock (&connection_threads_lock);
2303 return (NULL);
2304 } /* }}} void *connection_thread_main */
2306 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2307 {
2308 int fd;
2309 struct sockaddr_un sa;
2310 listen_socket_t *temp;
2311 int status;
2312 const char *path;
2313 char *path_copy, *dir;
2315 path = sock->addr;
2316 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2317 path += strlen("unix:");
2319 /* dirname may modify its argument */
2320 path_copy = strdup(path);
2321 if (path_copy == NULL)
2322 {
2323 fprintf(stderr, "rrdcached: strdup(): %s\n",
2324 rrd_strerror(errno));
2325 return (-1);
2326 }
2328 dir = dirname(path_copy);
2329 if (rrd_mkdir_p(dir, 0777) != 0)
2330 {
2331 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2332 dir, rrd_strerror(errno));
2333 return (-1);
2334 }
2336 free(path_copy);
2338 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2339 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2340 if (temp == NULL)
2341 {
2342 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2343 return (-1);
2344 }
2345 listen_fds = temp;
2346 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2348 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2349 if (fd < 0)
2350 {
2351 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2352 rrd_strerror(errno));
2353 return (-1);
2354 }
2356 memset (&sa, 0, sizeof (sa));
2357 sa.sun_family = AF_UNIX;
2358 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2360 /* if we've gotten this far, we own the pid file. any daemon started
2361 * with the same args must not be alive. therefore, ensure that we can
2362 * create the socket...
2363 */
2364 unlink(path);
2366 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2367 if (status != 0)
2368 {
2369 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2370 path, rrd_strerror(errno));
2371 close (fd);
2372 return (-1);
2373 }
2375 /* tweak the sockets group ownership */
2376 if (sock->socket_group != (gid_t)-1)
2377 {
2378 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2379 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2380 {
2381 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2382 }
2383 }
2385 if (sock->socket_permissions != (mode_t)-1)
2386 {
2387 if (chmod(path, sock->socket_permissions) != 0)
2388 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2389 (unsigned int)sock->socket_permissions, strerror(errno));
2390 }
2392 status = listen (fd, /* backlog = */ 10);
2393 if (status != 0)
2394 {
2395 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2396 path, rrd_strerror(errno));
2397 close (fd);
2398 unlink (path);
2399 return (-1);
2400 }
2402 listen_fds[listen_fds_num].fd = fd;
2403 listen_fds[listen_fds_num].family = PF_UNIX;
2404 strncpy(listen_fds[listen_fds_num].addr, path,
2405 sizeof (listen_fds[listen_fds_num].addr) - 1);
2406 listen_fds_num++;
2408 return (0);
2409 } /* }}} int open_listen_socket_unix */
2411 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2412 {
2413 struct addrinfo ai_hints;
2414 struct addrinfo *ai_res;
2415 struct addrinfo *ai_ptr;
2416 char addr_copy[NI_MAXHOST];
2417 char *addr;
2418 char *port;
2419 int status;
2421 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2422 addr_copy[sizeof (addr_copy) - 1] = 0;
2423 addr = addr_copy;
2425 memset (&ai_hints, 0, sizeof (ai_hints));
2426 ai_hints.ai_flags = 0;
2427 #ifdef AI_ADDRCONFIG
2428 ai_hints.ai_flags |= AI_ADDRCONFIG;
2429 #endif
2430 ai_hints.ai_family = AF_UNSPEC;
2431 ai_hints.ai_socktype = SOCK_STREAM;
2433 port = NULL;
2434 if (*addr == '[') /* IPv6+port format */
2435 {
2436 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2437 addr++;
2439 port = strchr (addr, ']');
2440 if (port == NULL)
2441 {
2442 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2443 return (-1);
2444 }
2445 *port = 0;
2446 port++;
2448 if (*port == ':')
2449 port++;
2450 else if (*port == 0)
2451 port = NULL;
2452 else
2453 {
2454 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2455 return (-1);
2456 }
2457 } /* if (*addr == '[') */
2458 else
2459 {
2460 port = rindex(addr, ':');
2461 if (port != NULL)
2462 {
2463 *port = 0;
2464 port++;
2465 }
2466 }
2467 ai_res = NULL;
2468 status = getaddrinfo (addr,
2469 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2470 &ai_hints, &ai_res);
2471 if (status != 0)
2472 {
2473 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2474 addr, gai_strerror (status));
2475 return (-1);
2476 }
2478 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2479 {
2480 int fd;
2481 listen_socket_t *temp;
2482 int one = 1;
2484 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2485 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2486 if (temp == NULL)
2487 {
2488 fprintf (stderr,
2489 "rrdcached: open_listen_socket_network: realloc failed.\n");
2490 continue;
2491 }
2492 listen_fds = temp;
2493 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2495 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2496 if (fd < 0)
2497 {
2498 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2499 rrd_strerror(errno));
2500 continue;
2501 }
2503 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2505 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2506 if (status != 0)
2507 {
2508 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2509 sock->addr, rrd_strerror(errno));
2510 close (fd);
2511 continue;
2512 }
2514 status = listen (fd, /* backlog = */ 10);
2515 if (status != 0)
2516 {
2517 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2518 sock->addr, rrd_strerror(errno));
2519 close (fd);
2520 freeaddrinfo(ai_res);
2521 return (-1);
2522 }
2524 listen_fds[listen_fds_num].fd = fd;
2525 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2526 listen_fds_num++;
2527 } /* for (ai_ptr) */
2529 freeaddrinfo(ai_res);
2530 return (0);
2531 } /* }}} static int open_listen_socket_network */
2533 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2534 {
2535 assert(sock != NULL);
2536 assert(sock->addr != NULL);
2538 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2539 || sock->addr[0] == '/')
2540 return (open_listen_socket_unix(sock));
2541 else
2542 return (open_listen_socket_network(sock));
2543 } /* }}} int open_listen_socket */
2545 static int close_listen_sockets (void) /* {{{ */
2546 {
2547 size_t i;
2549 for (i = 0; i < listen_fds_num; i++)
2550 {
2551 close (listen_fds[i].fd);
2553 if (listen_fds[i].family == PF_UNIX)
2554 unlink(listen_fds[i].addr);
2555 }
2557 free (listen_fds);
2558 listen_fds = NULL;
2559 listen_fds_num = 0;
2561 return (0);
2562 } /* }}} int close_listen_sockets */
2564 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2565 {
2566 struct pollfd *pollfds;
2567 int pollfds_num;
2568 int status;
2569 int i;
2571 if (listen_fds_num < 1)
2572 {
2573 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2574 return (NULL);
2575 }
2577 pollfds_num = listen_fds_num;
2578 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2579 if (pollfds == NULL)
2580 {
2581 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2582 return (NULL);
2583 }
2584 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2586 RRDD_LOG(LOG_INFO, "listening for connections");
2588 while (state == RUNNING)
2589 {
2590 for (i = 0; i < pollfds_num; i++)
2591 {
2592 pollfds[i].fd = listen_fds[i].fd;
2593 pollfds[i].events = POLLIN | POLLPRI;
2594 pollfds[i].revents = 0;
2595 }
2597 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2598 if (state != RUNNING)
2599 break;
2600 else if (status == 0) /* timeout */
2601 continue;
2602 else if (status < 0) /* error */
2603 {
2604 status = errno;
2605 if (status != EINTR)
2606 {
2607 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2608 }
2609 continue;
2610 }
2612 for (i = 0; i < pollfds_num; i++)
2613 {
2614 listen_socket_t *client_sock;
2615 struct sockaddr_storage client_sa;
2616 socklen_t client_sa_size;
2617 pthread_t tid;
2618 pthread_attr_t attr;
2620 if (pollfds[i].revents == 0)
2621 continue;
2623 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2624 {
2625 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2626 "poll(2) returned something unexpected for listen FD #%i.",
2627 pollfds[i].fd);
2628 continue;
2629 }
2631 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2632 if (client_sock == NULL)
2633 {
2634 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2635 continue;
2636 }
2637 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2639 client_sa_size = sizeof (client_sa);
2640 client_sock->fd = accept (pollfds[i].fd,
2641 (struct sockaddr *) &client_sa, &client_sa_size);
2642 if (client_sock->fd < 0)
2643 {
2644 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2645 free(client_sock);
2646 continue;
2647 }
2649 pthread_attr_init (&attr);
2650 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2652 status = pthread_create (&tid, &attr, connection_thread_main,
2653 client_sock);
2654 if (status != 0)
2655 {
2656 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2657 close_connection(client_sock);
2658 continue;
2659 }
2660 } /* for (pollfds_num) */
2661 } /* while (state == RUNNING) */
2663 RRDD_LOG(LOG_INFO, "starting shutdown");
2665 close_listen_sockets ();
2667 pthread_mutex_lock (&connection_threads_lock);
2668 while (connection_threads_num > 0)
2669 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2670 pthread_mutex_unlock (&connection_threads_lock);
2672 free(pollfds);
2674 return (NULL);
2675 } /* }}} void *listen_thread_main */
2677 static int daemonize (void) /* {{{ */
2678 {
2679 int pid_fd;
2680 char *base_dir;
2682 daemon_uid = geteuid();
2684 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2685 if (pid_fd < 0)
2686 pid_fd = check_pidfile();
2687 if (pid_fd < 0)
2688 return pid_fd;
2690 /* open all the listen sockets */
2691 if (config_listen_address_list_len > 0)
2692 {
2693 for (size_t i = 0; i < config_listen_address_list_len; i++)
2694 open_listen_socket (config_listen_address_list[i]);
2696 rrd_free_ptrs((void ***) &config_listen_address_list,
2697 &config_listen_address_list_len);
2698 }
2699 else
2700 {
2701 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2702 sizeof(default_socket.addr) - 1);
2703 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2705 if (default_socket.permissions == 0)
2706 socket_permission_set_all (&default_socket);
2708 open_listen_socket (&default_socket);
2709 }
2711 if (listen_fds_num < 1)
2712 {
2713 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2714 goto error;
2715 }
2717 if (!stay_foreground)
2718 {
2719 pid_t child;
2721 child = fork ();
2722 if (child < 0)
2723 {
2724 fprintf (stderr, "daemonize: fork(2) failed.\n");
2725 goto error;
2726 }
2727 else if (child > 0)
2728 exit(0);
2730 /* Become session leader */
2731 setsid ();
2733 /* Open the first three file descriptors to /dev/null */
2734 close (2);
2735 close (1);
2736 close (0);
2738 open ("/dev/null", O_RDWR);
2739 if (dup(0) == -1 || dup(0) == -1){
2740 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2741 }
2742 } /* if (!stay_foreground) */
2744 /* Change into the /tmp directory. */
2745 base_dir = (config_base_dir != NULL)
2746 ? config_base_dir
2747 : "/tmp";
2749 if (chdir (base_dir) != 0)
2750 {
2751 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2752 goto error;
2753 }
2755 install_signal_handlers();
2757 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2758 RRDD_LOG(LOG_INFO, "starting up");
2760 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2761 (GDestroyNotify) free_cache_item);
2762 if (cache_tree == NULL)
2763 {
2764 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2765 goto error;
2766 }
2768 return write_pidfile (pid_fd);
2770 error:
2771 remove_pidfile();
2772 return -1;
2773 } /* }}} int daemonize */
2775 static int cleanup (void) /* {{{ */
2776 {
2777 pthread_cond_broadcast (&flush_cond);
2778 pthread_join (flush_thread, NULL);
2780 pthread_cond_broadcast (&queue_cond);
2781 for (int i = 0; i < config_queue_threads; i++)
2782 pthread_join (queue_threads[i], NULL);
2784 if (config_flush_at_shutdown)
2785 {
2786 assert(cache_queue_head == NULL);
2787 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2788 }
2790 free(queue_threads);
2791 free(config_base_dir);
2793 pthread_mutex_lock(&cache_lock);
2794 g_tree_destroy(cache_tree);
2796 pthread_mutex_lock(&journal_lock);
2797 journal_done();
2799 RRDD_LOG(LOG_INFO, "goodbye");
2800 closelog ();
2802 remove_pidfile ();
2803 free(config_pid_file);
2805 return (0);
2806 } /* }}} int cleanup */
2808 static int read_options (int argc, char **argv) /* {{{ */
2809 {
2810 int option;
2811 int status = 0;
2813 socket_permission_clear (&default_socket);
2815 default_socket.socket_group = (gid_t)-1;
2816 default_socket.socket_permissions = (mode_t)-1;
2818 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2819 {
2820 switch (option)
2821 {
2822 case 'g':
2823 stay_foreground=1;
2824 break;
2826 case 'l':
2827 {
2828 listen_socket_t *new;
2830 new = malloc(sizeof(listen_socket_t));
2831 if (new == NULL)
2832 {
2833 fprintf(stderr, "read_options: malloc failed.\n");
2834 return(2);
2835 }
2836 memset(new, 0, sizeof(listen_socket_t));
2838 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2840 /* Add permissions to the socket {{{ */
2841 if (default_socket.permissions != 0)
2842 {
2843 socket_permission_copy (new, &default_socket);
2844 }
2845 else /* if (default_socket.permissions == 0) */
2846 {
2847 /* Add permission for ALL commands to the socket. */
2848 socket_permission_set_all (new);
2849 }
2850 /* }}} Done adding permissions. */
2852 new->socket_group = default_socket.socket_group;
2853 new->socket_permissions = default_socket.socket_permissions;
2855 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2856 &config_listen_address_list_len, new))
2857 {
2858 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2859 return (2);
2860 }
2861 }
2862 break;
2864 /* set socket group permissions */
2865 case 's':
2866 {
2867 gid_t group_gid;
2868 struct group *grp;
2870 group_gid = strtoul(optarg, NULL, 10);
2871 if (errno != EINVAL && group_gid>0)
2872 {
2873 /* we were passed a number */
2874 grp = getgrgid(group_gid);
2875 }
2876 else
2877 {
2878 grp = getgrnam(optarg);
2879 }
2881 if (grp)
2882 {
2883 default_socket.socket_group = grp->gr_gid;
2884 }
2885 else
2886 {
2887 /* no idea what the user wanted... */
2888 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2889 return (5);
2890 }
2891 }
2892 break;
2894 /* set socket file permissions */
2895 case 'm':
2896 {
2897 long tmp;
2898 char *endptr = NULL;
2900 tmp = strtol (optarg, &endptr, 8);
2901 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2902 || (tmp > 07777) || (tmp < 0)) {
2903 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2904 optarg);
2905 return (5);
2906 }
2908 default_socket.socket_permissions = (mode_t)tmp;
2909 }
2910 break;
2912 case 'P':
2913 {
2914 char *optcopy;
2915 char *saveptr;
2916 char *dummy;
2917 char *ptr;
2919 socket_permission_clear (&default_socket);
2921 optcopy = strdup (optarg);
2922 dummy = optcopy;
2923 saveptr = NULL;
2924 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2925 {
2926 dummy = NULL;
2927 status = socket_permission_add (&default_socket, ptr);
2928 if (status != 0)
2929 {
2930 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2931 "socket failed. Most likely, this permission doesn't "
2932 "exist. Check your command line.\n", ptr);
2933 status = 4;
2934 }
2935 }
2937 free (optcopy);
2938 }
2939 break;
2941 case 'f':
2942 {
2943 int temp;
2945 temp = atoi (optarg);
2946 if (temp > 0)
2947 config_flush_interval = temp;
2948 else
2949 {
2950 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2951 status = 3;
2952 }
2953 }
2954 break;
2956 case 'w':
2957 {
2958 int temp;
2960 temp = atoi (optarg);
2961 if (temp > 0)
2962 config_write_interval = temp;
2963 else
2964 {
2965 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2966 status = 2;
2967 }
2968 }
2969 break;
2971 case 'z':
2972 {
2973 int temp;
2975 temp = atoi(optarg);
2976 if (temp > 0)
2977 config_write_jitter = temp;
2978 else
2979 {
2980 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2981 status = 2;
2982 }
2984 break;
2985 }
2987 case 't':
2988 {
2989 int threads;
2990 threads = atoi(optarg);
2991 if (threads >= 1)
2992 config_queue_threads = threads;
2993 else
2994 {
2995 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2996 return 1;
2997 }
2998 }
2999 break;
3001 case 'B':
3002 config_write_base_only = 1;
3003 break;
3005 case 'b':
3006 {
3007 size_t len;
3008 char base_realpath[PATH_MAX];
3010 if (config_base_dir != NULL)
3011 free (config_base_dir);
3012 config_base_dir = strdup (optarg);
3013 if (config_base_dir == NULL)
3014 {
3015 fprintf (stderr, "read_options: strdup failed.\n");
3016 return (3);
3017 }
3019 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3020 {
3021 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3022 config_base_dir, rrd_strerror (errno));
3023 return (3);
3024 }
3026 /* make sure that the base directory is not resolved via
3027 * symbolic links. this makes some performance-enhancing
3028 * assumptions possible (we don't have to resolve paths
3029 * that start with a "/")
3030 */
3031 if (realpath(config_base_dir, base_realpath) == NULL)
3032 {
3033 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3034 "%s\n", config_base_dir, rrd_strerror(errno));
3035 return 5;
3036 }
3038 len = strlen (config_base_dir);
3039 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3040 {
3041 config_base_dir[len - 1] = 0;
3042 len--;
3043 }
3045 if (len < 1)
3046 {
3047 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3048 return (4);
3049 }
3051 _config_base_dir_len = len;
3053 len = strlen (base_realpath);
3054 while ((len > 0) && (base_realpath[len - 1] == '/'))
3055 {
3056 base_realpath[len - 1] = '\0';
3057 len--;
3058 }
3060 if (strncmp(config_base_dir,
3061 base_realpath, sizeof(base_realpath)) != 0)
3062 {
3063 fprintf(stderr,
3064 "Base directory (-b) resolved via file system links!\n"
3065 "Please consult rrdcached '-b' documentation!\n"
3066 "Consider specifying the real directory (%s)\n",
3067 base_realpath);
3068 return 5;
3069 }
3070 }
3071 break;
3073 case 'p':
3074 {
3075 if (config_pid_file != NULL)
3076 free (config_pid_file);
3077 config_pid_file = strdup (optarg);
3078 if (config_pid_file == NULL)
3079 {
3080 fprintf (stderr, "read_options: strdup failed.\n");
3081 return (3);
3082 }
3083 }
3084 break;
3086 case 'F':
3087 config_flush_at_shutdown = 1;
3088 break;
3090 case 'j':
3091 {
3092 char journal_dir_actual[PATH_MAX];
3093 const char *dir;
3094 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3096 status = rrd_mkdir_p(dir, 0777);
3097 if (status != 0)
3098 {
3099 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3100 dir, rrd_strerror(errno));
3101 return 6;
3102 }
3104 if (access(dir, R_OK|W_OK|X_OK) != 0)
3105 {
3106 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3107 errno ? rrd_strerror(errno) : "");
3108 return 6;
3109 }
3110 }
3111 break;
3113 case 'h':
3114 case '?':
3115 printf ("RRDCacheD %s\n"
3116 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3117 "\n"
3118 "Usage: rrdcached [options]\n"
3119 "\n"
3120 "Valid options are:\n"
3121 " -l <address> Socket address to listen to.\n"
3122 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3123 " -P <perms> Sets the permissions to assign to all following "
3124 "sockets\n"
3125 " -w <seconds> Interval in which to write data.\n"
3126 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3127 " -t <threads> Number of write threads.\n"
3128 " -f <seconds> Interval in which to flush dead data.\n"
3129 " -p <file> Location of the PID-file.\n"
3130 " -b <dir> Base directory to change to.\n"
3131 " -B Restrict file access to paths within -b <dir>\n"
3132 " -g Do not fork and run in the foreground.\n"
3133 " -j <dir> Directory in which to create the journal files.\n"
3134 " -F Always flush all updates at shutdown\n"
3135 " -s <id|name> Group owner of all following UNIX sockets\n"
3136 " (the socket will also have read/write permissions "
3137 "for that group)\n"
3138 " -m <mode> File permissions (octal) of all following UNIX "
3139 "sockets\n"
3140 "\n"
3141 "For more information and a detailed description of all options "
3142 "please refer\n"
3143 "to the rrdcached(1) manual page.\n",
3144 VERSION);
3145 if (option == 'h')
3146 status = -1;
3147 else
3148 status = 1;
3149 break;
3150 } /* switch (option) */
3151 } /* while (getopt) */
3153 /* advise the user when values are not sane */
3154 if (config_flush_interval < 2 * config_write_interval)
3155 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3156 " 2x write interval (-w) !\n");
3157 if (config_write_jitter > config_write_interval)
3158 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3159 " write interval (-w) !\n");
3161 if (config_write_base_only && config_base_dir == NULL)
3162 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3163 " Consult the rrdcached documentation\n");
3165 if (journal_dir == NULL)
3166 config_flush_at_shutdown = 1;
3168 return (status);
3169 } /* }}} int read_options */
3171 int main (int argc, char **argv)
3172 {
3173 int status;
3175 status = read_options (argc, argv);
3176 if (status != 0)
3177 {
3178 if (status < 0)
3179 status = 0;
3180 return (status);
3181 }
3183 status = daemonize ();
3184 if (status != 0)
3185 {
3186 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3187 return (1);
3188 }
3190 journal_init();
3192 /* start the queue threads */
3193 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3194 if (queue_threads == NULL)
3195 {
3196 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3197 cleanup();
3198 return (1);
3199 }
3200 for (int i = 0; i < config_queue_threads; i++)
3201 {
3202 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3203 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3204 if (status != 0)
3205 {
3206 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3207 cleanup();
3208 return (1);
3209 }
3210 }
3212 /* start the flush thread */
3213 memset(&flush_thread, 0, sizeof(flush_thread));
3214 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3215 if (status != 0)
3216 {
3217 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3218 cleanup();
3219 return (1);
3220 }
3222 listen_thread_main (NULL);
3223 cleanup ();
3225 return (0);
3226 } /* int main */
3228 /*
3229 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3230 */