1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 # include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
111 #include <glib-2.0/glib.h>
112 /* }}} */
114 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
116 #ifndef __GNUC__
117 # define __attribute__(x) /**/
118 #endif
120 /*
121 * Types
122 */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125 struct listen_socket_s
126 {
127 int fd;
128 char addr[PATH_MAX + 1];
129 int family;
131 /* state for BATCH processing */
132 time_t batch_start;
133 int batch_cmd;
135 /* buffered IO */
136 char *rbuf;
137 off_t next_cmd;
138 off_t next_read;
140 char *wbuf;
141 ssize_t wbuf_len;
143 uint32_t permissions;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command_s;
148 typedef struct command_s command_t;
149 /* note: guard against "unused" warnings in the handlers */
150 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
151 time_t now __attribute__((unused)),\
152 char *buffer __attribute__((unused)),\
153 size_t buffer_size __attribute__((unused))
155 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
156 DISPATCH_PROTO
158 struct command_s {
159 char *cmd;
160 int (*handler)(HANDLER_PROTO);
162 char context; /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT (1<<0)
164 #define CMD_CONTEXT_BATCH (1<<1)
165 #define CMD_CONTEXT_JOURNAL (1<<2)
166 #define CMD_CONTEXT_ANY (0x7f)
168 char *syntax;
169 char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176 char *file;
177 char **values;
178 size_t values_num;
179 time_t last_flush_time;
180 time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183 int flags;
184 pthread_cond_t flushed;
185 cache_item_t *prev;
186 cache_item_t *next;
187 };
189 struct callback_flush_data_s
190 {
191 time_t now;
192 time_t abs_timeout;
193 char **keys;
194 size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
199 {
200 HEAD,
201 TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* describe a set of journal files */
206 typedef struct {
207 char **files;
208 size_t files_num;
209 } journal_set;
211 /* max length of socket command or response */
212 #define CMD_MAX 4096
213 #define RBUF_SIZE (CMD_MAX*2)
215 /*
216 * Variables
217 */
218 static int stay_foreground = 0;
219 static uid_t daemon_uid;
221 static listen_socket_t *listen_fds = NULL;
222 static size_t listen_fds_num = 0;
224 static gboolean set_socket_group = FALSE;
225 static gid_t socket_group;
227 enum {
228 RUNNING, /* normal operation */
229 FLUSHING, /* flushing remaining values */
230 SHUTDOWN /* shutting down */
231 } state = RUNNING;
233 static pthread_t *queue_threads;
234 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
235 static int config_queue_threads = 4;
237 static pthread_t flush_thread;
238 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
240 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
241 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
242 static int connection_threads_num = 0;
244 /* Cache stuff */
245 static GTree *cache_tree = NULL;
246 static cache_item_t *cache_queue_head = NULL;
247 static cache_item_t *cache_queue_tail = NULL;
248 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
250 static int config_write_interval = 300;
251 static int config_write_jitter = 0;
252 static int config_flush_interval = 3600;
253 static int config_flush_at_shutdown = 0;
254 static char *config_pid_file = NULL;
255 static char *config_base_dir = NULL;
256 static size_t _config_base_dir_len = 0;
257 static int config_write_base_only = 0;
259 static listen_socket_t **config_listen_address_list = NULL;
260 static size_t config_listen_address_list_len = 0;
262 static uint64_t stats_queue_length = 0;
263 static uint64_t stats_updates_received = 0;
264 static uint64_t stats_flush_received = 0;
265 static uint64_t stats_updates_written = 0;
266 static uint64_t stats_data_sets_written = 0;
267 static uint64_t stats_journal_bytes = 0;
268 static uint64_t stats_journal_rotate = 0;
269 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
271 /* Journaled updates */
272 #define JOURNAL_REPLAY(s) ((s) == NULL)
273 #define JOURNAL_BASE "rrd.journal"
274 static journal_set *journal_cur = NULL;
275 static journal_set *journal_old = NULL;
276 static char *journal_dir = NULL;
277 static FILE *journal_fh = NULL; /* current journal file handle */
278 static long journal_size = 0; /* current journal size */
279 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
280 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
281 static int journal_write(char *cmd, char *args);
282 static void journal_done(void);
283 static void journal_rotate(void);
285 /* prototypes for forward refernces */
286 static int handle_request_help (HANDLER_PROTO);
288 /*
289 * Functions
290 */
291 static void sig_common (const char *sig) /* {{{ */
292 {
293 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
294 state = FLUSHING;
295 pthread_cond_broadcast(&flush_cond);
296 pthread_cond_broadcast(&queue_cond);
297 } /* }}} void sig_common */
299 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
300 {
301 sig_common("INT");
302 } /* }}} void sig_int_handler */
304 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
305 {
306 sig_common("TERM");
307 } /* }}} void sig_term_handler */
309 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
310 {
311 config_flush_at_shutdown = 1;
312 sig_common("USR1");
313 } /* }}} void sig_usr1_handler */
315 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
316 {
317 config_flush_at_shutdown = 0;
318 sig_common("USR2");
319 } /* }}} void sig_usr2_handler */
321 static void install_signal_handlers(void) /* {{{ */
322 {
323 /* These structures are static, because `sigaction' behaves weird if the are
324 * overwritten.. */
325 static struct sigaction sa_int;
326 static struct sigaction sa_term;
327 static struct sigaction sa_pipe;
328 static struct sigaction sa_usr1;
329 static struct sigaction sa_usr2;
331 /* Install signal handlers */
332 memset (&sa_int, 0, sizeof (sa_int));
333 sa_int.sa_handler = sig_int_handler;
334 sigaction (SIGINT, &sa_int, NULL);
336 memset (&sa_term, 0, sizeof (sa_term));
337 sa_term.sa_handler = sig_term_handler;
338 sigaction (SIGTERM, &sa_term, NULL);
340 memset (&sa_pipe, 0, sizeof (sa_pipe));
341 sa_pipe.sa_handler = SIG_IGN;
342 sigaction (SIGPIPE, &sa_pipe, NULL);
344 memset (&sa_pipe, 0, sizeof (sa_usr1));
345 sa_usr1.sa_handler = sig_usr1_handler;
346 sigaction (SIGUSR1, &sa_usr1, NULL);
348 memset (&sa_usr2, 0, sizeof (sa_usr2));
349 sa_usr2.sa_handler = sig_usr2_handler;
350 sigaction (SIGUSR2, &sa_usr2, NULL);
352 } /* }}} void install_signal_handlers */
354 static int open_pidfile(char *action, int oflag) /* {{{ */
355 {
356 int fd;
357 const char *file;
358 char *file_copy, *dir;
360 file = (config_pid_file != NULL)
361 ? config_pid_file
362 : LOCALSTATEDIR "/run/rrdcached.pid";
364 /* dirname may modify its argument */
365 file_copy = strdup(file);
366 if (file_copy == NULL)
367 {
368 fprintf(stderr, "rrdcached: strdup(): %s\n",
369 rrd_strerror(errno));
370 return -1;
371 }
373 dir = dirname(file_copy);
374 if (rrd_mkdir_p(dir, 0777) != 0)
375 {
376 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
377 dir, rrd_strerror(errno));
378 return -1;
379 }
381 free(file_copy);
383 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
384 if (fd < 0)
385 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
386 action, file, rrd_strerror(errno));
388 return(fd);
389 } /* }}} static int open_pidfile */
391 /* check existing pid file to see whether a daemon is running */
392 static int check_pidfile(void)
393 {
394 int pid_fd;
395 pid_t pid;
396 char pid_str[16];
398 pid_fd = open_pidfile("open", O_RDWR);
399 if (pid_fd < 0)
400 return pid_fd;
402 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
403 return -1;
405 pid = atoi(pid_str);
406 if (pid <= 0)
407 return -1;
409 /* another running process that we can signal COULD be
410 * a competing rrdcached */
411 if (pid != getpid() && kill(pid, 0) == 0)
412 {
413 fprintf(stderr,
414 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
415 close(pid_fd);
416 return -1;
417 }
419 lseek(pid_fd, 0, SEEK_SET);
420 if (ftruncate(pid_fd, 0) == -1)
421 {
422 fprintf(stderr,
423 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
424 close(pid_fd);
425 return -1;
426 }
428 fprintf(stderr,
429 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
430 "rrdcached: starting normally.\n", pid);
432 return pid_fd;
433 } /* }}} static int check_pidfile */
435 static int write_pidfile (int fd) /* {{{ */
436 {
437 pid_t pid;
438 FILE *fh;
440 pid = getpid ();
442 fh = fdopen (fd, "w");
443 if (fh == NULL)
444 {
445 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
446 close(fd);
447 return (-1);
448 }
450 fprintf (fh, "%i\n", (int) pid);
451 fclose (fh);
453 return (0);
454 } /* }}} int write_pidfile */
456 static int remove_pidfile (void) /* {{{ */
457 {
458 char *file;
459 int status;
461 file = (config_pid_file != NULL)
462 ? config_pid_file
463 : LOCALSTATEDIR "/run/rrdcached.pid";
465 status = unlink (file);
466 if (status == 0)
467 return (0);
468 return (errno);
469 } /* }}} int remove_pidfile */
471 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
472 {
473 char *eol;
475 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
476 sock->next_read - sock->next_cmd);
478 if (eol == NULL)
479 {
480 /* no commands left, move remainder back to front of rbuf */
481 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
482 sock->next_read - sock->next_cmd);
483 sock->next_read -= sock->next_cmd;
484 sock->next_cmd = 0;
485 *len = 0;
486 return NULL;
487 }
488 else
489 {
490 char *cmd = sock->rbuf + sock->next_cmd;
491 *eol = '\0';
493 sock->next_cmd = eol - sock->rbuf + 1;
495 if (eol > sock->rbuf && *(eol-1) == '\r')
496 *(--eol) = '\0'; /* handle "\r\n" EOL */
498 *len = eol - cmd;
500 return cmd;
501 }
503 /* NOTREACHED */
504 assert(1==0);
505 } /* }}} char *next_cmd */
507 /* add the characters directly to the write buffer */
508 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
509 {
510 char *new_buf;
512 assert(sock != NULL);
514 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
515 if (new_buf == NULL)
516 {
517 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
518 return -1;
519 }
521 strncpy(new_buf + sock->wbuf_len, str, len + 1);
523 sock->wbuf = new_buf;
524 sock->wbuf_len += len;
526 return 0;
527 } /* }}} static int add_to_wbuf */
529 /* add the text to the "extra" info that's sent after the status line */
530 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
531 {
532 va_list argp;
533 char buffer[CMD_MAX];
534 int len;
536 if (JOURNAL_REPLAY(sock)) return 0;
537 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
539 va_start(argp, fmt);
540 #ifdef HAVE_VSNPRINTF
541 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
542 #else
543 len = vsprintf(buffer, fmt, argp);
544 #endif
545 va_end(argp);
546 if (len < 0)
547 {
548 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
549 return -1;
550 }
552 return add_to_wbuf(sock, buffer, len);
553 } /* }}} static int add_response_info */
555 static int count_lines(char *str) /* {{{ */
556 {
557 int lines = 0;
559 if (str != NULL)
560 {
561 while ((str = strchr(str, '\n')) != NULL)
562 {
563 ++lines;
564 ++str;
565 }
566 }
568 return lines;
569 } /* }}} static int count_lines */
571 /* send the response back to the user.
572 * returns 0 on success, -1 on error
573 * write buffer is always zeroed after this call */
574 static int send_response (listen_socket_t *sock, response_code rc,
575 char *fmt, ...) /* {{{ */
576 {
577 va_list argp;
578 char buffer[CMD_MAX];
579 int lines;
580 ssize_t wrote;
581 int rclen, len;
583 if (JOURNAL_REPLAY(sock)) return rc;
585 if (sock->batch_start)
586 {
587 if (rc == RESP_OK)
588 return rc; /* no response on success during BATCH */
589 lines = sock->batch_cmd;
590 }
591 else if (rc == RESP_OK)
592 lines = count_lines(sock->wbuf);
593 else
594 lines = -1;
596 rclen = sprintf(buffer, "%d ", lines);
597 va_start(argp, fmt);
598 #ifdef HAVE_VSNPRINTF
599 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
600 #else
601 len = vsprintf(buffer+rclen, fmt, argp);
602 #endif
603 va_end(argp);
604 if (len < 0)
605 return -1;
607 len += rclen;
609 /* append the result to the wbuf, don't write to the user */
610 if (sock->batch_start)
611 return add_to_wbuf(sock, buffer, len);
613 /* first write must be complete */
614 if (len != write(sock->fd, buffer, len))
615 {
616 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
617 return -1;
618 }
620 if (sock->wbuf != NULL && rc == RESP_OK)
621 {
622 wrote = 0;
623 while (wrote < sock->wbuf_len)
624 {
625 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
626 if (wb <= 0)
627 {
628 RRDD_LOG(LOG_INFO, "send_response: could not write results");
629 return -1;
630 }
631 wrote += wb;
632 }
633 }
635 free(sock->wbuf); sock->wbuf = NULL;
636 sock->wbuf_len = 0;
638 return 0;
639 } /* }}} */
641 static void wipe_ci_values(cache_item_t *ci, time_t when)
642 {
643 ci->values = NULL;
644 ci->values_num = 0;
646 ci->last_flush_time = when;
647 if (config_write_jitter > 0)
648 ci->last_flush_time += (rrd_random() % config_write_jitter);
649 }
651 /* remove_from_queue
652 * remove a "cache_item_t" item from the queue.
653 * must hold 'cache_lock' when calling this
654 */
655 static void remove_from_queue(cache_item_t *ci) /* {{{ */
656 {
657 if (ci == NULL) return;
658 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
660 if (ci->prev == NULL)
661 cache_queue_head = ci->next; /* reset head */
662 else
663 ci->prev->next = ci->next;
665 if (ci->next == NULL)
666 cache_queue_tail = ci->prev; /* reset the tail */
667 else
668 ci->next->prev = ci->prev;
670 ci->next = ci->prev = NULL;
671 ci->flags &= ~CI_FLAGS_IN_QUEUE;
673 pthread_mutex_lock (&stats_lock);
674 assert (stats_queue_length > 0);
675 stats_queue_length--;
676 pthread_mutex_unlock (&stats_lock);
678 } /* }}} static void remove_from_queue */
680 /* free the resources associated with the cache_item_t
681 * must hold cache_lock when calling this function
682 */
683 static void *free_cache_item(cache_item_t *ci) /* {{{ */
684 {
685 if (ci == NULL) return NULL;
687 remove_from_queue(ci);
689 for (size_t i=0; i < ci->values_num; i++)
690 free(ci->values[i]);
692 free (ci->values);
693 free (ci->file);
695 /* in case anyone is waiting */
696 pthread_cond_broadcast(&ci->flushed);
697 pthread_cond_destroy(&ci->flushed);
699 free (ci);
701 return NULL;
702 } /* }}} static void *free_cache_item */
704 /*
705 * enqueue_cache_item:
706 * `cache_lock' must be acquired before calling this function!
707 */
708 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
709 queue_side_t side)
710 {
711 if (ci == NULL)
712 return (-1);
714 if (ci->values_num == 0)
715 return (0);
717 if (side == HEAD)
718 {
719 if (cache_queue_head == ci)
720 return 0;
722 /* remove if further down in queue */
723 remove_from_queue(ci);
725 ci->prev = NULL;
726 ci->next = cache_queue_head;
727 if (ci->next != NULL)
728 ci->next->prev = ci;
729 cache_queue_head = ci;
731 if (cache_queue_tail == NULL)
732 cache_queue_tail = cache_queue_head;
733 }
734 else /* (side == TAIL) */
735 {
736 /* We don't move values back in the list.. */
737 if (ci->flags & CI_FLAGS_IN_QUEUE)
738 return (0);
740 assert (ci->next == NULL);
741 assert (ci->prev == NULL);
743 ci->prev = cache_queue_tail;
745 if (cache_queue_tail == NULL)
746 cache_queue_head = ci;
747 else
748 cache_queue_tail->next = ci;
750 cache_queue_tail = ci;
751 }
753 ci->flags |= CI_FLAGS_IN_QUEUE;
755 pthread_cond_signal(&queue_cond);
756 pthread_mutex_lock (&stats_lock);
757 stats_queue_length++;
758 pthread_mutex_unlock (&stats_lock);
760 return (0);
761 } /* }}} int enqueue_cache_item */
763 /*
764 * tree_callback_flush:
765 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
766 * while this is in progress.
767 */
768 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
769 gpointer data)
770 {
771 cache_item_t *ci;
772 callback_flush_data_t *cfd;
774 ci = (cache_item_t *) value;
775 cfd = (callback_flush_data_t *) data;
777 if (ci->flags & CI_FLAGS_IN_QUEUE)
778 return FALSE;
780 if (ci->values_num > 0
781 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
782 {
783 enqueue_cache_item (ci, TAIL);
784 }
785 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
786 && (ci->values_num <= 0))
787 {
788 assert ((char *) key == ci->file);
789 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
790 {
791 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
792 return (FALSE);
793 }
794 }
796 return (FALSE);
797 } /* }}} gboolean tree_callback_flush */
799 static int flush_old_values (int max_age)
800 {
801 callback_flush_data_t cfd;
802 size_t k;
804 memset (&cfd, 0, sizeof (cfd));
805 /* Pass the current time as user data so that we don't need to call
806 * `time' for each node. */
807 cfd.now = time (NULL);
808 cfd.keys = NULL;
809 cfd.keys_num = 0;
811 if (max_age > 0)
812 cfd.abs_timeout = cfd.now - max_age;
813 else
814 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
816 /* `tree_callback_flush' will return the keys of all values that haven't
817 * been touched in the last `config_flush_interval' seconds in `cfd'.
818 * The char*'s in this array point to the same memory as ci->file, so we
819 * don't need to free them separately. */
820 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
822 for (k = 0; k < cfd.keys_num; k++)
823 {
824 /* should never fail, since we have held the cache_lock
825 * the entire time */
826 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
827 }
829 if (cfd.keys != NULL)
830 {
831 free (cfd.keys);
832 cfd.keys = NULL;
833 }
835 return (0);
836 } /* int flush_old_values */
838 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
839 {
840 struct timeval now;
841 struct timespec next_flush;
842 int status;
844 gettimeofday (&now, NULL);
845 next_flush.tv_sec = now.tv_sec + config_flush_interval;
846 next_flush.tv_nsec = 1000 * now.tv_usec;
848 pthread_mutex_lock(&cache_lock);
850 while (state == RUNNING)
851 {
852 gettimeofday (&now, NULL);
853 if ((now.tv_sec > next_flush.tv_sec)
854 || ((now.tv_sec == next_flush.tv_sec)
855 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
856 {
857 RRDD_LOG(LOG_DEBUG, "flushing old values");
859 /* Determine the time of the next cache flush. */
860 next_flush.tv_sec = now.tv_sec + config_flush_interval;
862 /* Flush all values that haven't been written in the last
863 * `config_write_interval' seconds. */
864 flush_old_values (config_write_interval);
866 /* unlock the cache while we rotate so we don't block incoming
867 * updates if the fsync() blocks on disk I/O */
868 pthread_mutex_unlock(&cache_lock);
869 journal_rotate();
870 pthread_mutex_lock(&cache_lock);
871 }
873 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
874 if (status != 0 && status != ETIMEDOUT)
875 {
876 RRDD_LOG (LOG_ERR, "flush_thread_main: "
877 "pthread_cond_timedwait returned %i.", status);
878 }
879 }
881 if (config_flush_at_shutdown)
882 flush_old_values (-1); /* flush everything */
884 state = SHUTDOWN;
886 pthread_mutex_unlock(&cache_lock);
888 return NULL;
889 } /* void *flush_thread_main */
891 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
892 {
893 pthread_mutex_lock (&cache_lock);
895 while (state != SHUTDOWN
896 || (cache_queue_head != NULL && config_flush_at_shutdown))
897 {
898 cache_item_t *ci;
899 char *file;
900 char **values;
901 size_t values_num;
902 int status;
904 /* Now, check if there's something to store away. If not, wait until
905 * something comes in. */
906 if (cache_queue_head == NULL)
907 {
908 status = pthread_cond_wait (&queue_cond, &cache_lock);
909 if ((status != 0) && (status != ETIMEDOUT))
910 {
911 RRDD_LOG (LOG_ERR, "queue_thread_main: "
912 "pthread_cond_wait returned %i.", status);
913 }
914 }
916 /* Check if a value has arrived. This may be NULL if we timed out or there
917 * was an interrupt such as a signal. */
918 if (cache_queue_head == NULL)
919 continue;
921 ci = cache_queue_head;
923 /* copy the relevant parts */
924 file = strdup (ci->file);
925 if (file == NULL)
926 {
927 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
928 continue;
929 }
931 assert(ci->values != NULL);
932 assert(ci->values_num > 0);
934 values = ci->values;
935 values_num = ci->values_num;
937 wipe_ci_values(ci, time(NULL));
938 remove_from_queue(ci);
940 pthread_mutex_unlock (&cache_lock);
942 rrd_clear_error ();
943 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
944 if (status != 0)
945 {
946 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
947 "rrd_update_r (%s) failed with status %i. (%s)",
948 file, status, rrd_get_error());
949 }
951 journal_write("wrote", file);
953 /* Search again in the tree. It's possible someone issued a "FORGET"
954 * while we were writing the update values. */
955 pthread_mutex_lock(&cache_lock);
956 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
957 if (ci)
958 pthread_cond_broadcast(&ci->flushed);
959 pthread_mutex_unlock(&cache_lock);
961 if (status == 0)
962 {
963 pthread_mutex_lock (&stats_lock);
964 stats_updates_written++;
965 stats_data_sets_written += values_num;
966 pthread_mutex_unlock (&stats_lock);
967 }
969 rrd_free_ptrs((void ***) &values, &values_num);
970 free(file);
972 pthread_mutex_lock (&cache_lock);
973 }
974 pthread_mutex_unlock (&cache_lock);
976 return (NULL);
977 } /* }}} void *queue_thread_main */
979 static int buffer_get_field (char **buffer_ret, /* {{{ */
980 size_t *buffer_size_ret, char **field_ret)
981 {
982 char *buffer;
983 size_t buffer_pos;
984 size_t buffer_size;
985 char *field;
986 size_t field_size;
987 int status;
989 buffer = *buffer_ret;
990 buffer_pos = 0;
991 buffer_size = *buffer_size_ret;
992 field = *buffer_ret;
993 field_size = 0;
995 if (buffer_size <= 0)
996 return (-1);
998 /* This is ensured by `handle_request'. */
999 assert (buffer[buffer_size - 1] == '\0');
1001 status = -1;
1002 while (buffer_pos < buffer_size)
1003 {
1004 /* Check for end-of-field or end-of-buffer */
1005 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1006 {
1007 field[field_size] = 0;
1008 field_size++;
1009 buffer_pos++;
1010 status = 0;
1011 break;
1012 }
1013 /* Handle escaped characters. */
1014 else if (buffer[buffer_pos] == '\\')
1015 {
1016 if (buffer_pos >= (buffer_size - 1))
1017 break;
1018 buffer_pos++;
1019 field[field_size] = buffer[buffer_pos];
1020 field_size++;
1021 buffer_pos++;
1022 }
1023 /* Normal operation */
1024 else
1025 {
1026 field[field_size] = buffer[buffer_pos];
1027 field_size++;
1028 buffer_pos++;
1029 }
1030 } /* while (buffer_pos < buffer_size) */
1032 if (status != 0)
1033 return (status);
1035 *buffer_ret = buffer + buffer_pos;
1036 *buffer_size_ret = buffer_size - buffer_pos;
1037 *field_ret = field;
1039 return (0);
1040 } /* }}} int buffer_get_field */
1042 /* if we're restricting writes to the base directory,
1043 * check whether the file falls within the dir
1044 * returns 1 if OK, otherwise 0
1045 */
1046 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1047 {
1048 assert(file != NULL);
1050 if (!config_write_base_only
1051 || JOURNAL_REPLAY(sock)
1052 || config_base_dir == NULL)
1053 return 1;
1055 if (strstr(file, "../") != NULL) goto err;
1057 /* relative paths without "../" are ok */
1058 if (*file != '/') return 1;
1060 /* file must be of the format base + "/" + <1+ char filename> */
1061 if (strlen(file) < _config_base_dir_len + 2) goto err;
1062 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1063 if (*(file + _config_base_dir_len) != '/') goto err;
1065 return 1;
1067 err:
1068 if (sock != NULL && sock->fd >= 0)
1069 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1071 return 0;
1072 } /* }}} static int check_file_access */
1074 /* when using a base dir, convert relative paths to absolute paths.
1075 * if necessary, modifies the "filename" pointer to point
1076 * to the new path created in "tmp". "tmp" is provided
1077 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1078 *
1079 * this allows us to optimize for the expected case (absolute path)
1080 * with a no-op.
1081 */
1082 static void get_abs_path(char **filename, char *tmp)
1083 {
1084 assert(tmp != NULL);
1085 assert(filename != NULL && *filename != NULL);
1087 if (config_base_dir == NULL || **filename == '/')
1088 return;
1090 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1091 *filename = tmp;
1092 } /* }}} static int get_abs_path */
1094 static int flush_file (const char *filename) /* {{{ */
1095 {
1096 cache_item_t *ci;
1098 pthread_mutex_lock (&cache_lock);
1100 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1101 if (ci == NULL)
1102 {
1103 pthread_mutex_unlock (&cache_lock);
1104 return (ENOENT);
1105 }
1107 if (ci->values_num > 0)
1108 {
1109 /* Enqueue at head */
1110 enqueue_cache_item (ci, HEAD);
1111 pthread_cond_wait(&ci->flushed, &cache_lock);
1112 }
1114 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1115 * may have been purged during our cond_wait() */
1117 pthread_mutex_unlock(&cache_lock);
1119 return (0);
1120 } /* }}} int flush_file */
1122 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1123 {
1124 char *err = "Syntax error.\n";
1126 if (cmd && cmd->syntax)
1127 err = cmd->syntax;
1129 return send_response(sock, RESP_ERR, "Usage: %s", err);
1130 } /* }}} static int syntax_error() */
1132 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1133 {
1134 uint64_t copy_queue_length;
1135 uint64_t copy_updates_received;
1136 uint64_t copy_flush_received;
1137 uint64_t copy_updates_written;
1138 uint64_t copy_data_sets_written;
1139 uint64_t copy_journal_bytes;
1140 uint64_t copy_journal_rotate;
1142 uint64_t tree_nodes_number;
1143 uint64_t tree_depth;
1145 pthread_mutex_lock (&stats_lock);
1146 copy_queue_length = stats_queue_length;
1147 copy_updates_received = stats_updates_received;
1148 copy_flush_received = stats_flush_received;
1149 copy_updates_written = stats_updates_written;
1150 copy_data_sets_written = stats_data_sets_written;
1151 copy_journal_bytes = stats_journal_bytes;
1152 copy_journal_rotate = stats_journal_rotate;
1153 pthread_mutex_unlock (&stats_lock);
1155 pthread_mutex_lock (&cache_lock);
1156 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1157 tree_depth = (uint64_t) g_tree_height (cache_tree);
1158 pthread_mutex_unlock (&cache_lock);
1160 add_response_info(sock,
1161 "QueueLength: %"PRIu64"\n", copy_queue_length);
1162 add_response_info(sock,
1163 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1164 add_response_info(sock,
1165 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1166 add_response_info(sock,
1167 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1168 add_response_info(sock,
1169 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1170 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1171 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1172 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1173 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1175 send_response(sock, RESP_OK, "Statistics follow\n");
1177 return (0);
1178 } /* }}} int handle_request_stats */
1180 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1181 {
1182 char *file, file_tmp[PATH_MAX];
1183 int status;
1185 status = buffer_get_field (&buffer, &buffer_size, &file);
1186 if (status != 0)
1187 {
1188 return syntax_error(sock,cmd);
1189 }
1190 else
1191 {
1192 pthread_mutex_lock(&stats_lock);
1193 stats_flush_received++;
1194 pthread_mutex_unlock(&stats_lock);
1196 get_abs_path(&file, file_tmp);
1197 if (!check_file_access(file, sock)) return 0;
1199 status = flush_file (file);
1200 if (status == 0)
1201 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1202 else if (status == ENOENT)
1203 {
1204 /* no file in our tree; see whether it exists at all */
1205 struct stat statbuf;
1207 memset(&statbuf, 0, sizeof(statbuf));
1208 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1209 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1210 else
1211 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1212 }
1213 else if (status < 0)
1214 return send_response(sock, RESP_ERR, "Internal error.\n");
1215 else
1216 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1217 }
1219 /* NOTREACHED */
1220 assert(1==0);
1221 } /* }}} int handle_request_flush */
1223 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1224 {
1225 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1227 pthread_mutex_lock(&cache_lock);
1228 flush_old_values(-1);
1229 pthread_mutex_unlock(&cache_lock);
1231 return send_response(sock, RESP_OK, "Started flush.\n");
1232 } /* }}} static int handle_request_flushall */
1234 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1235 {
1236 int status;
1237 char *file, file_tmp[PATH_MAX];
1238 cache_item_t *ci;
1240 status = buffer_get_field(&buffer, &buffer_size, &file);
1241 if (status != 0)
1242 return syntax_error(sock,cmd);
1244 get_abs_path(&file, file_tmp);
1246 pthread_mutex_lock(&cache_lock);
1247 ci = g_tree_lookup(cache_tree, file);
1248 if (ci == NULL)
1249 {
1250 pthread_mutex_unlock(&cache_lock);
1251 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1252 }
1254 for (size_t i=0; i < ci->values_num; i++)
1255 add_response_info(sock, "%s\n", ci->values[i]);
1257 pthread_mutex_unlock(&cache_lock);
1258 return send_response(sock, RESP_OK, "updates pending\n");
1259 } /* }}} static int handle_request_pending */
1261 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1262 {
1263 int status;
1264 gboolean found;
1265 char *file, file_tmp[PATH_MAX];
1267 status = buffer_get_field(&buffer, &buffer_size, &file);
1268 if (status != 0)
1269 return syntax_error(sock,cmd);
1271 get_abs_path(&file, file_tmp);
1272 if (!check_file_access(file, sock)) return 0;
1274 pthread_mutex_lock(&cache_lock);
1275 found = g_tree_remove(cache_tree, file);
1276 pthread_mutex_unlock(&cache_lock);
1278 if (found == TRUE)
1279 {
1280 if (!JOURNAL_REPLAY(sock))
1281 journal_write("forget", file);
1283 return send_response(sock, RESP_OK, "Gone!\n");
1284 }
1285 else
1286 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1288 /* NOTREACHED */
1289 assert(1==0);
1290 } /* }}} static int handle_request_forget */
1292 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1293 {
1294 cache_item_t *ci;
1296 pthread_mutex_lock(&cache_lock);
1298 ci = cache_queue_head;
1299 while (ci != NULL)
1300 {
1301 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1302 ci = ci->next;
1303 }
1305 pthread_mutex_unlock(&cache_lock);
1307 return send_response(sock, RESP_OK, "in queue.\n");
1308 } /* }}} int handle_request_queue */
1310 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1311 {
1312 char *file, file_tmp[PATH_MAX];
1313 int values_num = 0;
1314 int status;
1315 char orig_buf[CMD_MAX];
1317 cache_item_t *ci;
1319 /* save it for the journal later */
1320 if (!JOURNAL_REPLAY(sock))
1321 strncpy(orig_buf, buffer, buffer_size);
1323 status = buffer_get_field (&buffer, &buffer_size, &file);
1324 if (status != 0)
1325 return syntax_error(sock,cmd);
1327 pthread_mutex_lock(&stats_lock);
1328 stats_updates_received++;
1329 pthread_mutex_unlock(&stats_lock);
1331 get_abs_path(&file, file_tmp);
1332 if (!check_file_access(file, sock)) return 0;
1334 pthread_mutex_lock (&cache_lock);
1335 ci = g_tree_lookup (cache_tree, file);
1337 if (ci == NULL) /* {{{ */
1338 {
1339 struct stat statbuf;
1340 cache_item_t *tmp;
1342 /* don't hold the lock while we setup; stat(2) might block */
1343 pthread_mutex_unlock(&cache_lock);
1345 memset (&statbuf, 0, sizeof (statbuf));
1346 status = stat (file, &statbuf);
1347 if (status != 0)
1348 {
1349 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1351 status = errno;
1352 if (status == ENOENT)
1353 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1354 else
1355 return send_response(sock, RESP_ERR,
1356 "stat failed with error %i.\n", status);
1357 }
1358 if (!S_ISREG (statbuf.st_mode))
1359 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1361 if (access(file, R_OK|W_OK) != 0)
1362 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1363 file, rrd_strerror(errno));
1365 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1366 if (ci == NULL)
1367 {
1368 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1370 return send_response(sock, RESP_ERR, "malloc failed.\n");
1371 }
1372 memset (ci, 0, sizeof (cache_item_t));
1374 ci->file = strdup (file);
1375 if (ci->file == NULL)
1376 {
1377 free (ci);
1378 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1380 return send_response(sock, RESP_ERR, "strdup failed.\n");
1381 }
1383 wipe_ci_values(ci, now);
1384 ci->flags = CI_FLAGS_IN_TREE;
1385 pthread_cond_init(&ci->flushed, NULL);
1387 pthread_mutex_lock(&cache_lock);
1389 /* another UPDATE might have added this entry in the meantime */
1390 tmp = g_tree_lookup (cache_tree, file);
1391 if (tmp == NULL)
1392 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1393 else
1394 {
1395 free_cache_item (ci);
1396 ci = tmp;
1397 }
1399 /* state may have changed while we were unlocked */
1400 if (state == SHUTDOWN)
1401 return -1;
1402 } /* }}} */
1403 assert (ci != NULL);
1405 /* don't re-write updates in replay mode */
1406 if (!JOURNAL_REPLAY(sock))
1407 journal_write("update", orig_buf);
1409 while (buffer_size > 0)
1410 {
1411 char *value;
1412 time_t stamp;
1413 char *eostamp;
1415 status = buffer_get_field (&buffer, &buffer_size, &value);
1416 if (status != 0)
1417 {
1418 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1419 break;
1420 }
1422 /* make sure update time is always moving forward */
1423 stamp = strtol(value, &eostamp, 10);
1424 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1425 {
1426 pthread_mutex_unlock(&cache_lock);
1427 return send_response(sock, RESP_ERR,
1428 "Cannot find timestamp in '%s'!\n", value);
1429 }
1430 else if (stamp <= ci->last_update_stamp)
1431 {
1432 pthread_mutex_unlock(&cache_lock);
1433 return send_response(sock, RESP_ERR,
1434 "illegal attempt to update using time %ld when last"
1435 " update time is %ld (minimum one second step)\n",
1436 stamp, ci->last_update_stamp);
1437 }
1438 else
1439 ci->last_update_stamp = stamp;
1441 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1442 {
1443 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1444 continue;
1445 }
1447 values_num++;
1448 }
1450 if (((now - ci->last_flush_time) >= config_write_interval)
1451 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1452 && (ci->values_num > 0))
1453 {
1454 enqueue_cache_item (ci, TAIL);
1455 }
1457 pthread_mutex_unlock (&cache_lock);
1459 if (values_num < 1)
1460 return send_response(sock, RESP_ERR, "No values updated.\n");
1461 else
1462 return send_response(sock, RESP_OK,
1463 "errors, enqueued %i value(s).\n", values_num);
1465 /* NOTREACHED */
1466 assert(1==0);
1468 } /* }}} int handle_request_update */
1470 /* we came across a "WROTE" entry during journal replay.
1471 * throw away any values that we have accumulated for this file
1472 */
1473 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1474 {
1475 cache_item_t *ci;
1476 const char *file = buffer;
1478 pthread_mutex_lock(&cache_lock);
1480 ci = g_tree_lookup(cache_tree, file);
1481 if (ci == NULL)
1482 {
1483 pthread_mutex_unlock(&cache_lock);
1484 return (0);
1485 }
1487 if (ci->values)
1488 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1490 wipe_ci_values(ci, now);
1491 remove_from_queue(ci);
1493 pthread_mutex_unlock(&cache_lock);
1494 return (0);
1495 } /* }}} int handle_request_wrote */
1497 /* start "BATCH" processing */
1498 static int batch_start (HANDLER_PROTO) /* {{{ */
1499 {
1500 int status;
1501 if (sock->batch_start)
1502 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1504 status = send_response(sock, RESP_OK,
1505 "Go ahead. End with dot '.' on its own line.\n");
1506 sock->batch_start = time(NULL);
1507 sock->batch_cmd = 0;
1509 return status;
1510 } /* }}} static int batch_start */
1512 /* finish "BATCH" processing and return results to the client */
1513 static int batch_done (HANDLER_PROTO) /* {{{ */
1514 {
1515 assert(sock->batch_start);
1516 sock->batch_start = 0;
1517 sock->batch_cmd = 0;
1518 return send_response(sock, RESP_OK, "errors\n");
1519 } /* }}} static int batch_done */
1521 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1522 {
1523 return -1;
1524 } /* }}} static int handle_request_quit */
1526 static command_t list_of_commands[] = { /* {{{ */
1527 {
1528 "UPDATE",
1529 handle_request_update,
1530 CMD_CONTEXT_ANY,
1531 "UPDATE <filename> <values> [<values> ...]\n"
1532 ,
1533 "Adds the given file to the internal cache if it is not yet known and\n"
1534 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1535 "for details.\n"
1536 "\n"
1537 "Each <values> has the following form:\n"
1538 " <values> = <time>:<value>[:<value>[...]]\n"
1539 "See the rrdupdate(1) manpage for details.\n"
1540 },
1541 {
1542 "WROTE",
1543 handle_request_wrote,
1544 CMD_CONTEXT_JOURNAL,
1545 NULL,
1546 NULL
1547 },
1548 {
1549 "FLUSH",
1550 handle_request_flush,
1551 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1552 "FLUSH <filename>\n"
1553 ,
1554 "Adds the given filename to the head of the update queue and returns\n"
1555 "after it has been dequeued.\n"
1556 },
1557 {
1558 "FLUSHALL",
1559 handle_request_flushall,
1560 CMD_CONTEXT_CLIENT,
1561 "FLUSHALL\n"
1562 ,
1563 "Triggers writing of all pending updates. Returns immediately.\n"
1564 },
1565 {
1566 "PENDING",
1567 handle_request_pending,
1568 CMD_CONTEXT_CLIENT,
1569 "PENDING <filename>\n"
1570 ,
1571 "Shows any 'pending' updates for a file, in order.\n"
1572 "The updates shown have not yet been written to the underlying RRD file.\n"
1573 },
1574 {
1575 "FORGET",
1576 handle_request_forget,
1577 CMD_CONTEXT_ANY,
1578 "FORGET <filename>\n"
1579 ,
1580 "Removes the file completely from the cache.\n"
1581 "Any pending updates for the file will be lost.\n"
1582 },
1583 {
1584 "QUEUE",
1585 handle_request_queue,
1586 CMD_CONTEXT_CLIENT,
1587 "QUEUE\n"
1588 ,
1589 "Shows all files in the output queue.\n"
1590 "The output is zero or more lines in the following format:\n"
1591 "(where <num_vals> is the number of values to be written)\n"
1592 "\n"
1593 "<num_vals> <filename>\n"
1594 },
1595 {
1596 "STATS",
1597 handle_request_stats,
1598 CMD_CONTEXT_CLIENT,
1599 "STATS\n"
1600 ,
1601 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1602 "a description of the values.\n"
1603 },
1604 {
1605 "HELP",
1606 handle_request_help,
1607 CMD_CONTEXT_CLIENT,
1608 "HELP [<command>]\n",
1609 NULL, /* special! */
1610 },
1611 {
1612 "BATCH",
1613 batch_start,
1614 CMD_CONTEXT_CLIENT,
1615 "BATCH\n"
1616 ,
1617 "The 'BATCH' command permits the client to initiate a bulk load\n"
1618 " of commands to rrdcached.\n"
1619 "\n"
1620 "Usage:\n"
1621 "\n"
1622 " client: BATCH\n"
1623 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1624 " client: command #1\n"
1625 " client: command #2\n"
1626 " client: ... and so on\n"
1627 " client: .\n"
1628 " server: 2 errors\n"
1629 " server: 7 message for command #7\n"
1630 " server: 9 message for command #9\n"
1631 "\n"
1632 "For more information, consult the rrdcached(1) documentation.\n"
1633 },
1634 {
1635 ".", /* BATCH terminator */
1636 batch_done,
1637 CMD_CONTEXT_BATCH,
1638 NULL,
1639 NULL
1640 },
1641 {
1642 "QUIT",
1643 handle_request_quit,
1644 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1645 "QUIT\n"
1646 ,
1647 "Disconnect from rrdcached.\n"
1648 }
1649 }; /* }}} command_t list_of_commands[] */
1650 static size_t list_of_commands_len = sizeof (list_of_commands)
1651 / sizeof (list_of_commands[0]);
1653 static command_t *find_command(char *cmd)
1654 {
1655 size_t i;
1657 for (i = 0; i < list_of_commands_len; i++)
1658 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1659 return (&list_of_commands[i]);
1660 return NULL;
1661 }
1663 /* We currently use the index in the `list_of_commands' array as a bit position
1664 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1665 * outside these functions so that switching to a more elegant storage method
1666 * is easily possible. */
1667 static ssize_t find_command_index (const char *cmd) /* {{{ */
1668 {
1669 size_t i;
1671 for (i = 0; i < list_of_commands_len; i++)
1672 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1673 return ((ssize_t) i);
1674 return (-1);
1675 } /* }}} ssize_t find_command_index */
1677 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1678 const char *cmd)
1679 {
1680 ssize_t i;
1682 if (JOURNAL_REPLAY(sock))
1683 return (1);
1685 if (cmd == NULL)
1686 return (-1);
1688 if ((strcasecmp ("QUIT", cmd) == 0)
1689 || (strcasecmp ("HELP", cmd) == 0))
1690 return (1);
1691 else if (strcmp (".", cmd) == 0)
1692 cmd = "BATCH";
1694 i = find_command_index (cmd);
1695 if (i < 0)
1696 return (-1);
1697 assert (i < 32);
1699 if ((sock->permissions & (1 << i)) != 0)
1700 return (1);
1701 return (0);
1702 } /* }}} int socket_permission_check */
1704 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1705 const char *cmd)
1706 {
1707 ssize_t i;
1709 i = find_command_index (cmd);
1710 if (i < 0)
1711 return (-1);
1712 assert (i < 32);
1714 sock->permissions |= (1 << i);
1715 return (0);
1716 } /* }}} int socket_permission_add */
1718 /* check whether commands are received in the expected context */
1719 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1720 {
1721 if (JOURNAL_REPLAY(sock))
1722 return (cmd->context & CMD_CONTEXT_JOURNAL);
1723 else if (sock->batch_start)
1724 return (cmd->context & CMD_CONTEXT_BATCH);
1725 else
1726 return (cmd->context & CMD_CONTEXT_CLIENT);
1728 /* NOTREACHED */
1729 assert(1==0);
1730 }
1732 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1733 {
1734 int status;
1735 char *cmd_str;
1736 char *resp_txt;
1737 command_t *help = NULL;
1739 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1740 if (status == 0)
1741 help = find_command(cmd_str);
1743 if (help && (help->syntax || help->help))
1744 {
1745 char tmp[CMD_MAX];
1747 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1748 resp_txt = tmp;
1750 if (help->syntax)
1751 add_response_info(sock, "Usage: %s\n", help->syntax);
1753 if (help->help)
1754 add_response_info(sock, "%s\n", help->help);
1755 }
1756 else
1757 {
1758 size_t i;
1760 resp_txt = "Command overview\n";
1762 for (i = 0; i < list_of_commands_len; i++)
1763 {
1764 if (list_of_commands[i].syntax == NULL)
1765 continue;
1766 add_response_info (sock, "%s", list_of_commands[i].syntax);
1767 }
1768 }
1770 return send_response(sock, RESP_OK, resp_txt);
1771 } /* }}} int handle_request_help */
1773 static int handle_request (DISPATCH_PROTO) /* {{{ */
1774 {
1775 char *buffer_ptr = buffer;
1776 char *cmd_str = NULL;
1777 command_t *cmd = NULL;
1778 int status;
1780 assert (buffer[buffer_size - 1] == '\0');
1782 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1783 if (status != 0)
1784 {
1785 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1786 return (-1);
1787 }
1789 if (sock != NULL && sock->batch_start)
1790 sock->batch_cmd++;
1792 cmd = find_command(cmd_str);
1793 if (!cmd)
1794 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1796 if (!socket_permission_check (sock, cmd->cmd))
1797 return send_response(sock, RESP_ERR, "Permission denied.\n");
1799 if (!command_check_context(sock, cmd))
1800 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1802 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1803 } /* }}} int handle_request */
1805 static void journal_set_free (journal_set *js) /* {{{ */
1806 {
1807 if (js == NULL)
1808 return;
1810 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1812 free(js);
1813 } /* }}} journal_set_free */
1815 static void journal_set_remove (journal_set *js) /* {{{ */
1816 {
1817 if (js == NULL)
1818 return;
1820 for (uint i=0; i < js->files_num; i++)
1821 {
1822 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1823 unlink(js->files[i]);
1824 }
1825 } /* }}} journal_set_remove */
1827 /* close current journal file handle.
1828 * MUST hold journal_lock before calling */
1829 static void journal_close(void) /* {{{ */
1830 {
1831 if (journal_fh != NULL)
1832 {
1833 if (fclose(journal_fh) != 0)
1834 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1835 }
1837 journal_fh = NULL;
1838 journal_size = 0;
1839 } /* }}} journal_close */
1841 /* MUST hold journal_lock before calling */
1842 static void journal_new_file(void) /* {{{ */
1843 {
1844 struct timeval now;
1845 int new_fd;
1846 char new_file[PATH_MAX + 1];
1848 assert(journal_dir != NULL);
1849 assert(journal_cur != NULL);
1851 journal_close();
1853 gettimeofday(&now, NULL);
1854 /* this format assures that the files sort in strcmp() order */
1855 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1856 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1858 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1859 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1860 if (new_fd < 0)
1861 goto error;
1863 journal_fh = fdopen(new_fd, "a");
1864 if (journal_fh == NULL)
1865 goto error;
1867 journal_size = ftell(journal_fh);
1868 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1870 /* record the file in the journal set */
1871 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1873 return;
1875 error:
1876 RRDD_LOG(LOG_CRIT,
1877 "JOURNALING DISABLED: Error while trying to create %s : %s",
1878 new_file, rrd_strerror(errno));
1879 RRDD_LOG(LOG_CRIT,
1880 "JOURNALING DISABLED: All values will be flushed at shutdown");
1882 close(new_fd);
1883 config_flush_at_shutdown = 1;
1885 } /* }}} journal_new_file */
1887 /* MUST NOT hold journal_lock before calling this */
1888 static void journal_rotate(void) /* {{{ */
1889 {
1890 journal_set *old_js = NULL;
1892 if (journal_dir == NULL)
1893 return;
1895 RRDD_LOG(LOG_DEBUG, "rotating journals");
1897 pthread_mutex_lock(&stats_lock);
1898 ++stats_journal_rotate;
1899 pthread_mutex_unlock(&stats_lock);
1901 pthread_mutex_lock(&journal_lock);
1903 journal_close();
1905 /* rotate the journal sets */
1906 old_js = journal_old;
1907 journal_old = journal_cur;
1908 journal_cur = calloc(1, sizeof(journal_set));
1910 if (journal_cur != NULL)
1911 journal_new_file();
1912 else
1913 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1915 pthread_mutex_unlock(&journal_lock);
1917 journal_set_remove(old_js);
1918 journal_set_free (old_js);
1920 } /* }}} static void journal_rotate */
1922 /* MUST hold journal_lock when calling */
1923 static void journal_done(void) /* {{{ */
1924 {
1925 if (journal_cur == NULL)
1926 return;
1928 journal_close();
1930 if (config_flush_at_shutdown)
1931 {
1932 RRDD_LOG(LOG_INFO, "removing journals");
1933 journal_set_remove(journal_old);
1934 journal_set_remove(journal_cur);
1935 }
1936 else
1937 {
1938 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1939 "journals will be used at next startup");
1940 }
1942 journal_set_free(journal_cur);
1943 journal_set_free(journal_old);
1944 free(journal_dir);
1946 } /* }}} static void journal_done */
1948 static int journal_write(char *cmd, char *args) /* {{{ */
1949 {
1950 int chars;
1952 if (journal_fh == NULL)
1953 return 0;
1955 pthread_mutex_lock(&journal_lock);
1956 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1957 journal_size += chars;
1959 if (journal_size > JOURNAL_MAX)
1960 journal_new_file();
1962 pthread_mutex_unlock(&journal_lock);
1964 if (chars > 0)
1965 {
1966 pthread_mutex_lock(&stats_lock);
1967 stats_journal_bytes += chars;
1968 pthread_mutex_unlock(&stats_lock);
1969 }
1971 return chars;
1972 } /* }}} static int journal_write */
1974 static int journal_replay (const char *file) /* {{{ */
1975 {
1976 FILE *fh;
1977 int entry_cnt = 0;
1978 int fail_cnt = 0;
1979 uint64_t line = 0;
1980 char entry[CMD_MAX];
1981 time_t now;
1983 if (file == NULL) return 0;
1985 {
1986 char *reason = "unknown error";
1987 int status = 0;
1988 struct stat statbuf;
1990 memset(&statbuf, 0, sizeof(statbuf));
1991 if (stat(file, &statbuf) != 0)
1992 {
1993 reason = "stat error";
1994 status = errno;
1995 }
1996 else if (!S_ISREG(statbuf.st_mode))
1997 {
1998 reason = "not a regular file";
1999 status = EPERM;
2000 }
2001 if (statbuf.st_uid != daemon_uid)
2002 {
2003 reason = "not owned by daemon user";
2004 status = EACCES;
2005 }
2006 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2007 {
2008 reason = "must not be user/group writable";
2009 status = EACCES;
2010 }
2012 if (status != 0)
2013 {
2014 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2015 file, rrd_strerror(status), reason);
2016 return 0;
2017 }
2018 }
2020 fh = fopen(file, "r");
2021 if (fh == NULL)
2022 {
2023 if (errno != ENOENT)
2024 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2025 file, rrd_strerror(errno));
2026 return 0;
2027 }
2028 else
2029 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2031 now = time(NULL);
2033 while(!feof(fh))
2034 {
2035 size_t entry_len;
2037 ++line;
2038 if (fgets(entry, sizeof(entry), fh) == NULL)
2039 break;
2040 entry_len = strlen(entry);
2042 /* check \n termination in case journal writing crashed mid-line */
2043 if (entry_len == 0)
2044 continue;
2045 else if (entry[entry_len - 1] != '\n')
2046 {
2047 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2048 ++fail_cnt;
2049 continue;
2050 }
2052 entry[entry_len - 1] = '\0';
2054 if (handle_request(NULL, now, entry, entry_len) == 0)
2055 ++entry_cnt;
2056 else
2057 ++fail_cnt;
2058 }
2060 fclose(fh);
2062 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2063 entry_cnt, fail_cnt);
2065 return entry_cnt > 0 ? 1 : 0;
2066 } /* }}} static int journal_replay */
2068 static int journal_sort(const void *v1, const void *v2)
2069 {
2070 char **jn1 = (char **) v1;
2071 char **jn2 = (char **) v2;
2073 return strcmp(*jn1,*jn2);
2074 }
2076 static void journal_init(void) /* {{{ */
2077 {
2078 int had_journal = 0;
2079 DIR *dir;
2080 struct dirent *dent;
2081 char path[PATH_MAX+1];
2083 if (journal_dir == NULL) return;
2085 pthread_mutex_lock(&journal_lock);
2087 journal_cur = calloc(1, sizeof(journal_set));
2088 if (journal_cur == NULL)
2089 {
2090 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2091 return;
2092 }
2094 RRDD_LOG(LOG_INFO, "checking for journal files");
2096 /* Handle old journal files during transition. This gives them the
2097 * correct sort order. TODO: remove after first release
2098 */
2099 {
2100 char old_path[PATH_MAX+1];
2101 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2102 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2103 rename(old_path, path);
2105 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2106 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2107 rename(old_path, path);
2108 }
2110 dir = opendir(journal_dir);
2111 while ((dent = readdir(dir)) != NULL)
2112 {
2113 /* looks like a journal file? */
2114 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2115 continue;
2117 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2119 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2120 {
2121 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2122 dent->d_name);
2123 break;
2124 }
2125 }
2126 closedir(dir);
2128 qsort(journal_cur->files, journal_cur->files_num,
2129 sizeof(journal_cur->files[0]), journal_sort);
2131 for (uint i=0; i < journal_cur->files_num; i++)
2132 had_journal += journal_replay(journal_cur->files[i]);
2134 journal_new_file();
2136 /* it must have been a crash. start a flush */
2137 if (had_journal && config_flush_at_shutdown)
2138 flush_old_values(-1);
2140 pthread_mutex_unlock(&journal_lock);
2142 RRDD_LOG(LOG_INFO, "journal processing complete");
2144 } /* }}} static void journal_init */
2146 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2147 {
2148 assert(sock != NULL);
2150 free(sock->rbuf); sock->rbuf = NULL;
2151 free(sock->wbuf); sock->wbuf = NULL;
2152 free(sock);
2153 } /* }}} void free_listen_socket */
2155 static void close_connection(listen_socket_t *sock) /* {{{ */
2156 {
2157 if (sock->fd >= 0)
2158 {
2159 close(sock->fd);
2160 sock->fd = -1;
2161 }
2163 free_listen_socket(sock);
2165 } /* }}} void close_connection */
2167 static void *connection_thread_main (void *args) /* {{{ */
2168 {
2169 listen_socket_t *sock;
2170 int fd;
2172 sock = (listen_socket_t *) args;
2173 fd = sock->fd;
2175 /* init read buffers */
2176 sock->next_read = sock->next_cmd = 0;
2177 sock->rbuf = malloc(RBUF_SIZE);
2178 if (sock->rbuf == NULL)
2179 {
2180 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2181 close_connection(sock);
2182 return NULL;
2183 }
2185 pthread_mutex_lock (&connection_threads_lock);
2186 connection_threads_num++;
2187 pthread_mutex_unlock (&connection_threads_lock);
2189 while (state == RUNNING)
2190 {
2191 char *cmd;
2192 ssize_t cmd_len;
2193 ssize_t rbytes;
2194 time_t now;
2196 struct pollfd pollfd;
2197 int status;
2199 pollfd.fd = fd;
2200 pollfd.events = POLLIN | POLLPRI;
2201 pollfd.revents = 0;
2203 status = poll (&pollfd, 1, /* timeout = */ 500);
2204 if (state != RUNNING)
2205 break;
2206 else if (status == 0) /* timeout */
2207 continue;
2208 else if (status < 0) /* error */
2209 {
2210 status = errno;
2211 if (status != EINTR)
2212 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2213 continue;
2214 }
2216 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2217 break;
2218 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2219 {
2220 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2221 "poll(2) returned something unexpected: %#04hx",
2222 pollfd.revents);
2223 break;
2224 }
2226 rbytes = read(fd, sock->rbuf + sock->next_read,
2227 RBUF_SIZE - sock->next_read);
2228 if (rbytes < 0)
2229 {
2230 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2231 break;
2232 }
2233 else if (rbytes == 0)
2234 break; /* eof */
2236 sock->next_read += rbytes;
2238 if (sock->batch_start)
2239 now = sock->batch_start;
2240 else
2241 now = time(NULL);
2243 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2244 {
2245 status = handle_request (sock, now, cmd, cmd_len+1);
2246 if (status != 0)
2247 goto out_close;
2248 }
2249 }
2251 out_close:
2252 close_connection(sock);
2254 /* Remove this thread from the connection threads list */
2255 pthread_mutex_lock (&connection_threads_lock);
2256 connection_threads_num--;
2257 if (connection_threads_num <= 0)
2258 pthread_cond_broadcast(&connection_threads_done);
2259 pthread_mutex_unlock (&connection_threads_lock);
2261 return (NULL);
2262 } /* }}} void *connection_thread_main */
2264 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2265 {
2266 int fd;
2267 struct sockaddr_un sa;
2268 listen_socket_t *temp;
2269 int status;
2270 const char *path;
2271 char *path_copy, *dir;
2273 path = sock->addr;
2274 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2275 path += strlen("unix:");
2277 /* dirname may modify its argument */
2278 path_copy = strdup(path);
2279 if (path_copy == NULL)
2280 {
2281 fprintf(stderr, "rrdcached: strdup(): %s\n",
2282 rrd_strerror(errno));
2283 return (-1);
2284 }
2286 dir = dirname(path_copy);
2287 if (rrd_mkdir_p(dir, 0777) != 0)
2288 {
2289 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2290 dir, rrd_strerror(errno));
2291 return (-1);
2292 }
2294 free(path_copy);
2296 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2297 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2298 if (temp == NULL)
2299 {
2300 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2301 return (-1);
2302 }
2303 listen_fds = temp;
2304 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2306 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2307 if (fd < 0)
2308 {
2309 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2310 rrd_strerror(errno));
2311 return (-1);
2312 }
2314 memset (&sa, 0, sizeof (sa));
2315 sa.sun_family = AF_UNIX;
2316 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2318 /* if we've gotten this far, we own the pid file. any daemon started
2319 * with the same args must not be alive. therefore, ensure that we can
2320 * create the socket...
2321 */
2322 unlink(path);
2324 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2325 if (status != 0)
2326 {
2327 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2328 path, rrd_strerror(errno));
2329 close (fd);
2330 return (-1);
2331 }
2333 /* tweak the sockets group ownership */
2334 if (set_socket_group)
2335 {
2336 if ( (chown(path, getuid(), socket_group) != 0) ||
2337 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2338 {
2339 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2340 }
2341 }
2343 status = listen (fd, /* backlog = */ 10);
2344 if (status != 0)
2345 {
2346 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2347 path, rrd_strerror(errno));
2348 close (fd);
2349 unlink (path);
2350 return (-1);
2351 }
2353 listen_fds[listen_fds_num].fd = fd;
2354 listen_fds[listen_fds_num].family = PF_UNIX;
2355 strncpy(listen_fds[listen_fds_num].addr, path,
2356 sizeof (listen_fds[listen_fds_num].addr) - 1);
2357 listen_fds_num++;
2359 return (0);
2360 } /* }}} int open_listen_socket_unix */
2362 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2363 {
2364 struct addrinfo ai_hints;
2365 struct addrinfo *ai_res;
2366 struct addrinfo *ai_ptr;
2367 char addr_copy[NI_MAXHOST];
2368 char *addr;
2369 char *port;
2370 int status;
2372 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2373 addr_copy[sizeof (addr_copy) - 1] = 0;
2374 addr = addr_copy;
2376 memset (&ai_hints, 0, sizeof (ai_hints));
2377 ai_hints.ai_flags = 0;
2378 #ifdef AI_ADDRCONFIG
2379 ai_hints.ai_flags |= AI_ADDRCONFIG;
2380 #endif
2381 ai_hints.ai_family = AF_UNSPEC;
2382 ai_hints.ai_socktype = SOCK_STREAM;
2384 port = NULL;
2385 if (*addr == '[') /* IPv6+port format */
2386 {
2387 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2388 addr++;
2390 port = strchr (addr, ']');
2391 if (port == NULL)
2392 {
2393 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2394 return (-1);
2395 }
2396 *port = 0;
2397 port++;
2399 if (*port == ':')
2400 port++;
2401 else if (*port == 0)
2402 port = NULL;
2403 else
2404 {
2405 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2406 return (-1);
2407 }
2408 } /* if (*addr == '[') */
2409 else
2410 {
2411 port = rindex(addr, ':');
2412 if (port != NULL)
2413 {
2414 *port = 0;
2415 port++;
2416 }
2417 }
2418 ai_res = NULL;
2419 status = getaddrinfo (addr,
2420 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2421 &ai_hints, &ai_res);
2422 if (status != 0)
2423 {
2424 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2425 addr, gai_strerror (status));
2426 return (-1);
2427 }
2429 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2430 {
2431 int fd;
2432 listen_socket_t *temp;
2433 int one = 1;
2435 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2436 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2437 if (temp == NULL)
2438 {
2439 fprintf (stderr,
2440 "rrdcached: open_listen_socket_network: realloc failed.\n");
2441 continue;
2442 }
2443 listen_fds = temp;
2444 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2446 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2447 if (fd < 0)
2448 {
2449 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2450 rrd_strerror(errno));
2451 continue;
2452 }
2454 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2456 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2457 if (status != 0)
2458 {
2459 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2460 sock->addr, rrd_strerror(errno));
2461 close (fd);
2462 continue;
2463 }
2465 status = listen (fd, /* backlog = */ 10);
2466 if (status != 0)
2467 {
2468 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2469 sock->addr, rrd_strerror(errno));
2470 close (fd);
2471 freeaddrinfo(ai_res);
2472 return (-1);
2473 }
2475 listen_fds[listen_fds_num].fd = fd;
2476 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2477 listen_fds_num++;
2478 } /* for (ai_ptr) */
2480 freeaddrinfo(ai_res);
2481 return (0);
2482 } /* }}} static int open_listen_socket_network */
2484 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2485 {
2486 assert(sock != NULL);
2487 assert(sock->addr != NULL);
2489 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2490 || sock->addr[0] == '/')
2491 return (open_listen_socket_unix(sock));
2492 else
2493 return (open_listen_socket_network(sock));
2494 } /* }}} int open_listen_socket */
2496 static int close_listen_sockets (void) /* {{{ */
2497 {
2498 size_t i;
2500 for (i = 0; i < listen_fds_num; i++)
2501 {
2502 close (listen_fds[i].fd);
2504 if (listen_fds[i].family == PF_UNIX)
2505 unlink(listen_fds[i].addr);
2506 }
2508 free (listen_fds);
2509 listen_fds = NULL;
2510 listen_fds_num = 0;
2512 return (0);
2513 } /* }}} int close_listen_sockets */
2515 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2516 {
2517 struct pollfd *pollfds;
2518 int pollfds_num;
2519 int status;
2520 int i;
2522 if (listen_fds_num < 1)
2523 {
2524 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2525 return (NULL);
2526 }
2528 pollfds_num = listen_fds_num;
2529 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2530 if (pollfds == NULL)
2531 {
2532 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2533 return (NULL);
2534 }
2535 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2537 RRDD_LOG(LOG_INFO, "listening for connections");
2539 while (state == RUNNING)
2540 {
2541 for (i = 0; i < pollfds_num; i++)
2542 {
2543 pollfds[i].fd = listen_fds[i].fd;
2544 pollfds[i].events = POLLIN | POLLPRI;
2545 pollfds[i].revents = 0;
2546 }
2548 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2549 if (state != RUNNING)
2550 break;
2551 else if (status == 0) /* timeout */
2552 continue;
2553 else if (status < 0) /* error */
2554 {
2555 status = errno;
2556 if (status != EINTR)
2557 {
2558 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2559 }
2560 continue;
2561 }
2563 for (i = 0; i < pollfds_num; i++)
2564 {
2565 listen_socket_t *client_sock;
2566 struct sockaddr_storage client_sa;
2567 socklen_t client_sa_size;
2568 pthread_t tid;
2569 pthread_attr_t attr;
2571 if (pollfds[i].revents == 0)
2572 continue;
2574 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2575 {
2576 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2577 "poll(2) returned something unexpected for listen FD #%i.",
2578 pollfds[i].fd);
2579 continue;
2580 }
2582 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2583 if (client_sock == NULL)
2584 {
2585 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2586 continue;
2587 }
2588 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2590 client_sa_size = sizeof (client_sa);
2591 client_sock->fd = accept (pollfds[i].fd,
2592 (struct sockaddr *) &client_sa, &client_sa_size);
2593 if (client_sock->fd < 0)
2594 {
2595 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2596 free(client_sock);
2597 continue;
2598 }
2600 pthread_attr_init (&attr);
2601 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2603 status = pthread_create (&tid, &attr, connection_thread_main,
2604 client_sock);
2605 if (status != 0)
2606 {
2607 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2608 close_connection(client_sock);
2609 continue;
2610 }
2611 } /* for (pollfds_num) */
2612 } /* while (state == RUNNING) */
2614 RRDD_LOG(LOG_INFO, "starting shutdown");
2616 close_listen_sockets ();
2618 pthread_mutex_lock (&connection_threads_lock);
2619 while (connection_threads_num > 0)
2620 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2621 pthread_mutex_unlock (&connection_threads_lock);
2623 free(pollfds);
2625 return (NULL);
2626 } /* }}} void *listen_thread_main */
2628 static int daemonize (void) /* {{{ */
2629 {
2630 int pid_fd;
2631 char *base_dir;
2633 daemon_uid = geteuid();
2635 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2636 if (pid_fd < 0)
2637 pid_fd = check_pidfile();
2638 if (pid_fd < 0)
2639 return pid_fd;
2641 /* open all the listen sockets */
2642 if (config_listen_address_list_len > 0)
2643 {
2644 for (size_t i = 0; i < config_listen_address_list_len; i++)
2645 open_listen_socket (config_listen_address_list[i]);
2647 rrd_free_ptrs((void ***) &config_listen_address_list,
2648 &config_listen_address_list_len);
2649 }
2650 else
2651 {
2652 listen_socket_t sock;
2653 memset(&sock, 0, sizeof(sock));
2654 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2655 open_listen_socket (&sock);
2656 }
2658 if (listen_fds_num < 1)
2659 {
2660 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2661 goto error;
2662 }
2664 if (!stay_foreground)
2665 {
2666 pid_t child;
2668 child = fork ();
2669 if (child < 0)
2670 {
2671 fprintf (stderr, "daemonize: fork(2) failed.\n");
2672 goto error;
2673 }
2674 else if (child > 0)
2675 exit(0);
2677 /* Become session leader */
2678 setsid ();
2680 /* Open the first three file descriptors to /dev/null */
2681 close (2);
2682 close (1);
2683 close (0);
2685 open ("/dev/null", O_RDWR);
2686 if (dup(0) == -1 || dup(0) == -1){
2687 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2688 }
2689 } /* if (!stay_foreground) */
2691 /* Change into the /tmp directory. */
2692 base_dir = (config_base_dir != NULL)
2693 ? config_base_dir
2694 : "/tmp";
2696 if (chdir (base_dir) != 0)
2697 {
2698 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2699 goto error;
2700 }
2702 install_signal_handlers();
2704 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2705 RRDD_LOG(LOG_INFO, "starting up");
2707 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2708 (GDestroyNotify) free_cache_item);
2709 if (cache_tree == NULL)
2710 {
2711 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2712 goto error;
2713 }
2715 return write_pidfile (pid_fd);
2717 error:
2718 remove_pidfile();
2719 return -1;
2720 } /* }}} int daemonize */
2722 static int cleanup (void) /* {{{ */
2723 {
2724 pthread_cond_broadcast (&flush_cond);
2725 pthread_join (flush_thread, NULL);
2727 pthread_cond_broadcast (&queue_cond);
2728 for (int i = 0; i < config_queue_threads; i++)
2729 pthread_join (queue_threads[i], NULL);
2731 if (config_flush_at_shutdown)
2732 {
2733 assert(cache_queue_head == NULL);
2734 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2735 }
2737 free(queue_threads);
2738 free(config_base_dir);
2740 pthread_mutex_lock(&cache_lock);
2741 g_tree_destroy(cache_tree);
2743 pthread_mutex_lock(&journal_lock);
2744 journal_done();
2746 RRDD_LOG(LOG_INFO, "goodbye");
2747 closelog ();
2749 remove_pidfile ();
2750 free(config_pid_file);
2752 return (0);
2753 } /* }}} int cleanup */
2755 static int read_options (int argc, char **argv) /* {{{ */
2756 {
2757 int option;
2758 int status = 0;
2760 char **permissions = NULL;
2761 size_t permissions_len = 0;
2763 while ((option = getopt(argc, argv, "gl:s:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2764 {
2765 switch (option)
2766 {
2767 case 'g':
2768 stay_foreground=1;
2769 break;
2771 case 'l':
2772 {
2773 listen_socket_t *new;
2775 new = malloc(sizeof(listen_socket_t));
2776 if (new == NULL)
2777 {
2778 fprintf(stderr, "read_options: malloc failed.\n");
2779 return(2);
2780 }
2781 memset(new, 0, sizeof(listen_socket_t));
2783 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2785 /* Add permissions to the socket {{{ */
2786 if (permissions_len != 0)
2787 {
2788 size_t i;
2789 for (i = 0; i < permissions_len; i++)
2790 {
2791 status = socket_permission_add (new, permissions[i]);
2792 if (status != 0)
2793 {
2794 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2795 "socket failed. Most likely, this permission doesn't "
2796 "exist. Check your command line.\n", permissions[i]);
2797 status = 4;
2798 }
2799 }
2800 }
2801 else /* if (permissions_len == 0) */
2802 {
2803 /* Add permission for ALL commands to the socket. */
2804 size_t i;
2805 for (i = 0; i < list_of_commands_len; i++)
2806 {
2807 status = socket_permission_add (new, list_of_commands[i].cmd);
2808 if (status != 0)
2809 {
2810 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2811 "socket failed. This should never happen, ever! Sorry.\n",
2812 permissions[i]);
2813 status = 4;
2814 }
2815 }
2816 }
2817 /* }}} Done adding permissions. */
2819 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2820 &config_listen_address_list_len, new))
2821 {
2822 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2823 return (2);
2824 }
2825 }
2826 break;
2828 /* set socket group permissions */
2829 case 's':
2830 {
2831 gid_t group_gid;
2832 struct group *grp;
2834 group_gid = strtoul(optarg, NULL, 10);
2835 if (errno != EINVAL && group_gid>0)
2836 {
2837 /* we were passed a number */
2838 grp = getgrgid(group_gid);
2839 }
2840 else
2841 {
2842 grp = getgrnam(optarg);
2843 }
2845 if (grp)
2846 {
2847 socket_group = grp->gr_gid;
2848 set_socket_group = TRUE;
2849 }
2850 else
2851 {
2852 /* no idea what the user wanted... */
2853 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2854 return (5);
2855 }
2856 }
2857 break;
2859 case 'P':
2860 {
2861 char *optcopy;
2862 char *saveptr;
2863 char *dummy;
2864 char *ptr;
2866 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2868 optcopy = strdup (optarg);
2869 dummy = optcopy;
2870 saveptr = NULL;
2871 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2872 {
2873 dummy = NULL;
2874 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2875 }
2877 free (optcopy);
2878 }
2879 break;
2881 case 'f':
2882 {
2883 int temp;
2885 temp = atoi (optarg);
2886 if (temp > 0)
2887 config_flush_interval = temp;
2888 else
2889 {
2890 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2891 status = 3;
2892 }
2893 }
2894 break;
2896 case 'w':
2897 {
2898 int temp;
2900 temp = atoi (optarg);
2901 if (temp > 0)
2902 config_write_interval = temp;
2903 else
2904 {
2905 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2906 status = 2;
2907 }
2908 }
2909 break;
2911 case 'z':
2912 {
2913 int temp;
2915 temp = atoi(optarg);
2916 if (temp > 0)
2917 config_write_jitter = temp;
2918 else
2919 {
2920 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2921 status = 2;
2922 }
2924 break;
2925 }
2927 case 't':
2928 {
2929 int threads;
2930 threads = atoi(optarg);
2931 if (threads >= 1)
2932 config_queue_threads = threads;
2933 else
2934 {
2935 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2936 return 1;
2937 }
2938 }
2939 break;
2941 case 'B':
2942 config_write_base_only = 1;
2943 break;
2945 case 'b':
2946 {
2947 size_t len;
2948 char base_realpath[PATH_MAX];
2950 if (config_base_dir != NULL)
2951 free (config_base_dir);
2952 config_base_dir = strdup (optarg);
2953 if (config_base_dir == NULL)
2954 {
2955 fprintf (stderr, "read_options: strdup failed.\n");
2956 return (3);
2957 }
2959 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2960 {
2961 fprintf (stderr, "Failed to create base directory '%s': %s\n",
2962 config_base_dir, rrd_strerror (errno));
2963 return (3);
2964 }
2966 /* make sure that the base directory is not resolved via
2967 * symbolic links. this makes some performance-enhancing
2968 * assumptions possible (we don't have to resolve paths
2969 * that start with a "/")
2970 */
2971 if (realpath(config_base_dir, base_realpath) == NULL)
2972 {
2973 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2974 "%s\n", config_base_dir, rrd_strerror(errno));
2975 return 5;
2976 }
2978 len = strlen (config_base_dir);
2979 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2980 {
2981 config_base_dir[len - 1] = 0;
2982 len--;
2983 }
2985 if (len < 1)
2986 {
2987 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2988 return (4);
2989 }
2991 _config_base_dir_len = len;
2993 len = strlen (base_realpath);
2994 while ((len > 0) && (base_realpath[len - 1] == '/'))
2995 {
2996 base_realpath[len - 1] = '\0';
2997 len--;
2998 }
3000 if (strncmp(config_base_dir,
3001 base_realpath, sizeof(base_realpath)) != 0)
3002 {
3003 fprintf(stderr,
3004 "Base directory (-b) resolved via file system links!\n"
3005 "Please consult rrdcached '-b' documentation!\n"
3006 "Consider specifying the real directory (%s)\n",
3007 base_realpath);
3008 return 5;
3009 }
3010 }
3011 break;
3013 case 'p':
3014 {
3015 if (config_pid_file != NULL)
3016 free (config_pid_file);
3017 config_pid_file = strdup (optarg);
3018 if (config_pid_file == NULL)
3019 {
3020 fprintf (stderr, "read_options: strdup failed.\n");
3021 return (3);
3022 }
3023 }
3024 break;
3026 case 'F':
3027 config_flush_at_shutdown = 1;
3028 break;
3030 case 'j':
3031 {
3032 const char *dir = journal_dir = strdup(optarg);
3034 status = rrd_mkdir_p(dir, 0777);
3035 if (status != 0)
3036 {
3037 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3038 dir, rrd_strerror(errno));
3039 return 6;
3040 }
3042 if (access(dir, R_OK|W_OK|X_OK) != 0)
3043 {
3044 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3045 errno ? rrd_strerror(errno) : "");
3046 return 6;
3047 }
3048 }
3049 break;
3051 case 'h':
3052 case '?':
3053 printf ("RRDCacheD %s\n"
3054 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3055 "\n"
3056 "Usage: rrdcached [options]\n"
3057 "\n"
3058 "Valid options are:\n"
3059 " -l <address> Socket address to listen to.\n"
3060 " -P <perms> Sets the permissions to assign to all following "
3061 "sockets\n"
3062 " -w <seconds> Interval in which to write data.\n"
3063 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3064 " -t <threads> Number of write threads.\n"
3065 " -f <seconds> Interval in which to flush dead data.\n"
3066 " -p <file> Location of the PID-file.\n"
3067 " -b <dir> Base directory to change to.\n"
3068 " -B Restrict file access to paths within -b <dir>\n"
3069 " -g Do not fork and run in the foreground.\n"
3070 " -j <dir> Directory in which to create the journal files.\n"
3071 " -F Always flush all updates at shutdown\n"
3072 " -s <id|name> Make socket g+rw to named group\n"
3073 "\n"
3074 "For more information and a detailed description of all options "
3075 "please refer\n"
3076 "to the rrdcached(1) manual page.\n",
3077 VERSION);
3078 status = -1;
3079 break;
3080 } /* switch (option) */
3081 } /* while (getopt) */
3083 /* advise the user when values are not sane */
3084 if (config_flush_interval < 2 * config_write_interval)
3085 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3086 " 2x write interval (-w) !\n");
3087 if (config_write_jitter > config_write_interval)
3088 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3089 " write interval (-w) !\n");
3091 if (config_write_base_only && config_base_dir == NULL)
3092 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3093 " Consult the rrdcached documentation\n");
3095 if (journal_dir == NULL)
3096 config_flush_at_shutdown = 1;
3098 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3100 return (status);
3101 } /* }}} int read_options */
3103 int main (int argc, char **argv)
3104 {
3105 int status;
3107 status = read_options (argc, argv);
3108 if (status != 0)
3109 {
3110 if (status < 0)
3111 status = 0;
3112 return (status);
3113 }
3115 status = daemonize ();
3116 if (status != 0)
3117 {
3118 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3119 return (1);
3120 }
3122 journal_init();
3124 /* start the queue threads */
3125 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3126 if (queue_threads == NULL)
3127 {
3128 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3129 cleanup();
3130 return (1);
3131 }
3132 for (int i = 0; i < config_queue_threads; i++)
3133 {
3134 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3135 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3136 if (status != 0)
3137 {
3138 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3139 cleanup();
3140 return (1);
3141 }
3142 }
3144 /* start the flush thread */
3145 memset(&flush_thread, 0, sizeof(flush_thread));
3146 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3147 if (status != 0)
3148 {
3149 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3150 cleanup();
3151 return (1);
3152 }
3154 listen_thread_main (NULL);
3155 cleanup ();
3157 return (0);
3158 } /* int main */
3160 /*
3161 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3162 */