1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
113 {
114 int fd;
115 char addr[PATH_MAX + 1];
116 int family;
117 socket_privilege privilege;
119 /* state for BATCH processing */
120 int batch_mode;
121 int batch_cmd;
123 /* buffered IO */
124 char *rbuf;
125 off_t next_cmd;
126 off_t next_read;
128 char *wbuf;
129 ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137 char *file;
138 char **values;
139 int values_num;
140 time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143 int flags;
144 pthread_cond_t flushed;
145 cache_item_t *prev;
146 cache_item_t *next;
147 };
149 struct callback_flush_data_s
150 {
151 time_t now;
152 time_t abs_timeout;
153 char **keys;
154 size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
158 enum queue_side_e
159 {
160 HEAD,
161 TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
169 /*
170 * Variables
171 */
172 static int stay_foreground = 0;
174 static listen_socket_t *listen_fds = NULL;
175 static size_t listen_fds_num = 0;
177 static int do_shutdown = 0;
179 static pthread_t queue_thread;
181 static pthread_t *connection_threads = NULL;
182 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
183 static int connection_threads_num = 0;
185 /* Cache stuff */
186 static GTree *cache_tree = NULL;
187 static cache_item_t *cache_queue_head = NULL;
188 static cache_item_t *cache_queue_tail = NULL;
189 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
190 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
192 static int config_write_interval = 300;
193 static int config_write_jitter = 0;
194 static int config_flush_interval = 3600;
195 static int config_flush_at_shutdown = 0;
196 static char *config_pid_file = NULL;
197 static char *config_base_dir = NULL;
198 static size_t _config_base_dir_len = 0;
199 static int config_write_base_only = 0;
201 static listen_socket_t **config_listen_address_list = NULL;
202 static int config_listen_address_list_len = 0;
204 static uint64_t stats_queue_length = 0;
205 static uint64_t stats_updates_received = 0;
206 static uint64_t stats_flush_received = 0;
207 static uint64_t stats_updates_written = 0;
208 static uint64_t stats_data_sets_written = 0;
209 static uint64_t stats_journal_bytes = 0;
210 static uint64_t stats_journal_rotate = 0;
211 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
213 /* Journaled updates */
214 static char *journal_cur = NULL;
215 static char *journal_old = NULL;
216 static FILE *journal_fh = NULL;
217 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
218 static int journal_write(char *cmd, char *args);
219 static void journal_done(void);
220 static void journal_rotate(void);
222 /*
223 * Functions
224 */
225 static void sig_common (const char *sig) /* {{{ */
226 {
227 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
228 do_shutdown++;
229 pthread_cond_broadcast(&cache_cond);
230 } /* }}} void sig_common */
232 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
233 {
234 sig_common("INT");
235 } /* }}} void sig_int_handler */
237 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
238 {
239 sig_common("TERM");
240 } /* }}} void sig_term_handler */
242 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
243 {
244 config_flush_at_shutdown = 1;
245 sig_common("USR1");
246 } /* }}} void sig_usr1_handler */
248 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
249 {
250 config_flush_at_shutdown = 0;
251 sig_common("USR2");
252 } /* }}} void sig_usr2_handler */
254 static void install_signal_handlers(void) /* {{{ */
255 {
256 /* These structures are static, because `sigaction' behaves weird if the are
257 * overwritten.. */
258 static struct sigaction sa_int;
259 static struct sigaction sa_term;
260 static struct sigaction sa_pipe;
261 static struct sigaction sa_usr1;
262 static struct sigaction sa_usr2;
264 /* Install signal handlers */
265 memset (&sa_int, 0, sizeof (sa_int));
266 sa_int.sa_handler = sig_int_handler;
267 sigaction (SIGINT, &sa_int, NULL);
269 memset (&sa_term, 0, sizeof (sa_term));
270 sa_term.sa_handler = sig_term_handler;
271 sigaction (SIGTERM, &sa_term, NULL);
273 memset (&sa_pipe, 0, sizeof (sa_pipe));
274 sa_pipe.sa_handler = SIG_IGN;
275 sigaction (SIGPIPE, &sa_pipe, NULL);
277 memset (&sa_pipe, 0, sizeof (sa_usr1));
278 sa_usr1.sa_handler = sig_usr1_handler;
279 sigaction (SIGUSR1, &sa_usr1, NULL);
281 memset (&sa_usr2, 0, sizeof (sa_usr2));
282 sa_usr2.sa_handler = sig_usr2_handler;
283 sigaction (SIGUSR2, &sa_usr2, NULL);
285 } /* }}} void install_signal_handlers */
287 static int open_pidfile(void) /* {{{ */
288 {
289 int fd;
290 char *file;
292 file = (config_pid_file != NULL)
293 ? config_pid_file
294 : LOCALSTATEDIR "/run/rrdcached.pid";
296 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
297 if (fd < 0)
298 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
299 file, rrd_strerror(errno));
301 return(fd);
302 } /* }}} static int open_pidfile */
304 static int write_pidfile (int fd) /* {{{ */
305 {
306 pid_t pid;
307 FILE *fh;
309 pid = getpid ();
311 fh = fdopen (fd, "w");
312 if (fh == NULL)
313 {
314 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
315 close(fd);
316 return (-1);
317 }
319 fprintf (fh, "%i\n", (int) pid);
320 fclose (fh);
322 return (0);
323 } /* }}} int write_pidfile */
325 static int remove_pidfile (void) /* {{{ */
326 {
327 char *file;
328 int status;
330 file = (config_pid_file != NULL)
331 ? config_pid_file
332 : LOCALSTATEDIR "/run/rrdcached.pid";
334 status = unlink (file);
335 if (status == 0)
336 return (0);
337 return (errno);
338 } /* }}} int remove_pidfile */
340 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
341 {
342 char *eol;
344 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
345 sock->next_read - sock->next_cmd);
347 if (eol == NULL)
348 {
349 /* no commands left, move remainder back to front of rbuf */
350 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
351 sock->next_read - sock->next_cmd);
352 sock->next_read -= sock->next_cmd;
353 sock->next_cmd = 0;
354 *len = 0;
355 return NULL;
356 }
357 else
358 {
359 char *cmd = sock->rbuf + sock->next_cmd;
360 *eol = '\0';
362 sock->next_cmd = eol - sock->rbuf + 1;
364 if (eol > sock->rbuf && *(eol-1) == '\r')
365 *(--eol) = '\0'; /* handle "\r\n" EOL */
367 *len = eol - cmd;
369 return cmd;
370 }
372 /* NOTREACHED */
373 assert(1==0);
374 }
376 /* add the characters directly to the write buffer */
377 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
378 {
379 char *new_buf;
381 assert(sock != NULL);
383 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
384 if (new_buf == NULL)
385 {
386 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
387 return -1;
388 }
390 strncpy(new_buf + sock->wbuf_len, str, len + 1);
392 sock->wbuf = new_buf;
393 sock->wbuf_len += len;
395 return 0;
396 } /* }}} static int add_to_wbuf */
398 /* add the text to the "extra" info that's sent after the status line */
399 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
400 {
401 va_list argp;
402 char buffer[CMD_MAX];
403 int len;
405 if (sock == NULL) return 0; /* journal replay mode */
406 if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
408 va_start(argp, fmt);
409 #ifdef HAVE_VSNPRINTF
410 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
411 #else
412 len = vsprintf(buffer, fmt, argp);
413 #endif
414 va_end(argp);
415 if (len < 0)
416 {
417 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
418 return -1;
419 }
421 return add_to_wbuf(sock, buffer, len);
422 } /* }}} static int add_response_info */
424 static int count_lines(char *str) /* {{{ */
425 {
426 int lines = 0;
428 if (str != NULL)
429 {
430 while ((str = strchr(str, '\n')) != NULL)
431 {
432 ++lines;
433 ++str;
434 }
435 }
437 return lines;
438 } /* }}} static int count_lines */
440 /* send the response back to the user.
441 * returns 0 on success, -1 on error
442 * write buffer is always zeroed after this call */
443 static int send_response (listen_socket_t *sock, response_code rc,
444 char *fmt, ...) /* {{{ */
445 {
446 va_list argp;
447 char buffer[CMD_MAX];
448 int lines;
449 ssize_t wrote;
450 int rclen, len;
452 if (sock == NULL) return rc; /* journal replay mode */
454 if (sock->batch_mode)
455 {
456 if (rc == RESP_OK)
457 return rc; /* no response on success during BATCH */
458 lines = sock->batch_cmd;
459 }
460 else if (rc == RESP_OK)
461 lines = count_lines(sock->wbuf);
462 else
463 lines = -1;
465 rclen = sprintf(buffer, "%d ", lines);
466 va_start(argp, fmt);
467 #ifdef HAVE_VSNPRINTF
468 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
469 #else
470 len = vsprintf(buffer+rclen, fmt, argp);
471 #endif
472 va_end(argp);
473 if (len < 0)
474 return -1;
476 len += rclen;
478 /* append the result to the wbuf, don't write to the user */
479 if (sock->batch_mode)
480 return add_to_wbuf(sock, buffer, len);
482 /* first write must be complete */
483 if (len != write(sock->fd, buffer, len))
484 {
485 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
486 return -1;
487 }
489 if (sock->wbuf != NULL)
490 {
491 wrote = 0;
492 while (wrote < sock->wbuf_len)
493 {
494 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
495 if (wb <= 0)
496 {
497 RRDD_LOG(LOG_INFO, "send_response: could not write results");
498 return -1;
499 }
500 wrote += wb;
501 }
502 }
504 free(sock->wbuf); sock->wbuf = NULL;
505 sock->wbuf_len = 0;
507 return 0;
508 } /* }}} */
510 static void wipe_ci_values(cache_item_t *ci, time_t when)
511 {
512 ci->values = NULL;
513 ci->values_num = 0;
515 ci->last_flush_time = when;
516 if (config_write_jitter > 0)
517 ci->last_flush_time += (random() % config_write_jitter);
518 }
520 /* remove_from_queue
521 * remove a "cache_item_t" item from the queue.
522 * must hold 'cache_lock' when calling this
523 */
524 static void remove_from_queue(cache_item_t *ci) /* {{{ */
525 {
526 if (ci == NULL) return;
528 if (ci->prev == NULL)
529 cache_queue_head = ci->next; /* reset head */
530 else
531 ci->prev->next = ci->next;
533 if (ci->next == NULL)
534 cache_queue_tail = ci->prev; /* reset the tail */
535 else
536 ci->next->prev = ci->prev;
538 ci->next = ci->prev = NULL;
539 ci->flags &= ~CI_FLAGS_IN_QUEUE;
540 } /* }}} static void remove_from_queue */
542 /*
543 * enqueue_cache_item:
544 * `cache_lock' must be acquired before calling this function!
545 */
546 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
547 queue_side_t side)
548 {
549 if (ci == NULL)
550 return (-1);
552 if (ci->values_num == 0)
553 return (0);
555 if (side == HEAD)
556 {
557 if (cache_queue_head == ci)
558 return 0;
560 /* remove from the double linked list */
561 if (ci->flags & CI_FLAGS_IN_QUEUE)
562 remove_from_queue(ci);
564 ci->prev = NULL;
565 ci->next = cache_queue_head;
566 if (ci->next != NULL)
567 ci->next->prev = ci;
568 cache_queue_head = ci;
570 if (cache_queue_tail == NULL)
571 cache_queue_tail = cache_queue_head;
572 }
573 else /* (side == TAIL) */
574 {
575 /* We don't move values back in the list.. */
576 if (ci->flags & CI_FLAGS_IN_QUEUE)
577 return (0);
579 assert (ci->next == NULL);
580 assert (ci->prev == NULL);
582 ci->prev = cache_queue_tail;
584 if (cache_queue_tail == NULL)
585 cache_queue_head = ci;
586 else
587 cache_queue_tail->next = ci;
589 cache_queue_tail = ci;
590 }
592 ci->flags |= CI_FLAGS_IN_QUEUE;
594 pthread_cond_broadcast(&cache_cond);
595 pthread_mutex_lock (&stats_lock);
596 stats_queue_length++;
597 pthread_mutex_unlock (&stats_lock);
599 return (0);
600 } /* }}} int enqueue_cache_item */
602 /*
603 * tree_callback_flush:
604 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
605 * while this is in progress.
606 */
607 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
608 gpointer data)
609 {
610 cache_item_t *ci;
611 callback_flush_data_t *cfd;
613 ci = (cache_item_t *) value;
614 cfd = (callback_flush_data_t *) data;
616 if ((ci->last_flush_time <= cfd->abs_timeout)
617 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
618 && (ci->values_num > 0))
619 {
620 enqueue_cache_item (ci, TAIL);
621 }
622 else if ((do_shutdown != 0)
623 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
624 && (ci->values_num > 0))
625 {
626 enqueue_cache_item (ci, TAIL);
627 }
628 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
629 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
630 && (ci->values_num <= 0))
631 {
632 char **temp;
634 temp = (char **) realloc (cfd->keys,
635 sizeof (char *) * (cfd->keys_num + 1));
636 if (temp == NULL)
637 {
638 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
639 return (FALSE);
640 }
641 cfd->keys = temp;
642 /* Make really sure this points to the _same_ place */
643 assert ((char *) key == ci->file);
644 cfd->keys[cfd->keys_num] = (char *) key;
645 cfd->keys_num++;
646 }
648 return (FALSE);
649 } /* }}} gboolean tree_callback_flush */
651 static int flush_old_values (int max_age)
652 {
653 callback_flush_data_t cfd;
654 size_t k;
656 memset (&cfd, 0, sizeof (cfd));
657 /* Pass the current time as user data so that we don't need to call
658 * `time' for each node. */
659 cfd.now = time (NULL);
660 cfd.keys = NULL;
661 cfd.keys_num = 0;
663 if (max_age > 0)
664 cfd.abs_timeout = cfd.now - max_age;
665 else
666 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
668 /* `tree_callback_flush' will return the keys of all values that haven't
669 * been touched in the last `config_flush_interval' seconds in `cfd'.
670 * The char*'s in this array point to the same memory as ci->file, so we
671 * don't need to free them separately. */
672 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
674 for (k = 0; k < cfd.keys_num; k++)
675 {
676 cache_item_t *ci;
678 /* This must not fail. */
679 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
680 assert (ci != NULL);
682 /* If we end up here with values available, something's seriously
683 * messed up. */
684 assert (ci->values_num == 0);
686 /* Remove the node from the tree */
687 g_tree_remove (cache_tree, cfd.keys[k]);
688 cfd.keys[k] = NULL;
690 /* Now free and clean up `ci'. */
691 free (ci->file);
692 ci->file = NULL;
693 free (ci);
694 ci = NULL;
695 } /* for (k = 0; k < cfd.keys_num; k++) */
697 if (cfd.keys != NULL)
698 {
699 free (cfd.keys);
700 cfd.keys = NULL;
701 }
703 return (0);
704 } /* int flush_old_values */
706 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
707 {
708 struct timeval now;
709 struct timespec next_flush;
710 int final_flush = 0; /* make sure we only flush once on shutdown */
712 gettimeofday (&now, NULL);
713 next_flush.tv_sec = now.tv_sec + config_flush_interval;
714 next_flush.tv_nsec = 1000 * now.tv_usec;
716 pthread_mutex_lock (&cache_lock);
717 while ((do_shutdown == 0) || (cache_queue_head != NULL))
718 {
719 cache_item_t *ci;
720 char *file;
721 char **values;
722 int values_num;
723 int status;
724 int i;
726 /* First, check if it's time to do the cache flush. */
727 gettimeofday (&now, NULL);
728 if ((now.tv_sec > next_flush.tv_sec)
729 || ((now.tv_sec == next_flush.tv_sec)
730 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
731 {
732 /* Flush all values that haven't been written in the last
733 * `config_write_interval' seconds. */
734 flush_old_values (config_write_interval);
736 /* Determine the time of the next cache flush. */
737 while (next_flush.tv_sec <= now.tv_sec)
738 next_flush.tv_sec += config_flush_interval;
740 /* unlock the cache while we rotate so we don't block incoming
741 * updates if the fsync() blocks on disk I/O */
742 pthread_mutex_unlock(&cache_lock);
743 journal_rotate();
744 pthread_mutex_lock(&cache_lock);
745 }
747 /* Now, check if there's something to store away. If not, wait until
748 * something comes in or it's time to do the cache flush. if we are
749 * shutting down, do not wait around. */
750 if (cache_queue_head == NULL && !do_shutdown)
751 {
752 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
753 if ((status != 0) && (status != ETIMEDOUT))
754 {
755 RRDD_LOG (LOG_ERR, "queue_thread_main: "
756 "pthread_cond_timedwait returned %i.", status);
757 }
758 }
760 /* We're about to shut down */
761 if (do_shutdown != 0 && !final_flush++)
762 {
763 if (config_flush_at_shutdown)
764 flush_old_values (-1); /* flush everything */
765 else
766 break;
767 }
769 /* Check if a value has arrived. This may be NULL if we timed out or there
770 * was an interrupt such as a signal. */
771 if (cache_queue_head == NULL)
772 continue;
774 ci = cache_queue_head;
776 /* copy the relevant parts */
777 file = strdup (ci->file);
778 if (file == NULL)
779 {
780 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
781 continue;
782 }
784 assert(ci->values != NULL);
785 assert(ci->values_num > 0);
787 values = ci->values;
788 values_num = ci->values_num;
790 wipe_ci_values(ci, time(NULL));
791 remove_from_queue(ci);
793 pthread_mutex_lock (&stats_lock);
794 assert (stats_queue_length > 0);
795 stats_queue_length--;
796 pthread_mutex_unlock (&stats_lock);
798 pthread_mutex_unlock (&cache_lock);
800 rrd_clear_error ();
801 status = rrd_update_r (file, NULL, values_num, (void *) values);
802 if (status != 0)
803 {
804 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
805 "rrd_update_r (%s) failed with status %i. (%s)",
806 file, status, rrd_get_error());
807 }
809 journal_write("wrote", file);
810 pthread_cond_broadcast(&ci->flushed);
812 for (i = 0; i < values_num; i++)
813 free (values[i]);
815 free(values);
816 free(file);
818 if (status == 0)
819 {
820 pthread_mutex_lock (&stats_lock);
821 stats_updates_written++;
822 stats_data_sets_written += values_num;
823 pthread_mutex_unlock (&stats_lock);
824 }
826 pthread_mutex_lock (&cache_lock);
828 /* We're about to shut down */
829 if (do_shutdown != 0 && !final_flush++)
830 {
831 if (config_flush_at_shutdown)
832 flush_old_values (-1); /* flush everything */
833 else
834 break;
835 }
836 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
837 pthread_mutex_unlock (&cache_lock);
839 if (config_flush_at_shutdown)
840 {
841 assert(cache_queue_head == NULL);
842 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
843 }
845 journal_done();
847 return (NULL);
848 } /* }}} void *queue_thread_main */
850 static int buffer_get_field (char **buffer_ret, /* {{{ */
851 size_t *buffer_size_ret, char **field_ret)
852 {
853 char *buffer;
854 size_t buffer_pos;
855 size_t buffer_size;
856 char *field;
857 size_t field_size;
858 int status;
860 buffer = *buffer_ret;
861 buffer_pos = 0;
862 buffer_size = *buffer_size_ret;
863 field = *buffer_ret;
864 field_size = 0;
866 if (buffer_size <= 0)
867 return (-1);
869 /* This is ensured by `handle_request'. */
870 assert (buffer[buffer_size - 1] == '\0');
872 status = -1;
873 while (buffer_pos < buffer_size)
874 {
875 /* Check for end-of-field or end-of-buffer */
876 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
877 {
878 field[field_size] = 0;
879 field_size++;
880 buffer_pos++;
881 status = 0;
882 break;
883 }
884 /* Handle escaped characters. */
885 else if (buffer[buffer_pos] == '\\')
886 {
887 if (buffer_pos >= (buffer_size - 1))
888 break;
889 buffer_pos++;
890 field[field_size] = buffer[buffer_pos];
891 field_size++;
892 buffer_pos++;
893 }
894 /* Normal operation */
895 else
896 {
897 field[field_size] = buffer[buffer_pos];
898 field_size++;
899 buffer_pos++;
900 }
901 } /* while (buffer_pos < buffer_size) */
903 if (status != 0)
904 return (status);
906 *buffer_ret = buffer + buffer_pos;
907 *buffer_size_ret = buffer_size - buffer_pos;
908 *field_ret = field;
910 return (0);
911 } /* }}} int buffer_get_field */
913 /* if we're restricting writes to the base directory,
914 * check whether the file falls within the dir
915 * returns 1 if OK, otherwise 0
916 */
917 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
918 {
919 assert(file != NULL);
921 if (!config_write_base_only
922 || sock == NULL /* journal replay */
923 || config_base_dir == NULL)
924 return 1;
926 if (strstr(file, "../") != NULL) goto err;
928 /* relative paths without "../" are ok */
929 if (*file != '/') return 1;
931 /* file must be of the format base + "/" + <1+ char filename> */
932 if (strlen(file) < _config_base_dir_len + 2) goto err;
933 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
934 if (*(file + _config_base_dir_len) != '/') goto err;
936 return 1;
938 err:
939 if (sock != NULL && sock->fd >= 0)
940 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
942 return 0;
943 } /* }}} static int check_file_access */
945 static int flush_file (const char *filename) /* {{{ */
946 {
947 cache_item_t *ci;
949 pthread_mutex_lock (&cache_lock);
951 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
952 if (ci == NULL)
953 {
954 pthread_mutex_unlock (&cache_lock);
955 return (ENOENT);
956 }
958 if (ci->values_num > 0)
959 {
960 /* Enqueue at head */
961 enqueue_cache_item (ci, HEAD);
962 pthread_cond_wait(&ci->flushed, &cache_lock);
963 }
965 pthread_mutex_unlock(&cache_lock);
967 return (0);
968 } /* }}} int flush_file */
970 static int handle_request_help (listen_socket_t *sock, /* {{{ */
971 char *buffer, size_t buffer_size)
972 {
973 int status;
974 char **help_text;
975 char *command;
977 char *help_help[2] =
978 {
979 "Command overview\n"
980 ,
981 "FLUSH <filename>\n"
982 "FLUSHALL\n"
983 "HELP [<command>]\n"
984 "UPDATE <filename> <values> [<values> ...]\n"
985 "BATCH\n"
986 "STATS\n"
987 };
989 char *help_flush[2] =
990 {
991 "Help for FLUSH\n"
992 ,
993 "Usage: FLUSH <filename>\n"
994 "\n"
995 "Adds the given filename to the head of the update queue and returns\n"
996 "after is has been dequeued.\n"
997 };
999 char *help_flushall[2] =
1000 {
1001 "Help for FLUSHALL\n"
1002 ,
1003 "Usage: FLUSHALL\n"
1004 "\n"
1005 "Triggers writing of all pending updates. Returns immediately.\n"
1006 };
1008 char *help_update[2] =
1009 {
1010 "Help for UPDATE\n"
1011 ,
1012 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1013 "\n"
1014 "Adds the given file to the internal cache if it is not yet known and\n"
1015 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1016 "for details.\n"
1017 "\n"
1018 "Each <values> has the following form:\n"
1019 " <values> = <time>:<value>[:<value>[...]]\n"
1020 "See the rrdupdate(1) manpage for details.\n"
1021 };
1023 char *help_stats[2] =
1024 {
1025 "Help for STATS\n"
1026 ,
1027 "Usage: STATS\n"
1028 "\n"
1029 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1030 "a description of the values.\n"
1031 };
1033 char *help_batch[2] =
1034 {
1035 "Help for BATCH\n"
1036 ,
1037 "The 'BATCH' command permits the client to initiate a bulk load\n"
1038 " of commands to rrdcached.\n"
1039 "\n"
1040 "Usage:\n"
1041 "\n"
1042 " client: BATCH\n"
1043 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1044 " client: command #1\n"
1045 " client: command #2\n"
1046 " client: ... and so on\n"
1047 " client: .\n"
1048 " server: 2 errors\n"
1049 " server: 7 message for command #7\n"
1050 " server: 9 message for command #9\n"
1051 "\n"
1052 "For more information, consult the rrdcached(1) documentation.\n"
1053 };
1055 status = buffer_get_field (&buffer, &buffer_size, &command);
1056 if (status != 0)
1057 help_text = help_help;
1058 else
1059 {
1060 if (strcasecmp (command, "update") == 0)
1061 help_text = help_update;
1062 else if (strcasecmp (command, "flush") == 0)
1063 help_text = help_flush;
1064 else if (strcasecmp (command, "flushall") == 0)
1065 help_text = help_flushall;
1066 else if (strcasecmp (command, "stats") == 0)
1067 help_text = help_stats;
1068 else if (strcasecmp (command, "batch") == 0)
1069 help_text = help_batch;
1070 else
1071 help_text = help_help;
1072 }
1074 add_response_info(sock, help_text[1]);
1075 return send_response(sock, RESP_OK, help_text[0]);
1076 } /* }}} int handle_request_help */
1078 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1079 {
1080 uint64_t copy_queue_length;
1081 uint64_t copy_updates_received;
1082 uint64_t copy_flush_received;
1083 uint64_t copy_updates_written;
1084 uint64_t copy_data_sets_written;
1085 uint64_t copy_journal_bytes;
1086 uint64_t copy_journal_rotate;
1088 uint64_t tree_nodes_number;
1089 uint64_t tree_depth;
1091 pthread_mutex_lock (&stats_lock);
1092 copy_queue_length = stats_queue_length;
1093 copy_updates_received = stats_updates_received;
1094 copy_flush_received = stats_flush_received;
1095 copy_updates_written = stats_updates_written;
1096 copy_data_sets_written = stats_data_sets_written;
1097 copy_journal_bytes = stats_journal_bytes;
1098 copy_journal_rotate = stats_journal_rotate;
1099 pthread_mutex_unlock (&stats_lock);
1101 pthread_mutex_lock (&cache_lock);
1102 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1103 tree_depth = (uint64_t) g_tree_height (cache_tree);
1104 pthread_mutex_unlock (&cache_lock);
1106 add_response_info(sock,
1107 "QueueLength: %"PRIu64"\n", copy_queue_length);
1108 add_response_info(sock,
1109 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1110 add_response_info(sock,
1111 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1112 add_response_info(sock,
1113 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1114 add_response_info(sock,
1115 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1116 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1117 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1118 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1119 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1121 send_response(sock, RESP_OK, "Statistics follow\n");
1123 return (0);
1124 } /* }}} int handle_request_stats */
1126 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1127 char *buffer, size_t buffer_size)
1128 {
1129 char *file;
1130 int status;
1132 status = buffer_get_field (&buffer, &buffer_size, &file);
1133 if (status != 0)
1134 {
1135 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1136 }
1137 else
1138 {
1139 pthread_mutex_lock(&stats_lock);
1140 stats_flush_received++;
1141 pthread_mutex_unlock(&stats_lock);
1143 if (!check_file_access(file, sock)) return 0;
1145 status = flush_file (file);
1146 if (status == 0)
1147 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1148 else if (status == ENOENT)
1149 {
1150 /* no file in our tree; see whether it exists at all */
1151 struct stat statbuf;
1153 memset(&statbuf, 0, sizeof(statbuf));
1154 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1155 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1156 else
1157 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1158 }
1159 else if (status < 0)
1160 return send_response(sock, RESP_ERR, "Internal error.\n");
1161 else
1162 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1163 }
1165 /* NOTREACHED */
1166 assert(1==0);
1167 } /* }}} int handle_request_slurp */
1169 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1170 {
1172 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1174 pthread_mutex_lock(&cache_lock);
1175 flush_old_values(-1);
1176 pthread_mutex_unlock(&cache_lock);
1178 return send_response(sock, RESP_OK, "Started flush.\n");
1179 } /* }}} static int handle_request_flushall */
1181 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1182 char *buffer, size_t buffer_size)
1183 {
1184 char *file;
1185 int values_num = 0;
1186 int status;
1188 time_t now;
1189 cache_item_t *ci;
1191 now = time (NULL);
1193 status = buffer_get_field (&buffer, &buffer_size, &file);
1194 if (status != 0)
1195 return send_response(sock, RESP_ERR,
1196 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1198 pthread_mutex_lock(&stats_lock);
1199 stats_updates_received++;
1200 pthread_mutex_unlock(&stats_lock);
1202 if (!check_file_access(file, sock)) return 0;
1204 pthread_mutex_lock (&cache_lock);
1205 ci = g_tree_lookup (cache_tree, file);
1207 if (ci == NULL) /* {{{ */
1208 {
1209 struct stat statbuf;
1211 /* don't hold the lock while we setup; stat(2) might block */
1212 pthread_mutex_unlock(&cache_lock);
1214 memset (&statbuf, 0, sizeof (statbuf));
1215 status = stat (file, &statbuf);
1216 if (status != 0)
1217 {
1218 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1220 status = errno;
1221 if (status == ENOENT)
1222 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1223 else
1224 return send_response(sock, RESP_ERR,
1225 "stat failed with error %i.\n", status);
1226 }
1227 if (!S_ISREG (statbuf.st_mode))
1228 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1230 if (access(file, R_OK|W_OK) != 0)
1231 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1232 file, rrd_strerror(errno));
1234 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1235 if (ci == NULL)
1236 {
1237 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1239 return send_response(sock, RESP_ERR, "malloc failed.\n");
1240 }
1241 memset (ci, 0, sizeof (cache_item_t));
1243 ci->file = strdup (file);
1244 if (ci->file == NULL)
1245 {
1246 free (ci);
1247 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1249 return send_response(sock, RESP_ERR, "strdup failed.\n");
1250 }
1252 wipe_ci_values(ci, now);
1253 ci->flags = CI_FLAGS_IN_TREE;
1255 pthread_mutex_lock(&cache_lock);
1256 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1257 } /* }}} */
1258 assert (ci != NULL);
1260 while (buffer_size > 0)
1261 {
1262 char **temp;
1263 char *value;
1265 status = buffer_get_field (&buffer, &buffer_size, &value);
1266 if (status != 0)
1267 {
1268 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1269 break;
1270 }
1272 temp = (char **) realloc (ci->values,
1273 sizeof (char *) * (ci->values_num + 1));
1274 if (temp == NULL)
1275 {
1276 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1277 continue;
1278 }
1279 ci->values = temp;
1281 ci->values[ci->values_num] = strdup (value);
1282 if (ci->values[ci->values_num] == NULL)
1283 {
1284 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1285 continue;
1286 }
1287 ci->values_num++;
1289 values_num++;
1290 }
1292 if (((now - ci->last_flush_time) >= config_write_interval)
1293 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1294 && (ci->values_num > 0))
1295 {
1296 enqueue_cache_item (ci, TAIL);
1297 }
1299 pthread_mutex_unlock (&cache_lock);
1301 if (values_num < 1)
1302 return send_response(sock, RESP_ERR, "No values updated.\n");
1303 else
1304 return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1306 /* NOTREACHED */
1307 assert(1==0);
1309 } /* }}} int handle_request_update */
1311 /* we came across a "WROTE" entry during journal replay.
1312 * throw away any values that we have accumulated for this file
1313 */
1314 static int handle_request_wrote (const char *buffer) /* {{{ */
1315 {
1316 int i;
1317 cache_item_t *ci;
1318 const char *file = buffer;
1320 pthread_mutex_lock(&cache_lock);
1322 ci = g_tree_lookup(cache_tree, file);
1323 if (ci == NULL)
1324 {
1325 pthread_mutex_unlock(&cache_lock);
1326 return (0);
1327 }
1329 if (ci->values)
1330 {
1331 for (i=0; i < ci->values_num; i++)
1332 free(ci->values[i]);
1334 free(ci->values);
1335 }
1337 wipe_ci_values(ci, time(NULL));
1338 remove_from_queue(ci);
1340 pthread_mutex_unlock(&cache_lock);
1341 return (0);
1342 } /* }}} int handle_request_wrote */
1344 /* start "BATCH" processing */
1345 static int batch_start (listen_socket_t *sock) /* {{{ */
1346 {
1347 int status;
1348 if (sock->batch_mode)
1349 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1351 status = send_response(sock, RESP_OK,
1352 "Go ahead. End with dot '.' on its own line.\n");
1353 sock->batch_mode = 1;
1354 sock->batch_cmd = 0;
1356 return status;
1357 } /* }}} static int batch_start */
1359 /* finish "BATCH" processing and return results to the client */
1360 static int batch_done (listen_socket_t *sock) /* {{{ */
1361 {
1362 assert(sock->batch_mode);
1363 sock->batch_mode = 0;
1364 sock->batch_cmd = 0;
1365 return send_response(sock, RESP_OK, "errors\n");
1366 } /* }}} static int batch_done */
1368 /* returns 1 if we have the required privilege level */
1369 static int has_privilege (listen_socket_t *sock, /* {{{ */
1370 socket_privilege priv)
1371 {
1372 if (sock == NULL) /* journal replay */
1373 return 1;
1375 if (sock->privilege >= priv)
1376 return 1;
1378 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1379 } /* }}} static int has_privilege */
1381 /* if sock==NULL, we are in journal replay mode */
1382 static int handle_request (listen_socket_t *sock, /* {{{ */
1383 char *buffer, size_t buffer_size)
1384 {
1385 char *buffer_ptr;
1386 char *command;
1387 int status;
1389 assert (buffer[buffer_size - 1] == '\0');
1391 buffer_ptr = buffer;
1392 command = NULL;
1393 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1394 if (status != 0)
1395 {
1396 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1397 return (-1);
1398 }
1400 if (sock != NULL && sock->batch_mode)
1401 sock->batch_cmd++;
1403 if (strcasecmp (command, "update") == 0)
1404 {
1405 status = has_privilege(sock, PRIV_HIGH);
1406 if (status <= 0)
1407 return status;
1409 /* don't re-write updates in replay mode */
1410 if (sock != NULL)
1411 journal_write(command, buffer_ptr);
1413 return (handle_request_update (sock, buffer_ptr, buffer_size));
1414 }
1415 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1416 {
1417 /* this is only valid in replay mode */
1418 return (handle_request_wrote (buffer_ptr));
1419 }
1420 else if (strcasecmp (command, "flush") == 0)
1421 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1422 else if (strcasecmp (command, "flushall") == 0)
1423 {
1424 status = has_privilege(sock, PRIV_HIGH);
1425 if (status <= 0)
1426 return status;
1428 return (handle_request_flushall(sock));
1429 }
1430 else if (strcasecmp (command, "stats") == 0)
1431 return (handle_request_stats (sock));
1432 else if (strcasecmp (command, "help") == 0)
1433 return (handle_request_help (sock, buffer_ptr, buffer_size));
1434 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1435 return batch_start(sock);
1436 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1437 return batch_done(sock);
1438 else
1439 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1441 /* NOTREACHED */
1442 assert(1==0);
1443 } /* }}} int handle_request */
1445 /* MUST NOT hold journal_lock before calling this */
1446 static void journal_rotate(void) /* {{{ */
1447 {
1448 FILE *old_fh = NULL;
1450 if (journal_cur == NULL || journal_old == NULL)
1451 return;
1453 pthread_mutex_lock(&journal_lock);
1455 /* we rotate this way (rename before close) so that the we can release
1456 * the journal lock as fast as possible. Journal writes to the new
1457 * journal can proceed immediately after the new file is opened. The
1458 * fclose can then block without affecting new updates.
1459 */
1460 if (journal_fh != NULL)
1461 {
1462 old_fh = journal_fh;
1463 rename(journal_cur, journal_old);
1464 ++stats_journal_rotate;
1465 }
1467 journal_fh = fopen(journal_cur, "a");
1468 pthread_mutex_unlock(&journal_lock);
1470 if (old_fh != NULL)
1471 fclose(old_fh);
1473 if (journal_fh == NULL)
1474 {
1475 RRDD_LOG(LOG_CRIT,
1476 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1477 journal_cur, rrd_strerror(errno));
1479 RRDD_LOG(LOG_ERR,
1480 "JOURNALING DISABLED: All values will be flushed at shutdown");
1481 config_flush_at_shutdown = 1;
1482 }
1484 } /* }}} static void journal_rotate */
1486 static void journal_done(void) /* {{{ */
1487 {
1488 if (journal_cur == NULL)
1489 return;
1491 pthread_mutex_lock(&journal_lock);
1492 if (journal_fh != NULL)
1493 {
1494 fclose(journal_fh);
1495 journal_fh = NULL;
1496 }
1498 if (config_flush_at_shutdown)
1499 {
1500 RRDD_LOG(LOG_INFO, "removing journals");
1501 unlink(journal_old);
1502 unlink(journal_cur);
1503 }
1504 else
1505 {
1506 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1507 "journals will be used at next startup");
1508 }
1510 pthread_mutex_unlock(&journal_lock);
1512 } /* }}} static void journal_done */
1514 static int journal_write(char *cmd, char *args) /* {{{ */
1515 {
1516 int chars;
1518 if (journal_fh == NULL)
1519 return 0;
1521 pthread_mutex_lock(&journal_lock);
1522 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1523 pthread_mutex_unlock(&journal_lock);
1525 if (chars > 0)
1526 {
1527 pthread_mutex_lock(&stats_lock);
1528 stats_journal_bytes += chars;
1529 pthread_mutex_unlock(&stats_lock);
1530 }
1532 return chars;
1533 } /* }}} static int journal_write */
1535 static int journal_replay (const char *file) /* {{{ */
1536 {
1537 FILE *fh;
1538 int entry_cnt = 0;
1539 int fail_cnt = 0;
1540 uint64_t line = 0;
1541 char entry[CMD_MAX];
1543 if (file == NULL) return 0;
1545 fh = fopen(file, "r");
1546 if (fh == NULL)
1547 {
1548 if (errno != ENOENT)
1549 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1550 file, rrd_strerror(errno));
1551 return 0;
1552 }
1553 else
1554 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1556 while(!feof(fh))
1557 {
1558 size_t entry_len;
1560 ++line;
1561 if (fgets(entry, sizeof(entry), fh) == NULL)
1562 break;
1563 entry_len = strlen(entry);
1565 /* check \n termination in case journal writing crashed mid-line */
1566 if (entry_len == 0)
1567 continue;
1568 else if (entry[entry_len - 1] != '\n')
1569 {
1570 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1571 ++fail_cnt;
1572 continue;
1573 }
1575 entry[entry_len - 1] = '\0';
1577 if (handle_request(NULL, entry, entry_len) == 0)
1578 ++entry_cnt;
1579 else
1580 ++fail_cnt;
1581 }
1583 fclose(fh);
1585 if (entry_cnt > 0)
1586 {
1587 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1588 entry_cnt, fail_cnt);
1589 return 1;
1590 }
1591 else
1592 return 0;
1594 } /* }}} static int journal_replay */
1596 static void close_connection(listen_socket_t *sock)
1597 {
1598 close(sock->fd) ; sock->fd = -1;
1599 free(sock->rbuf); sock->rbuf = NULL;
1600 free(sock->wbuf); sock->wbuf = NULL;
1602 free(sock);
1603 }
1605 static void *connection_thread_main (void *args) /* {{{ */
1606 {
1607 pthread_t self;
1608 listen_socket_t *sock;
1609 int i;
1610 int fd;
1612 sock = (listen_socket_t *) args;
1613 fd = sock->fd;
1615 /* init read buffers */
1616 sock->next_read = sock->next_cmd = 0;
1617 sock->rbuf = malloc(RBUF_SIZE);
1618 if (sock->rbuf == NULL)
1619 {
1620 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1621 close_connection(sock);
1622 return NULL;
1623 }
1625 pthread_mutex_lock (&connection_threads_lock);
1626 {
1627 pthread_t *temp;
1629 temp = (pthread_t *) realloc (connection_threads,
1630 sizeof (pthread_t) * (connection_threads_num + 1));
1631 if (temp == NULL)
1632 {
1633 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1634 }
1635 else
1636 {
1637 connection_threads = temp;
1638 connection_threads[connection_threads_num] = pthread_self ();
1639 connection_threads_num++;
1640 }
1641 }
1642 pthread_mutex_unlock (&connection_threads_lock);
1644 while (do_shutdown == 0)
1645 {
1646 char *cmd;
1647 ssize_t cmd_len;
1648 ssize_t rbytes;
1650 struct pollfd pollfd;
1651 int status;
1653 pollfd.fd = fd;
1654 pollfd.events = POLLIN | POLLPRI;
1655 pollfd.revents = 0;
1657 status = poll (&pollfd, 1, /* timeout = */ 500);
1658 if (do_shutdown)
1659 break;
1660 else if (status == 0) /* timeout */
1661 continue;
1662 else if (status < 0) /* error */
1663 {
1664 status = errno;
1665 if (status == EINTR)
1666 continue;
1667 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1668 continue;
1669 }
1671 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1672 {
1673 close_connection(sock);
1674 break;
1675 }
1676 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1677 {
1678 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1679 "poll(2) returned something unexpected: %#04hx",
1680 pollfd.revents);
1681 close_connection(sock);
1682 break;
1683 }
1685 rbytes = read(fd, sock->rbuf + sock->next_read,
1686 RBUF_SIZE - sock->next_read);
1687 if (rbytes < 0)
1688 {
1689 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1690 break;
1691 }
1692 else if (rbytes == 0)
1693 break; /* eof */
1695 sock->next_read += rbytes;
1697 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1698 {
1699 status = handle_request (sock, cmd, cmd_len+1);
1700 if (status != 0)
1701 goto out_close;
1702 }
1703 }
1705 out_close:
1706 close_connection(sock);
1708 self = pthread_self ();
1709 /* Remove this thread from the connection threads list */
1710 pthread_mutex_lock (&connection_threads_lock);
1711 /* Find out own index in the array */
1712 for (i = 0; i < connection_threads_num; i++)
1713 if (pthread_equal (connection_threads[i], self) != 0)
1714 break;
1715 assert (i < connection_threads_num);
1717 /* Move the trailing threads forward. */
1718 if (i < (connection_threads_num - 1))
1719 {
1720 memmove (connection_threads + i,
1721 connection_threads + i + 1,
1722 sizeof (pthread_t) * (connection_threads_num - i - 1));
1723 }
1725 connection_threads_num--;
1726 pthread_mutex_unlock (&connection_threads_lock);
1728 return (NULL);
1729 } /* }}} void *connection_thread_main */
1731 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1732 {
1733 int fd;
1734 struct sockaddr_un sa;
1735 listen_socket_t *temp;
1736 int status;
1737 const char *path;
1739 path = sock->addr;
1740 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1741 path += strlen("unix:");
1743 temp = (listen_socket_t *) realloc (listen_fds,
1744 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1745 if (temp == NULL)
1746 {
1747 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1748 return (-1);
1749 }
1750 listen_fds = temp;
1751 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1753 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1754 if (fd < 0)
1755 {
1756 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1757 return (-1);
1758 }
1760 memset (&sa, 0, sizeof (sa));
1761 sa.sun_family = AF_UNIX;
1762 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1764 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1765 if (status != 0)
1766 {
1767 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1768 close (fd);
1769 unlink (path);
1770 return (-1);
1771 }
1773 status = listen (fd, /* backlog = */ 10);
1774 if (status != 0)
1775 {
1776 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1777 close (fd);
1778 unlink (path);
1779 return (-1);
1780 }
1782 listen_fds[listen_fds_num].fd = fd;
1783 listen_fds[listen_fds_num].family = PF_UNIX;
1784 strncpy(listen_fds[listen_fds_num].addr, path,
1785 sizeof (listen_fds[listen_fds_num].addr) - 1);
1786 listen_fds_num++;
1788 return (0);
1789 } /* }}} int open_listen_socket_unix */
1791 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1792 {
1793 struct addrinfo ai_hints;
1794 struct addrinfo *ai_res;
1795 struct addrinfo *ai_ptr;
1796 char addr_copy[NI_MAXHOST];
1797 char *addr;
1798 char *port;
1799 int status;
1801 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1802 addr_copy[sizeof (addr_copy) - 1] = 0;
1803 addr = addr_copy;
1805 memset (&ai_hints, 0, sizeof (ai_hints));
1806 ai_hints.ai_flags = 0;
1807 #ifdef AI_ADDRCONFIG
1808 ai_hints.ai_flags |= AI_ADDRCONFIG;
1809 #endif
1810 ai_hints.ai_family = AF_UNSPEC;
1811 ai_hints.ai_socktype = SOCK_STREAM;
1813 port = NULL;
1814 if (*addr == '[') /* IPv6+port format */
1815 {
1816 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1817 addr++;
1819 port = strchr (addr, ']');
1820 if (port == NULL)
1821 {
1822 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1823 sock->addr);
1824 return (-1);
1825 }
1826 *port = 0;
1827 port++;
1829 if (*port == ':')
1830 port++;
1831 else if (*port == 0)
1832 port = NULL;
1833 else
1834 {
1835 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1836 port);
1837 return (-1);
1838 }
1839 } /* if (*addr = ']') */
1840 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1841 {
1842 port = rindex(addr, ':');
1843 if (port != NULL)
1844 {
1845 *port = 0;
1846 port++;
1847 }
1848 }
1849 ai_res = NULL;
1850 status = getaddrinfo (addr,
1851 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1852 &ai_hints, &ai_res);
1853 if (status != 0)
1854 {
1855 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1856 "%s", addr, gai_strerror (status));
1857 return (-1);
1858 }
1860 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1861 {
1862 int fd;
1863 listen_socket_t *temp;
1864 int one = 1;
1866 temp = (listen_socket_t *) realloc (listen_fds,
1867 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1868 if (temp == NULL)
1869 {
1870 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1871 continue;
1872 }
1873 listen_fds = temp;
1874 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1876 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1877 if (fd < 0)
1878 {
1879 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1880 continue;
1881 }
1883 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1885 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1886 if (status != 0)
1887 {
1888 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1889 close (fd);
1890 continue;
1891 }
1893 status = listen (fd, /* backlog = */ 10);
1894 if (status != 0)
1895 {
1896 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1897 close (fd);
1898 return (-1);
1899 }
1901 listen_fds[listen_fds_num].fd = fd;
1902 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1903 listen_fds_num++;
1904 } /* for (ai_ptr) */
1906 return (0);
1907 } /* }}} static int open_listen_socket_network */
1909 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1910 {
1911 assert(sock != NULL);
1912 assert(sock->addr != NULL);
1914 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1915 || sock->addr[0] == '/')
1916 return (open_listen_socket_unix(sock));
1917 else
1918 return (open_listen_socket_network(sock));
1919 } /* }}} int open_listen_socket */
1921 static int close_listen_sockets (void) /* {{{ */
1922 {
1923 size_t i;
1925 for (i = 0; i < listen_fds_num; i++)
1926 {
1927 close (listen_fds[i].fd);
1929 if (listen_fds[i].family == PF_UNIX)
1930 unlink(listen_fds[i].addr);
1931 }
1933 free (listen_fds);
1934 listen_fds = NULL;
1935 listen_fds_num = 0;
1937 return (0);
1938 } /* }}} int close_listen_sockets */
1940 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1941 {
1942 struct pollfd *pollfds;
1943 int pollfds_num;
1944 int status;
1945 int i;
1947 for (i = 0; i < config_listen_address_list_len; i++)
1948 open_listen_socket (config_listen_address_list[i]);
1950 if (config_listen_address_list_len < 1)
1951 {
1952 listen_socket_t sock;
1953 memset(&sock, 0, sizeof(sock));
1954 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1955 open_listen_socket (&sock);
1956 }
1958 if (listen_fds_num < 1)
1959 {
1960 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1961 "could be opened. Sorry.");
1962 return (NULL);
1963 }
1965 pollfds_num = listen_fds_num;
1966 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1967 if (pollfds == NULL)
1968 {
1969 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1970 return (NULL);
1971 }
1972 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1974 RRDD_LOG(LOG_INFO, "listening for connections");
1976 while (do_shutdown == 0)
1977 {
1978 assert (pollfds_num == ((int) listen_fds_num));
1979 for (i = 0; i < pollfds_num; i++)
1980 {
1981 pollfds[i].fd = listen_fds[i].fd;
1982 pollfds[i].events = POLLIN | POLLPRI;
1983 pollfds[i].revents = 0;
1984 }
1986 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1987 if (do_shutdown)
1988 break;
1989 else if (status == 0) /* timeout */
1990 continue;
1991 else if (status < 0) /* error */
1992 {
1993 status = errno;
1994 if (status != EINTR)
1995 {
1996 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1997 }
1998 continue;
1999 }
2001 for (i = 0; i < pollfds_num; i++)
2002 {
2003 listen_socket_t *client_sock;
2004 struct sockaddr_storage client_sa;
2005 socklen_t client_sa_size;
2006 pthread_t tid;
2007 pthread_attr_t attr;
2009 if (pollfds[i].revents == 0)
2010 continue;
2012 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2013 {
2014 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2015 "poll(2) returned something unexpected for listen FD #%i.",
2016 pollfds[i].fd);
2017 continue;
2018 }
2020 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2021 if (client_sock == NULL)
2022 {
2023 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2024 continue;
2025 }
2026 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2028 client_sa_size = sizeof (client_sa);
2029 client_sock->fd = accept (pollfds[i].fd,
2030 (struct sockaddr *) &client_sa, &client_sa_size);
2031 if (client_sock->fd < 0)
2032 {
2033 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2034 free(client_sock);
2035 continue;
2036 }
2038 pthread_attr_init (&attr);
2039 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2041 status = pthread_create (&tid, &attr, connection_thread_main,
2042 client_sock);
2043 if (status != 0)
2044 {
2045 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2046 close_connection(client_sock);
2047 continue;
2048 }
2049 } /* for (pollfds_num) */
2050 } /* while (do_shutdown == 0) */
2052 RRDD_LOG(LOG_INFO, "starting shutdown");
2054 close_listen_sockets ();
2056 pthread_mutex_lock (&connection_threads_lock);
2057 while (connection_threads_num > 0)
2058 {
2059 pthread_t wait_for;
2061 wait_for = connection_threads[0];
2063 pthread_mutex_unlock (&connection_threads_lock);
2064 pthread_join (wait_for, /* retval = */ NULL);
2065 pthread_mutex_lock (&connection_threads_lock);
2066 }
2067 pthread_mutex_unlock (&connection_threads_lock);
2069 return (NULL);
2070 } /* }}} void *listen_thread_main */
2072 static int daemonize (void) /* {{{ */
2073 {
2074 int status;
2075 int fd;
2076 char *base_dir;
2078 fd = open_pidfile();
2079 if (fd < 0) return fd;
2081 if (!stay_foreground)
2082 {
2083 pid_t child;
2085 child = fork ();
2086 if (child < 0)
2087 {
2088 fprintf (stderr, "daemonize: fork(2) failed.\n");
2089 return (-1);
2090 }
2091 else if (child > 0)
2092 {
2093 return (1);
2094 }
2096 /* Become session leader */
2097 setsid ();
2099 /* Open the first three file descriptors to /dev/null */
2100 close (2);
2101 close (1);
2102 close (0);
2104 open ("/dev/null", O_RDWR);
2105 dup (0);
2106 dup (0);
2107 } /* if (!stay_foreground) */
2109 /* Change into the /tmp directory. */
2110 base_dir = (config_base_dir != NULL)
2111 ? config_base_dir
2112 : "/tmp";
2113 status = chdir (base_dir);
2114 if (status != 0)
2115 {
2116 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2117 return (-1);
2118 }
2120 install_signal_handlers();
2122 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2123 RRDD_LOG(LOG_INFO, "starting up");
2125 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2126 if (cache_tree == NULL)
2127 {
2128 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2129 return (-1);
2130 }
2132 status = write_pidfile (fd);
2133 return status;
2134 } /* }}} int daemonize */
2136 static int cleanup (void) /* {{{ */
2137 {
2138 do_shutdown++;
2140 pthread_cond_signal (&cache_cond);
2141 pthread_join (queue_thread, /* return = */ NULL);
2143 remove_pidfile ();
2145 RRDD_LOG(LOG_INFO, "goodbye");
2146 closelog ();
2148 return (0);
2149 } /* }}} int cleanup */
2151 static int read_options (int argc, char **argv) /* {{{ */
2152 {
2153 int option;
2154 int status = 0;
2156 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2157 {
2158 switch (option)
2159 {
2160 case 'g':
2161 stay_foreground=1;
2162 break;
2164 case 'L':
2165 case 'l':
2166 {
2167 listen_socket_t **temp;
2168 listen_socket_t *new;
2170 new = malloc(sizeof(listen_socket_t));
2171 if (new == NULL)
2172 {
2173 fprintf(stderr, "read_options: malloc failed.\n");
2174 return(2);
2175 }
2176 memset(new, 0, sizeof(listen_socket_t));
2178 temp = (listen_socket_t **) realloc (config_listen_address_list,
2179 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2180 if (temp == NULL)
2181 {
2182 fprintf (stderr, "read_options: realloc failed.\n");
2183 return (2);
2184 }
2185 config_listen_address_list = temp;
2187 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2188 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2190 temp[config_listen_address_list_len] = new;
2191 config_listen_address_list_len++;
2192 }
2193 break;
2195 case 'f':
2196 {
2197 int temp;
2199 temp = atoi (optarg);
2200 if (temp > 0)
2201 config_flush_interval = temp;
2202 else
2203 {
2204 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2205 status = 3;
2206 }
2207 }
2208 break;
2210 case 'w':
2211 {
2212 int temp;
2214 temp = atoi (optarg);
2215 if (temp > 0)
2216 config_write_interval = temp;
2217 else
2218 {
2219 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2220 status = 2;
2221 }
2222 }
2223 break;
2225 case 'z':
2226 {
2227 int temp;
2229 temp = atoi(optarg);
2230 if (temp > 0)
2231 config_write_jitter = temp;
2232 else
2233 {
2234 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2235 status = 2;
2236 }
2238 break;
2239 }
2241 case 'B':
2242 config_write_base_only = 1;
2243 break;
2245 case 'b':
2246 {
2247 size_t len;
2249 if (config_base_dir != NULL)
2250 free (config_base_dir);
2251 config_base_dir = strdup (optarg);
2252 if (config_base_dir == NULL)
2253 {
2254 fprintf (stderr, "read_options: strdup failed.\n");
2255 return (3);
2256 }
2258 len = strlen (config_base_dir);
2259 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2260 {
2261 config_base_dir[len - 1] = 0;
2262 len--;
2263 }
2265 if (len < 1)
2266 {
2267 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2268 return (4);
2269 }
2271 _config_base_dir_len = len;
2272 }
2273 break;
2275 case 'p':
2276 {
2277 if (config_pid_file != NULL)
2278 free (config_pid_file);
2279 config_pid_file = strdup (optarg);
2280 if (config_pid_file == NULL)
2281 {
2282 fprintf (stderr, "read_options: strdup failed.\n");
2283 return (3);
2284 }
2285 }
2286 break;
2288 case 'F':
2289 config_flush_at_shutdown = 1;
2290 break;
2292 case 'j':
2293 {
2294 struct stat statbuf;
2295 const char *dir = optarg;
2297 status = stat(dir, &statbuf);
2298 if (status != 0)
2299 {
2300 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2301 return 6;
2302 }
2304 if (!S_ISDIR(statbuf.st_mode)
2305 || access(dir, R_OK|W_OK|X_OK) != 0)
2306 {
2307 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2308 errno ? rrd_strerror(errno) : "");
2309 return 6;
2310 }
2312 journal_cur = malloc(PATH_MAX + 1);
2313 journal_old = malloc(PATH_MAX + 1);
2314 if (journal_cur == NULL || journal_old == NULL)
2315 {
2316 fprintf(stderr, "malloc failure for journal files\n");
2317 return 6;
2318 }
2319 else
2320 {
2321 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2322 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2323 }
2324 }
2325 break;
2327 case 'h':
2328 case '?':
2329 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2330 "\n"
2331 "Usage: rrdcached [options]\n"
2332 "\n"
2333 "Valid options are:\n"
2334 " -l <address> Socket address to listen to.\n"
2335 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2336 " -w <seconds> Interval in which to write data.\n"
2337 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2338 " -f <seconds> Interval in which to flush dead data.\n"
2339 " -p <file> Location of the PID-file.\n"
2340 " -b <dir> Base directory to change to.\n"
2341 " -B Restrict file access to paths within -b <dir>\n"
2342 " -g Do not fork and run in the foreground.\n"
2343 " -j <dir> Directory in which to create the journal files.\n"
2344 " -F Always flush all updates at shutdown\n"
2345 "\n"
2346 "For more information and a detailed description of all options "
2347 "please refer\n"
2348 "to the rrdcached(1) manual page.\n",
2349 VERSION);
2350 status = -1;
2351 break;
2352 } /* switch (option) */
2353 } /* while (getopt) */
2355 /* advise the user when values are not sane */
2356 if (config_flush_interval < 2 * config_write_interval)
2357 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2358 " 2x write interval (-w) !\n");
2359 if (config_write_jitter > config_write_interval)
2360 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2361 " write interval (-w) !\n");
2363 if (config_write_base_only && config_base_dir == NULL)
2364 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2365 " Consult the rrdcached documentation\n");
2367 if (journal_cur == NULL)
2368 config_flush_at_shutdown = 1;
2370 return (status);
2371 } /* }}} int read_options */
2373 int main (int argc, char **argv)
2374 {
2375 int status;
2377 status = read_options (argc, argv);
2378 if (status != 0)
2379 {
2380 if (status < 0)
2381 status = 0;
2382 return (status);
2383 }
2385 status = daemonize ();
2386 if (status == 1)
2387 {
2388 struct sigaction sigchld;
2390 memset (&sigchld, 0, sizeof (sigchld));
2391 sigchld.sa_handler = SIG_IGN;
2392 sigaction (SIGCHLD, &sigchld, NULL);
2394 return (0);
2395 }
2396 else if (status != 0)
2397 {
2398 fprintf (stderr, "daemonize failed, exiting.\n");
2399 return (1);
2400 }
2402 if (journal_cur != NULL)
2403 {
2404 int had_journal = 0;
2406 pthread_mutex_lock(&journal_lock);
2408 RRDD_LOG(LOG_INFO, "checking for journal files");
2410 had_journal += journal_replay(journal_old);
2411 had_journal += journal_replay(journal_cur);
2413 if (had_journal)
2414 flush_old_values(-1);
2416 pthread_mutex_unlock(&journal_lock);
2417 journal_rotate();
2419 RRDD_LOG(LOG_INFO, "journal processing complete");
2420 }
2422 /* start the queue thread */
2423 memset (&queue_thread, 0, sizeof (queue_thread));
2424 status = pthread_create (&queue_thread,
2425 NULL, /* attr */
2426 queue_thread_main,
2427 NULL); /* args */
2428 if (status != 0)
2429 {
2430 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2431 cleanup();
2432 return (1);
2433 }
2435 listen_thread_main (NULL);
2436 cleanup ();
2438 return (0);
2439 } /* int main */
2441 /*
2442 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2443 */