372adee9deea8e24f41160b794fbb34370fcd6b3
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008,2009 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 # include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
111 #include <glib-2.0/glib.h>
112 /* }}} */
114 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
116 #ifndef __GNUC__
117 # define __attribute__(x) /**/
118 #endif
120 /*
121 * Types
122 */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125 struct listen_socket_s
126 {
127 int fd;
128 char addr[PATH_MAX + 1];
129 int family;
131 /* state for BATCH processing */
132 time_t batch_start;
133 int batch_cmd;
135 /* buffered IO */
136 char *rbuf;
137 off_t next_cmd;
138 off_t next_read;
140 char *wbuf;
141 ssize_t wbuf_len;
143 uint32_t permissions;
145 gid_t socket_group;
146 };
147 typedef struct listen_socket_s listen_socket_t;
149 struct command_s;
150 typedef struct command_s command_t;
151 /* note: guard against "unused" warnings in the handlers */
152 #define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\
153 time_t now __attribute__((unused)),\
154 char *buffer __attribute__((unused)),\
155 size_t buffer_size __attribute__((unused))
157 #define HANDLER_PROTO command_t *cmd __attribute__((unused)),\
158 DISPATCH_PROTO
160 struct command_s {
161 char *cmd;
162 int (*handler)(HANDLER_PROTO);
164 char context; /* where we expect to see it */
165 #define CMD_CONTEXT_CLIENT (1<<0)
166 #define CMD_CONTEXT_BATCH (1<<1)
167 #define CMD_CONTEXT_JOURNAL (1<<2)
168 #define CMD_CONTEXT_ANY (0x7f)
170 char *syntax;
171 char *help;
172 };
174 struct cache_item_s;
175 typedef struct cache_item_s cache_item_t;
176 struct cache_item_s
177 {
178 char *file;
179 char **values;
180 size_t values_num;
181 time_t last_flush_time;
182 time_t last_update_stamp;
183 #define CI_FLAGS_IN_TREE (1<<0)
184 #define CI_FLAGS_IN_QUEUE (1<<1)
185 int flags;
186 pthread_cond_t flushed;
187 cache_item_t *prev;
188 cache_item_t *next;
189 };
191 struct callback_flush_data_s
192 {
193 time_t now;
194 time_t abs_timeout;
195 char **keys;
196 size_t keys_num;
197 };
198 typedef struct callback_flush_data_s callback_flush_data_t;
200 enum queue_side_e
201 {
202 HEAD,
203 TAIL
204 };
205 typedef enum queue_side_e queue_side_t;
207 /* describe a set of journal files */
208 typedef struct {
209 char **files;
210 size_t files_num;
211 } journal_set;
213 /* max length of socket command or response */
214 #define CMD_MAX 4096
215 #define RBUF_SIZE (CMD_MAX*2)
217 /*
218 * Variables
219 */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
226 enum {
227 RUNNING, /* normal operation */
228 FLUSHING, /* flushing remaining values */
229 SHUTDOWN /* shutting down */
230 } state = RUNNING;
232 static pthread_t *queue_threads;
233 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
234 static int config_queue_threads = 4;
236 static pthread_t flush_thread;
237 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
239 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
240 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
241 static int connection_threads_num = 0;
243 /* Cache stuff */
244 static GTree *cache_tree = NULL;
245 static cache_item_t *cache_queue_head = NULL;
246 static cache_item_t *cache_queue_tail = NULL;
247 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
249 static int config_write_interval = 300;
250 static int config_write_jitter = 0;
251 static int config_flush_interval = 3600;
252 static int config_flush_at_shutdown = 0;
253 static char *config_pid_file = NULL;
254 static char *config_base_dir = NULL;
255 static size_t _config_base_dir_len = 0;
256 static int config_write_base_only = 0;
258 static listen_socket_t **config_listen_address_list = NULL;
259 static size_t config_listen_address_list_len = 0;
261 static uint64_t stats_queue_length = 0;
262 static uint64_t stats_updates_received = 0;
263 static uint64_t stats_flush_received = 0;
264 static uint64_t stats_updates_written = 0;
265 static uint64_t stats_data_sets_written = 0;
266 static uint64_t stats_journal_bytes = 0;
267 static uint64_t stats_journal_rotate = 0;
268 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
270 /* Journaled updates */
271 #define JOURNAL_REPLAY(s) ((s) == NULL)
272 #define JOURNAL_BASE "rrd.journal"
273 static journal_set *journal_cur = NULL;
274 static journal_set *journal_old = NULL;
275 static char *journal_dir = NULL;
276 static FILE *journal_fh = NULL; /* current journal file handle */
277 static long journal_size = 0; /* current journal size */
278 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
279 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
280 static int journal_write(char *cmd, char *args);
281 static void journal_done(void);
282 static void journal_rotate(void);
284 /* prototypes for forward refernces */
285 static int handle_request_help (HANDLER_PROTO);
287 /*
288 * Functions
289 */
290 static void sig_common (const char *sig) /* {{{ */
291 {
292 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
293 state = FLUSHING;
294 pthread_cond_broadcast(&flush_cond);
295 pthread_cond_broadcast(&queue_cond);
296 } /* }}} void sig_common */
298 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
299 {
300 sig_common("INT");
301 } /* }}} void sig_int_handler */
303 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
304 {
305 sig_common("TERM");
306 } /* }}} void sig_term_handler */
308 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
309 {
310 config_flush_at_shutdown = 1;
311 sig_common("USR1");
312 } /* }}} void sig_usr1_handler */
314 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
315 {
316 config_flush_at_shutdown = 0;
317 sig_common("USR2");
318 } /* }}} void sig_usr2_handler */
320 static void install_signal_handlers(void) /* {{{ */
321 {
322 /* These structures are static, because `sigaction' behaves weird if the are
323 * overwritten.. */
324 static struct sigaction sa_int;
325 static struct sigaction sa_term;
326 static struct sigaction sa_pipe;
327 static struct sigaction sa_usr1;
328 static struct sigaction sa_usr2;
330 /* Install signal handlers */
331 memset (&sa_int, 0, sizeof (sa_int));
332 sa_int.sa_handler = sig_int_handler;
333 sigaction (SIGINT, &sa_int, NULL);
335 memset (&sa_term, 0, sizeof (sa_term));
336 sa_term.sa_handler = sig_term_handler;
337 sigaction (SIGTERM, &sa_term, NULL);
339 memset (&sa_pipe, 0, sizeof (sa_pipe));
340 sa_pipe.sa_handler = SIG_IGN;
341 sigaction (SIGPIPE, &sa_pipe, NULL);
343 memset (&sa_pipe, 0, sizeof (sa_usr1));
344 sa_usr1.sa_handler = sig_usr1_handler;
345 sigaction (SIGUSR1, &sa_usr1, NULL);
347 memset (&sa_usr2, 0, sizeof (sa_usr2));
348 sa_usr2.sa_handler = sig_usr2_handler;
349 sigaction (SIGUSR2, &sa_usr2, NULL);
351 } /* }}} void install_signal_handlers */
353 static int open_pidfile(char *action, int oflag) /* {{{ */
354 {
355 int fd;
356 const char *file;
357 char *file_copy, *dir;
359 file = (config_pid_file != NULL)
360 ? config_pid_file
361 : LOCALSTATEDIR "/run/rrdcached.pid";
363 /* dirname may modify its argument */
364 file_copy = strdup(file);
365 if (file_copy == NULL)
366 {
367 fprintf(stderr, "rrdcached: strdup(): %s\n",
368 rrd_strerror(errno));
369 return -1;
370 }
372 dir = dirname(file_copy);
373 if (rrd_mkdir_p(dir, 0777) != 0)
374 {
375 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
376 dir, rrd_strerror(errno));
377 return -1;
378 }
380 free(file_copy);
382 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
383 if (fd < 0)
384 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
385 action, file, rrd_strerror(errno));
387 return(fd);
388 } /* }}} static int open_pidfile */
390 /* check existing pid file to see whether a daemon is running */
391 static int check_pidfile(void)
392 {
393 int pid_fd;
394 pid_t pid;
395 char pid_str[16];
397 pid_fd = open_pidfile("open", O_RDWR);
398 if (pid_fd < 0)
399 return pid_fd;
401 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
402 return -1;
404 pid = atoi(pid_str);
405 if (pid <= 0)
406 return -1;
408 /* another running process that we can signal COULD be
409 * a competing rrdcached */
410 if (pid != getpid() && kill(pid, 0) == 0)
411 {
412 fprintf(stderr,
413 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
414 close(pid_fd);
415 return -1;
416 }
418 lseek(pid_fd, 0, SEEK_SET);
419 if (ftruncate(pid_fd, 0) == -1)
420 {
421 fprintf(stderr,
422 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
423 close(pid_fd);
424 return -1;
425 }
427 fprintf(stderr,
428 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
429 "rrdcached: starting normally.\n", pid);
431 return pid_fd;
432 } /* }}} static int check_pidfile */
434 static int write_pidfile (int fd) /* {{{ */
435 {
436 pid_t pid;
437 FILE *fh;
439 pid = getpid ();
441 fh = fdopen (fd, "w");
442 if (fh == NULL)
443 {
444 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
445 close(fd);
446 return (-1);
447 }
449 fprintf (fh, "%i\n", (int) pid);
450 fclose (fh);
452 return (0);
453 } /* }}} int write_pidfile */
455 static int remove_pidfile (void) /* {{{ */
456 {
457 char *file;
458 int status;
460 file = (config_pid_file != NULL)
461 ? config_pid_file
462 : LOCALSTATEDIR "/run/rrdcached.pid";
464 status = unlink (file);
465 if (status == 0)
466 return (0);
467 return (errno);
468 } /* }}} int remove_pidfile */
470 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
471 {
472 char *eol;
474 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
475 sock->next_read - sock->next_cmd);
477 if (eol == NULL)
478 {
479 /* no commands left, move remainder back to front of rbuf */
480 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
481 sock->next_read - sock->next_cmd);
482 sock->next_read -= sock->next_cmd;
483 sock->next_cmd = 0;
484 *len = 0;
485 return NULL;
486 }
487 else
488 {
489 char *cmd = sock->rbuf + sock->next_cmd;
490 *eol = '\0';
492 sock->next_cmd = eol - sock->rbuf + 1;
494 if (eol > sock->rbuf && *(eol-1) == '\r')
495 *(--eol) = '\0'; /* handle "\r\n" EOL */
497 *len = eol - cmd;
499 return cmd;
500 }
502 /* NOTREACHED */
503 assert(1==0);
504 } /* }}} char *next_cmd */
506 /* add the characters directly to the write buffer */
507 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
508 {
509 char *new_buf;
511 assert(sock != NULL);
513 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
514 if (new_buf == NULL)
515 {
516 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
517 return -1;
518 }
520 strncpy(new_buf + sock->wbuf_len, str, len + 1);
522 sock->wbuf = new_buf;
523 sock->wbuf_len += len;
525 return 0;
526 } /* }}} static int add_to_wbuf */
528 /* add the text to the "extra" info that's sent after the status line */
529 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
530 {
531 va_list argp;
532 char buffer[CMD_MAX];
533 int len;
535 if (JOURNAL_REPLAY(sock)) return 0;
536 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
538 va_start(argp, fmt);
539 #ifdef HAVE_VSNPRINTF
540 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
541 #else
542 len = vsprintf(buffer, fmt, argp);
543 #endif
544 va_end(argp);
545 if (len < 0)
546 {
547 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
548 return -1;
549 }
551 return add_to_wbuf(sock, buffer, len);
552 } /* }}} static int add_response_info */
554 static int count_lines(char *str) /* {{{ */
555 {
556 int lines = 0;
558 if (str != NULL)
559 {
560 while ((str = strchr(str, '\n')) != NULL)
561 {
562 ++lines;
563 ++str;
564 }
565 }
567 return lines;
568 } /* }}} static int count_lines */
570 /* send the response back to the user.
571 * returns 0 on success, -1 on error
572 * write buffer is always zeroed after this call */
573 static int send_response (listen_socket_t *sock, response_code rc,
574 char *fmt, ...) /* {{{ */
575 {
576 va_list argp;
577 char buffer[CMD_MAX];
578 int lines;
579 ssize_t wrote;
580 int rclen, len;
582 if (JOURNAL_REPLAY(sock)) return rc;
584 if (sock->batch_start)
585 {
586 if (rc == RESP_OK)
587 return rc; /* no response on success during BATCH */
588 lines = sock->batch_cmd;
589 }
590 else if (rc == RESP_OK)
591 lines = count_lines(sock->wbuf);
592 else
593 lines = -1;
595 rclen = sprintf(buffer, "%d ", lines);
596 va_start(argp, fmt);
597 #ifdef HAVE_VSNPRINTF
598 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
599 #else
600 len = vsprintf(buffer+rclen, fmt, argp);
601 #endif
602 va_end(argp);
603 if (len < 0)
604 return -1;
606 len += rclen;
608 /* append the result to the wbuf, don't write to the user */
609 if (sock->batch_start)
610 return add_to_wbuf(sock, buffer, len);
612 /* first write must be complete */
613 if (len != write(sock->fd, buffer, len))
614 {
615 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
616 return -1;
617 }
619 if (sock->wbuf != NULL && rc == RESP_OK)
620 {
621 wrote = 0;
622 while (wrote < sock->wbuf_len)
623 {
624 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
625 if (wb <= 0)
626 {
627 RRDD_LOG(LOG_INFO, "send_response: could not write results");
628 return -1;
629 }
630 wrote += wb;
631 }
632 }
634 free(sock->wbuf); sock->wbuf = NULL;
635 sock->wbuf_len = 0;
637 return 0;
638 } /* }}} */
640 static void wipe_ci_values(cache_item_t *ci, time_t when)
641 {
642 ci->values = NULL;
643 ci->values_num = 0;
645 ci->last_flush_time = when;
646 if (config_write_jitter > 0)
647 ci->last_flush_time += (rrd_random() % config_write_jitter);
648 }
650 /* remove_from_queue
651 * remove a "cache_item_t" item from the queue.
652 * must hold 'cache_lock' when calling this
653 */
654 static void remove_from_queue(cache_item_t *ci) /* {{{ */
655 {
656 if (ci == NULL) return;
657 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
659 if (ci->prev == NULL)
660 cache_queue_head = ci->next; /* reset head */
661 else
662 ci->prev->next = ci->next;
664 if (ci->next == NULL)
665 cache_queue_tail = ci->prev; /* reset the tail */
666 else
667 ci->next->prev = ci->prev;
669 ci->next = ci->prev = NULL;
670 ci->flags &= ~CI_FLAGS_IN_QUEUE;
672 pthread_mutex_lock (&stats_lock);
673 assert (stats_queue_length > 0);
674 stats_queue_length--;
675 pthread_mutex_unlock (&stats_lock);
677 } /* }}} static void remove_from_queue */
679 /* free the resources associated with the cache_item_t
680 * must hold cache_lock when calling this function
681 */
682 static void *free_cache_item(cache_item_t *ci) /* {{{ */
683 {
684 if (ci == NULL) return NULL;
686 remove_from_queue(ci);
688 for (size_t i=0; i < ci->values_num; i++)
689 free(ci->values[i]);
691 free (ci->values);
692 free (ci->file);
694 /* in case anyone is waiting */
695 pthread_cond_broadcast(&ci->flushed);
696 pthread_cond_destroy(&ci->flushed);
698 free (ci);
700 return NULL;
701 } /* }}} static void *free_cache_item */
703 /*
704 * enqueue_cache_item:
705 * `cache_lock' must be acquired before calling this function!
706 */
707 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
708 queue_side_t side)
709 {
710 if (ci == NULL)
711 return (-1);
713 if (ci->values_num == 0)
714 return (0);
716 if (side == HEAD)
717 {
718 if (cache_queue_head == ci)
719 return 0;
721 /* remove if further down in queue */
722 remove_from_queue(ci);
724 ci->prev = NULL;
725 ci->next = cache_queue_head;
726 if (ci->next != NULL)
727 ci->next->prev = ci;
728 cache_queue_head = ci;
730 if (cache_queue_tail == NULL)
731 cache_queue_tail = cache_queue_head;
732 }
733 else /* (side == TAIL) */
734 {
735 /* We don't move values back in the list.. */
736 if (ci->flags & CI_FLAGS_IN_QUEUE)
737 return (0);
739 assert (ci->next == NULL);
740 assert (ci->prev == NULL);
742 ci->prev = cache_queue_tail;
744 if (cache_queue_tail == NULL)
745 cache_queue_head = ci;
746 else
747 cache_queue_tail->next = ci;
749 cache_queue_tail = ci;
750 }
752 ci->flags |= CI_FLAGS_IN_QUEUE;
754 pthread_cond_signal(&queue_cond);
755 pthread_mutex_lock (&stats_lock);
756 stats_queue_length++;
757 pthread_mutex_unlock (&stats_lock);
759 return (0);
760 } /* }}} int enqueue_cache_item */
762 /*
763 * tree_callback_flush:
764 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
765 * while this is in progress.
766 */
767 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
768 gpointer data)
769 {
770 cache_item_t *ci;
771 callback_flush_data_t *cfd;
773 ci = (cache_item_t *) value;
774 cfd = (callback_flush_data_t *) data;
776 if (ci->flags & CI_FLAGS_IN_QUEUE)
777 return FALSE;
779 if (ci->values_num > 0
780 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
781 {
782 enqueue_cache_item (ci, TAIL);
783 }
784 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
785 && (ci->values_num <= 0))
786 {
787 assert ((char *) key == ci->file);
788 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
789 {
790 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
791 return (FALSE);
792 }
793 }
795 return (FALSE);
796 } /* }}} gboolean tree_callback_flush */
798 static int flush_old_values (int max_age)
799 {
800 callback_flush_data_t cfd;
801 size_t k;
803 memset (&cfd, 0, sizeof (cfd));
804 /* Pass the current time as user data so that we don't need to call
805 * `time' for each node. */
806 cfd.now = time (NULL);
807 cfd.keys = NULL;
808 cfd.keys_num = 0;
810 if (max_age > 0)
811 cfd.abs_timeout = cfd.now - max_age;
812 else
813 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
815 /* `tree_callback_flush' will return the keys of all values that haven't
816 * been touched in the last `config_flush_interval' seconds in `cfd'.
817 * The char*'s in this array point to the same memory as ci->file, so we
818 * don't need to free them separately. */
819 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
821 for (k = 0; k < cfd.keys_num; k++)
822 {
823 /* should never fail, since we have held the cache_lock
824 * the entire time */
825 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
826 }
828 if (cfd.keys != NULL)
829 {
830 free (cfd.keys);
831 cfd.keys = NULL;
832 }
834 return (0);
835 } /* int flush_old_values */
837 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
838 {
839 struct timeval now;
840 struct timespec next_flush;
841 int status;
843 gettimeofday (&now, NULL);
844 next_flush.tv_sec = now.tv_sec + config_flush_interval;
845 next_flush.tv_nsec = 1000 * now.tv_usec;
847 pthread_mutex_lock(&cache_lock);
849 while (state == RUNNING)
850 {
851 gettimeofday (&now, NULL);
852 if ((now.tv_sec > next_flush.tv_sec)
853 || ((now.tv_sec == next_flush.tv_sec)
854 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
855 {
856 RRDD_LOG(LOG_DEBUG, "flushing old values");
858 /* Determine the time of the next cache flush. */
859 next_flush.tv_sec = now.tv_sec + config_flush_interval;
861 /* Flush all values that haven't been written in the last
862 * `config_write_interval' seconds. */
863 flush_old_values (config_write_interval);
865 /* unlock the cache while we rotate so we don't block incoming
866 * updates if the fsync() blocks on disk I/O */
867 pthread_mutex_unlock(&cache_lock);
868 journal_rotate();
869 pthread_mutex_lock(&cache_lock);
870 }
872 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
873 if (status != 0 && status != ETIMEDOUT)
874 {
875 RRDD_LOG (LOG_ERR, "flush_thread_main: "
876 "pthread_cond_timedwait returned %i.", status);
877 }
878 }
880 if (config_flush_at_shutdown)
881 flush_old_values (-1); /* flush everything */
883 state = SHUTDOWN;
885 pthread_mutex_unlock(&cache_lock);
887 return NULL;
888 } /* void *flush_thread_main */
890 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
891 {
892 pthread_mutex_lock (&cache_lock);
894 while (state != SHUTDOWN
895 || (cache_queue_head != NULL && config_flush_at_shutdown))
896 {
897 cache_item_t *ci;
898 char *file;
899 char **values;
900 size_t values_num;
901 int status;
903 /* Now, check if there's something to store away. If not, wait until
904 * something comes in. */
905 if (cache_queue_head == NULL)
906 {
907 status = pthread_cond_wait (&queue_cond, &cache_lock);
908 if ((status != 0) && (status != ETIMEDOUT))
909 {
910 RRDD_LOG (LOG_ERR, "queue_thread_main: "
911 "pthread_cond_wait returned %i.", status);
912 }
913 }
915 /* Check if a value has arrived. This may be NULL if we timed out or there
916 * was an interrupt such as a signal. */
917 if (cache_queue_head == NULL)
918 continue;
920 ci = cache_queue_head;
922 /* copy the relevant parts */
923 file = strdup (ci->file);
924 if (file == NULL)
925 {
926 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
927 continue;
928 }
930 assert(ci->values != NULL);
931 assert(ci->values_num > 0);
933 values = ci->values;
934 values_num = ci->values_num;
936 wipe_ci_values(ci, time(NULL));
937 remove_from_queue(ci);
939 pthread_mutex_unlock (&cache_lock);
941 rrd_clear_error ();
942 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
943 if (status != 0)
944 {
945 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
946 "rrd_update_r (%s) failed with status %i. (%s)",
947 file, status, rrd_get_error());
948 }
950 journal_write("wrote", file);
952 /* Search again in the tree. It's possible someone issued a "FORGET"
953 * while we were writing the update values. */
954 pthread_mutex_lock(&cache_lock);
955 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
956 if (ci)
957 pthread_cond_broadcast(&ci->flushed);
958 pthread_mutex_unlock(&cache_lock);
960 if (status == 0)
961 {
962 pthread_mutex_lock (&stats_lock);
963 stats_updates_written++;
964 stats_data_sets_written += values_num;
965 pthread_mutex_unlock (&stats_lock);
966 }
968 rrd_free_ptrs((void ***) &values, &values_num);
969 free(file);
971 pthread_mutex_lock (&cache_lock);
972 }
973 pthread_mutex_unlock (&cache_lock);
975 return (NULL);
976 } /* }}} void *queue_thread_main */
978 static int buffer_get_field (char **buffer_ret, /* {{{ */
979 size_t *buffer_size_ret, char **field_ret)
980 {
981 char *buffer;
982 size_t buffer_pos;
983 size_t buffer_size;
984 char *field;
985 size_t field_size;
986 int status;
988 buffer = *buffer_ret;
989 buffer_pos = 0;
990 buffer_size = *buffer_size_ret;
991 field = *buffer_ret;
992 field_size = 0;
994 if (buffer_size <= 0)
995 return (-1);
997 /* This is ensured by `handle_request'. */
998 assert (buffer[buffer_size - 1] == '\0');
1000 status = -1;
1001 while (buffer_pos < buffer_size)
1002 {
1003 /* Check for end-of-field or end-of-buffer */
1004 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1005 {
1006 field[field_size] = 0;
1007 field_size++;
1008 buffer_pos++;
1009 status = 0;
1010 break;
1011 }
1012 /* Handle escaped characters. */
1013 else if (buffer[buffer_pos] == '\\')
1014 {
1015 if (buffer_pos >= (buffer_size - 1))
1016 break;
1017 buffer_pos++;
1018 field[field_size] = buffer[buffer_pos];
1019 field_size++;
1020 buffer_pos++;
1021 }
1022 /* Normal operation */
1023 else
1024 {
1025 field[field_size] = buffer[buffer_pos];
1026 field_size++;
1027 buffer_pos++;
1028 }
1029 } /* while (buffer_pos < buffer_size) */
1031 if (status != 0)
1032 return (status);
1034 *buffer_ret = buffer + buffer_pos;
1035 *buffer_size_ret = buffer_size - buffer_pos;
1036 *field_ret = field;
1038 return (0);
1039 } /* }}} int buffer_get_field */
1041 /* if we're restricting writes to the base directory,
1042 * check whether the file falls within the dir
1043 * returns 1 if OK, otherwise 0
1044 */
1045 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1046 {
1047 assert(file != NULL);
1049 if (!config_write_base_only
1050 || JOURNAL_REPLAY(sock)
1051 || config_base_dir == NULL)
1052 return 1;
1054 if (strstr(file, "../") != NULL) goto err;
1056 /* relative paths without "../" are ok */
1057 if (*file != '/') return 1;
1059 /* file must be of the format base + "/" + <1+ char filename> */
1060 if (strlen(file) < _config_base_dir_len + 2) goto err;
1061 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1062 if (*(file + _config_base_dir_len) != '/') goto err;
1064 return 1;
1066 err:
1067 if (sock != NULL && sock->fd >= 0)
1068 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1070 return 0;
1071 } /* }}} static int check_file_access */
1073 /* when using a base dir, convert relative paths to absolute paths.
1074 * if necessary, modifies the "filename" pointer to point
1075 * to the new path created in "tmp". "tmp" is provided
1076 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1077 *
1078 * this allows us to optimize for the expected case (absolute path)
1079 * with a no-op.
1080 */
1081 static void get_abs_path(char **filename, char *tmp)
1082 {
1083 assert(tmp != NULL);
1084 assert(filename != NULL && *filename != NULL);
1086 if (config_base_dir == NULL || **filename == '/')
1087 return;
1089 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1090 *filename = tmp;
1091 } /* }}} static int get_abs_path */
1093 static int flush_file (const char *filename) /* {{{ */
1094 {
1095 cache_item_t *ci;
1097 pthread_mutex_lock (&cache_lock);
1099 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1100 if (ci == NULL)
1101 {
1102 pthread_mutex_unlock (&cache_lock);
1103 return (ENOENT);
1104 }
1106 if (ci->values_num > 0)
1107 {
1108 /* Enqueue at head */
1109 enqueue_cache_item (ci, HEAD);
1110 pthread_cond_wait(&ci->flushed, &cache_lock);
1111 }
1113 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1114 * may have been purged during our cond_wait() */
1116 pthread_mutex_unlock(&cache_lock);
1118 return (0);
1119 } /* }}} int flush_file */
1121 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1122 {
1123 char *err = "Syntax error.\n";
1125 if (cmd && cmd->syntax)
1126 err = cmd->syntax;
1128 return send_response(sock, RESP_ERR, "Usage: %s", err);
1129 } /* }}} static int syntax_error() */
1131 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1132 {
1133 uint64_t copy_queue_length;
1134 uint64_t copy_updates_received;
1135 uint64_t copy_flush_received;
1136 uint64_t copy_updates_written;
1137 uint64_t copy_data_sets_written;
1138 uint64_t copy_journal_bytes;
1139 uint64_t copy_journal_rotate;
1141 uint64_t tree_nodes_number;
1142 uint64_t tree_depth;
1144 pthread_mutex_lock (&stats_lock);
1145 copy_queue_length = stats_queue_length;
1146 copy_updates_received = stats_updates_received;
1147 copy_flush_received = stats_flush_received;
1148 copy_updates_written = stats_updates_written;
1149 copy_data_sets_written = stats_data_sets_written;
1150 copy_journal_bytes = stats_journal_bytes;
1151 copy_journal_rotate = stats_journal_rotate;
1152 pthread_mutex_unlock (&stats_lock);
1154 pthread_mutex_lock (&cache_lock);
1155 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1156 tree_depth = (uint64_t) g_tree_height (cache_tree);
1157 pthread_mutex_unlock (&cache_lock);
1159 add_response_info(sock,
1160 "QueueLength: %"PRIu64"\n", copy_queue_length);
1161 add_response_info(sock,
1162 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1163 add_response_info(sock,
1164 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1165 add_response_info(sock,
1166 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1167 add_response_info(sock,
1168 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1169 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1170 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1171 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1172 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1174 send_response(sock, RESP_OK, "Statistics follow\n");
1176 return (0);
1177 } /* }}} int handle_request_stats */
1179 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1180 {
1181 char *file, file_tmp[PATH_MAX];
1182 int status;
1184 status = buffer_get_field (&buffer, &buffer_size, &file);
1185 if (status != 0)
1186 {
1187 return syntax_error(sock,cmd);
1188 }
1189 else
1190 {
1191 pthread_mutex_lock(&stats_lock);
1192 stats_flush_received++;
1193 pthread_mutex_unlock(&stats_lock);
1195 get_abs_path(&file, file_tmp);
1196 if (!check_file_access(file, sock)) return 0;
1198 status = flush_file (file);
1199 if (status == 0)
1200 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1201 else if (status == ENOENT)
1202 {
1203 /* no file in our tree; see whether it exists at all */
1204 struct stat statbuf;
1206 memset(&statbuf, 0, sizeof(statbuf));
1207 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1208 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1209 else
1210 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1211 }
1212 else if (status < 0)
1213 return send_response(sock, RESP_ERR, "Internal error.\n");
1214 else
1215 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1216 }
1218 /* NOTREACHED */
1219 assert(1==0);
1220 } /* }}} int handle_request_flush */
1222 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1223 {
1224 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1226 pthread_mutex_lock(&cache_lock);
1227 flush_old_values(-1);
1228 pthread_mutex_unlock(&cache_lock);
1230 return send_response(sock, RESP_OK, "Started flush.\n");
1231 } /* }}} static int handle_request_flushall */
1233 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1234 {
1235 int status;
1236 char *file, file_tmp[PATH_MAX];
1237 cache_item_t *ci;
1239 status = buffer_get_field(&buffer, &buffer_size, &file);
1240 if (status != 0)
1241 return syntax_error(sock,cmd);
1243 get_abs_path(&file, file_tmp);
1245 pthread_mutex_lock(&cache_lock);
1246 ci = g_tree_lookup(cache_tree, file);
1247 if (ci == NULL)
1248 {
1249 pthread_mutex_unlock(&cache_lock);
1250 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1251 }
1253 for (size_t i=0; i < ci->values_num; i++)
1254 add_response_info(sock, "%s\n", ci->values[i]);
1256 pthread_mutex_unlock(&cache_lock);
1257 return send_response(sock, RESP_OK, "updates pending\n");
1258 } /* }}} static int handle_request_pending */
1260 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1261 {
1262 int status;
1263 gboolean found;
1264 char *file, file_tmp[PATH_MAX];
1266 status = buffer_get_field(&buffer, &buffer_size, &file);
1267 if (status != 0)
1268 return syntax_error(sock,cmd);
1270 get_abs_path(&file, file_tmp);
1271 if (!check_file_access(file, sock)) return 0;
1273 pthread_mutex_lock(&cache_lock);
1274 found = g_tree_remove(cache_tree, file);
1275 pthread_mutex_unlock(&cache_lock);
1277 if (found == TRUE)
1278 {
1279 if (!JOURNAL_REPLAY(sock))
1280 journal_write("forget", file);
1282 return send_response(sock, RESP_OK, "Gone!\n");
1283 }
1284 else
1285 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1287 /* NOTREACHED */
1288 assert(1==0);
1289 } /* }}} static int handle_request_forget */
1291 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1292 {
1293 cache_item_t *ci;
1295 pthread_mutex_lock(&cache_lock);
1297 ci = cache_queue_head;
1298 while (ci != NULL)
1299 {
1300 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1301 ci = ci->next;
1302 }
1304 pthread_mutex_unlock(&cache_lock);
1306 return send_response(sock, RESP_OK, "in queue.\n");
1307 } /* }}} int handle_request_queue */
1309 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1310 {
1311 char *file, file_tmp[PATH_MAX];
1312 int values_num = 0;
1313 int status;
1314 char orig_buf[CMD_MAX];
1316 cache_item_t *ci;
1318 /* save it for the journal later */
1319 if (!JOURNAL_REPLAY(sock))
1320 strncpy(orig_buf, buffer, buffer_size);
1322 status = buffer_get_field (&buffer, &buffer_size, &file);
1323 if (status != 0)
1324 return syntax_error(sock,cmd);
1326 pthread_mutex_lock(&stats_lock);
1327 stats_updates_received++;
1328 pthread_mutex_unlock(&stats_lock);
1330 get_abs_path(&file, file_tmp);
1331 if (!check_file_access(file, sock)) return 0;
1333 pthread_mutex_lock (&cache_lock);
1334 ci = g_tree_lookup (cache_tree, file);
1336 if (ci == NULL) /* {{{ */
1337 {
1338 struct stat statbuf;
1339 cache_item_t *tmp;
1341 /* don't hold the lock while we setup; stat(2) might block */
1342 pthread_mutex_unlock(&cache_lock);
1344 memset (&statbuf, 0, sizeof (statbuf));
1345 status = stat (file, &statbuf);
1346 if (status != 0)
1347 {
1348 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1350 status = errno;
1351 if (status == ENOENT)
1352 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1353 else
1354 return send_response(sock, RESP_ERR,
1355 "stat failed with error %i.\n", status);
1356 }
1357 if (!S_ISREG (statbuf.st_mode))
1358 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1360 if (access(file, R_OK|W_OK) != 0)
1361 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1362 file, rrd_strerror(errno));
1364 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1365 if (ci == NULL)
1366 {
1367 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1369 return send_response(sock, RESP_ERR, "malloc failed.\n");
1370 }
1371 memset (ci, 0, sizeof (cache_item_t));
1373 ci->file = strdup (file);
1374 if (ci->file == NULL)
1375 {
1376 free (ci);
1377 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1379 return send_response(sock, RESP_ERR, "strdup failed.\n");
1380 }
1382 wipe_ci_values(ci, now);
1383 ci->flags = CI_FLAGS_IN_TREE;
1384 pthread_cond_init(&ci->flushed, NULL);
1386 pthread_mutex_lock(&cache_lock);
1388 /* another UPDATE might have added this entry in the meantime */
1389 tmp = g_tree_lookup (cache_tree, file);
1390 if (tmp == NULL)
1391 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1392 else
1393 {
1394 free_cache_item (ci);
1395 ci = tmp;
1396 }
1398 /* state may have changed while we were unlocked */
1399 if (state == SHUTDOWN)
1400 return -1;
1401 } /* }}} */
1402 assert (ci != NULL);
1404 /* don't re-write updates in replay mode */
1405 if (!JOURNAL_REPLAY(sock))
1406 journal_write("update", orig_buf);
1408 while (buffer_size > 0)
1409 {
1410 char *value;
1411 time_t stamp;
1412 char *eostamp;
1414 status = buffer_get_field (&buffer, &buffer_size, &value);
1415 if (status != 0)
1416 {
1417 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1418 break;
1419 }
1421 /* make sure update time is always moving forward */
1422 stamp = strtol(value, &eostamp, 10);
1423 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1424 {
1425 pthread_mutex_unlock(&cache_lock);
1426 return send_response(sock, RESP_ERR,
1427 "Cannot find timestamp in '%s'!\n", value);
1428 }
1429 else if (stamp <= ci->last_update_stamp)
1430 {
1431 pthread_mutex_unlock(&cache_lock);
1432 return send_response(sock, RESP_ERR,
1433 "illegal attempt to update using time %ld when last"
1434 " update time is %ld (minimum one second step)\n",
1435 stamp, ci->last_update_stamp);
1436 }
1437 else
1438 ci->last_update_stamp = stamp;
1440 if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1441 {
1442 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1443 continue;
1444 }
1446 values_num++;
1447 }
1449 if (((now - ci->last_flush_time) >= config_write_interval)
1450 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1451 && (ci->values_num > 0))
1452 {
1453 enqueue_cache_item (ci, TAIL);
1454 }
1456 pthread_mutex_unlock (&cache_lock);
1458 if (values_num < 1)
1459 return send_response(sock, RESP_ERR, "No values updated.\n");
1460 else
1461 return send_response(sock, RESP_OK,
1462 "errors, enqueued %i value(s).\n", values_num);
1464 /* NOTREACHED */
1465 assert(1==0);
1467 } /* }}} int handle_request_update */
1469 /* we came across a "WROTE" entry during journal replay.
1470 * throw away any values that we have accumulated for this file
1471 */
1472 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1473 {
1474 cache_item_t *ci;
1475 const char *file = buffer;
1477 pthread_mutex_lock(&cache_lock);
1479 ci = g_tree_lookup(cache_tree, file);
1480 if (ci == NULL)
1481 {
1482 pthread_mutex_unlock(&cache_lock);
1483 return (0);
1484 }
1486 if (ci->values)
1487 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1489 wipe_ci_values(ci, now);
1490 remove_from_queue(ci);
1492 pthread_mutex_unlock(&cache_lock);
1493 return (0);
1494 } /* }}} int handle_request_wrote */
1496 /* start "BATCH" processing */
1497 static int batch_start (HANDLER_PROTO) /* {{{ */
1498 {
1499 int status;
1500 if (sock->batch_start)
1501 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1503 status = send_response(sock, RESP_OK,
1504 "Go ahead. End with dot '.' on its own line.\n");
1505 sock->batch_start = time(NULL);
1506 sock->batch_cmd = 0;
1508 return status;
1509 } /* }}} static int batch_start */
1511 /* finish "BATCH" processing and return results to the client */
1512 static int batch_done (HANDLER_PROTO) /* {{{ */
1513 {
1514 assert(sock->batch_start);
1515 sock->batch_start = 0;
1516 sock->batch_cmd = 0;
1517 return send_response(sock, RESP_OK, "errors\n");
1518 } /* }}} static int batch_done */
1520 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1521 {
1522 return -1;
1523 } /* }}} static int handle_request_quit */
1525 static command_t list_of_commands[] = { /* {{{ */
1526 {
1527 "UPDATE",
1528 handle_request_update,
1529 CMD_CONTEXT_ANY,
1530 "UPDATE <filename> <values> [<values> ...]\n"
1531 ,
1532 "Adds the given file to the internal cache if it is not yet known and\n"
1533 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1534 "for details.\n"
1535 "\n"
1536 "Each <values> has the following form:\n"
1537 " <values> = <time>:<value>[:<value>[...]]\n"
1538 "See the rrdupdate(1) manpage for details.\n"
1539 },
1540 {
1541 "WROTE",
1542 handle_request_wrote,
1543 CMD_CONTEXT_JOURNAL,
1544 NULL,
1545 NULL
1546 },
1547 {
1548 "FLUSH",
1549 handle_request_flush,
1550 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1551 "FLUSH <filename>\n"
1552 ,
1553 "Adds the given filename to the head of the update queue and returns\n"
1554 "after it has been dequeued.\n"
1555 },
1556 {
1557 "FLUSHALL",
1558 handle_request_flushall,
1559 CMD_CONTEXT_CLIENT,
1560 "FLUSHALL\n"
1561 ,
1562 "Triggers writing of all pending updates. Returns immediately.\n"
1563 },
1564 {
1565 "PENDING",
1566 handle_request_pending,
1567 CMD_CONTEXT_CLIENT,
1568 "PENDING <filename>\n"
1569 ,
1570 "Shows any 'pending' updates for a file, in order.\n"
1571 "The updates shown have not yet been written to the underlying RRD file.\n"
1572 },
1573 {
1574 "FORGET",
1575 handle_request_forget,
1576 CMD_CONTEXT_ANY,
1577 "FORGET <filename>\n"
1578 ,
1579 "Removes the file completely from the cache.\n"
1580 "Any pending updates for the file will be lost.\n"
1581 },
1582 {
1583 "QUEUE",
1584 handle_request_queue,
1585 CMD_CONTEXT_CLIENT,
1586 "QUEUE\n"
1587 ,
1588 "Shows all files in the output queue.\n"
1589 "The output is zero or more lines in the following format:\n"
1590 "(where <num_vals> is the number of values to be written)\n"
1591 "\n"
1592 "<num_vals> <filename>\n"
1593 },
1594 {
1595 "STATS",
1596 handle_request_stats,
1597 CMD_CONTEXT_CLIENT,
1598 "STATS\n"
1599 ,
1600 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1601 "a description of the values.\n"
1602 },
1603 {
1604 "HELP",
1605 handle_request_help,
1606 CMD_CONTEXT_CLIENT,
1607 "HELP [<command>]\n",
1608 NULL, /* special! */
1609 },
1610 {
1611 "BATCH",
1612 batch_start,
1613 CMD_CONTEXT_CLIENT,
1614 "BATCH\n"
1615 ,
1616 "The 'BATCH' command permits the client to initiate a bulk load\n"
1617 " of commands to rrdcached.\n"
1618 "\n"
1619 "Usage:\n"
1620 "\n"
1621 " client: BATCH\n"
1622 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1623 " client: command #1\n"
1624 " client: command #2\n"
1625 " client: ... and so on\n"
1626 " client: .\n"
1627 " server: 2 errors\n"
1628 " server: 7 message for command #7\n"
1629 " server: 9 message for command #9\n"
1630 "\n"
1631 "For more information, consult the rrdcached(1) documentation.\n"
1632 },
1633 {
1634 ".", /* BATCH terminator */
1635 batch_done,
1636 CMD_CONTEXT_BATCH,
1637 NULL,
1638 NULL
1639 },
1640 {
1641 "QUIT",
1642 handle_request_quit,
1643 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1644 "QUIT\n"
1645 ,
1646 "Disconnect from rrdcached.\n"
1647 }
1648 }; /* }}} command_t list_of_commands[] */
1649 static size_t list_of_commands_len = sizeof (list_of_commands)
1650 / sizeof (list_of_commands[0]);
1652 static command_t *find_command(char *cmd)
1653 {
1654 size_t i;
1656 for (i = 0; i < list_of_commands_len; i++)
1657 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1658 return (&list_of_commands[i]);
1659 return NULL;
1660 }
1662 /* We currently use the index in the `list_of_commands' array as a bit position
1663 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1664 * outside these functions so that switching to a more elegant storage method
1665 * is easily possible. */
1666 static ssize_t find_command_index (const char *cmd) /* {{{ */
1667 {
1668 size_t i;
1670 for (i = 0; i < list_of_commands_len; i++)
1671 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1672 return ((ssize_t) i);
1673 return (-1);
1674 } /* }}} ssize_t find_command_index */
1676 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1677 const char *cmd)
1678 {
1679 ssize_t i;
1681 if (JOURNAL_REPLAY(sock))
1682 return (1);
1684 if (cmd == NULL)
1685 return (-1);
1687 if ((strcasecmp ("QUIT", cmd) == 0)
1688 || (strcasecmp ("HELP", cmd) == 0))
1689 return (1);
1690 else if (strcmp (".", cmd) == 0)
1691 cmd = "BATCH";
1693 i = find_command_index (cmd);
1694 if (i < 0)
1695 return (-1);
1696 assert (i < 32);
1698 if ((sock->permissions & (1 << i)) != 0)
1699 return (1);
1700 return (0);
1701 } /* }}} int socket_permission_check */
1703 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1704 const char *cmd)
1705 {
1706 ssize_t i;
1708 i = find_command_index (cmd);
1709 if (i < 0)
1710 return (-1);
1711 assert (i < 32);
1713 sock->permissions |= (1 << i);
1714 return (0);
1715 } /* }}} int socket_permission_add */
1717 /* check whether commands are received in the expected context */
1718 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1719 {
1720 if (JOURNAL_REPLAY(sock))
1721 return (cmd->context & CMD_CONTEXT_JOURNAL);
1722 else if (sock->batch_start)
1723 return (cmd->context & CMD_CONTEXT_BATCH);
1724 else
1725 return (cmd->context & CMD_CONTEXT_CLIENT);
1727 /* NOTREACHED */
1728 assert(1==0);
1729 }
1731 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1732 {
1733 int status;
1734 char *cmd_str;
1735 char *resp_txt;
1736 command_t *help = NULL;
1738 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1739 if (status == 0)
1740 help = find_command(cmd_str);
1742 if (help && (help->syntax || help->help))
1743 {
1744 char tmp[CMD_MAX];
1746 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1747 resp_txt = tmp;
1749 if (help->syntax)
1750 add_response_info(sock, "Usage: %s\n", help->syntax);
1752 if (help->help)
1753 add_response_info(sock, "%s\n", help->help);
1754 }
1755 else
1756 {
1757 size_t i;
1759 resp_txt = "Command overview\n";
1761 for (i = 0; i < list_of_commands_len; i++)
1762 {
1763 if (list_of_commands[i].syntax == NULL)
1764 continue;
1765 add_response_info (sock, "%s", list_of_commands[i].syntax);
1766 }
1767 }
1769 return send_response(sock, RESP_OK, resp_txt);
1770 } /* }}} int handle_request_help */
1772 static int handle_request (DISPATCH_PROTO) /* {{{ */
1773 {
1774 char *buffer_ptr = buffer;
1775 char *cmd_str = NULL;
1776 command_t *cmd = NULL;
1777 int status;
1779 assert (buffer[buffer_size - 1] == '\0');
1781 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1782 if (status != 0)
1783 {
1784 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1785 return (-1);
1786 }
1788 if (sock != NULL && sock->batch_start)
1789 sock->batch_cmd++;
1791 cmd = find_command(cmd_str);
1792 if (!cmd)
1793 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1795 if (!socket_permission_check (sock, cmd->cmd))
1796 return send_response(sock, RESP_ERR, "Permission denied.\n");
1798 if (!command_check_context(sock, cmd))
1799 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1801 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1802 } /* }}} int handle_request */
1804 static void journal_set_free (journal_set *js) /* {{{ */
1805 {
1806 if (js == NULL)
1807 return;
1809 rrd_free_ptrs((void ***) &js->files, &js->files_num);
1811 free(js);
1812 } /* }}} journal_set_free */
1814 static void journal_set_remove (journal_set *js) /* {{{ */
1815 {
1816 if (js == NULL)
1817 return;
1819 for (uint i=0; i < js->files_num; i++)
1820 {
1821 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1822 unlink(js->files[i]);
1823 }
1824 } /* }}} journal_set_remove */
1826 /* close current journal file handle.
1827 * MUST hold journal_lock before calling */
1828 static void journal_close(void) /* {{{ */
1829 {
1830 if (journal_fh != NULL)
1831 {
1832 if (fclose(journal_fh) != 0)
1833 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1834 }
1836 journal_fh = NULL;
1837 journal_size = 0;
1838 } /* }}} journal_close */
1840 /* MUST hold journal_lock before calling */
1841 static void journal_new_file(void) /* {{{ */
1842 {
1843 struct timeval now;
1844 int new_fd;
1845 char new_file[PATH_MAX + 1];
1847 assert(journal_dir != NULL);
1848 assert(journal_cur != NULL);
1850 journal_close();
1852 gettimeofday(&now, NULL);
1853 /* this format assures that the files sort in strcmp() order */
1854 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1855 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1857 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1858 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1859 if (new_fd < 0)
1860 goto error;
1862 journal_fh = fdopen(new_fd, "a");
1863 if (journal_fh == NULL)
1864 goto error;
1866 journal_size = ftell(journal_fh);
1867 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1869 /* record the file in the journal set */
1870 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1872 return;
1874 error:
1875 RRDD_LOG(LOG_CRIT,
1876 "JOURNALING DISABLED: Error while trying to create %s : %s",
1877 new_file, rrd_strerror(errno));
1878 RRDD_LOG(LOG_CRIT,
1879 "JOURNALING DISABLED: All values will be flushed at shutdown");
1881 close(new_fd);
1882 config_flush_at_shutdown = 1;
1884 } /* }}} journal_new_file */
1886 /* MUST NOT hold journal_lock before calling this */
1887 static void journal_rotate(void) /* {{{ */
1888 {
1889 journal_set *old_js = NULL;
1891 if (journal_dir == NULL)
1892 return;
1894 RRDD_LOG(LOG_DEBUG, "rotating journals");
1896 pthread_mutex_lock(&stats_lock);
1897 ++stats_journal_rotate;
1898 pthread_mutex_unlock(&stats_lock);
1900 pthread_mutex_lock(&journal_lock);
1902 journal_close();
1904 /* rotate the journal sets */
1905 old_js = journal_old;
1906 journal_old = journal_cur;
1907 journal_cur = calloc(1, sizeof(journal_set));
1909 if (journal_cur != NULL)
1910 journal_new_file();
1911 else
1912 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1914 pthread_mutex_unlock(&journal_lock);
1916 journal_set_remove(old_js);
1917 journal_set_free (old_js);
1919 } /* }}} static void journal_rotate */
1921 /* MUST hold journal_lock when calling */
1922 static void journal_done(void) /* {{{ */
1923 {
1924 if (journal_cur == NULL)
1925 return;
1927 journal_close();
1929 if (config_flush_at_shutdown)
1930 {
1931 RRDD_LOG(LOG_INFO, "removing journals");
1932 journal_set_remove(journal_old);
1933 journal_set_remove(journal_cur);
1934 }
1935 else
1936 {
1937 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1938 "journals will be used at next startup");
1939 }
1941 journal_set_free(journal_cur);
1942 journal_set_free(journal_old);
1943 free(journal_dir);
1945 } /* }}} static void journal_done */
1947 static int journal_write(char *cmd, char *args) /* {{{ */
1948 {
1949 int chars;
1951 if (journal_fh == NULL)
1952 return 0;
1954 pthread_mutex_lock(&journal_lock);
1955 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1956 journal_size += chars;
1958 if (journal_size > JOURNAL_MAX)
1959 journal_new_file();
1961 pthread_mutex_unlock(&journal_lock);
1963 if (chars > 0)
1964 {
1965 pthread_mutex_lock(&stats_lock);
1966 stats_journal_bytes += chars;
1967 pthread_mutex_unlock(&stats_lock);
1968 }
1970 return chars;
1971 } /* }}} static int journal_write */
1973 static int journal_replay (const char *file) /* {{{ */
1974 {
1975 FILE *fh;
1976 int entry_cnt = 0;
1977 int fail_cnt = 0;
1978 uint64_t line = 0;
1979 char entry[CMD_MAX];
1980 time_t now;
1982 if (file == NULL) return 0;
1984 {
1985 char *reason = "unknown error";
1986 int status = 0;
1987 struct stat statbuf;
1989 memset(&statbuf, 0, sizeof(statbuf));
1990 if (stat(file, &statbuf) != 0)
1991 {
1992 reason = "stat error";
1993 status = errno;
1994 }
1995 else if (!S_ISREG(statbuf.st_mode))
1996 {
1997 reason = "not a regular file";
1998 status = EPERM;
1999 }
2000 if (statbuf.st_uid != daemon_uid)
2001 {
2002 reason = "not owned by daemon user";
2003 status = EACCES;
2004 }
2005 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2006 {
2007 reason = "must not be user/group writable";
2008 status = EACCES;
2009 }
2011 if (status != 0)
2012 {
2013 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2014 file, rrd_strerror(status), reason);
2015 return 0;
2016 }
2017 }
2019 fh = fopen(file, "r");
2020 if (fh == NULL)
2021 {
2022 if (errno != ENOENT)
2023 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2024 file, rrd_strerror(errno));
2025 return 0;
2026 }
2027 else
2028 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2030 now = time(NULL);
2032 while(!feof(fh))
2033 {
2034 size_t entry_len;
2036 ++line;
2037 if (fgets(entry, sizeof(entry), fh) == NULL)
2038 break;
2039 entry_len = strlen(entry);
2041 /* check \n termination in case journal writing crashed mid-line */
2042 if (entry_len == 0)
2043 continue;
2044 else if (entry[entry_len - 1] != '\n')
2045 {
2046 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2047 ++fail_cnt;
2048 continue;
2049 }
2051 entry[entry_len - 1] = '\0';
2053 if (handle_request(NULL, now, entry, entry_len) == 0)
2054 ++entry_cnt;
2055 else
2056 ++fail_cnt;
2057 }
2059 fclose(fh);
2061 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2062 entry_cnt, fail_cnt);
2064 return entry_cnt > 0 ? 1 : 0;
2065 } /* }}} static int journal_replay */
2067 static int journal_sort(const void *v1, const void *v2)
2068 {
2069 char **jn1 = (char **) v1;
2070 char **jn2 = (char **) v2;
2072 return strcmp(*jn1,*jn2);
2073 }
2075 static void journal_init(void) /* {{{ */
2076 {
2077 int had_journal = 0;
2078 DIR *dir;
2079 struct dirent *dent;
2080 char path[PATH_MAX+1];
2082 if (journal_dir == NULL) return;
2084 pthread_mutex_lock(&journal_lock);
2086 journal_cur = calloc(1, sizeof(journal_set));
2087 if (journal_cur == NULL)
2088 {
2089 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2090 return;
2091 }
2093 RRDD_LOG(LOG_INFO, "checking for journal files");
2095 /* Handle old journal files during transition. This gives them the
2096 * correct sort order. TODO: remove after first release
2097 */
2098 {
2099 char old_path[PATH_MAX+1];
2100 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2101 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2102 rename(old_path, path);
2104 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2105 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2106 rename(old_path, path);
2107 }
2109 dir = opendir(journal_dir);
2110 while ((dent = readdir(dir)) != NULL)
2111 {
2112 /* looks like a journal file? */
2113 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2114 continue;
2116 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2118 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2119 {
2120 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2121 dent->d_name);
2122 break;
2123 }
2124 }
2125 closedir(dir);
2127 qsort(journal_cur->files, journal_cur->files_num,
2128 sizeof(journal_cur->files[0]), journal_sort);
2130 for (uint i=0; i < journal_cur->files_num; i++)
2131 had_journal += journal_replay(journal_cur->files[i]);
2133 journal_new_file();
2135 /* it must have been a crash. start a flush */
2136 if (had_journal && config_flush_at_shutdown)
2137 flush_old_values(-1);
2139 pthread_mutex_unlock(&journal_lock);
2141 RRDD_LOG(LOG_INFO, "journal processing complete");
2143 } /* }}} static void journal_init */
2145 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2146 {
2147 assert(sock != NULL);
2149 free(sock->rbuf); sock->rbuf = NULL;
2150 free(sock->wbuf); sock->wbuf = NULL;
2151 free(sock);
2152 } /* }}} void free_listen_socket */
2154 static void close_connection(listen_socket_t *sock) /* {{{ */
2155 {
2156 if (sock->fd >= 0)
2157 {
2158 close(sock->fd);
2159 sock->fd = -1;
2160 }
2162 free_listen_socket(sock);
2164 } /* }}} void close_connection */
2166 static void *connection_thread_main (void *args) /* {{{ */
2167 {
2168 listen_socket_t *sock;
2169 int fd;
2171 sock = (listen_socket_t *) args;
2172 fd = sock->fd;
2174 /* init read buffers */
2175 sock->next_read = sock->next_cmd = 0;
2176 sock->rbuf = malloc(RBUF_SIZE);
2177 if (sock->rbuf == NULL)
2178 {
2179 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2180 close_connection(sock);
2181 return NULL;
2182 }
2184 pthread_mutex_lock (&connection_threads_lock);
2185 connection_threads_num++;
2186 pthread_mutex_unlock (&connection_threads_lock);
2188 while (state == RUNNING)
2189 {
2190 char *cmd;
2191 ssize_t cmd_len;
2192 ssize_t rbytes;
2193 time_t now;
2195 struct pollfd pollfd;
2196 int status;
2198 pollfd.fd = fd;
2199 pollfd.events = POLLIN | POLLPRI;
2200 pollfd.revents = 0;
2202 status = poll (&pollfd, 1, /* timeout = */ 500);
2203 if (state != RUNNING)
2204 break;
2205 else if (status == 0) /* timeout */
2206 continue;
2207 else if (status < 0) /* error */
2208 {
2209 status = errno;
2210 if (status != EINTR)
2211 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2212 continue;
2213 }
2215 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2216 break;
2217 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2218 {
2219 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2220 "poll(2) returned something unexpected: %#04hx",
2221 pollfd.revents);
2222 break;
2223 }
2225 rbytes = read(fd, sock->rbuf + sock->next_read,
2226 RBUF_SIZE - sock->next_read);
2227 if (rbytes < 0)
2228 {
2229 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2230 break;
2231 }
2232 else if (rbytes == 0)
2233 break; /* eof */
2235 sock->next_read += rbytes;
2237 if (sock->batch_start)
2238 now = sock->batch_start;
2239 else
2240 now = time(NULL);
2242 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2243 {
2244 status = handle_request (sock, now, cmd, cmd_len+1);
2245 if (status != 0)
2246 goto out_close;
2247 }
2248 }
2250 out_close:
2251 close_connection(sock);
2253 /* Remove this thread from the connection threads list */
2254 pthread_mutex_lock (&connection_threads_lock);
2255 connection_threads_num--;
2256 if (connection_threads_num <= 0)
2257 pthread_cond_broadcast(&connection_threads_done);
2258 pthread_mutex_unlock (&connection_threads_lock);
2260 return (NULL);
2261 } /* }}} void *connection_thread_main */
2263 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2264 {
2265 int fd;
2266 struct sockaddr_un sa;
2267 listen_socket_t *temp;
2268 int status;
2269 const char *path;
2270 char *path_copy, *dir;
2272 path = sock->addr;
2273 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2274 path += strlen("unix:");
2276 /* dirname may modify its argument */
2277 path_copy = strdup(path);
2278 if (path_copy == NULL)
2279 {
2280 fprintf(stderr, "rrdcached: strdup(): %s\n",
2281 rrd_strerror(errno));
2282 return (-1);
2283 }
2285 dir = dirname(path_copy);
2286 if (rrd_mkdir_p(dir, 0777) != 0)
2287 {
2288 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2289 dir, rrd_strerror(errno));
2290 return (-1);
2291 }
2293 free(path_copy);
2295 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2296 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2297 if (temp == NULL)
2298 {
2299 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2300 return (-1);
2301 }
2302 listen_fds = temp;
2303 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2305 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2306 if (fd < 0)
2307 {
2308 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2309 rrd_strerror(errno));
2310 return (-1);
2311 }
2313 memset (&sa, 0, sizeof (sa));
2314 sa.sun_family = AF_UNIX;
2315 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2317 /* if we've gotten this far, we own the pid file. any daemon started
2318 * with the same args must not be alive. therefore, ensure that we can
2319 * create the socket...
2320 */
2321 unlink(path);
2323 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2324 if (status != 0)
2325 {
2326 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2327 path, rrd_strerror(errno));
2328 close (fd);
2329 return (-1);
2330 }
2332 /* tweak the sockets group ownership */
2333 if (sock->socket_group != (gid_t)-1)
2334 {
2335 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2336 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2337 {
2338 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2339 }
2340 }
2342 status = listen (fd, /* backlog = */ 10);
2343 if (status != 0)
2344 {
2345 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2346 path, rrd_strerror(errno));
2347 close (fd);
2348 unlink (path);
2349 return (-1);
2350 }
2352 listen_fds[listen_fds_num].fd = fd;
2353 listen_fds[listen_fds_num].family = PF_UNIX;
2354 strncpy(listen_fds[listen_fds_num].addr, path,
2355 sizeof (listen_fds[listen_fds_num].addr) - 1);
2356 listen_fds_num++;
2358 return (0);
2359 } /* }}} int open_listen_socket_unix */
2361 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2362 {
2363 struct addrinfo ai_hints;
2364 struct addrinfo *ai_res;
2365 struct addrinfo *ai_ptr;
2366 char addr_copy[NI_MAXHOST];
2367 char *addr;
2368 char *port;
2369 int status;
2371 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2372 addr_copy[sizeof (addr_copy) - 1] = 0;
2373 addr = addr_copy;
2375 memset (&ai_hints, 0, sizeof (ai_hints));
2376 ai_hints.ai_flags = 0;
2377 #ifdef AI_ADDRCONFIG
2378 ai_hints.ai_flags |= AI_ADDRCONFIG;
2379 #endif
2380 ai_hints.ai_family = AF_UNSPEC;
2381 ai_hints.ai_socktype = SOCK_STREAM;
2383 port = NULL;
2384 if (*addr == '[') /* IPv6+port format */
2385 {
2386 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2387 addr++;
2389 port = strchr (addr, ']');
2390 if (port == NULL)
2391 {
2392 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2393 return (-1);
2394 }
2395 *port = 0;
2396 port++;
2398 if (*port == ':')
2399 port++;
2400 else if (*port == 0)
2401 port = NULL;
2402 else
2403 {
2404 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2405 return (-1);
2406 }
2407 } /* if (*addr == '[') */
2408 else
2409 {
2410 port = rindex(addr, ':');
2411 if (port != NULL)
2412 {
2413 *port = 0;
2414 port++;
2415 }
2416 }
2417 ai_res = NULL;
2418 status = getaddrinfo (addr,
2419 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2420 &ai_hints, &ai_res);
2421 if (status != 0)
2422 {
2423 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2424 addr, gai_strerror (status));
2425 return (-1);
2426 }
2428 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2429 {
2430 int fd;
2431 listen_socket_t *temp;
2432 int one = 1;
2434 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2435 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2436 if (temp == NULL)
2437 {
2438 fprintf (stderr,
2439 "rrdcached: open_listen_socket_network: realloc failed.\n");
2440 continue;
2441 }
2442 listen_fds = temp;
2443 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2445 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2446 if (fd < 0)
2447 {
2448 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2449 rrd_strerror(errno));
2450 continue;
2451 }
2453 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2455 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2456 if (status != 0)
2457 {
2458 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2459 sock->addr, rrd_strerror(errno));
2460 close (fd);
2461 continue;
2462 }
2464 status = listen (fd, /* backlog = */ 10);
2465 if (status != 0)
2466 {
2467 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2468 sock->addr, rrd_strerror(errno));
2469 close (fd);
2470 freeaddrinfo(ai_res);
2471 return (-1);
2472 }
2474 listen_fds[listen_fds_num].fd = fd;
2475 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2476 listen_fds_num++;
2477 } /* for (ai_ptr) */
2479 freeaddrinfo(ai_res);
2480 return (0);
2481 } /* }}} static int open_listen_socket_network */
2483 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2484 {
2485 assert(sock != NULL);
2486 assert(sock->addr != NULL);
2488 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2489 || sock->addr[0] == '/')
2490 return (open_listen_socket_unix(sock));
2491 else
2492 return (open_listen_socket_network(sock));
2493 } /* }}} int open_listen_socket */
2495 static int close_listen_sockets (void) /* {{{ */
2496 {
2497 size_t i;
2499 for (i = 0; i < listen_fds_num; i++)
2500 {
2501 close (listen_fds[i].fd);
2503 if (listen_fds[i].family == PF_UNIX)
2504 unlink(listen_fds[i].addr);
2505 }
2507 free (listen_fds);
2508 listen_fds = NULL;
2509 listen_fds_num = 0;
2511 return (0);
2512 } /* }}} int close_listen_sockets */
2514 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2515 {
2516 struct pollfd *pollfds;
2517 int pollfds_num;
2518 int status;
2519 int i;
2521 if (listen_fds_num < 1)
2522 {
2523 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2524 return (NULL);
2525 }
2527 pollfds_num = listen_fds_num;
2528 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2529 if (pollfds == NULL)
2530 {
2531 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2532 return (NULL);
2533 }
2534 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2536 RRDD_LOG(LOG_INFO, "listening for connections");
2538 while (state == RUNNING)
2539 {
2540 for (i = 0; i < pollfds_num; i++)
2541 {
2542 pollfds[i].fd = listen_fds[i].fd;
2543 pollfds[i].events = POLLIN | POLLPRI;
2544 pollfds[i].revents = 0;
2545 }
2547 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2548 if (state != RUNNING)
2549 break;
2550 else if (status == 0) /* timeout */
2551 continue;
2552 else if (status < 0) /* error */
2553 {
2554 status = errno;
2555 if (status != EINTR)
2556 {
2557 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2558 }
2559 continue;
2560 }
2562 for (i = 0; i < pollfds_num; i++)
2563 {
2564 listen_socket_t *client_sock;
2565 struct sockaddr_storage client_sa;
2566 socklen_t client_sa_size;
2567 pthread_t tid;
2568 pthread_attr_t attr;
2570 if (pollfds[i].revents == 0)
2571 continue;
2573 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2574 {
2575 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2576 "poll(2) returned something unexpected for listen FD #%i.",
2577 pollfds[i].fd);
2578 continue;
2579 }
2581 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2582 if (client_sock == NULL)
2583 {
2584 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2585 continue;
2586 }
2587 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2589 client_sa_size = sizeof (client_sa);
2590 client_sock->fd = accept (pollfds[i].fd,
2591 (struct sockaddr *) &client_sa, &client_sa_size);
2592 if (client_sock->fd < 0)
2593 {
2594 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2595 free(client_sock);
2596 continue;
2597 }
2599 pthread_attr_init (&attr);
2600 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2602 status = pthread_create (&tid, &attr, connection_thread_main,
2603 client_sock);
2604 if (status != 0)
2605 {
2606 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2607 close_connection(client_sock);
2608 continue;
2609 }
2610 } /* for (pollfds_num) */
2611 } /* while (state == RUNNING) */
2613 RRDD_LOG(LOG_INFO, "starting shutdown");
2615 close_listen_sockets ();
2617 pthread_mutex_lock (&connection_threads_lock);
2618 while (connection_threads_num > 0)
2619 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2620 pthread_mutex_unlock (&connection_threads_lock);
2622 free(pollfds);
2624 return (NULL);
2625 } /* }}} void *listen_thread_main */
2627 static int daemonize (void) /* {{{ */
2628 {
2629 int pid_fd;
2630 char *base_dir;
2632 daemon_uid = geteuid();
2634 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2635 if (pid_fd < 0)
2636 pid_fd = check_pidfile();
2637 if (pid_fd < 0)
2638 return pid_fd;
2640 /* open all the listen sockets */
2641 if (config_listen_address_list_len > 0)
2642 {
2643 for (size_t i = 0; i < config_listen_address_list_len; i++)
2644 open_listen_socket (config_listen_address_list[i]);
2646 rrd_free_ptrs((void ***) &config_listen_address_list,
2647 &config_listen_address_list_len);
2648 }
2649 else
2650 {
2651 listen_socket_t sock;
2652 memset(&sock, 0, sizeof(sock));
2653 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2654 open_listen_socket (&sock);
2655 }
2657 if (listen_fds_num < 1)
2658 {
2659 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2660 goto error;
2661 }
2663 if (!stay_foreground)
2664 {
2665 pid_t child;
2667 child = fork ();
2668 if (child < 0)
2669 {
2670 fprintf (stderr, "daemonize: fork(2) failed.\n");
2671 goto error;
2672 }
2673 else if (child > 0)
2674 exit(0);
2676 /* Become session leader */
2677 setsid ();
2679 /* Open the first three file descriptors to /dev/null */
2680 close (2);
2681 close (1);
2682 close (0);
2684 open ("/dev/null", O_RDWR);
2685 if (dup(0) == -1 || dup(0) == -1){
2686 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2687 }
2688 } /* if (!stay_foreground) */
2690 /* Change into the /tmp directory. */
2691 base_dir = (config_base_dir != NULL)
2692 ? config_base_dir
2693 : "/tmp";
2695 if (chdir (base_dir) != 0)
2696 {
2697 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2698 goto error;
2699 }
2701 install_signal_handlers();
2703 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2704 RRDD_LOG(LOG_INFO, "starting up");
2706 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2707 (GDestroyNotify) free_cache_item);
2708 if (cache_tree == NULL)
2709 {
2710 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2711 goto error;
2712 }
2714 return write_pidfile (pid_fd);
2716 error:
2717 remove_pidfile();
2718 return -1;
2719 } /* }}} int daemonize */
2721 static int cleanup (void) /* {{{ */
2722 {
2723 pthread_cond_broadcast (&flush_cond);
2724 pthread_join (flush_thread, NULL);
2726 pthread_cond_broadcast (&queue_cond);
2727 for (int i = 0; i < config_queue_threads; i++)
2728 pthread_join (queue_threads[i], NULL);
2730 if (config_flush_at_shutdown)
2731 {
2732 assert(cache_queue_head == NULL);
2733 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2734 }
2736 free(queue_threads);
2737 free(config_base_dir);
2739 pthread_mutex_lock(&cache_lock);
2740 g_tree_destroy(cache_tree);
2742 pthread_mutex_lock(&journal_lock);
2743 journal_done();
2745 RRDD_LOG(LOG_INFO, "goodbye");
2746 closelog ();
2748 remove_pidfile ();
2749 free(config_pid_file);
2751 return (0);
2752 } /* }}} int cleanup */
2754 static int read_options (int argc, char **argv) /* {{{ */
2755 {
2756 int option;
2757 int status = 0;
2759 char **permissions = NULL;
2760 size_t permissions_len = 0;
2762 gid_t socket_group = (gid_t)-1;
2764 while ((option = getopt(argc, argv, "gl:s:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2765 {
2766 switch (option)
2767 {
2768 case 'g':
2769 stay_foreground=1;
2770 break;
2772 case 'l':
2773 {
2774 listen_socket_t *new;
2776 new = malloc(sizeof(listen_socket_t));
2777 if (new == NULL)
2778 {
2779 fprintf(stderr, "read_options: malloc failed.\n");
2780 return(2);
2781 }
2782 memset(new, 0, sizeof(listen_socket_t));
2784 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2786 /* Add permissions to the socket {{{ */
2787 if (permissions_len != 0)
2788 {
2789 size_t i;
2790 for (i = 0; i < permissions_len; i++)
2791 {
2792 status = socket_permission_add (new, permissions[i]);
2793 if (status != 0)
2794 {
2795 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2796 "socket failed. Most likely, this permission doesn't "
2797 "exist. Check your command line.\n", permissions[i]);
2798 status = 4;
2799 }
2800 }
2801 }
2802 else /* if (permissions_len == 0) */
2803 {
2804 /* Add permission for ALL commands to the socket. */
2805 size_t i;
2806 for (i = 0; i < list_of_commands_len; i++)
2807 {
2808 status = socket_permission_add (new, list_of_commands[i].cmd);
2809 if (status != 0)
2810 {
2811 fprintf (stderr, "read_options: Adding permission \"%s\" to "
2812 "socket failed. This should never happen, ever! Sorry.\n",
2813 permissions[i]);
2814 status = 4;
2815 }
2816 }
2817 }
2818 /* }}} Done adding permissions. */
2820 new->socket_group = socket_group;
2822 if (!rrd_add_ptr((void ***)&config_listen_address_list,
2823 &config_listen_address_list_len, new))
2824 {
2825 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2826 return (2);
2827 }
2828 }
2829 break;
2831 /* set socket group permissions */
2832 case 's':
2833 {
2834 gid_t group_gid;
2835 struct group *grp;
2837 group_gid = strtoul(optarg, NULL, 10);
2838 if (errno != EINVAL && group_gid>0)
2839 {
2840 /* we were passed a number */
2841 grp = getgrgid(group_gid);
2842 }
2843 else
2844 {
2845 grp = getgrnam(optarg);
2846 }
2848 if (grp)
2849 {
2850 socket_group = grp->gr_gid;
2851 }
2852 else
2853 {
2854 /* no idea what the user wanted... */
2855 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2856 return (5);
2857 }
2858 }
2859 break;
2861 case 'P':
2862 {
2863 char *optcopy;
2864 char *saveptr;
2865 char *dummy;
2866 char *ptr;
2868 rrd_free_ptrs ((void *) &permissions, &permissions_len);
2870 optcopy = strdup (optarg);
2871 dummy = optcopy;
2872 saveptr = NULL;
2873 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2874 {
2875 dummy = NULL;
2876 rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2877 }
2879 free (optcopy);
2880 }
2881 break;
2883 case 'f':
2884 {
2885 int temp;
2887 temp = atoi (optarg);
2888 if (temp > 0)
2889 config_flush_interval = temp;
2890 else
2891 {
2892 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2893 status = 3;
2894 }
2895 }
2896 break;
2898 case 'w':
2899 {
2900 int temp;
2902 temp = atoi (optarg);
2903 if (temp > 0)
2904 config_write_interval = temp;
2905 else
2906 {
2907 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2908 status = 2;
2909 }
2910 }
2911 break;
2913 case 'z':
2914 {
2915 int temp;
2917 temp = atoi(optarg);
2918 if (temp > 0)
2919 config_write_jitter = temp;
2920 else
2921 {
2922 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2923 status = 2;
2924 }
2926 break;
2927 }
2929 case 't':
2930 {
2931 int threads;
2932 threads = atoi(optarg);
2933 if (threads >= 1)
2934 config_queue_threads = threads;
2935 else
2936 {
2937 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2938 return 1;
2939 }
2940 }
2941 break;
2943 case 'B':
2944 config_write_base_only = 1;
2945 break;
2947 case 'b':
2948 {
2949 size_t len;
2950 char base_realpath[PATH_MAX];
2952 if (config_base_dir != NULL)
2953 free (config_base_dir);
2954 config_base_dir = strdup (optarg);
2955 if (config_base_dir == NULL)
2956 {
2957 fprintf (stderr, "read_options: strdup failed.\n");
2958 return (3);
2959 }
2961 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2962 {
2963 fprintf (stderr, "Failed to create base directory '%s': %s\n",
2964 config_base_dir, rrd_strerror (errno));
2965 return (3);
2966 }
2968 /* make sure that the base directory is not resolved via
2969 * symbolic links. this makes some performance-enhancing
2970 * assumptions possible (we don't have to resolve paths
2971 * that start with a "/")
2972 */
2973 if (realpath(config_base_dir, base_realpath) == NULL)
2974 {
2975 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2976 "%s\n", config_base_dir, rrd_strerror(errno));
2977 return 5;
2978 }
2980 len = strlen (config_base_dir);
2981 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2982 {
2983 config_base_dir[len - 1] = 0;
2984 len--;
2985 }
2987 if (len < 1)
2988 {
2989 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2990 return (4);
2991 }
2993 _config_base_dir_len = len;
2995 len = strlen (base_realpath);
2996 while ((len > 0) && (base_realpath[len - 1] == '/'))
2997 {
2998 base_realpath[len - 1] = '\0';
2999 len--;
3000 }
3002 if (strncmp(config_base_dir,
3003 base_realpath, sizeof(base_realpath)) != 0)
3004 {
3005 fprintf(stderr,
3006 "Base directory (-b) resolved via file system links!\n"
3007 "Please consult rrdcached '-b' documentation!\n"
3008 "Consider specifying the real directory (%s)\n",
3009 base_realpath);
3010 return 5;
3011 }
3012 }
3013 break;
3015 case 'p':
3016 {
3017 if (config_pid_file != NULL)
3018 free (config_pid_file);
3019 config_pid_file = strdup (optarg);
3020 if (config_pid_file == NULL)
3021 {
3022 fprintf (stderr, "read_options: strdup failed.\n");
3023 return (3);
3024 }
3025 }
3026 break;
3028 case 'F':
3029 config_flush_at_shutdown = 1;
3030 break;
3032 case 'j':
3033 {
3034 const char *dir = journal_dir = strdup(optarg);
3036 status = rrd_mkdir_p(dir, 0777);
3037 if (status != 0)
3038 {
3039 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3040 dir, rrd_strerror(errno));
3041 return 6;
3042 }
3044 if (access(dir, R_OK|W_OK|X_OK) != 0)
3045 {
3046 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3047 errno ? rrd_strerror(errno) : "");
3048 return 6;
3049 }
3050 }
3051 break;
3053 case 'h':
3054 case '?':
3055 printf ("RRDCacheD %s\n"
3056 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3057 "\n"
3058 "Usage: rrdcached [options]\n"
3059 "\n"
3060 "Valid options are:\n"
3061 " -l <address> Socket address to listen to.\n"
3062 " -P <perms> Sets the permissions to assign to all following "
3063 "sockets\n"
3064 " -w <seconds> Interval in which to write data.\n"
3065 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3066 " -t <threads> Number of write threads.\n"
3067 " -f <seconds> Interval in which to flush dead data.\n"
3068 " -p <file> Location of the PID-file.\n"
3069 " -b <dir> Base directory to change to.\n"
3070 " -B Restrict file access to paths within -b <dir>\n"
3071 " -g Do not fork and run in the foreground.\n"
3072 " -j <dir> Directory in which to create the journal files.\n"
3073 " -F Always flush all updates at shutdown\n"
3074 " -s <id|name> Make socket g+rw to named group\n"
3075 "\n"
3076 "For more information and a detailed description of all options "
3077 "please refer\n"
3078 "to the rrdcached(1) manual page.\n",
3079 VERSION);
3080 status = -1;
3081 break;
3082 } /* switch (option) */
3083 } /* while (getopt) */
3085 /* advise the user when values are not sane */
3086 if (config_flush_interval < 2 * config_write_interval)
3087 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3088 " 2x write interval (-w) !\n");
3089 if (config_write_jitter > config_write_interval)
3090 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3091 " write interval (-w) !\n");
3093 if (config_write_base_only && config_base_dir == NULL)
3094 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3095 " Consult the rrdcached documentation\n");
3097 if (journal_dir == NULL)
3098 config_flush_at_shutdown = 1;
3100 rrd_free_ptrs ((void *) &permissions, &permissions_len);
3102 return (status);
3103 } /* }}} int read_options */
3105 int main (int argc, char **argv)
3106 {
3107 int status;
3109 status = read_options (argc, argv);
3110 if (status != 0)
3111 {
3112 if (status < 0)
3113 status = 0;
3114 return (status);
3115 }
3117 status = daemonize ();
3118 if (status != 0)
3119 {
3120 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3121 return (1);
3122 }
3124 journal_init();
3126 /* start the queue threads */
3127 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3128 if (queue_threads == NULL)
3129 {
3130 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3131 cleanup();
3132 return (1);
3133 }
3134 for (int i = 0; i < config_queue_threads; i++)
3135 {
3136 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3137 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3138 if (status != 0)
3139 {
3140 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3141 cleanup();
3142 return (1);
3143 }
3144 }
3146 /* start the flush thread */
3147 memset(&flush_thread, 0, sizeof(flush_thread));
3148 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3149 if (status != 0)
3150 {
3151 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3152 cleanup();
3153 return (1);
3154 }
3156 listen_thread_main (NULL);
3157 cleanup ();
3159 return (0);
3160 } /* int main */
3162 /*
3163 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3164 */