1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
113 {
114 int fd;
115 char addr[PATH_MAX + 1];
116 int family;
117 socket_privilege privilege;
119 /* state for BATCH processing */
120 int batch_mode;
121 int batch_cmd;
123 /* buffered IO */
124 char *rbuf;
125 off_t next_cmd;
126 off_t next_read;
128 char *wbuf;
129 ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137 char *file;
138 char **values;
139 int values_num;
140 time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143 int flags;
144 pthread_cond_t flushed;
145 cache_item_t *prev;
146 cache_item_t *next;
147 };
149 struct callback_flush_data_s
150 {
151 time_t now;
152 time_t abs_timeout;
153 char **keys;
154 size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
158 enum queue_side_e
159 {
160 HEAD,
161 TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
169 /*
170 * Variables
171 */
172 static int stay_foreground = 0;
173 static uid_t daemon_uid;
175 static listen_socket_t *listen_fds = NULL;
176 static size_t listen_fds_num = 0;
178 static int do_shutdown = 0;
180 static pthread_t queue_thread;
182 static pthread_t *connection_threads = NULL;
183 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
184 static int connection_threads_num = 0;
186 /* Cache stuff */
187 static GTree *cache_tree = NULL;
188 static cache_item_t *cache_queue_head = NULL;
189 static cache_item_t *cache_queue_tail = NULL;
190 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
193 static int config_write_interval = 300;
194 static int config_write_jitter = 0;
195 static int config_flush_interval = 3600;
196 static int config_flush_at_shutdown = 0;
197 static char *config_pid_file = NULL;
198 static char *config_base_dir = NULL;
199 static size_t _config_base_dir_len = 0;
200 static int config_write_base_only = 0;
202 static listen_socket_t **config_listen_address_list = NULL;
203 static int config_listen_address_list_len = 0;
205 static uint64_t stats_queue_length = 0;
206 static uint64_t stats_updates_received = 0;
207 static uint64_t stats_flush_received = 0;
208 static uint64_t stats_updates_written = 0;
209 static uint64_t stats_data_sets_written = 0;
210 static uint64_t stats_journal_bytes = 0;
211 static uint64_t stats_journal_rotate = 0;
212 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214 /* Journaled updates */
215 static char *journal_cur = NULL;
216 static char *journal_old = NULL;
217 static FILE *journal_fh = NULL;
218 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
219 static int journal_write(char *cmd, char *args);
220 static void journal_done(void);
221 static void journal_rotate(void);
223 /*
224 * Functions
225 */
226 static void sig_common (const char *sig) /* {{{ */
227 {
228 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
229 do_shutdown++;
230 pthread_cond_broadcast(&cache_cond);
231 } /* }}} void sig_common */
233 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
234 {
235 sig_common("INT");
236 } /* }}} void sig_int_handler */
238 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
239 {
240 sig_common("TERM");
241 } /* }}} void sig_term_handler */
243 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 config_flush_at_shutdown = 1;
246 sig_common("USR1");
247 } /* }}} void sig_usr1_handler */
249 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
250 {
251 config_flush_at_shutdown = 0;
252 sig_common("USR2");
253 } /* }}} void sig_usr2_handler */
255 static void install_signal_handlers(void) /* {{{ */
256 {
257 /* These structures are static, because `sigaction' behaves weird if the are
258 * overwritten.. */
259 static struct sigaction sa_int;
260 static struct sigaction sa_term;
261 static struct sigaction sa_pipe;
262 static struct sigaction sa_usr1;
263 static struct sigaction sa_usr2;
265 /* Install signal handlers */
266 memset (&sa_int, 0, sizeof (sa_int));
267 sa_int.sa_handler = sig_int_handler;
268 sigaction (SIGINT, &sa_int, NULL);
270 memset (&sa_term, 0, sizeof (sa_term));
271 sa_term.sa_handler = sig_term_handler;
272 sigaction (SIGTERM, &sa_term, NULL);
274 memset (&sa_pipe, 0, sizeof (sa_pipe));
275 sa_pipe.sa_handler = SIG_IGN;
276 sigaction (SIGPIPE, &sa_pipe, NULL);
278 memset (&sa_pipe, 0, sizeof (sa_usr1));
279 sa_usr1.sa_handler = sig_usr1_handler;
280 sigaction (SIGUSR1, &sa_usr1, NULL);
282 memset (&sa_usr2, 0, sizeof (sa_usr2));
283 sa_usr2.sa_handler = sig_usr2_handler;
284 sigaction (SIGUSR2, &sa_usr2, NULL);
286 } /* }}} void install_signal_handlers */
288 static int open_pidfile(void) /* {{{ */
289 {
290 int fd;
291 char *file;
293 file = (config_pid_file != NULL)
294 ? config_pid_file
295 : LOCALSTATEDIR "/run/rrdcached.pid";
297 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
298 if (fd < 0)
299 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
300 file, rrd_strerror(errno));
302 return(fd);
303 } /* }}} static int open_pidfile */
305 static int write_pidfile (int fd) /* {{{ */
306 {
307 pid_t pid;
308 FILE *fh;
310 pid = getpid ();
312 fh = fdopen (fd, "w");
313 if (fh == NULL)
314 {
315 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
316 close(fd);
317 return (-1);
318 }
320 fprintf (fh, "%i\n", (int) pid);
321 fclose (fh);
323 return (0);
324 } /* }}} int write_pidfile */
326 static int remove_pidfile (void) /* {{{ */
327 {
328 char *file;
329 int status;
331 file = (config_pid_file != NULL)
332 ? config_pid_file
333 : LOCALSTATEDIR "/run/rrdcached.pid";
335 status = unlink (file);
336 if (status == 0)
337 return (0);
338 return (errno);
339 } /* }}} int remove_pidfile */
341 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
342 {
343 char *eol;
345 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
346 sock->next_read - sock->next_cmd);
348 if (eol == NULL)
349 {
350 /* no commands left, move remainder back to front of rbuf */
351 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
352 sock->next_read - sock->next_cmd);
353 sock->next_read -= sock->next_cmd;
354 sock->next_cmd = 0;
355 *len = 0;
356 return NULL;
357 }
358 else
359 {
360 char *cmd = sock->rbuf + sock->next_cmd;
361 *eol = '\0';
363 sock->next_cmd = eol - sock->rbuf + 1;
365 if (eol > sock->rbuf && *(eol-1) == '\r')
366 *(--eol) = '\0'; /* handle "\r\n" EOL */
368 *len = eol - cmd;
370 return cmd;
371 }
373 /* NOTREACHED */
374 assert(1==0);
375 }
377 /* add the characters directly to the write buffer */
378 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
379 {
380 char *new_buf;
382 assert(sock != NULL);
384 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
385 if (new_buf == NULL)
386 {
387 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
388 return -1;
389 }
391 strncpy(new_buf + sock->wbuf_len, str, len + 1);
393 sock->wbuf = new_buf;
394 sock->wbuf_len += len;
396 return 0;
397 } /* }}} static int add_to_wbuf */
399 /* add the text to the "extra" info that's sent after the status line */
400 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
401 {
402 va_list argp;
403 char buffer[CMD_MAX];
404 int len;
406 if (sock == NULL) return 0; /* journal replay mode */
407 if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
409 va_start(argp, fmt);
410 #ifdef HAVE_VSNPRINTF
411 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
412 #else
413 len = vsprintf(buffer, fmt, argp);
414 #endif
415 va_end(argp);
416 if (len < 0)
417 {
418 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
419 return -1;
420 }
422 return add_to_wbuf(sock, buffer, len);
423 } /* }}} static int add_response_info */
425 static int count_lines(char *str) /* {{{ */
426 {
427 int lines = 0;
429 if (str != NULL)
430 {
431 while ((str = strchr(str, '\n')) != NULL)
432 {
433 ++lines;
434 ++str;
435 }
436 }
438 return lines;
439 } /* }}} static int count_lines */
441 /* send the response back to the user.
442 * returns 0 on success, -1 on error
443 * write buffer is always zeroed after this call */
444 static int send_response (listen_socket_t *sock, response_code rc,
445 char *fmt, ...) /* {{{ */
446 {
447 va_list argp;
448 char buffer[CMD_MAX];
449 int lines;
450 ssize_t wrote;
451 int rclen, len;
453 if (sock == NULL) return rc; /* journal replay mode */
455 if (sock->batch_mode)
456 {
457 if (rc == RESP_OK)
458 return rc; /* no response on success during BATCH */
459 lines = sock->batch_cmd;
460 }
461 else if (rc == RESP_OK)
462 lines = count_lines(sock->wbuf);
463 else
464 lines = -1;
466 rclen = sprintf(buffer, "%d ", lines);
467 va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
470 #else
471 len = vsprintf(buffer+rclen, fmt, argp);
472 #endif
473 va_end(argp);
474 if (len < 0)
475 return -1;
477 len += rclen;
479 /* append the result to the wbuf, don't write to the user */
480 if (sock->batch_mode)
481 return add_to_wbuf(sock, buffer, len);
483 /* first write must be complete */
484 if (len != write(sock->fd, buffer, len))
485 {
486 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
487 return -1;
488 }
490 if (sock->wbuf != NULL)
491 {
492 wrote = 0;
493 while (wrote < sock->wbuf_len)
494 {
495 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
496 if (wb <= 0)
497 {
498 RRDD_LOG(LOG_INFO, "send_response: could not write results");
499 return -1;
500 }
501 wrote += wb;
502 }
503 }
505 free(sock->wbuf); sock->wbuf = NULL;
506 sock->wbuf_len = 0;
508 return 0;
509 } /* }}} */
511 static void wipe_ci_values(cache_item_t *ci, time_t when)
512 {
513 ci->values = NULL;
514 ci->values_num = 0;
516 ci->last_flush_time = when;
517 if (config_write_jitter > 0)
518 ci->last_flush_time += (random() % config_write_jitter);
519 }
521 /* remove_from_queue
522 * remove a "cache_item_t" item from the queue.
523 * must hold 'cache_lock' when calling this
524 */
525 static void remove_from_queue(cache_item_t *ci) /* {{{ */
526 {
527 if (ci == NULL) return;
529 if (ci->prev == NULL)
530 cache_queue_head = ci->next; /* reset head */
531 else
532 ci->prev->next = ci->next;
534 if (ci->next == NULL)
535 cache_queue_tail = ci->prev; /* reset the tail */
536 else
537 ci->next->prev = ci->prev;
539 ci->next = ci->prev = NULL;
540 ci->flags &= ~CI_FLAGS_IN_QUEUE;
541 } /* }}} static void remove_from_queue */
543 /*
544 * enqueue_cache_item:
545 * `cache_lock' must be acquired before calling this function!
546 */
547 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
548 queue_side_t side)
549 {
550 if (ci == NULL)
551 return (-1);
553 if (ci->values_num == 0)
554 return (0);
556 if (side == HEAD)
557 {
558 if (cache_queue_head == ci)
559 return 0;
561 /* remove from the double linked list */
562 if (ci->flags & CI_FLAGS_IN_QUEUE)
563 remove_from_queue(ci);
565 ci->prev = NULL;
566 ci->next = cache_queue_head;
567 if (ci->next != NULL)
568 ci->next->prev = ci;
569 cache_queue_head = ci;
571 if (cache_queue_tail == NULL)
572 cache_queue_tail = cache_queue_head;
573 }
574 else /* (side == TAIL) */
575 {
576 /* We don't move values back in the list.. */
577 if (ci->flags & CI_FLAGS_IN_QUEUE)
578 return (0);
580 assert (ci->next == NULL);
581 assert (ci->prev == NULL);
583 ci->prev = cache_queue_tail;
585 if (cache_queue_tail == NULL)
586 cache_queue_head = ci;
587 else
588 cache_queue_tail->next = ci;
590 cache_queue_tail = ci;
591 }
593 ci->flags |= CI_FLAGS_IN_QUEUE;
595 pthread_cond_broadcast(&cache_cond);
596 pthread_mutex_lock (&stats_lock);
597 stats_queue_length++;
598 pthread_mutex_unlock (&stats_lock);
600 return (0);
601 } /* }}} int enqueue_cache_item */
603 /*
604 * tree_callback_flush:
605 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
606 * while this is in progress.
607 */
608 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
609 gpointer data)
610 {
611 cache_item_t *ci;
612 callback_flush_data_t *cfd;
614 ci = (cache_item_t *) value;
615 cfd = (callback_flush_data_t *) data;
617 if ((ci->last_flush_time <= cfd->abs_timeout)
618 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
619 && (ci->values_num > 0))
620 {
621 enqueue_cache_item (ci, TAIL);
622 }
623 else if ((do_shutdown != 0)
624 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
625 && (ci->values_num > 0))
626 {
627 enqueue_cache_item (ci, TAIL);
628 }
629 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
630 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
631 && (ci->values_num <= 0))
632 {
633 char **temp;
635 temp = (char **) realloc (cfd->keys,
636 sizeof (char *) * (cfd->keys_num + 1));
637 if (temp == NULL)
638 {
639 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
640 return (FALSE);
641 }
642 cfd->keys = temp;
643 /* Make really sure this points to the _same_ place */
644 assert ((char *) key == ci->file);
645 cfd->keys[cfd->keys_num] = (char *) key;
646 cfd->keys_num++;
647 }
649 return (FALSE);
650 } /* }}} gboolean tree_callback_flush */
652 static int flush_old_values (int max_age)
653 {
654 callback_flush_data_t cfd;
655 size_t k;
657 memset (&cfd, 0, sizeof (cfd));
658 /* Pass the current time as user data so that we don't need to call
659 * `time' for each node. */
660 cfd.now = time (NULL);
661 cfd.keys = NULL;
662 cfd.keys_num = 0;
664 if (max_age > 0)
665 cfd.abs_timeout = cfd.now - max_age;
666 else
667 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
669 /* `tree_callback_flush' will return the keys of all values that haven't
670 * been touched in the last `config_flush_interval' seconds in `cfd'.
671 * The char*'s in this array point to the same memory as ci->file, so we
672 * don't need to free them separately. */
673 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
675 for (k = 0; k < cfd.keys_num; k++)
676 {
677 cache_item_t *ci;
679 /* This must not fail. */
680 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
681 assert (ci != NULL);
683 /* If we end up here with values available, something's seriously
684 * messed up. */
685 assert (ci->values_num == 0);
687 /* Remove the node from the tree */
688 g_tree_remove (cache_tree, cfd.keys[k]);
689 cfd.keys[k] = NULL;
691 /* Now free and clean up `ci'. */
692 free (ci->file);
693 ci->file = NULL;
694 free (ci);
695 ci = NULL;
696 } /* for (k = 0; k < cfd.keys_num; k++) */
698 if (cfd.keys != NULL)
699 {
700 free (cfd.keys);
701 cfd.keys = NULL;
702 }
704 return (0);
705 } /* int flush_old_values */
707 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
708 {
709 struct timeval now;
710 struct timespec next_flush;
711 int final_flush = 0; /* make sure we only flush once on shutdown */
713 gettimeofday (&now, NULL);
714 next_flush.tv_sec = now.tv_sec + config_flush_interval;
715 next_flush.tv_nsec = 1000 * now.tv_usec;
717 pthread_mutex_lock (&cache_lock);
718 while ((do_shutdown == 0) || (cache_queue_head != NULL))
719 {
720 cache_item_t *ci;
721 char *file;
722 char **values;
723 int values_num;
724 int status;
725 int i;
727 /* First, check if it's time to do the cache flush. */
728 gettimeofday (&now, NULL);
729 if ((now.tv_sec > next_flush.tv_sec)
730 || ((now.tv_sec == next_flush.tv_sec)
731 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
732 {
733 /* Flush all values that haven't been written in the last
734 * `config_write_interval' seconds. */
735 flush_old_values (config_write_interval);
737 /* Determine the time of the next cache flush. */
738 while (next_flush.tv_sec <= now.tv_sec)
739 next_flush.tv_sec += config_flush_interval;
741 /* unlock the cache while we rotate so we don't block incoming
742 * updates if the fsync() blocks on disk I/O */
743 pthread_mutex_unlock(&cache_lock);
744 journal_rotate();
745 pthread_mutex_lock(&cache_lock);
746 }
748 /* Now, check if there's something to store away. If not, wait until
749 * something comes in or it's time to do the cache flush. if we are
750 * shutting down, do not wait around. */
751 if (cache_queue_head == NULL && !do_shutdown)
752 {
753 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
754 if ((status != 0) && (status != ETIMEDOUT))
755 {
756 RRDD_LOG (LOG_ERR, "queue_thread_main: "
757 "pthread_cond_timedwait returned %i.", status);
758 }
759 }
761 /* We're about to shut down */
762 if (do_shutdown != 0 && !final_flush++)
763 {
764 if (config_flush_at_shutdown)
765 flush_old_values (-1); /* flush everything */
766 else
767 break;
768 }
770 /* Check if a value has arrived. This may be NULL if we timed out or there
771 * was an interrupt such as a signal. */
772 if (cache_queue_head == NULL)
773 continue;
775 ci = cache_queue_head;
777 /* copy the relevant parts */
778 file = strdup (ci->file);
779 if (file == NULL)
780 {
781 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
782 continue;
783 }
785 assert(ci->values != NULL);
786 assert(ci->values_num > 0);
788 values = ci->values;
789 values_num = ci->values_num;
791 wipe_ci_values(ci, time(NULL));
792 remove_from_queue(ci);
794 pthread_mutex_lock (&stats_lock);
795 assert (stats_queue_length > 0);
796 stats_queue_length--;
797 pthread_mutex_unlock (&stats_lock);
799 pthread_mutex_unlock (&cache_lock);
801 rrd_clear_error ();
802 status = rrd_update_r (file, NULL, values_num, (void *) values);
803 if (status != 0)
804 {
805 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
806 "rrd_update_r (%s) failed with status %i. (%s)",
807 file, status, rrd_get_error());
808 }
810 journal_write("wrote", file);
811 pthread_cond_broadcast(&ci->flushed);
813 for (i = 0; i < values_num; i++)
814 free (values[i]);
816 free(values);
817 free(file);
819 if (status == 0)
820 {
821 pthread_mutex_lock (&stats_lock);
822 stats_updates_written++;
823 stats_data_sets_written += values_num;
824 pthread_mutex_unlock (&stats_lock);
825 }
827 pthread_mutex_lock (&cache_lock);
829 /* We're about to shut down */
830 if (do_shutdown != 0 && !final_flush++)
831 {
832 if (config_flush_at_shutdown)
833 flush_old_values (-1); /* flush everything */
834 else
835 break;
836 }
837 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
838 pthread_mutex_unlock (&cache_lock);
840 if (config_flush_at_shutdown)
841 {
842 assert(cache_queue_head == NULL);
843 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
844 }
846 journal_done();
848 return (NULL);
849 } /* }}} void *queue_thread_main */
851 static int buffer_get_field (char **buffer_ret, /* {{{ */
852 size_t *buffer_size_ret, char **field_ret)
853 {
854 char *buffer;
855 size_t buffer_pos;
856 size_t buffer_size;
857 char *field;
858 size_t field_size;
859 int status;
861 buffer = *buffer_ret;
862 buffer_pos = 0;
863 buffer_size = *buffer_size_ret;
864 field = *buffer_ret;
865 field_size = 0;
867 if (buffer_size <= 0)
868 return (-1);
870 /* This is ensured by `handle_request'. */
871 assert (buffer[buffer_size - 1] == '\0');
873 status = -1;
874 while (buffer_pos < buffer_size)
875 {
876 /* Check for end-of-field or end-of-buffer */
877 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
878 {
879 field[field_size] = 0;
880 field_size++;
881 buffer_pos++;
882 status = 0;
883 break;
884 }
885 /* Handle escaped characters. */
886 else if (buffer[buffer_pos] == '\\')
887 {
888 if (buffer_pos >= (buffer_size - 1))
889 break;
890 buffer_pos++;
891 field[field_size] = buffer[buffer_pos];
892 field_size++;
893 buffer_pos++;
894 }
895 /* Normal operation */
896 else
897 {
898 field[field_size] = buffer[buffer_pos];
899 field_size++;
900 buffer_pos++;
901 }
902 } /* while (buffer_pos < buffer_size) */
904 if (status != 0)
905 return (status);
907 *buffer_ret = buffer + buffer_pos;
908 *buffer_size_ret = buffer_size - buffer_pos;
909 *field_ret = field;
911 return (0);
912 } /* }}} int buffer_get_field */
914 /* if we're restricting writes to the base directory,
915 * check whether the file falls within the dir
916 * returns 1 if OK, otherwise 0
917 */
918 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
919 {
920 assert(file != NULL);
922 if (!config_write_base_only
923 || sock == NULL /* journal replay */
924 || config_base_dir == NULL)
925 return 1;
927 if (strstr(file, "../") != NULL) goto err;
929 /* relative paths without "../" are ok */
930 if (*file != '/') return 1;
932 /* file must be of the format base + "/" + <1+ char filename> */
933 if (strlen(file) < _config_base_dir_len + 2) goto err;
934 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
935 if (*(file + _config_base_dir_len) != '/') goto err;
937 return 1;
939 err:
940 if (sock != NULL && sock->fd >= 0)
941 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
943 return 0;
944 } /* }}} static int check_file_access */
946 /* returns 1 if we have the required privilege level,
947 * otherwise issue an error to the user on sock */
948 static int has_privilege (listen_socket_t *sock, /* {{{ */
949 socket_privilege priv)
950 {
951 if (sock == NULL) /* journal replay */
952 return 1;
954 if (sock->privilege >= priv)
955 return 1;
957 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
958 } /* }}} static int has_privilege */
960 static int flush_file (const char *filename) /* {{{ */
961 {
962 cache_item_t *ci;
964 pthread_mutex_lock (&cache_lock);
966 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
967 if (ci == NULL)
968 {
969 pthread_mutex_unlock (&cache_lock);
970 return (ENOENT);
971 }
973 if (ci->values_num > 0)
974 {
975 /* Enqueue at head */
976 enqueue_cache_item (ci, HEAD);
977 pthread_cond_wait(&ci->flushed, &cache_lock);
978 }
980 pthread_mutex_unlock(&cache_lock);
982 return (0);
983 } /* }}} int flush_file */
985 static int handle_request_help (listen_socket_t *sock, /* {{{ */
986 char *buffer, size_t buffer_size)
987 {
988 int status;
989 char **help_text;
990 char *command;
992 char *help_help[2] =
993 {
994 "Command overview\n"
995 ,
996 "FLUSH <filename>\n"
997 "FLUSHALL\n"
998 "HELP [<command>]\n"
999 "UPDATE <filename> <values> [<values> ...]\n"
1000 "BATCH\n"
1001 "STATS\n"
1002 };
1004 char *help_flush[2] =
1005 {
1006 "Help for FLUSH\n"
1007 ,
1008 "Usage: FLUSH <filename>\n"
1009 "\n"
1010 "Adds the given filename to the head of the update queue and returns\n"
1011 "after is has been dequeued.\n"
1012 };
1014 char *help_flushall[2] =
1015 {
1016 "Help for FLUSHALL\n"
1017 ,
1018 "Usage: FLUSHALL\n"
1019 "\n"
1020 "Triggers writing of all pending updates. Returns immediately.\n"
1021 };
1023 char *help_update[2] =
1024 {
1025 "Help for UPDATE\n"
1026 ,
1027 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1028 "\n"
1029 "Adds the given file to the internal cache if it is not yet known and\n"
1030 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1031 "for details.\n"
1032 "\n"
1033 "Each <values> has the following form:\n"
1034 " <values> = <time>:<value>[:<value>[...]]\n"
1035 "See the rrdupdate(1) manpage for details.\n"
1036 };
1038 char *help_stats[2] =
1039 {
1040 "Help for STATS\n"
1041 ,
1042 "Usage: STATS\n"
1043 "\n"
1044 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1045 "a description of the values.\n"
1046 };
1048 char *help_batch[2] =
1049 {
1050 "Help for BATCH\n"
1051 ,
1052 "The 'BATCH' command permits the client to initiate a bulk load\n"
1053 " of commands to rrdcached.\n"
1054 "\n"
1055 "Usage:\n"
1056 "\n"
1057 " client: BATCH\n"
1058 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1059 " client: command #1\n"
1060 " client: command #2\n"
1061 " client: ... and so on\n"
1062 " client: .\n"
1063 " server: 2 errors\n"
1064 " server: 7 message for command #7\n"
1065 " server: 9 message for command #9\n"
1066 "\n"
1067 "For more information, consult the rrdcached(1) documentation.\n"
1068 };
1070 status = buffer_get_field (&buffer, &buffer_size, &command);
1071 if (status != 0)
1072 help_text = help_help;
1073 else
1074 {
1075 if (strcasecmp (command, "update") == 0)
1076 help_text = help_update;
1077 else if (strcasecmp (command, "flush") == 0)
1078 help_text = help_flush;
1079 else if (strcasecmp (command, "flushall") == 0)
1080 help_text = help_flushall;
1081 else if (strcasecmp (command, "stats") == 0)
1082 help_text = help_stats;
1083 else if (strcasecmp (command, "batch") == 0)
1084 help_text = help_batch;
1085 else
1086 help_text = help_help;
1087 }
1089 add_response_info(sock, help_text[1]);
1090 return send_response(sock, RESP_OK, help_text[0]);
1091 } /* }}} int handle_request_help */
1093 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1094 {
1095 uint64_t copy_queue_length;
1096 uint64_t copy_updates_received;
1097 uint64_t copy_flush_received;
1098 uint64_t copy_updates_written;
1099 uint64_t copy_data_sets_written;
1100 uint64_t copy_journal_bytes;
1101 uint64_t copy_journal_rotate;
1103 uint64_t tree_nodes_number;
1104 uint64_t tree_depth;
1106 pthread_mutex_lock (&stats_lock);
1107 copy_queue_length = stats_queue_length;
1108 copy_updates_received = stats_updates_received;
1109 copy_flush_received = stats_flush_received;
1110 copy_updates_written = stats_updates_written;
1111 copy_data_sets_written = stats_data_sets_written;
1112 copy_journal_bytes = stats_journal_bytes;
1113 copy_journal_rotate = stats_journal_rotate;
1114 pthread_mutex_unlock (&stats_lock);
1116 pthread_mutex_lock (&cache_lock);
1117 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1118 tree_depth = (uint64_t) g_tree_height (cache_tree);
1119 pthread_mutex_unlock (&cache_lock);
1121 add_response_info(sock,
1122 "QueueLength: %"PRIu64"\n", copy_queue_length);
1123 add_response_info(sock,
1124 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1125 add_response_info(sock,
1126 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1127 add_response_info(sock,
1128 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1129 add_response_info(sock,
1130 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1131 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1132 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1133 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1134 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1136 send_response(sock, RESP_OK, "Statistics follow\n");
1138 return (0);
1139 } /* }}} int handle_request_stats */
1141 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1142 char *buffer, size_t buffer_size)
1143 {
1144 char *file;
1145 int status;
1147 status = buffer_get_field (&buffer, &buffer_size, &file);
1148 if (status != 0)
1149 {
1150 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1151 }
1152 else
1153 {
1154 pthread_mutex_lock(&stats_lock);
1155 stats_flush_received++;
1156 pthread_mutex_unlock(&stats_lock);
1158 if (!check_file_access(file, sock)) return 0;
1160 status = flush_file (file);
1161 if (status == 0)
1162 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1163 else if (status == ENOENT)
1164 {
1165 /* no file in our tree; see whether it exists at all */
1166 struct stat statbuf;
1168 memset(&statbuf, 0, sizeof(statbuf));
1169 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1170 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1171 else
1172 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1173 }
1174 else if (status < 0)
1175 return send_response(sock, RESP_ERR, "Internal error.\n");
1176 else
1177 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1178 }
1180 /* NOTREACHED */
1181 assert(1==0);
1182 } /* }}} int handle_request_slurp */
1184 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1185 {
1186 int status;
1188 status = has_privilege(sock, PRIV_HIGH);
1189 if (status <= 0)
1190 return status;
1192 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1194 pthread_mutex_lock(&cache_lock);
1195 flush_old_values(-1);
1196 pthread_mutex_unlock(&cache_lock);
1198 return send_response(sock, RESP_OK, "Started flush.\n");
1199 } /* }}} static int handle_request_flushall */
1201 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1202 char *buffer, size_t buffer_size)
1203 {
1204 char *file;
1205 int values_num = 0;
1206 int status;
1207 char orig_buf[CMD_MAX];
1209 time_t now;
1210 cache_item_t *ci;
1212 now = time (NULL);
1214 status = has_privilege(sock, PRIV_HIGH);
1215 if (status <= 0)
1216 return status;
1218 /* save it for the journal later */
1219 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1221 status = buffer_get_field (&buffer, &buffer_size, &file);
1222 if (status != 0)
1223 return send_response(sock, RESP_ERR,
1224 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1226 pthread_mutex_lock(&stats_lock);
1227 stats_updates_received++;
1228 pthread_mutex_unlock(&stats_lock);
1230 if (!check_file_access(file, sock)) return 0;
1232 pthread_mutex_lock (&cache_lock);
1233 ci = g_tree_lookup (cache_tree, file);
1235 if (ci == NULL) /* {{{ */
1236 {
1237 struct stat statbuf;
1239 /* don't hold the lock while we setup; stat(2) might block */
1240 pthread_mutex_unlock(&cache_lock);
1242 memset (&statbuf, 0, sizeof (statbuf));
1243 status = stat (file, &statbuf);
1244 if (status != 0)
1245 {
1246 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1248 status = errno;
1249 if (status == ENOENT)
1250 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1251 else
1252 return send_response(sock, RESP_ERR,
1253 "stat failed with error %i.\n", status);
1254 }
1255 if (!S_ISREG (statbuf.st_mode))
1256 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1258 if (access(file, R_OK|W_OK) != 0)
1259 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1260 file, rrd_strerror(errno));
1262 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1263 if (ci == NULL)
1264 {
1265 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1267 return send_response(sock, RESP_ERR, "malloc failed.\n");
1268 }
1269 memset (ci, 0, sizeof (cache_item_t));
1271 ci->file = strdup (file);
1272 if (ci->file == NULL)
1273 {
1274 free (ci);
1275 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1277 return send_response(sock, RESP_ERR, "strdup failed.\n");
1278 }
1280 wipe_ci_values(ci, now);
1281 ci->flags = CI_FLAGS_IN_TREE;
1283 pthread_mutex_lock(&cache_lock);
1284 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1285 } /* }}} */
1286 assert (ci != NULL);
1288 /* don't re-write updates in replay mode */
1289 if (sock != NULL)
1290 journal_write("update", orig_buf);
1292 while (buffer_size > 0)
1293 {
1294 char **temp;
1295 char *value;
1297 status = buffer_get_field (&buffer, &buffer_size, &value);
1298 if (status != 0)
1299 {
1300 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1301 break;
1302 }
1304 temp = (char **) realloc (ci->values,
1305 sizeof (char *) * (ci->values_num + 1));
1306 if (temp == NULL)
1307 {
1308 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1309 continue;
1310 }
1311 ci->values = temp;
1313 ci->values[ci->values_num] = strdup (value);
1314 if (ci->values[ci->values_num] == NULL)
1315 {
1316 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1317 continue;
1318 }
1319 ci->values_num++;
1321 values_num++;
1322 }
1324 if (((now - ci->last_flush_time) >= config_write_interval)
1325 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1326 && (ci->values_num > 0))
1327 {
1328 enqueue_cache_item (ci, TAIL);
1329 }
1331 pthread_mutex_unlock (&cache_lock);
1333 if (values_num < 1)
1334 return send_response(sock, RESP_ERR, "No values updated.\n");
1335 else
1336 return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1338 /* NOTREACHED */
1339 assert(1==0);
1341 } /* }}} int handle_request_update */
1343 /* we came across a "WROTE" entry during journal replay.
1344 * throw away any values that we have accumulated for this file
1345 */
1346 static int handle_request_wrote (const char *buffer) /* {{{ */
1347 {
1348 int i;
1349 cache_item_t *ci;
1350 const char *file = buffer;
1352 pthread_mutex_lock(&cache_lock);
1354 ci = g_tree_lookup(cache_tree, file);
1355 if (ci == NULL)
1356 {
1357 pthread_mutex_unlock(&cache_lock);
1358 return (0);
1359 }
1361 if (ci->values)
1362 {
1363 for (i=0; i < ci->values_num; i++)
1364 free(ci->values[i]);
1366 free(ci->values);
1367 }
1369 wipe_ci_values(ci, time(NULL));
1370 remove_from_queue(ci);
1372 pthread_mutex_unlock(&cache_lock);
1373 return (0);
1374 } /* }}} int handle_request_wrote */
1376 /* start "BATCH" processing */
1377 static int batch_start (listen_socket_t *sock) /* {{{ */
1378 {
1379 int status;
1380 if (sock->batch_mode)
1381 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1383 status = send_response(sock, RESP_OK,
1384 "Go ahead. End with dot '.' on its own line.\n");
1385 sock->batch_mode = 1;
1386 sock->batch_cmd = 0;
1388 return status;
1389 } /* }}} static int batch_start */
1391 /* finish "BATCH" processing and return results to the client */
1392 static int batch_done (listen_socket_t *sock) /* {{{ */
1393 {
1394 assert(sock->batch_mode);
1395 sock->batch_mode = 0;
1396 sock->batch_cmd = 0;
1397 return send_response(sock, RESP_OK, "errors\n");
1398 } /* }}} static int batch_done */
1400 /* if sock==NULL, we are in journal replay mode */
1401 static int handle_request (listen_socket_t *sock, /* {{{ */
1402 char *buffer, size_t buffer_size)
1403 {
1404 char *buffer_ptr;
1405 char *command;
1406 int status;
1408 assert (buffer[buffer_size - 1] == '\0');
1410 buffer_ptr = buffer;
1411 command = NULL;
1412 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1413 if (status != 0)
1414 {
1415 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1416 return (-1);
1417 }
1419 if (sock != NULL && sock->batch_mode)
1420 sock->batch_cmd++;
1422 if (strcasecmp (command, "update") == 0)
1423 return (handle_request_update (sock, buffer_ptr, buffer_size));
1424 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1425 {
1426 /* this is only valid in replay mode */
1427 return (handle_request_wrote (buffer_ptr));
1428 }
1429 else if (strcasecmp (command, "flush") == 0)
1430 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1431 else if (strcasecmp (command, "flushall") == 0)
1432 return (handle_request_flushall(sock));
1433 else if (strcasecmp (command, "stats") == 0)
1434 return (handle_request_stats (sock));
1435 else if (strcasecmp (command, "help") == 0)
1436 return (handle_request_help (sock, buffer_ptr, buffer_size));
1437 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1438 return batch_start(sock);
1439 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1440 return batch_done(sock);
1441 else
1442 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1444 /* NOTREACHED */
1445 assert(1==0);
1446 } /* }}} int handle_request */
1448 /* MUST NOT hold journal_lock before calling this */
1449 static void journal_rotate(void) /* {{{ */
1450 {
1451 FILE *old_fh = NULL;
1452 int new_fd;
1454 if (journal_cur == NULL || journal_old == NULL)
1455 return;
1457 pthread_mutex_lock(&journal_lock);
1459 /* we rotate this way (rename before close) so that the we can release
1460 * the journal lock as fast as possible. Journal writes to the new
1461 * journal can proceed immediately after the new file is opened. The
1462 * fclose can then block without affecting new updates.
1463 */
1464 if (journal_fh != NULL)
1465 {
1466 old_fh = journal_fh;
1467 journal_fh = NULL;
1468 rename(journal_cur, journal_old);
1469 ++stats_journal_rotate;
1470 }
1472 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1473 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1474 if (new_fd >= 0)
1475 {
1476 journal_fh = fdopen(new_fd, "a");
1477 if (journal_fh == NULL)
1478 close(new_fd);
1479 }
1481 pthread_mutex_unlock(&journal_lock);
1483 if (old_fh != NULL)
1484 fclose(old_fh);
1486 if (journal_fh == NULL)
1487 {
1488 RRDD_LOG(LOG_CRIT,
1489 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1490 journal_cur, rrd_strerror(errno));
1492 RRDD_LOG(LOG_ERR,
1493 "JOURNALING DISABLED: All values will be flushed at shutdown");
1494 config_flush_at_shutdown = 1;
1495 }
1497 } /* }}} static void journal_rotate */
1499 static void journal_done(void) /* {{{ */
1500 {
1501 if (journal_cur == NULL)
1502 return;
1504 pthread_mutex_lock(&journal_lock);
1505 if (journal_fh != NULL)
1506 {
1507 fclose(journal_fh);
1508 journal_fh = NULL;
1509 }
1511 if (config_flush_at_shutdown)
1512 {
1513 RRDD_LOG(LOG_INFO, "removing journals");
1514 unlink(journal_old);
1515 unlink(journal_cur);
1516 }
1517 else
1518 {
1519 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1520 "journals will be used at next startup");
1521 }
1523 pthread_mutex_unlock(&journal_lock);
1525 } /* }}} static void journal_done */
1527 static int journal_write(char *cmd, char *args) /* {{{ */
1528 {
1529 int chars;
1531 if (journal_fh == NULL)
1532 return 0;
1534 pthread_mutex_lock(&journal_lock);
1535 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1536 pthread_mutex_unlock(&journal_lock);
1538 if (chars > 0)
1539 {
1540 pthread_mutex_lock(&stats_lock);
1541 stats_journal_bytes += chars;
1542 pthread_mutex_unlock(&stats_lock);
1543 }
1545 return chars;
1546 } /* }}} static int journal_write */
1548 static int journal_replay (const char *file) /* {{{ */
1549 {
1550 FILE *fh;
1551 int entry_cnt = 0;
1552 int fail_cnt = 0;
1553 uint64_t line = 0;
1554 char entry[CMD_MAX];
1556 if (file == NULL) return 0;
1558 {
1559 char *reason;
1560 int status = 0;
1561 struct stat statbuf;
1563 memset(&statbuf, 0, sizeof(statbuf));
1564 if (stat(file, &statbuf) != 0)
1565 {
1566 if (errno == ENOENT)
1567 return 0;
1569 reason = "stat error";
1570 status = errno;
1571 }
1572 else if (!S_ISREG(statbuf.st_mode))
1573 {
1574 reason = "not a regular file";
1575 status = EPERM;
1576 }
1577 if (statbuf.st_uid != daemon_uid)
1578 {
1579 reason = "not owned by daemon user";
1580 status = EACCES;
1581 }
1582 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1583 {
1584 reason = "must not be user/group writable";
1585 status = EACCES;
1586 }
1588 if (status != 0)
1589 {
1590 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1591 file, rrd_strerror(status), reason);
1592 return 0;
1593 }
1594 }
1596 fh = fopen(file, "r");
1597 if (fh == NULL)
1598 {
1599 if (errno != ENOENT)
1600 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1601 file, rrd_strerror(errno));
1602 return 0;
1603 }
1604 else
1605 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1607 while(!feof(fh))
1608 {
1609 size_t entry_len;
1611 ++line;
1612 if (fgets(entry, sizeof(entry), fh) == NULL)
1613 break;
1614 entry_len = strlen(entry);
1616 /* check \n termination in case journal writing crashed mid-line */
1617 if (entry_len == 0)
1618 continue;
1619 else if (entry[entry_len - 1] != '\n')
1620 {
1621 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1622 ++fail_cnt;
1623 continue;
1624 }
1626 entry[entry_len - 1] = '\0';
1628 if (handle_request(NULL, entry, entry_len) == 0)
1629 ++entry_cnt;
1630 else
1631 ++fail_cnt;
1632 }
1634 fclose(fh);
1636 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1637 entry_cnt, fail_cnt);
1639 return entry_cnt > 0 ? 1 : 0;
1640 } /* }}} static int journal_replay */
1642 static void journal_init(void) /* {{{ */
1643 {
1644 int had_journal = 0;
1646 if (journal_cur == NULL) return;
1648 pthread_mutex_lock(&journal_lock);
1650 RRDD_LOG(LOG_INFO, "checking for journal files");
1652 had_journal += journal_replay(journal_old);
1653 had_journal += journal_replay(journal_cur);
1655 /* it must have been a crash. start a flush */
1656 if (had_journal && config_flush_at_shutdown)
1657 flush_old_values(-1);
1659 pthread_mutex_unlock(&journal_lock);
1660 journal_rotate();
1662 RRDD_LOG(LOG_INFO, "journal processing complete");
1664 } /* }}} static void journal_init */
1666 static void close_connection(listen_socket_t *sock)
1667 {
1668 close(sock->fd) ; sock->fd = -1;
1669 free(sock->rbuf); sock->rbuf = NULL;
1670 free(sock->wbuf); sock->wbuf = NULL;
1672 free(sock);
1673 }
1675 static void *connection_thread_main (void *args) /* {{{ */
1676 {
1677 pthread_t self;
1678 listen_socket_t *sock;
1679 int i;
1680 int fd;
1682 sock = (listen_socket_t *) args;
1683 fd = sock->fd;
1685 /* init read buffers */
1686 sock->next_read = sock->next_cmd = 0;
1687 sock->rbuf = malloc(RBUF_SIZE);
1688 if (sock->rbuf == NULL)
1689 {
1690 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1691 close_connection(sock);
1692 return NULL;
1693 }
1695 pthread_mutex_lock (&connection_threads_lock);
1696 {
1697 pthread_t *temp;
1699 temp = (pthread_t *) realloc (connection_threads,
1700 sizeof (pthread_t) * (connection_threads_num + 1));
1701 if (temp == NULL)
1702 {
1703 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1704 }
1705 else
1706 {
1707 connection_threads = temp;
1708 connection_threads[connection_threads_num] = pthread_self ();
1709 connection_threads_num++;
1710 }
1711 }
1712 pthread_mutex_unlock (&connection_threads_lock);
1714 while (do_shutdown == 0)
1715 {
1716 char *cmd;
1717 ssize_t cmd_len;
1718 ssize_t rbytes;
1720 struct pollfd pollfd;
1721 int status;
1723 pollfd.fd = fd;
1724 pollfd.events = POLLIN | POLLPRI;
1725 pollfd.revents = 0;
1727 status = poll (&pollfd, 1, /* timeout = */ 500);
1728 if (do_shutdown)
1729 break;
1730 else if (status == 0) /* timeout */
1731 continue;
1732 else if (status < 0) /* error */
1733 {
1734 status = errno;
1735 if (status == EINTR)
1736 continue;
1737 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1738 continue;
1739 }
1741 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1742 {
1743 close_connection(sock);
1744 break;
1745 }
1746 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1747 {
1748 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1749 "poll(2) returned something unexpected: %#04hx",
1750 pollfd.revents);
1751 close_connection(sock);
1752 break;
1753 }
1755 rbytes = read(fd, sock->rbuf + sock->next_read,
1756 RBUF_SIZE - sock->next_read);
1757 if (rbytes < 0)
1758 {
1759 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1760 break;
1761 }
1762 else if (rbytes == 0)
1763 break; /* eof */
1765 sock->next_read += rbytes;
1767 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1768 {
1769 status = handle_request (sock, cmd, cmd_len+1);
1770 if (status != 0)
1771 goto out_close;
1772 }
1773 }
1775 out_close:
1776 close_connection(sock);
1778 self = pthread_self ();
1779 /* Remove this thread from the connection threads list */
1780 pthread_mutex_lock (&connection_threads_lock);
1781 /* Find out own index in the array */
1782 for (i = 0; i < connection_threads_num; i++)
1783 if (pthread_equal (connection_threads[i], self) != 0)
1784 break;
1785 assert (i < connection_threads_num);
1787 /* Move the trailing threads forward. */
1788 if (i < (connection_threads_num - 1))
1789 {
1790 memmove (connection_threads + i,
1791 connection_threads + i + 1,
1792 sizeof (pthread_t) * (connection_threads_num - i - 1));
1793 }
1795 connection_threads_num--;
1796 pthread_mutex_unlock (&connection_threads_lock);
1798 return (NULL);
1799 } /* }}} void *connection_thread_main */
1801 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1802 {
1803 int fd;
1804 struct sockaddr_un sa;
1805 listen_socket_t *temp;
1806 int status;
1807 const char *path;
1809 path = sock->addr;
1810 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1811 path += strlen("unix:");
1813 temp = (listen_socket_t *) realloc (listen_fds,
1814 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1815 if (temp == NULL)
1816 {
1817 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1818 return (-1);
1819 }
1820 listen_fds = temp;
1821 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1823 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1824 if (fd < 0)
1825 {
1826 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1827 return (-1);
1828 }
1830 memset (&sa, 0, sizeof (sa));
1831 sa.sun_family = AF_UNIX;
1832 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1834 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1835 if (status != 0)
1836 {
1837 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1838 close (fd);
1839 unlink (path);
1840 return (-1);
1841 }
1843 status = listen (fd, /* backlog = */ 10);
1844 if (status != 0)
1845 {
1846 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1847 close (fd);
1848 unlink (path);
1849 return (-1);
1850 }
1852 listen_fds[listen_fds_num].fd = fd;
1853 listen_fds[listen_fds_num].family = PF_UNIX;
1854 strncpy(listen_fds[listen_fds_num].addr, path,
1855 sizeof (listen_fds[listen_fds_num].addr) - 1);
1856 listen_fds_num++;
1858 return (0);
1859 } /* }}} int open_listen_socket_unix */
1861 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1862 {
1863 struct addrinfo ai_hints;
1864 struct addrinfo *ai_res;
1865 struct addrinfo *ai_ptr;
1866 char addr_copy[NI_MAXHOST];
1867 char *addr;
1868 char *port;
1869 int status;
1871 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1872 addr_copy[sizeof (addr_copy) - 1] = 0;
1873 addr = addr_copy;
1875 memset (&ai_hints, 0, sizeof (ai_hints));
1876 ai_hints.ai_flags = 0;
1877 #ifdef AI_ADDRCONFIG
1878 ai_hints.ai_flags |= AI_ADDRCONFIG;
1879 #endif
1880 ai_hints.ai_family = AF_UNSPEC;
1881 ai_hints.ai_socktype = SOCK_STREAM;
1883 port = NULL;
1884 if (*addr == '[') /* IPv6+port format */
1885 {
1886 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1887 addr++;
1889 port = strchr (addr, ']');
1890 if (port == NULL)
1891 {
1892 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1893 sock->addr);
1894 return (-1);
1895 }
1896 *port = 0;
1897 port++;
1899 if (*port == ':')
1900 port++;
1901 else if (*port == 0)
1902 port = NULL;
1903 else
1904 {
1905 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1906 port);
1907 return (-1);
1908 }
1909 } /* if (*addr = ']') */
1910 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1911 {
1912 port = rindex(addr, ':');
1913 if (port != NULL)
1914 {
1915 *port = 0;
1916 port++;
1917 }
1918 }
1919 ai_res = NULL;
1920 status = getaddrinfo (addr,
1921 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1922 &ai_hints, &ai_res);
1923 if (status != 0)
1924 {
1925 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1926 "%s", addr, gai_strerror (status));
1927 return (-1);
1928 }
1930 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1931 {
1932 int fd;
1933 listen_socket_t *temp;
1934 int one = 1;
1936 temp = (listen_socket_t *) realloc (listen_fds,
1937 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1938 if (temp == NULL)
1939 {
1940 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1941 continue;
1942 }
1943 listen_fds = temp;
1944 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1946 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1947 if (fd < 0)
1948 {
1949 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1950 continue;
1951 }
1953 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1955 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1956 if (status != 0)
1957 {
1958 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1959 close (fd);
1960 continue;
1961 }
1963 status = listen (fd, /* backlog = */ 10);
1964 if (status != 0)
1965 {
1966 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1967 close (fd);
1968 return (-1);
1969 }
1971 listen_fds[listen_fds_num].fd = fd;
1972 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1973 listen_fds_num++;
1974 } /* for (ai_ptr) */
1976 return (0);
1977 } /* }}} static int open_listen_socket_network */
1979 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1980 {
1981 assert(sock != NULL);
1982 assert(sock->addr != NULL);
1984 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1985 || sock->addr[0] == '/')
1986 return (open_listen_socket_unix(sock));
1987 else
1988 return (open_listen_socket_network(sock));
1989 } /* }}} int open_listen_socket */
1991 static int close_listen_sockets (void) /* {{{ */
1992 {
1993 size_t i;
1995 for (i = 0; i < listen_fds_num; i++)
1996 {
1997 close (listen_fds[i].fd);
1999 if (listen_fds[i].family == PF_UNIX)
2000 unlink(listen_fds[i].addr);
2001 }
2003 free (listen_fds);
2004 listen_fds = NULL;
2005 listen_fds_num = 0;
2007 return (0);
2008 } /* }}} int close_listen_sockets */
2010 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2011 {
2012 struct pollfd *pollfds;
2013 int pollfds_num;
2014 int status;
2015 int i;
2017 for (i = 0; i < config_listen_address_list_len; i++)
2018 open_listen_socket (config_listen_address_list[i]);
2020 if (config_listen_address_list_len < 1)
2021 {
2022 listen_socket_t sock;
2023 memset(&sock, 0, sizeof(sock));
2024 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2025 open_listen_socket (&sock);
2026 }
2028 if (listen_fds_num < 1)
2029 {
2030 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2031 "could be opened. Sorry.");
2032 return (NULL);
2033 }
2035 pollfds_num = listen_fds_num;
2036 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2037 if (pollfds == NULL)
2038 {
2039 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2040 return (NULL);
2041 }
2042 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2044 RRDD_LOG(LOG_INFO, "listening for connections");
2046 while (do_shutdown == 0)
2047 {
2048 assert (pollfds_num == ((int) listen_fds_num));
2049 for (i = 0; i < pollfds_num; i++)
2050 {
2051 pollfds[i].fd = listen_fds[i].fd;
2052 pollfds[i].events = POLLIN | POLLPRI;
2053 pollfds[i].revents = 0;
2054 }
2056 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2057 if (do_shutdown)
2058 break;
2059 else if (status == 0) /* timeout */
2060 continue;
2061 else if (status < 0) /* error */
2062 {
2063 status = errno;
2064 if (status != EINTR)
2065 {
2066 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2067 }
2068 continue;
2069 }
2071 for (i = 0; i < pollfds_num; i++)
2072 {
2073 listen_socket_t *client_sock;
2074 struct sockaddr_storage client_sa;
2075 socklen_t client_sa_size;
2076 pthread_t tid;
2077 pthread_attr_t attr;
2079 if (pollfds[i].revents == 0)
2080 continue;
2082 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2083 {
2084 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2085 "poll(2) returned something unexpected for listen FD #%i.",
2086 pollfds[i].fd);
2087 continue;
2088 }
2090 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2091 if (client_sock == NULL)
2092 {
2093 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2094 continue;
2095 }
2096 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2098 client_sa_size = sizeof (client_sa);
2099 client_sock->fd = accept (pollfds[i].fd,
2100 (struct sockaddr *) &client_sa, &client_sa_size);
2101 if (client_sock->fd < 0)
2102 {
2103 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2104 free(client_sock);
2105 continue;
2106 }
2108 pthread_attr_init (&attr);
2109 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2111 status = pthread_create (&tid, &attr, connection_thread_main,
2112 client_sock);
2113 if (status != 0)
2114 {
2115 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2116 close_connection(client_sock);
2117 continue;
2118 }
2119 } /* for (pollfds_num) */
2120 } /* while (do_shutdown == 0) */
2122 RRDD_LOG(LOG_INFO, "starting shutdown");
2124 close_listen_sockets ();
2126 pthread_mutex_lock (&connection_threads_lock);
2127 while (connection_threads_num > 0)
2128 {
2129 pthread_t wait_for;
2131 wait_for = connection_threads[0];
2133 pthread_mutex_unlock (&connection_threads_lock);
2134 pthread_join (wait_for, /* retval = */ NULL);
2135 pthread_mutex_lock (&connection_threads_lock);
2136 }
2137 pthread_mutex_unlock (&connection_threads_lock);
2139 return (NULL);
2140 } /* }}} void *listen_thread_main */
2142 static int daemonize (void) /* {{{ */
2143 {
2144 int status;
2145 int fd;
2146 char *base_dir;
2148 daemon_uid = geteuid();
2150 fd = open_pidfile();
2151 if (fd < 0) return fd;
2153 if (!stay_foreground)
2154 {
2155 pid_t child;
2157 child = fork ();
2158 if (child < 0)
2159 {
2160 fprintf (stderr, "daemonize: fork(2) failed.\n");
2161 return (-1);
2162 }
2163 else if (child > 0)
2164 {
2165 return (1);
2166 }
2168 /* Become session leader */
2169 setsid ();
2171 /* Open the first three file descriptors to /dev/null */
2172 close (2);
2173 close (1);
2174 close (0);
2176 open ("/dev/null", O_RDWR);
2177 dup (0);
2178 dup (0);
2179 } /* if (!stay_foreground) */
2181 /* Change into the /tmp directory. */
2182 base_dir = (config_base_dir != NULL)
2183 ? config_base_dir
2184 : "/tmp";
2185 status = chdir (base_dir);
2186 if (status != 0)
2187 {
2188 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2189 return (-1);
2190 }
2192 install_signal_handlers();
2194 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2195 RRDD_LOG(LOG_INFO, "starting up");
2197 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2198 if (cache_tree == NULL)
2199 {
2200 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2201 return (-1);
2202 }
2204 status = write_pidfile (fd);
2205 return status;
2206 } /* }}} int daemonize */
2208 static int cleanup (void) /* {{{ */
2209 {
2210 do_shutdown++;
2212 pthread_cond_signal (&cache_cond);
2213 pthread_join (queue_thread, /* return = */ NULL);
2215 remove_pidfile ();
2217 RRDD_LOG(LOG_INFO, "goodbye");
2218 closelog ();
2220 return (0);
2221 } /* }}} int cleanup */
2223 static int read_options (int argc, char **argv) /* {{{ */
2224 {
2225 int option;
2226 int status = 0;
2228 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2229 {
2230 switch (option)
2231 {
2232 case 'g':
2233 stay_foreground=1;
2234 break;
2236 case 'L':
2237 case 'l':
2238 {
2239 listen_socket_t **temp;
2240 listen_socket_t *new;
2242 new = malloc(sizeof(listen_socket_t));
2243 if (new == NULL)
2244 {
2245 fprintf(stderr, "read_options: malloc failed.\n");
2246 return(2);
2247 }
2248 memset(new, 0, sizeof(listen_socket_t));
2250 temp = (listen_socket_t **) realloc (config_listen_address_list,
2251 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2252 if (temp == NULL)
2253 {
2254 fprintf (stderr, "read_options: realloc failed.\n");
2255 return (2);
2256 }
2257 config_listen_address_list = temp;
2259 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2260 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2262 temp[config_listen_address_list_len] = new;
2263 config_listen_address_list_len++;
2264 }
2265 break;
2267 case 'f':
2268 {
2269 int temp;
2271 temp = atoi (optarg);
2272 if (temp > 0)
2273 config_flush_interval = temp;
2274 else
2275 {
2276 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2277 status = 3;
2278 }
2279 }
2280 break;
2282 case 'w':
2283 {
2284 int temp;
2286 temp = atoi (optarg);
2287 if (temp > 0)
2288 config_write_interval = temp;
2289 else
2290 {
2291 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2292 status = 2;
2293 }
2294 }
2295 break;
2297 case 'z':
2298 {
2299 int temp;
2301 temp = atoi(optarg);
2302 if (temp > 0)
2303 config_write_jitter = temp;
2304 else
2305 {
2306 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2307 status = 2;
2308 }
2310 break;
2311 }
2313 case 'B':
2314 config_write_base_only = 1;
2315 break;
2317 case 'b':
2318 {
2319 size_t len;
2321 if (config_base_dir != NULL)
2322 free (config_base_dir);
2323 config_base_dir = strdup (optarg);
2324 if (config_base_dir == NULL)
2325 {
2326 fprintf (stderr, "read_options: strdup failed.\n");
2327 return (3);
2328 }
2330 len = strlen (config_base_dir);
2331 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2332 {
2333 config_base_dir[len - 1] = 0;
2334 len--;
2335 }
2337 if (len < 1)
2338 {
2339 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2340 return (4);
2341 }
2343 _config_base_dir_len = len;
2344 }
2345 break;
2347 case 'p':
2348 {
2349 if (config_pid_file != NULL)
2350 free (config_pid_file);
2351 config_pid_file = strdup (optarg);
2352 if (config_pid_file == NULL)
2353 {
2354 fprintf (stderr, "read_options: strdup failed.\n");
2355 return (3);
2356 }
2357 }
2358 break;
2360 case 'F':
2361 config_flush_at_shutdown = 1;
2362 break;
2364 case 'j':
2365 {
2366 struct stat statbuf;
2367 const char *dir = optarg;
2369 status = stat(dir, &statbuf);
2370 if (status != 0)
2371 {
2372 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2373 return 6;
2374 }
2376 if (!S_ISDIR(statbuf.st_mode)
2377 || access(dir, R_OK|W_OK|X_OK) != 0)
2378 {
2379 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2380 errno ? rrd_strerror(errno) : "");
2381 return 6;
2382 }
2384 journal_cur = malloc(PATH_MAX + 1);
2385 journal_old = malloc(PATH_MAX + 1);
2386 if (journal_cur == NULL || journal_old == NULL)
2387 {
2388 fprintf(stderr, "malloc failure for journal files\n");
2389 return 6;
2390 }
2391 else
2392 {
2393 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2394 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2395 }
2396 }
2397 break;
2399 case 'h':
2400 case '?':
2401 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2402 "\n"
2403 "Usage: rrdcached [options]\n"
2404 "\n"
2405 "Valid options are:\n"
2406 " -l <address> Socket address to listen to.\n"
2407 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2408 " -w <seconds> Interval in which to write data.\n"
2409 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2410 " -f <seconds> Interval in which to flush dead data.\n"
2411 " -p <file> Location of the PID-file.\n"
2412 " -b <dir> Base directory to change to.\n"
2413 " -B Restrict file access to paths within -b <dir>\n"
2414 " -g Do not fork and run in the foreground.\n"
2415 " -j <dir> Directory in which to create the journal files.\n"
2416 " -F Always flush all updates at shutdown\n"
2417 "\n"
2418 "For more information and a detailed description of all options "
2419 "please refer\n"
2420 "to the rrdcached(1) manual page.\n",
2421 VERSION);
2422 status = -1;
2423 break;
2424 } /* switch (option) */
2425 } /* while (getopt) */
2427 /* advise the user when values are not sane */
2428 if (config_flush_interval < 2 * config_write_interval)
2429 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2430 " 2x write interval (-w) !\n");
2431 if (config_write_jitter > config_write_interval)
2432 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2433 " write interval (-w) !\n");
2435 if (config_write_base_only && config_base_dir == NULL)
2436 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2437 " Consult the rrdcached documentation\n");
2439 if (journal_cur == NULL)
2440 config_flush_at_shutdown = 1;
2442 return (status);
2443 } /* }}} int read_options */
2445 int main (int argc, char **argv)
2446 {
2447 int status;
2449 status = read_options (argc, argv);
2450 if (status != 0)
2451 {
2452 if (status < 0)
2453 status = 0;
2454 return (status);
2455 }
2457 status = daemonize ();
2458 if (status == 1)
2459 {
2460 struct sigaction sigchld;
2462 memset (&sigchld, 0, sizeof (sigchld));
2463 sigchld.sa_handler = SIG_IGN;
2464 sigaction (SIGCHLD, &sigchld, NULL);
2466 return (0);
2467 }
2468 else if (status != 0)
2469 {
2470 fprintf (stderr, "daemonize failed, exiting.\n");
2471 return (1);
2472 }
2474 journal_init();
2476 /* start the queue thread */
2477 memset (&queue_thread, 0, sizeof (queue_thread));
2478 status = pthread_create (&queue_thread,
2479 NULL, /* attr */
2480 queue_thread_main,
2481 NULL); /* args */
2482 if (status != 0)
2483 {
2484 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2485 cleanup();
2486 return (1);
2487 }
2489 listen_thread_main (NULL);
2490 cleanup ();
2492 return (0);
2493 } /* int main */
2495 /*
2496 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2497 */