9b8d9ee30cb59ae59d8d27de6b96f4b0a72cde95
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 # include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
111 #include <glib-2.0/glib.h>
112 /* }}} */
114 #define RRDD_LOG(severity, ...) \
115 do { \
116 if (stay_foreground) \
117 fprintf(stderr, __VA_ARGS__); \
118 syslog ((severity), __VA_ARGS__); \
119 } while (0)
121 #ifndef __GNUC__
122 # define __attribute__(x) /**/
123 #endif
125 /*
126 * Types
127 */
128 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
130 struct listen_socket_s
131 {
132 int fd;
133 char addr[PATH_MAX + 1];
134 int family;
136 /* state for BATCH processing */
137 time_t batch_start;
138 int batch_cmd;
140 /* buffered IO */
141 char *rbuf;
142 off_t next_cmd;
143 off_t next_read;
145 char *wbuf;
146 ssize_t wbuf_len;
148 uint32_t permissions;
150 gid_t socket_group;
151 mode_t socket_permissions;
152 };
153 typedef struct listen_socket_s listen_socket_t;
155 struct command_s;
156 typedef struct command_s command_t;
157 /* note: guard against "unused" warnings in the handlers */
158 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
159 time_t now __attribute__((unused)),\
160 char *buffer __attribute__((unused)),\
161 size_t buffer_size __attribute__((unused))
163 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
164 DISPATCH_PROTO
166 struct command_s {
167 char *cmd;
168 int (*handler)(HANDLER_PROTO);
170 char context; /* where we expect to see it */
171 #define CMD_CONTEXT_CLIENT (1<<0)
172 #define CMD_CONTEXT_BATCH (1<<1)
173 #define CMD_CONTEXT_JOURNAL (1<<2)
174 #define CMD_CONTEXT_ANY (0x7f)
176 char *syntax;
177 char *help;
178 };
180 struct cache_item_s;
181 typedef struct cache_item_s cache_item_t;
182 struct cache_item_s
183 {
184 char *file;
185 char **values;
186 size_t values_num; /* number of valid pointers */
187 size_t values_alloc; /* number of allocated pointers */
188 time_t last_flush_time;
189 time_t last_update_stamp;
190 #define CI_FLAGS_IN_TREE (1<<0)
191 #define CI_FLAGS_IN_QUEUE (1<<1)
192 int flags;
193 pthread_cond_t flushed;
194 cache_item_t *prev;
195 cache_item_t *next;
196 };
198 struct callback_flush_data_s
199 {
200 time_t now;
201 time_t abs_timeout;
202 char **keys;
203 size_t keys_num;
204 };
205 typedef struct callback_flush_data_s callback_flush_data_t;
207 enum queue_side_e
208 {
209 HEAD,
210 TAIL
211 };
212 typedef enum queue_side_e queue_side_t;
214 /* describe a set of journal files */
215 typedef struct {
216 char **files;
217 size_t files_num;
218 } journal_set;
220 /* max length of socket command or response */
221 #define CMD_MAX 4096
222 #define RBUF_SIZE (CMD_MAX*2)
224 /*
225 * Variables
226 */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
233 enum {
234 RUNNING, /* normal operation */
235 FLUSHING, /* flushing remaining values */
236 SHUTDOWN /* shutting down */
237 } state = RUNNING;
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
250 /* Cache stuff */
251 static GTree *cache_tree = NULL;
252 static cache_item_t *cache_queue_head = NULL;
253 static cache_item_t *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
256 static int config_write_interval = 300;
257 static int config_write_jitter = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
278 /* Journaled updates */
279 #define JOURNAL_REPLAY(s) ((s) == NULL)
280 #define JOURNAL_BASE "rrd.journal"
281 static journal_set *journal_cur = NULL;
282 static journal_set *journal_old = NULL;
283 static char *journal_dir = NULL;
284 static FILE *journal_fh = NULL; /* current journal file handle */
285 static long journal_size = 0; /* current journal size */
286 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
287 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
288 static int journal_write(char *cmd, char *args);
289 static void journal_done(void);
290 static void journal_rotate(void);
292 /* prototypes for forward refernces */
293 static int handle_request_help (HANDLER_PROTO);
295 /*
296 * Functions
297 */
298 static void sig_common (const char *sig) /* {{{ */
299 {
300 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
301 state = FLUSHING;
302 pthread_cond_broadcast(&flush_cond);
303 pthread_cond_broadcast(&queue_cond);
304 } /* }}} void sig_common */
306 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
307 {
308 sig_common("INT");
309 } /* }}} void sig_int_handler */
311 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
312 {
313 sig_common("TERM");
314 } /* }}} void sig_term_handler */
316 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
317 {
318 config_flush_at_shutdown = 1;
319 sig_common("USR1");
320 } /* }}} void sig_usr1_handler */
322 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
323 {
324 config_flush_at_shutdown = 0;
325 sig_common("USR2");
326 } /* }}} void sig_usr2_handler */
328 static void install_signal_handlers(void) /* {{{ */
329 {
330 /* These structures are static, because `sigaction' behaves weird if the are
331 * overwritten.. */
332 static struct sigaction sa_int;
333 static struct sigaction sa_term;
334 static struct sigaction sa_pipe;
335 static struct sigaction sa_usr1;
336 static struct sigaction sa_usr2;
338 /* Install signal handlers */
339 memset (&sa_int, 0, sizeof (sa_int));
340 sa_int.sa_handler = sig_int_handler;
341 sigaction (SIGINT, &sa_int, NULL);
343 memset (&sa_term, 0, sizeof (sa_term));
344 sa_term.sa_handler = sig_term_handler;
345 sigaction (SIGTERM, &sa_term, NULL);
347 memset (&sa_pipe, 0, sizeof (sa_pipe));
348 sa_pipe.sa_handler = SIG_IGN;
349 sigaction (SIGPIPE, &sa_pipe, NULL);
351 memset (&sa_pipe, 0, sizeof (sa_usr1));
352 sa_usr1.sa_handler = sig_usr1_handler;
353 sigaction (SIGUSR1, &sa_usr1, NULL);
355 memset (&sa_usr2, 0, sizeof (sa_usr2));
356 sa_usr2.sa_handler = sig_usr2_handler;
357 sigaction (SIGUSR2, &sa_usr2, NULL);
359 } /* }}} void install_signal_handlers */
361 static int open_pidfile(char *action, int oflag) /* {{{ */
362 {
363 int fd;
364 const char *file;
365 char *file_copy, *dir;
367 file = (config_pid_file != NULL)
368 ? config_pid_file
369 : LOCALSTATEDIR "/run/rrdcached.pid";
371 /* dirname may modify its argument */
372 file_copy = strdup(file);
373 if (file_copy == NULL)
374 {
375 fprintf(stderr, "rrdcached: strdup(): %s\n",
376 rrd_strerror(errno));
377 return -1;
378 }
380 dir = dirname(file_copy);
381 if (rrd_mkdir_p(dir, 0777) != 0)
382 {
383 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
384 dir, rrd_strerror(errno));
385 return -1;
386 }
388 free(file_copy);
390 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
391 if (fd < 0)
392 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
393 action, file, rrd_strerror(errno));
395 return(fd);
396 } /* }}} static int open_pidfile */
398 /* check existing pid file to see whether a daemon is running */
399 static int check_pidfile(void)
400 {
401 int pid_fd;
402 pid_t pid;
403 char pid_str[16];
405 pid_fd = open_pidfile("open", O_RDWR);
406 if (pid_fd < 0)
407 return pid_fd;
409 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
410 return -1;
412 pid = atoi(pid_str);
413 if (pid <= 0)
414 return -1;
416 /* another running process that we can signal COULD be
417 * a competing rrdcached */
418 if (pid != getpid() && kill(pid, 0) == 0)
419 {
420 fprintf(stderr,
421 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
422 close(pid_fd);
423 return -1;
424 }
426 lseek(pid_fd, 0, SEEK_SET);
427 if (ftruncate(pid_fd, 0) == -1)
428 {
429 fprintf(stderr,
430 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
431 close(pid_fd);
432 return -1;
433 }
435 fprintf(stderr,
436 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
437 "rrdcached: starting normally.\n", pid);
439 return pid_fd;
440 } /* }}} static int check_pidfile */
442 static int write_pidfile (int fd) /* {{{ */
443 {
444 pid_t pid;
445 FILE *fh;
447 pid = getpid ();
449 fh = fdopen (fd, "w");
450 if (fh == NULL)
451 {
452 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
453 close(fd);
454 return (-1);
455 }
457 fprintf (fh, "%i\n", (int) pid);
458 fclose (fh);
460 return (0);
461 } /* }}} int write_pidfile */
463 static int remove_pidfile (void) /* {{{ */
464 {
465 char *file;
466 int status;
468 file = (config_pid_file != NULL)
469 ? config_pid_file
470 : LOCALSTATEDIR "/run/rrdcached.pid";
472 status = unlink (file);
473 if (status == 0)
474 return (0);
475 return (errno);
476 } /* }}} int remove_pidfile */
478 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
479 {
480 char *eol;
482 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
483 sock->next_read - sock->next_cmd);
485 if (eol == NULL)
486 {
487 /* no commands left, move remainder back to front of rbuf */
488 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
489 sock->next_read - sock->next_cmd);
490 sock->next_read -= sock->next_cmd;
491 sock->next_cmd = 0;
492 *len = 0;
493 return NULL;
494 }
495 else
496 {
497 char *cmd = sock->rbuf + sock->next_cmd;
498 *eol = '\0';
500 sock->next_cmd = eol - sock->rbuf + 1;
502 if (eol > sock->rbuf && *(eol-1) == '\r')
503 *(--eol) = '\0'; /* handle "\r\n" EOL */
505 *len = eol - cmd;
507 return cmd;
508 }
510 /* NOTREACHED */
511 assert(1==0);
512 } /* }}} char *next_cmd */
514 /* add the characters directly to the write buffer */
515 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
516 {
517 char *new_buf;
519 assert(sock != NULL);
521 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
522 if (new_buf == NULL)
523 {
524 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
525 return -1;
526 }
528 strncpy(new_buf + sock->wbuf_len, str, len + 1);
530 sock->wbuf = new_buf;
531 sock->wbuf_len += len;
533 return 0;
534 } /* }}} static int add_to_wbuf */
536 /* add the text to the "extra" info that's sent after the status line */
537 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
538 {
539 va_list argp;
540 char buffer[CMD_MAX];
541 int len;
543 if (JOURNAL_REPLAY(sock)) return 0;
544 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
546 va_start(argp, fmt);
547 #ifdef HAVE_VSNPRINTF
548 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
549 #else
550 len = vsprintf(buffer, fmt, argp);
551 #endif
552 va_end(argp);
553 if (len < 0)
554 {
555 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
556 return -1;
557 }
559 return add_to_wbuf(sock, buffer, len);
560 } /* }}} static int add_response_info */
562 static int count_lines(char *str) /* {{{ */
563 {
564 int lines = 0;
566 if (str != NULL)
567 {
568 while ((str = strchr(str, '\n')) != NULL)
569 {
570 ++lines;
571 ++str;
572 }
573 }
575 return lines;
576 } /* }}} static int count_lines */
578 /* send the response back to the user.
579 * returns 0 on success, -1 on error
580 * write buffer is always zeroed after this call */
581 static int send_response (listen_socket_t *sock, response_code rc,
582 char *fmt, ...) /* {{{ */
583 {
584 va_list argp;
585 char buffer[CMD_MAX];
586 int lines;
587 ssize_t wrote;
588 int rclen, len;
590 if (JOURNAL_REPLAY(sock)) return rc;
592 if (sock->batch_start)
593 {
594 if (rc == RESP_OK)
595 return rc; /* no response on success during BATCH */
596 lines = sock->batch_cmd;
597 }
598 else if (rc == RESP_OK)
599 lines = count_lines(sock->wbuf);
600 else
601 lines = -1;
603 rclen = sprintf(buffer, "%d ", lines);
604 va_start(argp, fmt);
605 #ifdef HAVE_VSNPRINTF
606 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
607 #else
608 len = vsprintf(buffer+rclen, fmt, argp);
609 #endif
610 va_end(argp);
611 if (len < 0)
612 return -1;
614 len += rclen;
616 /* append the result to the wbuf, don't write to the user */
617 if (sock->batch_start)
618 return add_to_wbuf(sock, buffer, len);
620 /* first write must be complete */
621 if (len != write(sock->fd, buffer, len))
622 {
623 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
624 return -1;
625 }
627 if (sock->wbuf != NULL && rc == RESP_OK)
628 {
629 wrote = 0;
630 while (wrote < sock->wbuf_len)
631 {
632 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
633 if (wb <= 0)
634 {
635 RRDD_LOG(LOG_INFO, "send_response: could not write results");
636 return -1;
637 }
638 wrote += wb;
639 }
640 }
642 free(sock->wbuf); sock->wbuf = NULL;
643 sock->wbuf_len = 0;
645 return 0;
646 } /* }}} */
648 static void wipe_ci_values(cache_item_t *ci, time_t when)
649 {
650 ci->values = NULL;
651 ci->values_num = 0;
652 ci->values_alloc = 0;
654 ci->last_flush_time = when;
655 if (config_write_jitter > 0)
656 ci->last_flush_time += (rrd_random() % config_write_jitter);
657 }
659 /* remove_from_queue
660 * remove a "cache_item_t" item from the queue.
661 * must hold 'cache_lock' when calling this
662 */
663 static void remove_from_queue(cache_item_t *ci) /* {{{ */
664 {
665 if (ci == NULL) return;
666 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
668 if (ci->prev == NULL)
669 cache_queue_head = ci->next; /* reset head */
670 else
671 ci->prev->next = ci->next;
673 if (ci->next == NULL)
674 cache_queue_tail = ci->prev; /* reset the tail */
675 else
676 ci->next->prev = ci->prev;
678 ci->next = ci->prev = NULL;
679 ci->flags &= ~CI_FLAGS_IN_QUEUE;
681 pthread_mutex_lock (&stats_lock);
682 assert (stats_queue_length > 0);
683 stats_queue_length--;
684 pthread_mutex_unlock (&stats_lock);
686 } /* }}} static void remove_from_queue */
688 /* free the resources associated with the cache_item_t
689 * must hold cache_lock when calling this function
690 */
691 static void *free_cache_item(cache_item_t *ci) /* {{{ */
692 {
693 if (ci == NULL) return NULL;
695 remove_from_queue(ci);
697 for (size_t i=0; i < ci->values_num; i++)
698 free(ci->values[i]);
700 free (ci->values);
701 free (ci->file);
703 /* in case anyone is waiting */
704 pthread_cond_broadcast(&ci->flushed);
705 pthread_cond_destroy(&ci->flushed);
707 free (ci);
709 return NULL;
710 } /* }}} static void *free_cache_item */
712 /*
713 * enqueue_cache_item:
714 * `cache_lock' must be acquired before calling this function!
715 */
716 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
717 queue_side_t side)
718 {
719 if (ci == NULL)
720 return (-1);
722 if (ci->values_num == 0)
723 return (0);
725 if (side == HEAD)
726 {
727 if (cache_queue_head == ci)
728 return 0;
730 /* remove if further down in queue */
731 remove_from_queue(ci);
733 ci->prev = NULL;
734 ci->next = cache_queue_head;
735 if (ci->next != NULL)
736 ci->next->prev = ci;
737 cache_queue_head = ci;
739 if (cache_queue_tail == NULL)
740 cache_queue_tail = cache_queue_head;
741 }
742 else /* (side == TAIL) */
743 {
744 /* We don't move values back in the list.. */
745 if (ci->flags & CI_FLAGS_IN_QUEUE)
746 return (0);
748 assert (ci->next == NULL);
749 assert (ci->prev == NULL);
751 ci->prev = cache_queue_tail;
753 if (cache_queue_tail == NULL)
754 cache_queue_head = ci;
755 else
756 cache_queue_tail->next = ci;
758 cache_queue_tail = ci;
759 }
761 ci->flags |= CI_FLAGS_IN_QUEUE;
763 pthread_cond_signal(&queue_cond);
764 pthread_mutex_lock (&stats_lock);
765 stats_queue_length++;
766 pthread_mutex_unlock (&stats_lock);
768 return (0);
769 } /* }}} int enqueue_cache_item */
771 /*
772 * tree_callback_flush:
773 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
774 * while this is in progress.
775 */
776 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
777 gpointer data)
778 {
779 cache_item_t *ci;
780 callback_flush_data_t *cfd;
782 ci = (cache_item_t *) value;
783 cfd = (callback_flush_data_t *) data;
785 if (ci->flags & CI_FLAGS_IN_QUEUE)
786 return FALSE;
788 if (ci->values_num > 0
789 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
790 {
791 enqueue_cache_item (ci, TAIL);
792 }
793 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
794 && (ci->values_num <= 0))
795 {
796 assert ((char *) key == ci->file);
797 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
798 {
799 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
800 return (FALSE);
801 }
802 }
804 return (FALSE);
805 } /* }}} gboolean tree_callback_flush */
807 static int flush_old_values (int max_age)
808 {
809 callback_flush_data_t cfd;
810 size_t k;
812 memset (&cfd, 0, sizeof (cfd));
813 /* Pass the current time as user data so that we don't need to call
814 * `time' for each node. */
815 cfd.now = time (NULL);
816 cfd.keys = NULL;
817 cfd.keys_num = 0;
819 if (max_age > 0)
820 cfd.abs_timeout = cfd.now - max_age;
821 else
822 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
824 /* `tree_callback_flush' will return the keys of all values that haven't
825 * been touched in the last `config_flush_interval' seconds in `cfd'.
826 * The char*'s in this array point to the same memory as ci->file, so we
827 * don't need to free them separately. */
828 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
830 for (k = 0; k < cfd.keys_num; k++)
831 {
832 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
833 /* should never fail, since we have held the cache_lock
834 * the entire time */
835 assert(status == TRUE);
836 }
838 if (cfd.keys != NULL)
839 {
840 free (cfd.keys);
841 cfd.keys = NULL;
842 }
844 return (0);
845 } /* int flush_old_values */
847 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
848 {
849 struct timeval now;
850 struct timespec next_flush;
851 int status;
853 gettimeofday (&now, NULL);
854 next_flush.tv_sec = now.tv_sec + config_flush_interval;
855 next_flush.tv_nsec = 1000 * now.tv_usec;
857 pthread_mutex_lock(&cache_lock);
859 while (state == RUNNING)
860 {
861 gettimeofday (&now, NULL);
862 if ((now.tv_sec > next_flush.tv_sec)
863 || ((now.tv_sec == next_flush.tv_sec)
864 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
865 {
866 RRDD_LOG(LOG_DEBUG, "flushing old values");
868 /* Determine the time of the next cache flush. */
869 next_flush.tv_sec = now.tv_sec + config_flush_interval;
871 /* Flush all values that haven't been written in the last
872 * `config_write_interval' seconds. */
873 flush_old_values (config_write_interval);
875 /* unlock the cache while we rotate so we don't block incoming
876 * updates if the fsync() blocks on disk I/O */
877 pthread_mutex_unlock(&cache_lock);
878 journal_rotate();
879 pthread_mutex_lock(&cache_lock);
880 }
882 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
883 if (status != 0 && status != ETIMEDOUT)
884 {
885 RRDD_LOG (LOG_ERR, "flush_thread_main: "
886 "pthread_cond_timedwait returned %i.", status);
887 }
888 }
890 if (config_flush_at_shutdown)
891 flush_old_values (-1); /* flush everything */
893 state = SHUTDOWN;
895 pthread_mutex_unlock(&cache_lock);
897 return NULL;
898 } /* void *flush_thread_main */
900 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
901 {
902 pthread_mutex_lock (&cache_lock);
904 while (state != SHUTDOWN
905 || (cache_queue_head != NULL && config_flush_at_shutdown))
906 {
907 cache_item_t *ci;
908 char *file;
909 char **values;
910 size_t values_num;
911 int status;
913 /* Now, check if there's something to store away. If not, wait until
914 * something comes in. */
915 if (cache_queue_head == NULL)
916 {
917 status = pthread_cond_wait (&queue_cond, &cache_lock);
918 if ((status != 0) && (status != ETIMEDOUT))
919 {
920 RRDD_LOG (LOG_ERR, "queue_thread_main: "
921 "pthread_cond_wait returned %i.", status);
922 }
923 }
925 /* Check if a value has arrived. This may be NULL if we timed out or there
926 * was an interrupt such as a signal. */
927 if (cache_queue_head == NULL)
928 continue;
930 ci = cache_queue_head;
932 /* copy the relevant parts */
933 file = strdup (ci->file);
934 if (file == NULL)
935 {
936 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
937 continue;
938 }
940 assert(ci->values != NULL);
941 assert(ci->values_num > 0);
943 values = ci->values;
944 values_num = ci->values_num;
946 wipe_ci_values(ci, time(NULL));
947 remove_from_queue(ci);
949 pthread_mutex_unlock (&cache_lock);
951 rrd_clear_error ();
952 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
953 if (status != 0)
954 {
955 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
956 "rrd_update_r (%s) failed with status %i. (%s)",
957 file, status, rrd_get_error());
958 }
960 journal_write("wrote", file);
962 /* Search again in the tree. It's possible someone issued a "FORGET"
963 * while we were writing the update values. */
964 pthread_mutex_lock(&cache_lock);
965 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
966 if (ci)
967 pthread_cond_broadcast(&ci->flushed);
968 pthread_mutex_unlock(&cache_lock);
970 if (status == 0)
971 {
972 pthread_mutex_lock (&stats_lock);
973 stats_updates_written++;
974 stats_data_sets_written += values_num;
975 pthread_mutex_unlock (&stats_lock);
976 }
978 rrd_free_ptrs((void ***) &values, &values_num);
979 free(file);
981 pthread_mutex_lock (&cache_lock);
982 }
983 pthread_mutex_unlock (&cache_lock);
985 return (NULL);
986 } /* }}} void *queue_thread_main */
988 static int buffer_get_field (char **buffer_ret, /* {{{ */
989 size_t *buffer_size_ret, char **field_ret)
990 {
991 char *buffer;
992 size_t buffer_pos;
993 size_t buffer_size;
994 char *field;
995 size_t field_size;
996 int status;
998 buffer = *buffer_ret;
999 buffer_pos = 0;
1000 buffer_size = *buffer_size_ret;
1001 field = *buffer_ret;
1002 field_size = 0;
1004 if (buffer_size <= 0)
1005 return (-1);
1007 /* This is ensured by `handle_request'. */
1008 assert (buffer[buffer_size - 1] == '\0');
1010 status = -1;
1011 while (buffer_pos < buffer_size)
1012 {
1013 /* Check for end-of-field or end-of-buffer */
1014 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1015 {
1016 field[field_size] = 0;
1017 field_size++;
1018 buffer_pos++;
1019 status = 0;
1020 break;
1021 }
1022 /* Handle escaped characters. */
1023 else if (buffer[buffer_pos] == '\\')
1024 {
1025 if (buffer_pos >= (buffer_size - 1))
1026 break;
1027 buffer_pos++;
1028 field[field_size] = buffer[buffer_pos];
1029 field_size++;
1030 buffer_pos++;
1031 }
1032 /* Normal operation */
1033 else
1034 {
1035 field[field_size] = buffer[buffer_pos];
1036 field_size++;
1037 buffer_pos++;
1038 }
1039 } /* while (buffer_pos < buffer_size) */
1041 if (status != 0)
1042 return (status);
1044 *buffer_ret = buffer + buffer_pos;
1045 *buffer_size_ret = buffer_size - buffer_pos;
1046 *field_ret = field;
1048 return (0);
1049 } /* }}} int buffer_get_field */
1051 /* if we're restricting writes to the base directory,
1052 * check whether the file falls within the dir
1053 * returns 1 if OK, otherwise 0
1054 */
1055 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1056 {
1057 assert(file != NULL);
1059 if (!config_write_base_only
1060 || JOURNAL_REPLAY(sock)
1061 || config_base_dir == NULL)
1062 return 1;
1064 if (strstr(file, "../") != NULL) goto err;
1066 /* relative paths without "../" are ok */
1067 if (*file != '/') return 1;
1069 /* file must be of the format base + "/" + <1+ char filename> */
1070 if (strlen(file) < _config_base_dir_len + 2) goto err;
1071 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1072 if (*(file + _config_base_dir_len) != '/') goto err;
1074 return 1;
1076 err:
1077 if (sock != NULL && sock->fd >= 0)
1078 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1080 return 0;
1081 } /* }}} static int check_file_access */
1083 /* when using a base dir, convert relative paths to absolute paths.
1084 * if necessary, modifies the "filename" pointer to point
1085 * to the new path created in "tmp". "tmp" is provided
1086 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1087 *
1088 * this allows us to optimize for the expected case (absolute path)
1089 * with a no-op.
1090 */
1091 static void get_abs_path(char **filename, char *tmp)
1092 {
1093 assert(tmp != NULL);
1094 assert(filename != NULL && *filename != NULL);
1096 if (config_base_dir == NULL || **filename == '/')
1097 return;
1099 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1100 *filename = tmp;
1101 } /* }}} static int get_abs_path */
1103 static int flush_file (const char *filename) /* {{{ */
1104 {
1105 cache_item_t *ci;
1107 pthread_mutex_lock (&cache_lock);
1109 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1110 if (ci == NULL)
1111 {
1112 pthread_mutex_unlock (&cache_lock);
1113 return (ENOENT);
1114 }
1116 if (ci->values_num > 0)
1117 {
1118 /* Enqueue at head */
1119 enqueue_cache_item (ci, HEAD);
1120 pthread_cond_wait(&ci->flushed, &cache_lock);
1121 }
1123 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1124 * may have been purged during our cond_wait() */
1126 pthread_mutex_unlock(&cache_lock);
1128 return (0);
1129 } /* }}} int flush_file */
1131 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1132 {
1133 char *err = "Syntax error.\n";
1135 if (cmd && cmd->syntax)
1136 err = cmd->syntax;
1138 return send_response(sock, RESP_ERR, "Usage: %s", err);
1139 } /* }}} static int syntax_error() */
1141 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1142 {
1143 uint64_t copy_queue_length;
1144 uint64_t copy_updates_received;
1145 uint64_t copy_flush_received;
1146 uint64_t copy_updates_written;
1147 uint64_t copy_data_sets_written;
1148 uint64_t copy_journal_bytes;
1149 uint64_t copy_journal_rotate;
1151 uint64_t tree_nodes_number;
1152 uint64_t tree_depth;
1154 pthread_mutex_lock (&stats_lock);
1155 copy_queue_length = stats_queue_length;
1156 copy_updates_received = stats_updates_received;
1157 copy_flush_received = stats_flush_received;
1158 copy_updates_written = stats_updates_written;
1159 copy_data_sets_written = stats_data_sets_written;
1160 copy_journal_bytes = stats_journal_bytes;
1161 copy_journal_rotate = stats_journal_rotate;
1162 pthread_mutex_unlock (&stats_lock);
1164 pthread_mutex_lock (&cache_lock);
1165 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1166 tree_depth = (uint64_t) g_tree_height (cache_tree);
1167 pthread_mutex_unlock (&cache_lock);
1169 add_response_info(sock,
1170 "QueueLength: %"PRIu64"\n", copy_queue_length);
1171 add_response_info(sock,
1172 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1173 add_response_info(sock,
1174 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1175 add_response_info(sock,
1176 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1177 add_response_info(sock,
1178 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1179 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1180 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1181 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1182 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1184 send_response(sock, RESP_OK, "Statistics follow\n");
1186 return (0);
1187 } /* }}} int handle_request_stats */
1189 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1190 {
1191 char *file, file_tmp[PATH_MAX];
1192 int status;
1194 status = buffer_get_field (&buffer, &buffer_size, &file);
1195 if (status != 0)
1196 {
1197 return syntax_error(sock,cmd);
1198 }
1199 else
1200 {
1201 pthread_mutex_lock(&stats_lock);
1202 stats_flush_received++;
1203 pthread_mutex_unlock(&stats_lock);
1205 get_abs_path(&file, file_tmp);
1206 if (!check_file_access(file, sock)) return 0;
1208 status = flush_file (file);
1209 if (status == 0)
1210 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1211 else if (status == ENOENT)
1212 {
1213 /* no file in our tree; see whether it exists at all */
1214 struct stat statbuf;
1216 memset(&statbuf, 0, sizeof(statbuf));
1217 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1218 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1219 else
1220 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1221 }
1222 else if (status < 0)
1223 return send_response(sock, RESP_ERR, "Internal error.\n");
1224 else
1225 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1226 }
1228 /* NOTREACHED */
1229 assert(1==0);
1230 } /* }}} int handle_request_flush */
1232 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1233 {
1234 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1236 pthread_mutex_lock(&cache_lock);
1237 flush_old_values(-1);
1238 pthread_mutex_unlock(&cache_lock);
1240 return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1243 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1244 {
1245 int status;
1246 char *file, file_tmp[PATH_MAX];
1247 cache_item_t *ci;
1249 status = buffer_get_field(&buffer, &buffer_size, &file);
1250 if (status != 0)
1251 return syntax_error(sock,cmd);
1253 get_abs_path(&file, file_tmp);
1255 pthread_mutex_lock(&cache_lock);
1256 ci = g_tree_lookup(cache_tree, file);
1257 if (ci == NULL)
1258 {
1259 pthread_mutex_unlock(&cache_lock);
1260 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1261 }
1263 for (size_t i=0; i < ci->values_num; i++)
1264 add_response_info(sock, "%s\n", ci->values[i]);
1266 pthread_mutex_unlock(&cache_lock);
1267 return send_response(sock, RESP_OK, "updates pending\n");
1268 } /* }}} static int handle_request_pending */
1270 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1271 {
1272 int status;
1273 gboolean found;
1274 char *file, file_tmp[PATH_MAX];
1276 status = buffer_get_field(&buffer, &buffer_size, &file);
1277 if (status != 0)
1278 return syntax_error(sock,cmd);
1280 get_abs_path(&file, file_tmp);
1281 if (!check_file_access(file, sock)) return 0;
1283 pthread_mutex_lock(&cache_lock);
1284 found = g_tree_remove(cache_tree, file);
1285 pthread_mutex_unlock(&cache_lock);
1287 if (found == TRUE)
1288 {
1289 if (!JOURNAL_REPLAY(sock))
1290 journal_write("forget", file);
1292 return send_response(sock, RESP_OK, "Gone!\n");
1293 }
1294 else
1295 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1297 /* NOTREACHED */
1298 assert(1==0);
1299 } /* }}} static int handle_request_forget */
1301 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1302 {
1303 cache_item_t *ci;
1305 pthread_mutex_lock(&cache_lock);
1307 ci = cache_queue_head;
1308 while (ci != NULL)
1309 {
1310 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1311 ci = ci->next;
1312 }
1314 pthread_mutex_unlock(&cache_lock);
1316 return send_response(sock, RESP_OK, "in queue.\n");
1317 } /* }}} int handle_request_queue */
1319 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1320 {
1321 char *file, file_tmp[PATH_MAX];
1322 int values_num = 0;
1323 int status;
1324 char orig_buf[CMD_MAX];
1326 cache_item_t *ci;
1328 /* save it for the journal later */
1329 if (!JOURNAL_REPLAY(sock))
1330 strncpy(orig_buf, buffer, buffer_size);
1332 status = buffer_get_field (&buffer, &buffer_size, &file);
1333 if (status != 0)
1334 return syntax_error(sock,cmd);
1336 pthread_mutex_lock(&stats_lock);
1337 stats_updates_received++;
1338 pthread_mutex_unlock(&stats_lock);
1340 get_abs_path(&file, file_tmp);
1341 if (!check_file_access(file, sock)) return 0;
1343 pthread_mutex_lock (&cache_lock);
1344 ci = g_tree_lookup (cache_tree, file);
1346 if (ci == NULL) /* {{{ */
1347 {
1348 struct stat statbuf;
1349 cache_item_t *tmp;
1351 /* don't hold the lock while we setup; stat(2) might block */
1352 pthread_mutex_unlock(&cache_lock);
1354 memset (&statbuf, 0, sizeof (statbuf));
1355 status = stat (file, &statbuf);
1356 if (status != 0)
1357 {
1358 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1360 status = errno;
1361 if (status == ENOENT)
1362 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1363 else
1364 return send_response(sock, RESP_ERR,
1365 "stat failed with error %i.\n", status);
1366 }
1367 if (!S_ISREG (statbuf.st_mode))
1368 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1370 if (access(file, R_OK|W_OK) != 0)
1371 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1372 file, rrd_strerror(errno));
1374 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1375 if (ci == NULL)
1376 {
1377 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1379 return send_response(sock, RESP_ERR, "malloc failed.\n");
1380 }
1381 memset (ci, 0, sizeof (cache_item_t));
1383 ci->file = strdup (file);
1384 if (ci->file == NULL)
1385 {
1386 free (ci);
1387 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1389 return send_response(sock, RESP_ERR, "strdup failed.\n");
1390 }
1392 wipe_ci_values(ci, now);
1393 ci->flags = CI_FLAGS_IN_TREE;
1394 pthread_cond_init(&ci->flushed, NULL);
1396 pthread_mutex_lock(&cache_lock);
1398 /* another UPDATE might have added this entry in the meantime */
1399 tmp = g_tree_lookup (cache_tree, file);
1400 if (tmp == NULL)
1401 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1402 else
1403 {
1404 free_cache_item (ci);
1405 ci = tmp;
1406 }
1408 /* state may have changed while we were unlocked */
1409 if (state == SHUTDOWN)
1410 return -1;
1411 } /* }}} */
1412 assert (ci != NULL);
1414 /* don't re-write updates in replay mode */
1415 if (!JOURNAL_REPLAY(sock))
1416 journal_write("update", orig_buf);
1418 while (buffer_size > 0)
1419 {
1420 char *value;
1421 time_t stamp;
1422 char *eostamp;
1424 status = buffer_get_field (&buffer, &buffer_size, &value);
1425 if (status != 0)
1426 {
1427 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1428 break;
1429 }
1431 /* make sure update time is always moving forward */
1432 stamp = strtol(value, &eostamp, 10);
1433 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434 {
1435 pthread_mutex_unlock(&cache_lock);
1436 return send_response(sock, RESP_ERR,
1437 "Cannot find timestamp in '%s'!\n", value);
1438 }
1439 else if (stamp <= ci->last_update_stamp)
1440 {
1441 pthread_mutex_unlock(&cache_lock);
1442 return send_response(sock, RESP_ERR,
1443 "illegal attempt to update using time %ld when last"
1444 " update time is %ld (minimum one second step)\n",
1445 stamp, ci->last_update_stamp);
1446 }
1447 else
1448 ci->last_update_stamp = stamp;
1450 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451 &ci->values_alloc, config_alloc_chunk))
1452 {
1453 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454 continue;
1455 }
1457 values_num++;
1458 }
1460 if (((now - ci->last_flush_time) >= config_write_interval)
1461 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462 && (ci->values_num > 0))
1463 {
1464 enqueue_cache_item (ci, TAIL);
1465 }
1467 pthread_mutex_unlock (&cache_lock);
1469 if (values_num < 1)
1470 return send_response(sock, RESP_ERR, "No values updated.\n");
1471 else
1472 return send_response(sock, RESP_OK,
1473 "errors, enqueued %i value(s).\n", values_num);
1475 /* NOTREACHED */
1476 assert(1==0);
1478 } /* }}} int handle_request_update */
1480 /* we came across a "WROTE" entry during journal replay.
1481 * throw away any values that we have accumulated for this file
1482 */
1483 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1484 {
1485 cache_item_t *ci;
1486 const char *file = buffer;
1488 pthread_mutex_lock(&cache_lock);
1490 ci = g_tree_lookup(cache_tree, file);
1491 if (ci == NULL)
1492 {
1493 pthread_mutex_unlock(&cache_lock);
1494 return (0);
1495 }
1497 if (ci->values)
1498 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1500 wipe_ci_values(ci, now);
1501 remove_from_queue(ci);
1503 pthread_mutex_unlock(&cache_lock);
1504 return (0);
1505 } /* }}} int handle_request_wrote */
1507 /* start "BATCH" processing */
1508 static int batch_start (HANDLER_PROTO) /* {{{ */
1509 {
1510 int status;
1511 if (sock->batch_start)
1512 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1514 status = send_response(sock, RESP_OK,
1515 "Go ahead. End with dot '.' on its own line.\n");
1516 sock->batch_start = time(NULL);
1517 sock->batch_cmd = 0;
1519 return status;
1520 } /* }}} static int batch_start */
1522 /* finish "BATCH" processing and return results to the client */
1523 static int batch_done (HANDLER_PROTO) /* {{{ */
1524 {
1525 assert(sock->batch_start);
1526 sock->batch_start = 0;
1527 sock->batch_cmd = 0;
1528 return send_response(sock, RESP_OK, "errors\n");
1529 } /* }}} static int batch_done */
1531 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1532 {
1533 return -1;
1534 } /* }}} static int handle_request_quit */
1536 static command_t list_of_commands[] = { /* {{{ */
1537 {
1538 "UPDATE",
1539 handle_request_update,
1540 CMD_CONTEXT_ANY,
1541 "UPDATE <filename> <values> [<values> ...]\n"
1542 ,
1543 "Adds the given file to the internal cache if it is not yet known and\n"
1544 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1545 "for details.\n"
1546 "\n"
1547 "Each <values> has the following form:\n"
1548 " <values> = <time>:<value>[:<value>[...]]\n"
1549 "See the rrdupdate(1) manpage for details.\n"
1550 },
1551 {
1552 "WROTE",
1553 handle_request_wrote,
1554 CMD_CONTEXT_JOURNAL,
1555 NULL,
1556 NULL
1557 },
1558 {
1559 "FLUSH",
1560 handle_request_flush,
1561 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1562 "FLUSH <filename>\n"
1563 ,
1564 "Adds the given filename to the head of the update queue and returns\n"
1565 "after it has been dequeued.\n"
1566 },
1567 {
1568 "FLUSHALL",
1569 handle_request_flushall,
1570 CMD_CONTEXT_CLIENT,
1571 "FLUSHALL\n"
1572 ,
1573 "Triggers writing of all pending updates. Returns immediately.\n"
1574 },
1575 {
1576 "PENDING",
1577 handle_request_pending,
1578 CMD_CONTEXT_CLIENT,
1579 "PENDING <filename>\n"
1580 ,
1581 "Shows any 'pending' updates for a file, in order.\n"
1582 "The updates shown have not yet been written to the underlying RRD file.\n"
1583 },
1584 {
1585 "FORGET",
1586 handle_request_forget,
1587 CMD_CONTEXT_ANY,
1588 "FORGET <filename>\n"
1589 ,
1590 "Removes the file completely from the cache.\n"
1591 "Any pending updates for the file will be lost.\n"
1592 },
1593 {
1594 "QUEUE",
1595 handle_request_queue,
1596 CMD_CONTEXT_CLIENT,
1597 "QUEUE\n"
1598 ,
1599 "Shows all files in the output queue.\n"
1600 "The output is zero or more lines in the following format:\n"
1601 "(where <num_vals> is the number of values to be written)\n"
1602 "\n"
1603 "<num_vals> <filename>\n"
1604 },
1605 {
1606 "STATS",
1607 handle_request_stats,
1608 CMD_CONTEXT_CLIENT,
1609 "STATS\n"
1610 ,
1611 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1612 "a description of the values.\n"
1613 },
1614 {
1615 "HELP",
1616 handle_request_help,
1617 CMD_CONTEXT_CLIENT,
1618 "HELP [<command>]\n",
1619 NULL, /* special! */
1620 },
1621 {
1622 "BATCH",
1623 batch_start,
1624 CMD_CONTEXT_CLIENT,
1625 "BATCH\n"
1626 ,
1627 "The 'BATCH' command permits the client to initiate a bulk load\n"
1628 " of commands to rrdcached.\n"
1629 "\n"
1630 "Usage:\n"
1631 "\n"
1632 " client: BATCH\n"
1633 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1634 " client: command #1\n"
1635 " client: command #2\n"
1636 " client: ... and so on\n"
1637 " client: .\n"
1638 " server: 2 errors\n"
1639 " server: 7 message for command #7\n"
1640 " server: 9 message for command #9\n"
1641 "\n"
1642 "For more information, consult the rrdcached(1) documentation.\n"
1643 },
1644 {
1645 ".", /* BATCH terminator */
1646 batch_done,
1647 CMD_CONTEXT_BATCH,
1648 NULL,
1649 NULL
1650 },
1651 {
1652 "QUIT",
1653 handle_request_quit,
1654 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1655 "QUIT\n"
1656 ,
1657 "Disconnect from rrdcached.\n"
1658 }
1659 }; /* }}} command_t list_of_commands[] */
1660 static size_t list_of_commands_len = sizeof (list_of_commands)
1661 / sizeof (list_of_commands[0]);
1663 static command_t *find_command(char *cmd)
1664 {
1665 size_t i;
1667 for (i = 0; i < list_of_commands_len; i++)
1668 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1669 return (&list_of_commands[i]);
1670 return NULL;
1671 }
1673 /* We currently use the index in the `list_of_commands' array as a bit position
1674 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1675 * outside these functions so that switching to a more elegant storage method
1676 * is easily possible. */
1677 static ssize_t find_command_index (const char *cmd) /* {{{ */
1678 {
1679 size_t i;
1681 for (i = 0; i < list_of_commands_len; i++)
1682 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1683 return ((ssize_t) i);
1684 return (-1);
1685 } /* }}} ssize_t find_command_index */
1687 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1688 const char *cmd)
1689 {
1690 ssize_t i;
1692 if (JOURNAL_REPLAY(sock))
1693 return (1);
1695 if (cmd == NULL)
1696 return (-1);
1698 if ((strcasecmp ("QUIT", cmd) == 0)
1699 || (strcasecmp ("HELP", cmd) == 0))
1700 return (1);
1701 else if (strcmp (".", cmd) == 0)
1702 cmd = "BATCH";
1704 i = find_command_index (cmd);
1705 if (i < 0)
1706 return (-1);
1707 assert (i < 32);
1709 if ((sock->permissions & (1 << i)) != 0)
1710 return (1);
1711 return (0);
1712 } /* }}} int socket_permission_check */
1714 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1715 const char *cmd)
1716 {
1717 ssize_t i;
1719 i = find_command_index (cmd);
1720 if (i < 0)
1721 return (-1);
1722 assert (i < 32);
1724 sock->permissions |= (1 << i);
1725 return (0);
1726 } /* }}} int socket_permission_add */
1728 /* check whether commands are received in the expected context */
1729 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1730 {
1731 if (JOURNAL_REPLAY(sock))
1732 return (cmd->context & CMD_CONTEXT_JOURNAL);
1733 else if (sock->batch_start)
1734 return (cmd->context & CMD_CONTEXT_BATCH);
1735 else
1736 return (cmd->context & CMD_CONTEXT_CLIENT);
1738 /* NOTREACHED */
1739 assert(1==0);
1740 }
1742 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1743 {
1744 int status;
1745 char *cmd_str;
1746 char *resp_txt;
1747 command_t *help = NULL;
1749 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1750 if (status == 0)
1751 help = find_command(cmd_str);
1753 if (help && (help->syntax || help->help))
1754 {
1755 char tmp[CMD_MAX];
1757 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1758 resp_txt = tmp;
1760 if (help->syntax)
1761 add_response_info(sock, "Usage: %s\n", help->syntax);
1763 if (help->help)
1764 add_response_info(sock, "%s\n", help->help);
1765 }
1766 else
1767 {
1768 size_t i;
1770 resp_txt = "Command overview\n";
1772 for (i = 0; i < list_of_commands_len; i++)
1773 {
1774 if (list_of_commands[i].syntax == NULL)
1775 continue;
1776 add_response_info (sock, "%s", list_of_commands[i].syntax);
1777 }
1778 }
1780 return send_response(sock, RESP_OK, resp_txt);
1781 } /* }}} int handle_request_help */
1783 static int handle_request (DISPATCH_PROTO) /* {{{ */
1784 {
1785 char *buffer_ptr = buffer;
1786 char *cmd_str = NULL;
1787 command_t *cmd = NULL;
1788 int status;
1790 assert (buffer[buffer_size - 1] == '\0');
1792 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1793 if (status != 0)
1794 {
1795 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1796 return (-1);
1797 }
1799 if (sock != NULL && sock->batch_start)
1800 sock->batch_cmd++;
1802 cmd = find_command(cmd_str);
1803 if (!cmd)
1804 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1806 if (!socket_permission_check (sock, cmd->cmd))
1807 return send_response(sock, RESP_ERR, "Permission denied.\n");
1809 if (!command_check_context(sock, cmd))
1810 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1812 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1813 } /* }}} int handle_request */
1815 static void journal_set_free (journal_set *js) /* {{{ */
1816 {
1817 if (js == NULL)
1818 return;
1820 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1822 free(js);
1823 } /* }}} journal_set_free */
1825 static void journal_set_remove (journal_set *js) /* {{{ */
1826 {
1827 if (js == NULL)
1828 return;
1830 for (uint i=0; i < js->files_num; i++)
1831 {
1832 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1833 unlink(js->files[i]);
1834 }
1835 } /* }}} journal_set_remove */
1837 /* close current journal file handle.
1838 * MUST hold journal_lock before calling */
1839 static void journal_close(void) /* {{{ */
1840 {
1841 if (journal_fh != NULL)
1842 {
1843 if (fclose(journal_fh) != 0)
1844 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1845 }
1847 journal_fh = NULL;
1848 journal_size = 0;
1849 } /* }}} journal_close */
1851 /* MUST hold journal_lock before calling */
1852 static void journal_new_file(void) /* {{{ */
1853 {
1854 struct timeval now;
1855 int new_fd;
1856 char new_file[PATH_MAX + 1];
1858 assert(journal_dir != NULL);
1859 assert(journal_cur != NULL);
1861 journal_close();
1863 gettimeofday(&now, NULL);
1864 /* this format assures that the files sort in strcmp() order */
1865 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1866 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1868 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1869 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1870 if (new_fd < 0)
1871 goto error;
1873 journal_fh = fdopen(new_fd, "a");
1874 if (journal_fh == NULL)
1875 goto error;
1877 journal_size = ftell(journal_fh);
1878 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1880 /* record the file in the journal set */
1881 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1883 return;
1885 error:
1886 RRDD_LOG(LOG_CRIT,
1887 "JOURNALING DISABLED: Error while trying to create %s : %s",
1888 new_file, rrd_strerror(errno));
1889 RRDD_LOG(LOG_CRIT,
1890 "JOURNALING DISABLED: All values will be flushed at shutdown");
1892 close(new_fd);
1893 config_flush_at_shutdown = 1;
1895 } /* }}} journal_new_file */
1897 /* MUST NOT hold journal_lock before calling this */
1898 static void journal_rotate(void) /* {{{ */
1899 {
1900 journal_set *old_js = NULL;
1902 if (journal_dir == NULL)
1903 return;
1905 RRDD_LOG(LOG_DEBUG, "rotating journals");
1907 pthread_mutex_lock(&stats_lock);
1908 ++stats_journal_rotate;
1909 pthread_mutex_unlock(&stats_lock);
1911 pthread_mutex_lock(&journal_lock);
1913 journal_close();
1915 /* rotate the journal sets */
1916 old_js = journal_old;
1917 journal_old = journal_cur;
1918 journal_cur = calloc(1, sizeof(journal_set));
1920 if (journal_cur != NULL)
1921 journal_new_file();
1922 else
1923 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1925 pthread_mutex_unlock(&journal_lock);
1927 journal_set_remove(old_js);
1928 journal_set_free (old_js);
1930 } /* }}} static void journal_rotate */
1932 /* MUST hold journal_lock when calling */
1933 static void journal_done(void) /* {{{ */
1934 {
1935 if (journal_cur == NULL)
1936 return;
1938 journal_close();
1940 if (config_flush_at_shutdown)
1941 {
1942 RRDD_LOG(LOG_INFO, "removing journals");
1943 journal_set_remove(journal_old);
1944 journal_set_remove(journal_cur);
1945 }
1946 else
1947 {
1948 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1949 "journals will be used at next startup");
1950 }
1952 journal_set_free(journal_cur);
1953 journal_set_free(journal_old);
1954 free(journal_dir);
1956 } /* }}} static void journal_done */
1958 static int journal_write(char *cmd, char *args) /* {{{ */
1959 {
1960 int chars;
1962 if (journal_fh == NULL)
1963 return 0;
1965 pthread_mutex_lock(&journal_lock);
1966 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1967 journal_size += chars;
1969 if (journal_size > JOURNAL_MAX)
1970 journal_new_file();
1972 pthread_mutex_unlock(&journal_lock);
1974 if (chars > 0)
1975 {
1976 pthread_mutex_lock(&stats_lock);
1977 stats_journal_bytes += chars;
1978 pthread_mutex_unlock(&stats_lock);
1979 }
1981 return chars;
1982 } /* }}} static int journal_write */
1984 static int journal_replay (const char *file) /* {{{ */
1985 {
1986 FILE *fh;
1987 int entry_cnt = 0;
1988 int fail_cnt = 0;
1989 uint64_t line = 0;
1990 char entry[CMD_MAX];
1991 time_t now;
1993 if (file == NULL) return 0;
1995 {
1996 char *reason = "unknown error";
1997 int status = 0;
1998 struct stat statbuf;
2000 memset(&statbuf, 0, sizeof(statbuf));
2001 if (stat(file, &statbuf) != 0)
2002 {
2003 reason = "stat error";
2004 status = errno;
2005 }
2006 else if (!S_ISREG(statbuf.st_mode))
2007 {
2008 reason = "not a regular file";
2009 status = EPERM;
2010 }
2011 if (statbuf.st_uid != daemon_uid)
2012 {
2013 reason = "not owned by daemon user";
2014 status = EACCES;
2015 }
2016 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2017 {
2018 reason = "must not be user/group writable";
2019 status = EACCES;
2020 }
2022 if (status != 0)
2023 {
2024 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2025 file, rrd_strerror(status), reason);
2026 return 0;
2027 }
2028 }
2030 fh = fopen(file, "r");
2031 if (fh == NULL)
2032 {
2033 if (errno != ENOENT)
2034 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2035 file, rrd_strerror(errno));
2036 return 0;
2037 }
2038 else
2039 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2041 now = time(NULL);
2043 while(!feof(fh))
2044 {
2045 size_t entry_len;
2047 ++line;
2048 if (fgets(entry, sizeof(entry), fh) == NULL)
2049 break;
2050 entry_len = strlen(entry);
2052 /* check \n termination in case journal writing crashed mid-line */
2053 if (entry_len == 0)
2054 continue;
2055 else if (entry[entry_len - 1] != '\n')
2056 {
2057 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2058 ++fail_cnt;
2059 continue;
2060 }
2062 entry[entry_len - 1] = '\0';
2064 if (handle_request(NULL, now, entry, entry_len) == 0)
2065 ++entry_cnt;
2066 else
2067 ++fail_cnt;
2068 }
2070 fclose(fh);
2072 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2073 entry_cnt, fail_cnt);
2075 return entry_cnt > 0 ? 1 : 0;
2076 } /* }}} static int journal_replay */
2078 static int journal_sort(const void *v1, const void *v2)
2079 {
2080 char **jn1 = (char **) v1;
2081 char **jn2 = (char **) v2;
2083 return strcmp(*jn1,*jn2);
2084 }
2086 static void journal_init(void) /* {{{ */
2087 {
2088 int had_journal = 0;
2089 DIR *dir;
2090 struct dirent *dent;
2091 char path[PATH_MAX+1];
2093 if (journal_dir == NULL) return;
2095 pthread_mutex_lock(&journal_lock);
2097 journal_cur = calloc(1, sizeof(journal_set));
2098 if (journal_cur == NULL)
2099 {
2100 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2101 return;
2102 }
2104 RRDD_LOG(LOG_INFO, "checking for journal files");
2106 /* Handle old journal files during transition. This gives them the
2107 * correct sort order. TODO: remove after first release
2108 */
2109 {
2110 char old_path[PATH_MAX+1];
2111 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2112 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2113 rename(old_path, path);
2115 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2116 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2117 rename(old_path, path);
2118 }
2120 dir = opendir(journal_dir);
2121 while ((dent = readdir(dir)) != NULL)
2122 {
2123 /* looks like a journal file? */
2124 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2125 continue;
2127 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2129 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2130 {
2131 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2132 dent->d_name);
2133 break;
2134 }
2135 }
2136 closedir(dir);
2138 qsort(journal_cur->files, journal_cur->files_num,
2139 sizeof(journal_cur->files[0]), journal_sort);
2141 for (uint i=0; i < journal_cur->files_num; i++)
2142 had_journal += journal_replay(journal_cur->files[i]);
2144 journal_new_file();
2146 /* it must have been a crash. start a flush */
2147 if (had_journal && config_flush_at_shutdown)
2148 flush_old_values(-1);
2150 pthread_mutex_unlock(&journal_lock);
2152 RRDD_LOG(LOG_INFO, "journal processing complete");
2154 } /* }}} static void journal_init */
2156 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2157 {
2158 assert(sock != NULL);
2160 free(sock->rbuf); sock->rbuf = NULL;
2161 free(sock->wbuf); sock->wbuf = NULL;
2162 free(sock);
2163 } /* }}} void free_listen_socket */
2165 static void close_connection(listen_socket_t *sock) /* {{{ */
2166 {
2167 if (sock->fd >= 0)
2168 {
2169 close(sock->fd);
2170 sock->fd = -1;
2171 }
2173 free_listen_socket(sock);
2175 } /* }}} void close_connection */
2177 static void *connection_thread_main (void *args) /* {{{ */
2178 {
2179 listen_socket_t *sock;
2180 int fd;
2182 sock = (listen_socket_t *) args;
2183 fd = sock->fd;
2185 /* init read buffers */
2186 sock->next_read = sock->next_cmd = 0;
2187 sock->rbuf = malloc(RBUF_SIZE);
2188 if (sock->rbuf == NULL)
2189 {
2190 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2191 close_connection(sock);
2192 return NULL;
2193 }
2195 pthread_mutex_lock (&connection_threads_lock);
2196 connection_threads_num++;
2197 pthread_mutex_unlock (&connection_threads_lock);
2199 while (state == RUNNING)
2200 {
2201 char *cmd;
2202 ssize_t cmd_len;
2203 ssize_t rbytes;
2204 time_t now;
2206 struct pollfd pollfd;
2207 int status;
2209 pollfd.fd = fd;
2210 pollfd.events = POLLIN | POLLPRI;
2211 pollfd.revents = 0;
2213 status = poll (&pollfd, 1, /* timeout = */ 500);
2214 if (state != RUNNING)
2215 break;
2216 else if (status == 0) /* timeout */
2217 continue;
2218 else if (status < 0) /* error */
2219 {
2220 status = errno;
2221 if (status != EINTR)
2222 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2223 continue;
2224 }
2226 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2227 break;
2228 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2229 {
2230 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2231 "poll(2) returned something unexpected: %#04hx",
2232 pollfd.revents);
2233 break;
2234 }
2236 rbytes = read(fd, sock->rbuf + sock->next_read,
2237 RBUF_SIZE - sock->next_read);
2238 if (rbytes < 0)
2239 {
2240 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2241 break;
2242 }
2243 else if (rbytes == 0)
2244 break; /* eof */
2246 sock->next_read += rbytes;
2248 if (sock->batch_start)
2249 now = sock->batch_start;
2250 else
2251 now = time(NULL);
2253 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2254 {
2255 status = handle_request (sock, now, cmd, cmd_len+1);
2256 if (status != 0)
2257 goto out_close;
2258 }
2259 }
2261 out_close:
2262 close_connection(sock);
2264 /* Remove this thread from the connection threads list */
2265 pthread_mutex_lock (&connection_threads_lock);
2266 connection_threads_num--;
2267 if (connection_threads_num <= 0)
2268 pthread_cond_broadcast(&connection_threads_done);
2269 pthread_mutex_unlock (&connection_threads_lock);
2271 return (NULL);
2272 } /* }}} void *connection_thread_main */
2274 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2275 {
2276 int fd;
2277 struct sockaddr_un sa;
2278 listen_socket_t *temp;
2279 int status;
2280 const char *path;
2281 char *path_copy, *dir;
2283 path = sock->addr;
2284 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2285 path += strlen("unix:");
2287 /* dirname may modify its argument */
2288 path_copy = strdup(path);
2289 if (path_copy == NULL)
2290 {
2291 fprintf(stderr, "rrdcached: strdup(): %s\n",
2292 rrd_strerror(errno));
2293 return (-1);
2294 }
2296 dir = dirname(path_copy);
2297 if (rrd_mkdir_p(dir, 0777) != 0)
2298 {
2299 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2300 dir, rrd_strerror(errno));
2301 return (-1);
2302 }
2304 free(path_copy);
2306 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2307 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2308 if (temp == NULL)
2309 {
2310 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2311 return (-1);
2312 }
2313 listen_fds = temp;
2314 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2316 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2317 if (fd < 0)
2318 {
2319 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2320 rrd_strerror(errno));
2321 return (-1);
2322 }
2324 memset (&sa, 0, sizeof (sa));
2325 sa.sun_family = AF_UNIX;
2326 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2328 /* if we've gotten this far, we own the pid file. any daemon started
2329 * with the same args must not be alive. therefore, ensure that we can
2330 * create the socket...
2331 */
2332 unlink(path);
2334 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2335 if (status != 0)
2336 {
2337 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2338 path, rrd_strerror(errno));
2339 close (fd);
2340 return (-1);
2341 }
2343 /* tweak the sockets group ownership */
2344 if (sock->socket_group != (gid_t)-1)
2345 {
2346 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2347 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2348 {
2349 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2350 }
2351 }
2353 if (sock->socket_permissions != (mode_t)-1)
2354 {
2355 if (chmod(path, sock->socket_permissions) != 0)
2356 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2357 (unsigned int)sock->socket_permissions, strerror(errno));
2358 }
2360 status = listen (fd, /* backlog = */ 10);
2361 if (status != 0)
2362 {
2363 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2364 path, rrd_strerror(errno));
2365 close (fd);
2366 unlink (path);
2367 return (-1);
2368 }
2370 listen_fds[listen_fds_num].fd = fd;
2371 listen_fds[listen_fds_num].family = PF_UNIX;
2372 strncpy(listen_fds[listen_fds_num].addr, path,
2373 sizeof (listen_fds[listen_fds_num].addr) - 1);
2374 listen_fds_num++;
2376 return (0);
2377 } /* }}} int open_listen_socket_unix */
2379 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2380 {
2381 struct addrinfo ai_hints;
2382 struct addrinfo *ai_res;
2383 struct addrinfo *ai_ptr;
2384 char addr_copy[NI_MAXHOST];
2385 char *addr;
2386 char *port;
2387 int status;
2389 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2390 addr_copy[sizeof (addr_copy) - 1] = 0;
2391 addr = addr_copy;
2393 memset (&ai_hints, 0, sizeof (ai_hints));
2394 ai_hints.ai_flags = 0;
2395 #ifdef AI_ADDRCONFIG
2396 ai_hints.ai_flags |= AI_ADDRCONFIG;
2397 #endif
2398 ai_hints.ai_family = AF_UNSPEC;
2399 ai_hints.ai_socktype = SOCK_STREAM;
2401 port = NULL;
2402 if (*addr == '[') /* IPv6+port format */
2403 {
2404 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2405 addr++;
2407 port = strchr (addr, ']');
2408 if (port == NULL)
2409 {
2410 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2411 return (-1);
2412 }
2413 *port = 0;
2414 port++;
2416 if (*port == ':')
2417 port++;
2418 else if (*port == 0)
2419 port = NULL;
2420 else
2421 {
2422 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2423 return (-1);
2424 }
2425 } /* if (*addr == '[') */
2426 else
2427 {
2428 port = rindex(addr, ':');
2429 if (port != NULL)
2430 {
2431 *port = 0;
2432 port++;
2433 }
2434 }
2435 ai_res = NULL;
2436 status = getaddrinfo (addr,
2437 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2438 &ai_hints, &ai_res);
2439 if (status != 0)
2440 {
2441 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2442 addr, gai_strerror (status));
2443 return (-1);
2444 }
2446 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2447 {
2448 int fd;
2449 listen_socket_t *temp;
2450 int one = 1;
2452 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2453 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2454 if (temp == NULL)
2455 {
2456 fprintf (stderr,
2457 "rrdcached: open_listen_socket_network: realloc failed.\n");
2458 continue;
2459 }
2460 listen_fds = temp;
2461 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2463 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2464 if (fd < 0)
2465 {
2466 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2467 rrd_strerror(errno));
2468 continue;
2469 }
2471 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2473 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2474 if (status != 0)
2475 {
2476 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2477 sock->addr, rrd_strerror(errno));
2478 close (fd);
2479 continue;
2480 }
2482 status = listen (fd, /* backlog = */ 10);
2483 if (status != 0)
2484 {
2485 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2486 sock->addr, rrd_strerror(errno));
2487 close (fd);
2488 freeaddrinfo(ai_res);
2489 return (-1);
2490 }
2492 listen_fds[listen_fds_num].fd = fd;
2493 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2494 listen_fds_num++;
2495 } /* for (ai_ptr) */
2497 freeaddrinfo(ai_res);
2498 return (0);
2499 } /* }}} static int open_listen_socket_network */
2501 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2502 {
2503 assert(sock != NULL);
2504 assert(sock->addr != NULL);
2506 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2507 || sock->addr[0] == '/')
2508 return (open_listen_socket_unix(sock));
2509 else
2510 return (open_listen_socket_network(sock));
2511 } /* }}} int open_listen_socket */
2513 static int close_listen_sockets (void) /* {{{ */
2514 {
2515 size_t i;
2517 for (i = 0; i < listen_fds_num; i++)
2518 {
2519 close (listen_fds[i].fd);
2521 if (listen_fds[i].family == PF_UNIX)
2522 unlink(listen_fds[i].addr);
2523 }
2525 free (listen_fds);
2526 listen_fds = NULL;
2527 listen_fds_num = 0;
2529 return (0);
2530 } /* }}} int close_listen_sockets */
2532 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2533 {
2534 struct pollfd *pollfds;
2535 int pollfds_num;
2536 int status;
2537 int i;
2539 if (listen_fds_num < 1)
2540 {
2541 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2542 return (NULL);
2543 }
2545 pollfds_num = listen_fds_num;
2546 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2547 if (pollfds == NULL)
2548 {
2549 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2550 return (NULL);
2551 }
2552 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2554 RRDD_LOG(LOG_INFO, "listening for connections");
2556 while (state == RUNNING)
2557 {
2558 for (i = 0; i < pollfds_num; i++)
2559 {
2560 pollfds[i].fd = listen_fds[i].fd;
2561 pollfds[i].events = POLLIN | POLLPRI;
2562 pollfds[i].revents = 0;
2563 }
2565 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2566 if (state != RUNNING)
2567 break;
2568 else if (status == 0) /* timeout */
2569 continue;
2570 else if (status < 0) /* error */
2571 {
2572 status = errno;
2573 if (status != EINTR)
2574 {
2575 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2576 }
2577 continue;
2578 }
2580 for (i = 0; i < pollfds_num; i++)
2581 {
2582 listen_socket_t *client_sock;
2583 struct sockaddr_storage client_sa;
2584 socklen_t client_sa_size;
2585 pthread_t tid;
2586 pthread_attr_t attr;
2588 if (pollfds[i].revents == 0)
2589 continue;
2591 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2592 {
2593 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2594 "poll(2) returned something unexpected for listen FD #%i.",
2595 pollfds[i].fd);
2596 continue;
2597 }
2599 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2600 if (client_sock == NULL)
2601 {
2602 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2603 continue;
2604 }
2605 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2607 client_sa_size = sizeof (client_sa);
2608 client_sock->fd = accept (pollfds[i].fd,
2609 (struct sockaddr *) &client_sa, &client_sa_size);
2610 if (client_sock->fd < 0)
2611 {
2612 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2613 free(client_sock);
2614 continue;
2615 }
2617 pthread_attr_init (&attr);
2618 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2620 status = pthread_create (&tid, &attr, connection_thread_main,
2621 client_sock);
2622 if (status != 0)
2623 {
2624 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2625 close_connection(client_sock);
2626 continue;
2627 }
2628 } /* for (pollfds_num) */
2629 } /* while (state == RUNNING) */
2631 RRDD_LOG(LOG_INFO, "starting shutdown");
2633 close_listen_sockets ();
2635 pthread_mutex_lock (&connection_threads_lock);
2636 while (connection_threads_num > 0)
2637 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2638 pthread_mutex_unlock (&connection_threads_lock);
2640 free(pollfds);
2642 return (NULL);
2643 } /* }}} void *listen_thread_main */
2645 static int daemonize (void) /* {{{ */
2646 {
2647 int pid_fd;
2648 char *base_dir;
2650 daemon_uid = geteuid();
2652 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2653 if (pid_fd < 0)
2654 pid_fd = check_pidfile();
2655 if (pid_fd < 0)
2656 return pid_fd;
2658 /* open all the listen sockets */
2659 if (config_listen_address_list_len > 0)
2660 {
2661 for (size_t i = 0; i < config_listen_address_list_len; i++)
2662 open_listen_socket (config_listen_address_list[i]);
2664 rrd_free_ptrs((void ***) &config_listen_address_list,
2665 &config_listen_address_list_len);
2666 }
2667 else
2668 {
2669 listen_socket_t sock;
2670 memset(&sock, 0, sizeof(sock));
2671 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2672 open_listen_socket (&sock);
2673 }
2675 if (listen_fds_num < 1)
2676 {
2677 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2678 goto error;
2679 }
2681 if (!stay_foreground)
2682 {
2683 pid_t child;
2685 child = fork ();
2686 if (child < 0)
2687 {
2688 fprintf (stderr, "daemonize: fork(2) failed.\n");
2689 goto error;
2690 }
2691 else if (child > 0)
2692 exit(0);
2694 /* Become session leader */
2695 setsid ();
2697 /* Open the first three file descriptors to /dev/null */
2698 close (2);
2699 close (1);
2700 close (0);
2702 open ("/dev/null", O_RDWR);
2703 if (dup(0) == -1 || dup(0) == -1){
2704 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2705 }
2706 } /* if (!stay_foreground) */
2708 /* Change into the /tmp directory. */
2709 base_dir = (config_base_dir != NULL)
2710 ? config_base_dir
2711 : "/tmp";
2713 if (chdir (base_dir) != 0)
2714 {
2715 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2716 goto error;
2717 }
2719 install_signal_handlers();
2721 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2722 RRDD_LOG(LOG_INFO, "starting up");
2724 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2725 (GDestroyNotify) free_cache_item);
2726 if (cache_tree == NULL)
2727 {
2728 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2729 goto error;
2730 }
2732 return write_pidfile (pid_fd);
2734 error:
2735 remove_pidfile();
2736 return -1;
2737 } /* }}} int daemonize */
2739 static int cleanup (void) /* {{{ */
2740 {
2741 pthread_cond_broadcast (&flush_cond);
2742 pthread_join (flush_thread, NULL);
2744 pthread_cond_broadcast (&queue_cond);
2745 for (int i = 0; i < config_queue_threads; i++)
2746 pthread_join (queue_threads[i], NULL);
2748 if (config_flush_at_shutdown)
2749 {
2750 assert(cache_queue_head == NULL);
2751 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2752 }
2754 free(queue_threads);
2755 free(config_base_dir);
2757 pthread_mutex_lock(&cache_lock);
2758 g_tree_destroy(cache_tree);
2760 pthread_mutex_lock(&journal_lock);
2761 journal_done();
2763 RRDD_LOG(LOG_INFO, "goodbye");
2764 closelog ();
2766 remove_pidfile ();
2767 free(config_pid_file);
2769 return (0);
2770 } /* }}} int cleanup */
2772 static int read_options (int argc, char **argv) /* {{{ */
2773 {
2774 int option;
2775 int status = 0;
2777 char **permissions = NULL;
2778 size_t permissions_len = 0;
2780 gid_t socket_group = (gid_t)-1;
2781 mode_t socket_permissions = (mode_t)-1;
2783 while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:m:h?")) != -1)
2784 {
2785 switch (option)
2786 {
2787 case 'g':
2788 stay_foreground=1;
2789 break;
2791 case 'l':
2792 {
2793 listen_socket_t *new;
2795 new = malloc(sizeof(listen_socket_t));
2796 if (new == NULL)
2797 {
2798 fprintf(stderr, "read_options: malloc failed.\n");
2799 return(2);
2800 }
2801 memset(new, 0, sizeof(listen_socket_t));
2803 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2805 /* Add permissions to the socket {{{ */
2806 if (permissions_len != 0)
2807 {
2808 size_t i;
2809 for (i = 0; i < permissions_len; i++)
2810 {
2811 status = socket_permission_add (new, permissions[i]);
2812 if (status != 0)
2813 {
2814 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2815 "socket failed. Most likely, this permission doesn't "
2816 "exist. Check your command line.\n", permissions[i]);
2817 status = 4;
2818 }
2819 }
2820 }
2821 else /* if (permissions_len == 0) */
2822 {
2823 /* Add permission for ALL commands to the socket. */
2824 size_t i;
2825 for (i = 0; i < list_of_commands_len; i++)
2826 {
2827 status = socket_permission_add (new, list_of_commands[i].cmd);
2828 if (status != 0)
2829 {
2830 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2831 "socket failed. This should never happen, ever! Sorry.\n",
2832 permissions[i]);
2833 status = 4;
2834 }
2835 }
2836 }
2837 /* }}} Done adding permissions. */
2839 new->socket_group = socket_group;
2840 new->socket_permissions = socket_permissions;
2842 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2843 &config_listen_address_list_len, new))
2844 {
2845 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2846 return (2);
2847 }
2848 }
2849 break;
2851 /* set socket group permissions */
2852 case 's':
2853 {
2854 gid_t group_gid;
2855 struct group *grp;
2857 group_gid = strtoul(optarg, NULL, 10);
2858 if (errno != EINVAL && group_gid>0)
2859 {
2860 /* we were passed a number */
2861 grp = getgrgid(group_gid);
2862 }
2863 else
2864 {
2865 grp = getgrnam(optarg);
2866 }
2868 if (grp)
2869 {
2870 socket_group = grp->gr_gid;
2871 }
2872 else
2873 {
2874 /* no idea what the user wanted... */
2875 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2876 return (5);
2877 }
2878 }
2879 break;
2881 /* set socket file permissions */
2882 case 'm':
2883 {
2884 long tmp;
2885 char *endptr = NULL;
2887 tmp = strtol (optarg, &endptr, 8);
2888 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2889 || (tmp > 07777) || (tmp < 0)) {
2890 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2891 optarg);
2892 return (5);
2893 }
2895 socket_permissions = (mode_t)tmp;
2896 }
2897 break;
2899 case 'P':
2900 {
2901 char *optcopy;
2902 char *saveptr;
2903 char *dummy;
2904 char *ptr;
2906 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2908 optcopy = strdup (optarg);
2909 dummy = optcopy;
2910 saveptr = NULL;
2911 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2912 {
2913 dummy = NULL;
2914 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2915 }
2917 free (optcopy);
2918 }
2919 break;
2921 case 'f':
2922 {
2923 int temp;
2925 temp = atoi (optarg);
2926 if (temp > 0)
2927 config_flush_interval = temp;
2928 else
2929 {
2930 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2931 status = 3;
2932 }
2933 }
2934 break;
2936 case 'w':
2937 {
2938 int temp;
2940 temp = atoi (optarg);
2941 if (temp > 0)
2942 config_write_interval = temp;
2943 else
2944 {
2945 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2946 status = 2;
2947 }
2948 }
2949 break;
2951 case 'z':
2952 {
2953 int temp;
2955 temp = atoi(optarg);
2956 if (temp > 0)
2957 config_write_jitter = temp;
2958 else
2959 {
2960 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2961 status = 2;
2962 }
2964 break;
2965 }
2967 case 't':
2968 {
2969 int threads;
2970 threads = atoi(optarg);
2971 if (threads >= 1)
2972 config_queue_threads = threads;
2973 else
2974 {
2975 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2976 return 1;
2977 }
2978 }
2979 break;
2981 case 'B':
2982 config_write_base_only = 1;
2983 break;
2985 case 'b':
2986 {
2987 size_t len;
2988 char base_realpath[PATH_MAX];
2990 if (config_base_dir != NULL)
2991 free (config_base_dir);
2992 config_base_dir = strdup (optarg);
2993 if (config_base_dir == NULL)
2994 {
2995 fprintf (stderr, "read_options: strdup failed.\n");
2996 return (3);
2997 }
2999 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3000 {
3001 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3002 config_base_dir, rrd_strerror (errno));
3003 return (3);
3004 }
3006 /* make sure that the base directory is not resolved via
3007 * symbolic links. this makes some performance-enhancing
3008 * assumptions possible (we don't have to resolve paths
3009 * that start with a "/")
3010 */
3011 if (realpath(config_base_dir, base_realpath) == NULL)
3012 {
3013 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3014 "%s\n", config_base_dir, rrd_strerror(errno));
3015 return 5;
3016 }
3018 len = strlen (config_base_dir);
3019 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3020 {
3021 config_base_dir[len - 1] = 0;
3022 len--;
3023 }
3025 if (len < 1)
3026 {
3027 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3028 return (4);
3029 }
3031 _config_base_dir_len = len;
3033 len = strlen (base_realpath);
3034 while ((len > 0) && (base_realpath[len - 1] == '/'))
3035 {
3036 base_realpath[len - 1] = '\0';
3037 len--;
3038 }
3040 if (strncmp(config_base_dir,
3041 base_realpath, sizeof(base_realpath)) != 0)
3042 {
3043 fprintf(stderr,
3044 "Base directory (-b) resolved via file system links!\n"
3045 "Please consult rrdcached '-b' documentation!\n"
3046 "Consider specifying the real directory (%s)\n",
3047 base_realpath);
3048 return 5;
3049 }
3050 }
3051 break;
3053 case 'p':
3054 {
3055 if (config_pid_file != NULL)
3056 free (config_pid_file);
3057 config_pid_file = strdup (optarg);
3058 if (config_pid_file == NULL)
3059 {
3060 fprintf (stderr, "read_options: strdup failed.\n");
3061 return (3);
3062 }
3063 }
3064 break;
3066 case 'F':
3067 config_flush_at_shutdown = 1;
3068 break;
3070 case 'j':
3071 {
3072 const char *dir = journal_dir = strdup(optarg);
3074 status = rrd_mkdir_p(dir, 0777);
3075 if (status != 0)
3076 {
3077 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3078 dir, rrd_strerror(errno));
3079 return 6;
3080 }
3082 if (access(dir, R_OK|W_OK|X_OK) != 0)
3083 {
3084 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3085 errno ? rrd_strerror(errno) : "");
3086 return 6;
3087 }
3088 }
3089 break;
3091 case 'm':
3092 {
3093 int temp = atoi(optarg);
3094 if (temp > 0)
3095 config_alloc_chunk = temp;
3096 else
3097 {
3098 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3099 status = 10;
3100 }
3101 }
3102 break;
3104 case 'h':
3105 case '?':
3106 printf ("RRDCacheD %s\n"
3107 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3108 "\n"
3109 "Usage: rrdcached [options]\n"
3110 "\n"
3111 "Valid options are:\n"
3112 " -l <address> Socket address to listen to.\n"
3113 " -P <perms> Sets the permissions to assign to all following "
3114 "sockets\n"
3115 " -w <seconds> Interval in which to write data.\n"
3116 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3117 " -t <threads> Number of write threads.\n"
3118 " -f <seconds> Interval in which to flush dead data.\n"
3119 " -p <file> Location of the PID-file.\n"
3120 " -b <dir> Base directory to change to.\n"
3121 " -B Restrict file access to paths within -b <dir>\n"
3122 " -g Do not fork and run in the foreground.\n"
3123 " -j <dir> Directory in which to create the journal files.\n"
3124 " -F Always flush all updates at shutdown\n"
3125 " -s <id|name> Group owner of all following UNIX sockets\n"
3126 " (the socket will also have read/write permissions "
3127 "for that group)\n"
3128 " -m <mode> File permissions (octal) of all following UNIX "
3129 "sockets\n"
3130 "\n"
3131 "For more information and a detailed description of all options "
3132 "please refer\n"
3133 "to the rrdcached(1) manual page.\n",
3134 VERSION);
3135 if (option == 'h')
3136 status = -1;
3137 else
3138 status = 1;
3139 break;
3140 } /* switch (option) */
3141 } /* while (getopt) */
3143 /* advise the user when values are not sane */
3144 if (config_flush_interval < 2 * config_write_interval)
3145 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3146 " 2x write interval (-w) !\n");
3147 if (config_write_jitter > config_write_interval)
3148 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3149 " write interval (-w) !\n");
3151 if (config_write_base_only && config_base_dir == NULL)
3152 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3153 " Consult the rrdcached documentation\n");
3155 if (journal_dir == NULL)
3156 config_flush_at_shutdown = 1;
3158 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3160 return (status);
3161 } /* }}} int read_options */
3163 int main (int argc, char **argv)
3164 {
3165 int status;
3167 status = read_options (argc, argv);
3168 if (status != 0)
3169 {
3170 if (status < 0)
3171 status = 0;
3172 return (status);
3173 }
3175 status = daemonize ();
3176 if (status != 0)
3177 {
3178 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3179 return (1);
3180 }
3182 journal_init();
3184 /* start the queue threads */
3185 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3186 if (queue_threads == NULL)
3187 {
3188 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3189 cleanup();
3190 return (1);
3191 }
3192 for (int i = 0; i < config_queue_threads; i++)
3193 {
3194 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3195 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3196 if (status != 0)
3197 {
3198 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3199 cleanup();
3200 return (1);
3201 }
3202 }
3204 /* start the flush thread */
3205 memset(&flush_thread, 0, sizeof(flush_thread));
3206 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3207 if (status != 0)
3208 {
3209 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3210 cleanup();
3211 return (1);
3212 }
3214 listen_thread_main (NULL);
3215 cleanup ();
3217 return (0);
3218 } /* int main */
3220 /*
3221 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3222 */