1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
113 {
114 int fd;
115 char addr[PATH_MAX + 1];
116 int family;
117 socket_privilege privilege;
119 /* state for BATCH processing */
120 int batch_mode;
121 int batch_cmd;
123 /* buffered IO */
124 char *rbuf;
125 off_t next_cmd;
126 off_t next_read;
128 char *wbuf;
129 ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137 char *file;
138 char **values;
139 int values_num;
140 time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143 int flags;
144 pthread_cond_t flushed;
145 cache_item_t *prev;
146 cache_item_t *next;
147 };
149 struct callback_flush_data_s
150 {
151 time_t now;
152 time_t abs_timeout;
153 char **keys;
154 size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
158 enum queue_side_e
159 {
160 HEAD,
161 TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
169 /*
170 * Variables
171 */
172 static int stay_foreground = 0;
173 static uid_t daemon_uid;
175 static listen_socket_t *listen_fds = NULL;
176 static size_t listen_fds_num = 0;
178 static int do_shutdown = 0;
180 static pthread_t queue_thread;
182 static pthread_t *connection_threads = NULL;
183 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
184 static int connection_threads_num = 0;
186 /* Cache stuff */
187 static GTree *cache_tree = NULL;
188 static cache_item_t *cache_queue_head = NULL;
189 static cache_item_t *cache_queue_tail = NULL;
190 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
193 static int config_write_interval = 300;
194 static int config_write_jitter = 0;
195 static int config_flush_interval = 3600;
196 static int config_flush_at_shutdown = 0;
197 static char *config_pid_file = NULL;
198 static char *config_base_dir = NULL;
199 static size_t _config_base_dir_len = 0;
200 static int config_write_base_only = 0;
202 static listen_socket_t **config_listen_address_list = NULL;
203 static int config_listen_address_list_len = 0;
205 static uint64_t stats_queue_length = 0;
206 static uint64_t stats_updates_received = 0;
207 static uint64_t stats_flush_received = 0;
208 static uint64_t stats_updates_written = 0;
209 static uint64_t stats_data_sets_written = 0;
210 static uint64_t stats_journal_bytes = 0;
211 static uint64_t stats_journal_rotate = 0;
212 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214 /* Journaled updates */
215 static char *journal_cur = NULL;
216 static char *journal_old = NULL;
217 static FILE *journal_fh = NULL;
218 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
219 static int journal_write(char *cmd, char *args);
220 static void journal_done(void);
221 static void journal_rotate(void);
223 /*
224 * Functions
225 */
226 static void sig_common (const char *sig) /* {{{ */
227 {
228 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
229 do_shutdown++;
230 pthread_cond_broadcast(&cache_cond);
231 } /* }}} void sig_common */
233 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
234 {
235 sig_common("INT");
236 } /* }}} void sig_int_handler */
238 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
239 {
240 sig_common("TERM");
241 } /* }}} void sig_term_handler */
243 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 config_flush_at_shutdown = 1;
246 sig_common("USR1");
247 } /* }}} void sig_usr1_handler */
249 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
250 {
251 config_flush_at_shutdown = 0;
252 sig_common("USR2");
253 } /* }}} void sig_usr2_handler */
255 static void install_signal_handlers(void) /* {{{ */
256 {
257 /* These structures are static, because `sigaction' behaves weird if the are
258 * overwritten.. */
259 static struct sigaction sa_int;
260 static struct sigaction sa_term;
261 static struct sigaction sa_pipe;
262 static struct sigaction sa_usr1;
263 static struct sigaction sa_usr2;
265 /* Install signal handlers */
266 memset (&sa_int, 0, sizeof (sa_int));
267 sa_int.sa_handler = sig_int_handler;
268 sigaction (SIGINT, &sa_int, NULL);
270 memset (&sa_term, 0, sizeof (sa_term));
271 sa_term.sa_handler = sig_term_handler;
272 sigaction (SIGTERM, &sa_term, NULL);
274 memset (&sa_pipe, 0, sizeof (sa_pipe));
275 sa_pipe.sa_handler = SIG_IGN;
276 sigaction (SIGPIPE, &sa_pipe, NULL);
278 memset (&sa_pipe, 0, sizeof (sa_usr1));
279 sa_usr1.sa_handler = sig_usr1_handler;
280 sigaction (SIGUSR1, &sa_usr1, NULL);
282 memset (&sa_usr2, 0, sizeof (sa_usr2));
283 sa_usr2.sa_handler = sig_usr2_handler;
284 sigaction (SIGUSR2, &sa_usr2, NULL);
286 } /* }}} void install_signal_handlers */
288 static int open_pidfile(void) /* {{{ */
289 {
290 int fd;
291 char *file;
293 file = (config_pid_file != NULL)
294 ? config_pid_file
295 : LOCALSTATEDIR "/run/rrdcached.pid";
297 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
298 if (fd < 0)
299 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
300 file, rrd_strerror(errno));
302 return(fd);
303 } /* }}} static int open_pidfile */
305 static int write_pidfile (int fd) /* {{{ */
306 {
307 pid_t pid;
308 FILE *fh;
310 pid = getpid ();
312 fh = fdopen (fd, "w");
313 if (fh == NULL)
314 {
315 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
316 close(fd);
317 return (-1);
318 }
320 fprintf (fh, "%i\n", (int) pid);
321 fclose (fh);
323 return (0);
324 } /* }}} int write_pidfile */
326 static int remove_pidfile (void) /* {{{ */
327 {
328 char *file;
329 int status;
331 file = (config_pid_file != NULL)
332 ? config_pid_file
333 : LOCALSTATEDIR "/run/rrdcached.pid";
335 status = unlink (file);
336 if (status == 0)
337 return (0);
338 return (errno);
339 } /* }}} int remove_pidfile */
341 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
342 {
343 char *eol;
345 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
346 sock->next_read - sock->next_cmd);
348 if (eol == NULL)
349 {
350 /* no commands left, move remainder back to front of rbuf */
351 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
352 sock->next_read - sock->next_cmd);
353 sock->next_read -= sock->next_cmd;
354 sock->next_cmd = 0;
355 *len = 0;
356 return NULL;
357 }
358 else
359 {
360 char *cmd = sock->rbuf + sock->next_cmd;
361 *eol = '\0';
363 sock->next_cmd = eol - sock->rbuf + 1;
365 if (eol > sock->rbuf && *(eol-1) == '\r')
366 *(--eol) = '\0'; /* handle "\r\n" EOL */
368 *len = eol - cmd;
370 return cmd;
371 }
373 /* NOTREACHED */
374 assert(1==0);
375 }
377 /* add the characters directly to the write buffer */
378 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
379 {
380 char *new_buf;
382 assert(sock != NULL);
384 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
385 if (new_buf == NULL)
386 {
387 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
388 return -1;
389 }
391 strncpy(new_buf + sock->wbuf_len, str, len + 1);
393 sock->wbuf = new_buf;
394 sock->wbuf_len += len;
396 return 0;
397 } /* }}} static int add_to_wbuf */
399 /* add the text to the "extra" info that's sent after the status line */
400 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
401 {
402 va_list argp;
403 char buffer[CMD_MAX];
404 int len;
406 if (sock == NULL) return 0; /* journal replay mode */
407 if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
409 va_start(argp, fmt);
410 #ifdef HAVE_VSNPRINTF
411 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
412 #else
413 len = vsprintf(buffer, fmt, argp);
414 #endif
415 va_end(argp);
416 if (len < 0)
417 {
418 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
419 return -1;
420 }
422 return add_to_wbuf(sock, buffer, len);
423 } /* }}} static int add_response_info */
425 static int count_lines(char *str) /* {{{ */
426 {
427 int lines = 0;
429 if (str != NULL)
430 {
431 while ((str = strchr(str, '\n')) != NULL)
432 {
433 ++lines;
434 ++str;
435 }
436 }
438 return lines;
439 } /* }}} static int count_lines */
441 /* send the response back to the user.
442 * returns 0 on success, -1 on error
443 * write buffer is always zeroed after this call */
444 static int send_response (listen_socket_t *sock, response_code rc,
445 char *fmt, ...) /* {{{ */
446 {
447 va_list argp;
448 char buffer[CMD_MAX];
449 int lines;
450 ssize_t wrote;
451 int rclen, len;
453 if (sock == NULL) return rc; /* journal replay mode */
455 if (sock->batch_mode)
456 {
457 if (rc == RESP_OK)
458 return rc; /* no response on success during BATCH */
459 lines = sock->batch_cmd;
460 }
461 else if (rc == RESP_OK)
462 lines = count_lines(sock->wbuf);
463 else
464 lines = -1;
466 rclen = sprintf(buffer, "%d ", lines);
467 va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
470 #else
471 len = vsprintf(buffer+rclen, fmt, argp);
472 #endif
473 va_end(argp);
474 if (len < 0)
475 return -1;
477 len += rclen;
479 /* append the result to the wbuf, don't write to the user */
480 if (sock->batch_mode)
481 return add_to_wbuf(sock, buffer, len);
483 /* first write must be complete */
484 if (len != write(sock->fd, buffer, len))
485 {
486 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
487 return -1;
488 }
490 if (sock->wbuf != NULL)
491 {
492 wrote = 0;
493 while (wrote < sock->wbuf_len)
494 {
495 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
496 if (wb <= 0)
497 {
498 RRDD_LOG(LOG_INFO, "send_response: could not write results");
499 return -1;
500 }
501 wrote += wb;
502 }
503 }
505 free(sock->wbuf); sock->wbuf = NULL;
506 sock->wbuf_len = 0;
508 return 0;
509 } /* }}} */
511 static void wipe_ci_values(cache_item_t *ci, time_t when)
512 {
513 ci->values = NULL;
514 ci->values_num = 0;
516 ci->last_flush_time = when;
517 if (config_write_jitter > 0)
518 ci->last_flush_time += (random() % config_write_jitter);
519 }
521 /* remove_from_queue
522 * remove a "cache_item_t" item from the queue.
523 * must hold 'cache_lock' when calling this
524 */
525 static void remove_from_queue(cache_item_t *ci) /* {{{ */
526 {
527 if (ci == NULL) return;
529 if (ci->prev == NULL)
530 cache_queue_head = ci->next; /* reset head */
531 else
532 ci->prev->next = ci->next;
534 if (ci->next == NULL)
535 cache_queue_tail = ci->prev; /* reset the tail */
536 else
537 ci->next->prev = ci->prev;
539 ci->next = ci->prev = NULL;
540 ci->flags &= ~CI_FLAGS_IN_QUEUE;
541 } /* }}} static void remove_from_queue */
543 /*
544 * enqueue_cache_item:
545 * `cache_lock' must be acquired before calling this function!
546 */
547 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
548 queue_side_t side)
549 {
550 if (ci == NULL)
551 return (-1);
553 if (ci->values_num == 0)
554 return (0);
556 if (side == HEAD)
557 {
558 if (cache_queue_head == ci)
559 return 0;
561 /* remove from the double linked list */
562 if (ci->flags & CI_FLAGS_IN_QUEUE)
563 remove_from_queue(ci);
565 ci->prev = NULL;
566 ci->next = cache_queue_head;
567 if (ci->next != NULL)
568 ci->next->prev = ci;
569 cache_queue_head = ci;
571 if (cache_queue_tail == NULL)
572 cache_queue_tail = cache_queue_head;
573 }
574 else /* (side == TAIL) */
575 {
576 /* We don't move values back in the list.. */
577 if (ci->flags & CI_FLAGS_IN_QUEUE)
578 return (0);
580 assert (ci->next == NULL);
581 assert (ci->prev == NULL);
583 ci->prev = cache_queue_tail;
585 if (cache_queue_tail == NULL)
586 cache_queue_head = ci;
587 else
588 cache_queue_tail->next = ci;
590 cache_queue_tail = ci;
591 }
593 ci->flags |= CI_FLAGS_IN_QUEUE;
595 pthread_cond_broadcast(&cache_cond);
596 pthread_mutex_lock (&stats_lock);
597 stats_queue_length++;
598 pthread_mutex_unlock (&stats_lock);
600 return (0);
601 } /* }}} int enqueue_cache_item */
603 /*
604 * tree_callback_flush:
605 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
606 * while this is in progress.
607 */
608 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
609 gpointer data)
610 {
611 cache_item_t *ci;
612 callback_flush_data_t *cfd;
614 ci = (cache_item_t *) value;
615 cfd = (callback_flush_data_t *) data;
617 if ((ci->last_flush_time <= cfd->abs_timeout)
618 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
619 && (ci->values_num > 0))
620 {
621 enqueue_cache_item (ci, TAIL);
622 }
623 else if ((do_shutdown != 0)
624 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
625 && (ci->values_num > 0))
626 {
627 enqueue_cache_item (ci, TAIL);
628 }
629 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
630 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
631 && (ci->values_num <= 0))
632 {
633 char **temp;
635 temp = (char **) realloc (cfd->keys,
636 sizeof (char *) * (cfd->keys_num + 1));
637 if (temp == NULL)
638 {
639 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
640 return (FALSE);
641 }
642 cfd->keys = temp;
643 /* Make really sure this points to the _same_ place */
644 assert ((char *) key == ci->file);
645 cfd->keys[cfd->keys_num] = (char *) key;
646 cfd->keys_num++;
647 }
649 return (FALSE);
650 } /* }}} gboolean tree_callback_flush */
652 static int flush_old_values (int max_age)
653 {
654 callback_flush_data_t cfd;
655 size_t k;
657 memset (&cfd, 0, sizeof (cfd));
658 /* Pass the current time as user data so that we don't need to call
659 * `time' for each node. */
660 cfd.now = time (NULL);
661 cfd.keys = NULL;
662 cfd.keys_num = 0;
664 if (max_age > 0)
665 cfd.abs_timeout = cfd.now - max_age;
666 else
667 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
669 /* `tree_callback_flush' will return the keys of all values that haven't
670 * been touched in the last `config_flush_interval' seconds in `cfd'.
671 * The char*'s in this array point to the same memory as ci->file, so we
672 * don't need to free them separately. */
673 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
675 for (k = 0; k < cfd.keys_num; k++)
676 {
677 cache_item_t *ci;
679 /* This must not fail. */
680 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
681 assert (ci != NULL);
683 /* If we end up here with values available, something's seriously
684 * messed up. */
685 assert (ci->values_num == 0);
687 /* Remove the node from the tree */
688 g_tree_remove (cache_tree, cfd.keys[k]);
689 cfd.keys[k] = NULL;
691 /* Now free and clean up `ci'. */
692 free (ci->file);
693 ci->file = NULL;
694 free (ci);
695 ci = NULL;
696 } /* for (k = 0; k < cfd.keys_num; k++) */
698 if (cfd.keys != NULL)
699 {
700 free (cfd.keys);
701 cfd.keys = NULL;
702 }
704 return (0);
705 } /* int flush_old_values */
707 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
708 {
709 struct timeval now;
710 struct timespec next_flush;
711 int final_flush = 0; /* make sure we only flush once on shutdown */
713 gettimeofday (&now, NULL);
714 next_flush.tv_sec = now.tv_sec + config_flush_interval;
715 next_flush.tv_nsec = 1000 * now.tv_usec;
717 pthread_mutex_lock (&cache_lock);
718 while ((do_shutdown == 0) || (cache_queue_head != NULL))
719 {
720 cache_item_t *ci;
721 char *file;
722 char **values;
723 int values_num;
724 int status;
725 int i;
727 /* First, check if it's time to do the cache flush. */
728 gettimeofday (&now, NULL);
729 if ((now.tv_sec > next_flush.tv_sec)
730 || ((now.tv_sec == next_flush.tv_sec)
731 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
732 {
733 /* Flush all values that haven't been written in the last
734 * `config_write_interval' seconds. */
735 flush_old_values (config_write_interval);
737 /* Determine the time of the next cache flush. */
738 while (next_flush.tv_sec <= now.tv_sec)
739 next_flush.tv_sec += config_flush_interval;
741 /* unlock the cache while we rotate so we don't block incoming
742 * updates if the fsync() blocks on disk I/O */
743 pthread_mutex_unlock(&cache_lock);
744 journal_rotate();
745 pthread_mutex_lock(&cache_lock);
746 }
748 /* Now, check if there's something to store away. If not, wait until
749 * something comes in or it's time to do the cache flush. if we are
750 * shutting down, do not wait around. */
751 if (cache_queue_head == NULL && !do_shutdown)
752 {
753 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
754 if ((status != 0) && (status != ETIMEDOUT))
755 {
756 RRDD_LOG (LOG_ERR, "queue_thread_main: "
757 "pthread_cond_timedwait returned %i.", status);
758 }
759 }
761 /* We're about to shut down */
762 if (do_shutdown != 0 && !final_flush++)
763 {
764 if (config_flush_at_shutdown)
765 flush_old_values (-1); /* flush everything */
766 else
767 break;
768 }
770 /* Check if a value has arrived. This may be NULL if we timed out or there
771 * was an interrupt such as a signal. */
772 if (cache_queue_head == NULL)
773 continue;
775 ci = cache_queue_head;
777 /* copy the relevant parts */
778 file = strdup (ci->file);
779 if (file == NULL)
780 {
781 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
782 continue;
783 }
785 assert(ci->values != NULL);
786 assert(ci->values_num > 0);
788 values = ci->values;
789 values_num = ci->values_num;
791 wipe_ci_values(ci, time(NULL));
792 remove_from_queue(ci);
794 pthread_mutex_lock (&stats_lock);
795 assert (stats_queue_length > 0);
796 stats_queue_length--;
797 pthread_mutex_unlock (&stats_lock);
799 pthread_mutex_unlock (&cache_lock);
801 rrd_clear_error ();
802 status = rrd_update_r (file, NULL, values_num, (void *) values);
803 if (status != 0)
804 {
805 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
806 "rrd_update_r (%s) failed with status %i. (%s)",
807 file, status, rrd_get_error());
808 }
810 journal_write("wrote", file);
811 pthread_cond_broadcast(&ci->flushed);
813 for (i = 0; i < values_num; i++)
814 free (values[i]);
816 free(values);
817 free(file);
819 if (status == 0)
820 {
821 pthread_mutex_lock (&stats_lock);
822 stats_updates_written++;
823 stats_data_sets_written += values_num;
824 pthread_mutex_unlock (&stats_lock);
825 }
827 pthread_mutex_lock (&cache_lock);
829 /* We're about to shut down */
830 if (do_shutdown != 0 && !final_flush++)
831 {
832 if (config_flush_at_shutdown)
833 flush_old_values (-1); /* flush everything */
834 else
835 break;
836 }
837 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
838 pthread_mutex_unlock (&cache_lock);
840 if (config_flush_at_shutdown)
841 {
842 assert(cache_queue_head == NULL);
843 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
844 }
846 journal_done();
848 return (NULL);
849 } /* }}} void *queue_thread_main */
851 static int buffer_get_field (char **buffer_ret, /* {{{ */
852 size_t *buffer_size_ret, char **field_ret)
853 {
854 char *buffer;
855 size_t buffer_pos;
856 size_t buffer_size;
857 char *field;
858 size_t field_size;
859 int status;
861 buffer = *buffer_ret;
862 buffer_pos = 0;
863 buffer_size = *buffer_size_ret;
864 field = *buffer_ret;
865 field_size = 0;
867 if (buffer_size <= 0)
868 return (-1);
870 /* This is ensured by `handle_request'. */
871 assert (buffer[buffer_size - 1] == '\0');
873 status = -1;
874 while (buffer_pos < buffer_size)
875 {
876 /* Check for end-of-field or end-of-buffer */
877 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
878 {
879 field[field_size] = 0;
880 field_size++;
881 buffer_pos++;
882 status = 0;
883 break;
884 }
885 /* Handle escaped characters. */
886 else if (buffer[buffer_pos] == '\\')
887 {
888 if (buffer_pos >= (buffer_size - 1))
889 break;
890 buffer_pos++;
891 field[field_size] = buffer[buffer_pos];
892 field_size++;
893 buffer_pos++;
894 }
895 /* Normal operation */
896 else
897 {
898 field[field_size] = buffer[buffer_pos];
899 field_size++;
900 buffer_pos++;
901 }
902 } /* while (buffer_pos < buffer_size) */
904 if (status != 0)
905 return (status);
907 *buffer_ret = buffer + buffer_pos;
908 *buffer_size_ret = buffer_size - buffer_pos;
909 *field_ret = field;
911 return (0);
912 } /* }}} int buffer_get_field */
914 /* if we're restricting writes to the base directory,
915 * check whether the file falls within the dir
916 * returns 1 if OK, otherwise 0
917 */
918 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
919 {
920 assert(file != NULL);
922 if (!config_write_base_only
923 || sock == NULL /* journal replay */
924 || config_base_dir == NULL)
925 return 1;
927 if (strstr(file, "../") != NULL) goto err;
929 /* relative paths without "../" are ok */
930 if (*file != '/') return 1;
932 /* file must be of the format base + "/" + <1+ char filename> */
933 if (strlen(file) < _config_base_dir_len + 2) goto err;
934 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
935 if (*(file + _config_base_dir_len) != '/') goto err;
937 return 1;
939 err:
940 if (sock != NULL && sock->fd >= 0)
941 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
943 return 0;
944 } /* }}} static int check_file_access */
946 static int flush_file (const char *filename) /* {{{ */
947 {
948 cache_item_t *ci;
950 pthread_mutex_lock (&cache_lock);
952 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
953 if (ci == NULL)
954 {
955 pthread_mutex_unlock (&cache_lock);
956 return (ENOENT);
957 }
959 if (ci->values_num > 0)
960 {
961 /* Enqueue at head */
962 enqueue_cache_item (ci, HEAD);
963 pthread_cond_wait(&ci->flushed, &cache_lock);
964 }
966 pthread_mutex_unlock(&cache_lock);
968 return (0);
969 } /* }}} int flush_file */
971 static int handle_request_help (listen_socket_t *sock, /* {{{ */
972 char *buffer, size_t buffer_size)
973 {
974 int status;
975 char **help_text;
976 char *command;
978 char *help_help[2] =
979 {
980 "Command overview\n"
981 ,
982 "FLUSH <filename>\n"
983 "FLUSHALL\n"
984 "HELP [<command>]\n"
985 "UPDATE <filename> <values> [<values> ...]\n"
986 "BATCH\n"
987 "STATS\n"
988 };
990 char *help_flush[2] =
991 {
992 "Help for FLUSH\n"
993 ,
994 "Usage: FLUSH <filename>\n"
995 "\n"
996 "Adds the given filename to the head of the update queue and returns\n"
997 "after is has been dequeued.\n"
998 };
1000 char *help_flushall[2] =
1001 {
1002 "Help for FLUSHALL\n"
1003 ,
1004 "Usage: FLUSHALL\n"
1005 "\n"
1006 "Triggers writing of all pending updates. Returns immediately.\n"
1007 };
1009 char *help_update[2] =
1010 {
1011 "Help for UPDATE\n"
1012 ,
1013 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1014 "\n"
1015 "Adds the given file to the internal cache if it is not yet known and\n"
1016 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1017 "for details.\n"
1018 "\n"
1019 "Each <values> has the following form:\n"
1020 " <values> = <time>:<value>[:<value>[...]]\n"
1021 "See the rrdupdate(1) manpage for details.\n"
1022 };
1024 char *help_stats[2] =
1025 {
1026 "Help for STATS\n"
1027 ,
1028 "Usage: STATS\n"
1029 "\n"
1030 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1031 "a description of the values.\n"
1032 };
1034 char *help_batch[2] =
1035 {
1036 "Help for BATCH\n"
1037 ,
1038 "The 'BATCH' command permits the client to initiate a bulk load\n"
1039 " of commands to rrdcached.\n"
1040 "\n"
1041 "Usage:\n"
1042 "\n"
1043 " client: BATCH\n"
1044 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1045 " client: command #1\n"
1046 " client: command #2\n"
1047 " client: ... and so on\n"
1048 " client: .\n"
1049 " server: 2 errors\n"
1050 " server: 7 message for command #7\n"
1051 " server: 9 message for command #9\n"
1052 "\n"
1053 "For more information, consult the rrdcached(1) documentation.\n"
1054 };
1056 status = buffer_get_field (&buffer, &buffer_size, &command);
1057 if (status != 0)
1058 help_text = help_help;
1059 else
1060 {
1061 if (strcasecmp (command, "update") == 0)
1062 help_text = help_update;
1063 else if (strcasecmp (command, "flush") == 0)
1064 help_text = help_flush;
1065 else if (strcasecmp (command, "flushall") == 0)
1066 help_text = help_flushall;
1067 else if (strcasecmp (command, "stats") == 0)
1068 help_text = help_stats;
1069 else if (strcasecmp (command, "batch") == 0)
1070 help_text = help_batch;
1071 else
1072 help_text = help_help;
1073 }
1075 add_response_info(sock, help_text[1]);
1076 return send_response(sock, RESP_OK, help_text[0]);
1077 } /* }}} int handle_request_help */
1079 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1080 {
1081 uint64_t copy_queue_length;
1082 uint64_t copy_updates_received;
1083 uint64_t copy_flush_received;
1084 uint64_t copy_updates_written;
1085 uint64_t copy_data_sets_written;
1086 uint64_t copy_journal_bytes;
1087 uint64_t copy_journal_rotate;
1089 uint64_t tree_nodes_number;
1090 uint64_t tree_depth;
1092 pthread_mutex_lock (&stats_lock);
1093 copy_queue_length = stats_queue_length;
1094 copy_updates_received = stats_updates_received;
1095 copy_flush_received = stats_flush_received;
1096 copy_updates_written = stats_updates_written;
1097 copy_data_sets_written = stats_data_sets_written;
1098 copy_journal_bytes = stats_journal_bytes;
1099 copy_journal_rotate = stats_journal_rotate;
1100 pthread_mutex_unlock (&stats_lock);
1102 pthread_mutex_lock (&cache_lock);
1103 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1104 tree_depth = (uint64_t) g_tree_height (cache_tree);
1105 pthread_mutex_unlock (&cache_lock);
1107 add_response_info(sock,
1108 "QueueLength: %"PRIu64"\n", copy_queue_length);
1109 add_response_info(sock,
1110 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1111 add_response_info(sock,
1112 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1113 add_response_info(sock,
1114 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1115 add_response_info(sock,
1116 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1117 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1118 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1119 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1120 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1122 send_response(sock, RESP_OK, "Statistics follow\n");
1124 return (0);
1125 } /* }}} int handle_request_stats */
1127 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1128 char *buffer, size_t buffer_size)
1129 {
1130 char *file;
1131 int status;
1133 status = buffer_get_field (&buffer, &buffer_size, &file);
1134 if (status != 0)
1135 {
1136 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1137 }
1138 else
1139 {
1140 pthread_mutex_lock(&stats_lock);
1141 stats_flush_received++;
1142 pthread_mutex_unlock(&stats_lock);
1144 if (!check_file_access(file, sock)) return 0;
1146 status = flush_file (file);
1147 if (status == 0)
1148 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1149 else if (status == ENOENT)
1150 {
1151 /* no file in our tree; see whether it exists at all */
1152 struct stat statbuf;
1154 memset(&statbuf, 0, sizeof(statbuf));
1155 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1156 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1157 else
1158 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1159 }
1160 else if (status < 0)
1161 return send_response(sock, RESP_ERR, "Internal error.\n");
1162 else
1163 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1164 }
1166 /* NOTREACHED */
1167 assert(1==0);
1168 } /* }}} int handle_request_slurp */
1170 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1171 {
1173 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1175 pthread_mutex_lock(&cache_lock);
1176 flush_old_values(-1);
1177 pthread_mutex_unlock(&cache_lock);
1179 return send_response(sock, RESP_OK, "Started flush.\n");
1180 } /* }}} static int handle_request_flushall */
1182 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1183 char *buffer, size_t buffer_size)
1184 {
1185 char *file;
1186 int values_num = 0;
1187 int status;
1189 time_t now;
1190 cache_item_t *ci;
1192 now = time (NULL);
1194 status = buffer_get_field (&buffer, &buffer_size, &file);
1195 if (status != 0)
1196 return send_response(sock, RESP_ERR,
1197 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1199 pthread_mutex_lock(&stats_lock);
1200 stats_updates_received++;
1201 pthread_mutex_unlock(&stats_lock);
1203 if (!check_file_access(file, sock)) return 0;
1205 pthread_mutex_lock (&cache_lock);
1206 ci = g_tree_lookup (cache_tree, file);
1208 if (ci == NULL) /* {{{ */
1209 {
1210 struct stat statbuf;
1212 /* don't hold the lock while we setup; stat(2) might block */
1213 pthread_mutex_unlock(&cache_lock);
1215 memset (&statbuf, 0, sizeof (statbuf));
1216 status = stat (file, &statbuf);
1217 if (status != 0)
1218 {
1219 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1221 status = errno;
1222 if (status == ENOENT)
1223 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1224 else
1225 return send_response(sock, RESP_ERR,
1226 "stat failed with error %i.\n", status);
1227 }
1228 if (!S_ISREG (statbuf.st_mode))
1229 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1231 if (access(file, R_OK|W_OK) != 0)
1232 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1233 file, rrd_strerror(errno));
1235 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1236 if (ci == NULL)
1237 {
1238 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1240 return send_response(sock, RESP_ERR, "malloc failed.\n");
1241 }
1242 memset (ci, 0, sizeof (cache_item_t));
1244 ci->file = strdup (file);
1245 if (ci->file == NULL)
1246 {
1247 free (ci);
1248 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1250 return send_response(sock, RESP_ERR, "strdup failed.\n");
1251 }
1253 wipe_ci_values(ci, now);
1254 ci->flags = CI_FLAGS_IN_TREE;
1256 pthread_mutex_lock(&cache_lock);
1257 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1258 } /* }}} */
1259 assert (ci != NULL);
1261 while (buffer_size > 0)
1262 {
1263 char **temp;
1264 char *value;
1266 status = buffer_get_field (&buffer, &buffer_size, &value);
1267 if (status != 0)
1268 {
1269 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1270 break;
1271 }
1273 temp = (char **) realloc (ci->values,
1274 sizeof (char *) * (ci->values_num + 1));
1275 if (temp == NULL)
1276 {
1277 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1278 continue;
1279 }
1280 ci->values = temp;
1282 ci->values[ci->values_num] = strdup (value);
1283 if (ci->values[ci->values_num] == NULL)
1284 {
1285 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1286 continue;
1287 }
1288 ci->values_num++;
1290 values_num++;
1291 }
1293 if (((now - ci->last_flush_time) >= config_write_interval)
1294 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1295 && (ci->values_num > 0))
1296 {
1297 enqueue_cache_item (ci, TAIL);
1298 }
1300 pthread_mutex_unlock (&cache_lock);
1302 if (values_num < 1)
1303 return send_response(sock, RESP_ERR, "No values updated.\n");
1304 else
1305 return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1307 /* NOTREACHED */
1308 assert(1==0);
1310 } /* }}} int handle_request_update */
1312 /* we came across a "WROTE" entry during journal replay.
1313 * throw away any values that we have accumulated for this file
1314 */
1315 static int handle_request_wrote (const char *buffer) /* {{{ */
1316 {
1317 int i;
1318 cache_item_t *ci;
1319 const char *file = buffer;
1321 pthread_mutex_lock(&cache_lock);
1323 ci = g_tree_lookup(cache_tree, file);
1324 if (ci == NULL)
1325 {
1326 pthread_mutex_unlock(&cache_lock);
1327 return (0);
1328 }
1330 if (ci->values)
1331 {
1332 for (i=0; i < ci->values_num; i++)
1333 free(ci->values[i]);
1335 free(ci->values);
1336 }
1338 wipe_ci_values(ci, time(NULL));
1339 remove_from_queue(ci);
1341 pthread_mutex_unlock(&cache_lock);
1342 return (0);
1343 } /* }}} int handle_request_wrote */
1345 /* start "BATCH" processing */
1346 static int batch_start (listen_socket_t *sock) /* {{{ */
1347 {
1348 int status;
1349 if (sock->batch_mode)
1350 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1352 status = send_response(sock, RESP_OK,
1353 "Go ahead. End with dot '.' on its own line.\n");
1354 sock->batch_mode = 1;
1355 sock->batch_cmd = 0;
1357 return status;
1358 } /* }}} static int batch_start */
1360 /* finish "BATCH" processing and return results to the client */
1361 static int batch_done (listen_socket_t *sock) /* {{{ */
1362 {
1363 assert(sock->batch_mode);
1364 sock->batch_mode = 0;
1365 sock->batch_cmd = 0;
1366 return send_response(sock, RESP_OK, "errors\n");
1367 } /* }}} static int batch_done */
1369 /* returns 1 if we have the required privilege level */
1370 static int has_privilege (listen_socket_t *sock, /* {{{ */
1371 socket_privilege priv)
1372 {
1373 if (sock == NULL) /* journal replay */
1374 return 1;
1376 if (sock->privilege >= priv)
1377 return 1;
1379 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1380 } /* }}} static int has_privilege */
1382 /* if sock==NULL, we are in journal replay mode */
1383 static int handle_request (listen_socket_t *sock, /* {{{ */
1384 char *buffer, size_t buffer_size)
1385 {
1386 char *buffer_ptr;
1387 char *command;
1388 int status;
1390 assert (buffer[buffer_size - 1] == '\0');
1392 buffer_ptr = buffer;
1393 command = NULL;
1394 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1395 if (status != 0)
1396 {
1397 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1398 return (-1);
1399 }
1401 if (sock != NULL && sock->batch_mode)
1402 sock->batch_cmd++;
1404 if (strcasecmp (command, "update") == 0)
1405 {
1406 status = has_privilege(sock, PRIV_HIGH);
1407 if (status <= 0)
1408 return status;
1410 /* don't re-write updates in replay mode */
1411 if (sock != NULL)
1412 journal_write(command, buffer_ptr);
1414 return (handle_request_update (sock, buffer_ptr, buffer_size));
1415 }
1416 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1417 {
1418 /* this is only valid in replay mode */
1419 return (handle_request_wrote (buffer_ptr));
1420 }
1421 else if (strcasecmp (command, "flush") == 0)
1422 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1423 else if (strcasecmp (command, "flushall") == 0)
1424 {
1425 status = has_privilege(sock, PRIV_HIGH);
1426 if (status <= 0)
1427 return status;
1429 return (handle_request_flushall(sock));
1430 }
1431 else if (strcasecmp (command, "stats") == 0)
1432 return (handle_request_stats (sock));
1433 else if (strcasecmp (command, "help") == 0)
1434 return (handle_request_help (sock, buffer_ptr, buffer_size));
1435 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1436 return batch_start(sock);
1437 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1438 return batch_done(sock);
1439 else
1440 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1442 /* NOTREACHED */
1443 assert(1==0);
1444 } /* }}} int handle_request */
1446 /* MUST NOT hold journal_lock before calling this */
1447 static void journal_rotate(void) /* {{{ */
1448 {
1449 FILE *old_fh = NULL;
1450 int new_fd;
1452 if (journal_cur == NULL || journal_old == NULL)
1453 return;
1455 pthread_mutex_lock(&journal_lock);
1457 /* we rotate this way (rename before close) so that the we can release
1458 * the journal lock as fast as possible. Journal writes to the new
1459 * journal can proceed immediately after the new file is opened. The
1460 * fclose can then block without affecting new updates.
1461 */
1462 if (journal_fh != NULL)
1463 {
1464 old_fh = journal_fh;
1465 journal_fh = NULL;
1466 rename(journal_cur, journal_old);
1467 ++stats_journal_rotate;
1468 }
1470 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1471 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1472 if (new_fd >= 0)
1473 {
1474 journal_fh = fdopen(new_fd, "a");
1475 if (journal_fh == NULL)
1476 close(new_fd);
1477 }
1479 pthread_mutex_unlock(&journal_lock);
1481 if (old_fh != NULL)
1482 fclose(old_fh);
1484 if (journal_fh == NULL)
1485 {
1486 RRDD_LOG(LOG_CRIT,
1487 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1488 journal_cur, rrd_strerror(errno));
1490 RRDD_LOG(LOG_ERR,
1491 "JOURNALING DISABLED: All values will be flushed at shutdown");
1492 config_flush_at_shutdown = 1;
1493 }
1495 } /* }}} static void journal_rotate */
1497 static void journal_done(void) /* {{{ */
1498 {
1499 if (journal_cur == NULL)
1500 return;
1502 pthread_mutex_lock(&journal_lock);
1503 if (journal_fh != NULL)
1504 {
1505 fclose(journal_fh);
1506 journal_fh = NULL;
1507 }
1509 if (config_flush_at_shutdown)
1510 {
1511 RRDD_LOG(LOG_INFO, "removing journals");
1512 unlink(journal_old);
1513 unlink(journal_cur);
1514 }
1515 else
1516 {
1517 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1518 "journals will be used at next startup");
1519 }
1521 pthread_mutex_unlock(&journal_lock);
1523 } /* }}} static void journal_done */
1525 static int journal_write(char *cmd, char *args) /* {{{ */
1526 {
1527 int chars;
1529 if (journal_fh == NULL)
1530 return 0;
1532 pthread_mutex_lock(&journal_lock);
1533 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1534 pthread_mutex_unlock(&journal_lock);
1536 if (chars > 0)
1537 {
1538 pthread_mutex_lock(&stats_lock);
1539 stats_journal_bytes += chars;
1540 pthread_mutex_unlock(&stats_lock);
1541 }
1543 return chars;
1544 } /* }}} static int journal_write */
1546 static int journal_replay (const char *file) /* {{{ */
1547 {
1548 FILE *fh;
1549 int entry_cnt = 0;
1550 int fail_cnt = 0;
1551 uint64_t line = 0;
1552 char entry[CMD_MAX];
1554 if (file == NULL) return 0;
1556 {
1557 char *reason;
1558 int status = 0;
1559 struct stat statbuf;
1561 memset(&statbuf, 0, sizeof(statbuf));
1562 if (stat(file, &statbuf) != 0)
1563 {
1564 if (errno == ENOENT)
1565 return 0;
1567 reason = "stat error";
1568 status = errno;
1569 }
1570 else if (!S_ISREG(statbuf.st_mode))
1571 {
1572 reason = "not a regular file";
1573 status = EPERM;
1574 }
1575 if (statbuf.st_uid != daemon_uid)
1576 {
1577 reason = "not owned by daemon user";
1578 status = EACCES;
1579 }
1580 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1581 {
1582 reason = "must not be user/group writable";
1583 status = EACCES;
1584 }
1586 if (status != 0)
1587 {
1588 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1589 file, rrd_strerror(status), reason);
1590 return 0;
1591 }
1592 }
1594 fh = fopen(file, "r");
1595 if (fh == NULL)
1596 {
1597 if (errno != ENOENT)
1598 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1599 file, rrd_strerror(errno));
1600 return 0;
1601 }
1602 else
1603 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1605 while(!feof(fh))
1606 {
1607 size_t entry_len;
1609 ++line;
1610 if (fgets(entry, sizeof(entry), fh) == NULL)
1611 break;
1612 entry_len = strlen(entry);
1614 /* check \n termination in case journal writing crashed mid-line */
1615 if (entry_len == 0)
1616 continue;
1617 else if (entry[entry_len - 1] != '\n')
1618 {
1619 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1620 ++fail_cnt;
1621 continue;
1622 }
1624 entry[entry_len - 1] = '\0';
1626 if (handle_request(NULL, entry, entry_len) == 0)
1627 ++entry_cnt;
1628 else
1629 ++fail_cnt;
1630 }
1632 fclose(fh);
1634 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1635 entry_cnt, fail_cnt);
1637 return entry_cnt > 0 ? 1 : 0;
1638 } /* }}} static int journal_replay */
1640 static void journal_init(void) /* {{{ */
1641 {
1642 int had_journal = 0;
1644 if (journal_cur == NULL) return;
1646 pthread_mutex_lock(&journal_lock);
1648 RRDD_LOG(LOG_INFO, "checking for journal files");
1650 had_journal += journal_replay(journal_old);
1651 had_journal += journal_replay(journal_cur);
1653 /* it must have been a crash. start a flush */
1654 if (had_journal && config_flush_at_shutdown)
1655 flush_old_values(-1);
1657 pthread_mutex_unlock(&journal_lock);
1658 journal_rotate();
1660 RRDD_LOG(LOG_INFO, "journal processing complete");
1662 } /* }}} static void journal_init */
1664 static void close_connection(listen_socket_t *sock)
1665 {
1666 close(sock->fd) ; sock->fd = -1;
1667 free(sock->rbuf); sock->rbuf = NULL;
1668 free(sock->wbuf); sock->wbuf = NULL;
1670 free(sock);
1671 }
1673 static void *connection_thread_main (void *args) /* {{{ */
1674 {
1675 pthread_t self;
1676 listen_socket_t *sock;
1677 int i;
1678 int fd;
1680 sock = (listen_socket_t *) args;
1681 fd = sock->fd;
1683 /* init read buffers */
1684 sock->next_read = sock->next_cmd = 0;
1685 sock->rbuf = malloc(RBUF_SIZE);
1686 if (sock->rbuf == NULL)
1687 {
1688 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1689 close_connection(sock);
1690 return NULL;
1691 }
1693 pthread_mutex_lock (&connection_threads_lock);
1694 {
1695 pthread_t *temp;
1697 temp = (pthread_t *) realloc (connection_threads,
1698 sizeof (pthread_t) * (connection_threads_num + 1));
1699 if (temp == NULL)
1700 {
1701 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1702 }
1703 else
1704 {
1705 connection_threads = temp;
1706 connection_threads[connection_threads_num] = pthread_self ();
1707 connection_threads_num++;
1708 }
1709 }
1710 pthread_mutex_unlock (&connection_threads_lock);
1712 while (do_shutdown == 0)
1713 {
1714 char *cmd;
1715 ssize_t cmd_len;
1716 ssize_t rbytes;
1718 struct pollfd pollfd;
1719 int status;
1721 pollfd.fd = fd;
1722 pollfd.events = POLLIN | POLLPRI;
1723 pollfd.revents = 0;
1725 status = poll (&pollfd, 1, /* timeout = */ 500);
1726 if (do_shutdown)
1727 break;
1728 else if (status == 0) /* timeout */
1729 continue;
1730 else if (status < 0) /* error */
1731 {
1732 status = errno;
1733 if (status == EINTR)
1734 continue;
1735 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1736 continue;
1737 }
1739 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1740 {
1741 close_connection(sock);
1742 break;
1743 }
1744 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1745 {
1746 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1747 "poll(2) returned something unexpected: %#04hx",
1748 pollfd.revents);
1749 close_connection(sock);
1750 break;
1751 }
1753 rbytes = read(fd, sock->rbuf + sock->next_read,
1754 RBUF_SIZE - sock->next_read);
1755 if (rbytes < 0)
1756 {
1757 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1758 break;
1759 }
1760 else if (rbytes == 0)
1761 break; /* eof */
1763 sock->next_read += rbytes;
1765 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1766 {
1767 status = handle_request (sock, cmd, cmd_len+1);
1768 if (status != 0)
1769 goto out_close;
1770 }
1771 }
1773 out_close:
1774 close_connection(sock);
1776 self = pthread_self ();
1777 /* Remove this thread from the connection threads list */
1778 pthread_mutex_lock (&connection_threads_lock);
1779 /* Find out own index in the array */
1780 for (i = 0; i < connection_threads_num; i++)
1781 if (pthread_equal (connection_threads[i], self) != 0)
1782 break;
1783 assert (i < connection_threads_num);
1785 /* Move the trailing threads forward. */
1786 if (i < (connection_threads_num - 1))
1787 {
1788 memmove (connection_threads + i,
1789 connection_threads + i + 1,
1790 sizeof (pthread_t) * (connection_threads_num - i - 1));
1791 }
1793 connection_threads_num--;
1794 pthread_mutex_unlock (&connection_threads_lock);
1796 return (NULL);
1797 } /* }}} void *connection_thread_main */
1799 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1800 {
1801 int fd;
1802 struct sockaddr_un sa;
1803 listen_socket_t *temp;
1804 int status;
1805 const char *path;
1807 path = sock->addr;
1808 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1809 path += strlen("unix:");
1811 temp = (listen_socket_t *) realloc (listen_fds,
1812 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1813 if (temp == NULL)
1814 {
1815 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1816 return (-1);
1817 }
1818 listen_fds = temp;
1819 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1821 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1822 if (fd < 0)
1823 {
1824 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1825 return (-1);
1826 }
1828 memset (&sa, 0, sizeof (sa));
1829 sa.sun_family = AF_UNIX;
1830 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1832 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1833 if (status != 0)
1834 {
1835 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1836 close (fd);
1837 unlink (path);
1838 return (-1);
1839 }
1841 status = listen (fd, /* backlog = */ 10);
1842 if (status != 0)
1843 {
1844 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1845 close (fd);
1846 unlink (path);
1847 return (-1);
1848 }
1850 listen_fds[listen_fds_num].fd = fd;
1851 listen_fds[listen_fds_num].family = PF_UNIX;
1852 strncpy(listen_fds[listen_fds_num].addr, path,
1853 sizeof (listen_fds[listen_fds_num].addr) - 1);
1854 listen_fds_num++;
1856 return (0);
1857 } /* }}} int open_listen_socket_unix */
1859 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1860 {
1861 struct addrinfo ai_hints;
1862 struct addrinfo *ai_res;
1863 struct addrinfo *ai_ptr;
1864 char addr_copy[NI_MAXHOST];
1865 char *addr;
1866 char *port;
1867 int status;
1869 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1870 addr_copy[sizeof (addr_copy) - 1] = 0;
1871 addr = addr_copy;
1873 memset (&ai_hints, 0, sizeof (ai_hints));
1874 ai_hints.ai_flags = 0;
1875 #ifdef AI_ADDRCONFIG
1876 ai_hints.ai_flags |= AI_ADDRCONFIG;
1877 #endif
1878 ai_hints.ai_family = AF_UNSPEC;
1879 ai_hints.ai_socktype = SOCK_STREAM;
1881 port = NULL;
1882 if (*addr == '[') /* IPv6+port format */
1883 {
1884 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1885 addr++;
1887 port = strchr (addr, ']');
1888 if (port == NULL)
1889 {
1890 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1891 sock->addr);
1892 return (-1);
1893 }
1894 *port = 0;
1895 port++;
1897 if (*port == ':')
1898 port++;
1899 else if (*port == 0)
1900 port = NULL;
1901 else
1902 {
1903 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1904 port);
1905 return (-1);
1906 }
1907 } /* if (*addr = ']') */
1908 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1909 {
1910 port = rindex(addr, ':');
1911 if (port != NULL)
1912 {
1913 *port = 0;
1914 port++;
1915 }
1916 }
1917 ai_res = NULL;
1918 status = getaddrinfo (addr,
1919 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1920 &ai_hints, &ai_res);
1921 if (status != 0)
1922 {
1923 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1924 "%s", addr, gai_strerror (status));
1925 return (-1);
1926 }
1928 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1929 {
1930 int fd;
1931 listen_socket_t *temp;
1932 int one = 1;
1934 temp = (listen_socket_t *) realloc (listen_fds,
1935 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1936 if (temp == NULL)
1937 {
1938 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1939 continue;
1940 }
1941 listen_fds = temp;
1942 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1944 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1945 if (fd < 0)
1946 {
1947 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1948 continue;
1949 }
1951 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1953 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1954 if (status != 0)
1955 {
1956 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1957 close (fd);
1958 continue;
1959 }
1961 status = listen (fd, /* backlog = */ 10);
1962 if (status != 0)
1963 {
1964 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1965 close (fd);
1966 return (-1);
1967 }
1969 listen_fds[listen_fds_num].fd = fd;
1970 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1971 listen_fds_num++;
1972 } /* for (ai_ptr) */
1974 return (0);
1975 } /* }}} static int open_listen_socket_network */
1977 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1978 {
1979 assert(sock != NULL);
1980 assert(sock->addr != NULL);
1982 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1983 || sock->addr[0] == '/')
1984 return (open_listen_socket_unix(sock));
1985 else
1986 return (open_listen_socket_network(sock));
1987 } /* }}} int open_listen_socket */
1989 static int close_listen_sockets (void) /* {{{ */
1990 {
1991 size_t i;
1993 for (i = 0; i < listen_fds_num; i++)
1994 {
1995 close (listen_fds[i].fd);
1997 if (listen_fds[i].family == PF_UNIX)
1998 unlink(listen_fds[i].addr);
1999 }
2001 free (listen_fds);
2002 listen_fds = NULL;
2003 listen_fds_num = 0;
2005 return (0);
2006 } /* }}} int close_listen_sockets */
2008 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2009 {
2010 struct pollfd *pollfds;
2011 int pollfds_num;
2012 int status;
2013 int i;
2015 for (i = 0; i < config_listen_address_list_len; i++)
2016 open_listen_socket (config_listen_address_list[i]);
2018 if (config_listen_address_list_len < 1)
2019 {
2020 listen_socket_t sock;
2021 memset(&sock, 0, sizeof(sock));
2022 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2023 open_listen_socket (&sock);
2024 }
2026 if (listen_fds_num < 1)
2027 {
2028 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2029 "could be opened. Sorry.");
2030 return (NULL);
2031 }
2033 pollfds_num = listen_fds_num;
2034 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2035 if (pollfds == NULL)
2036 {
2037 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2038 return (NULL);
2039 }
2040 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2042 RRDD_LOG(LOG_INFO, "listening for connections");
2044 while (do_shutdown == 0)
2045 {
2046 assert (pollfds_num == ((int) listen_fds_num));
2047 for (i = 0; i < pollfds_num; i++)
2048 {
2049 pollfds[i].fd = listen_fds[i].fd;
2050 pollfds[i].events = POLLIN | POLLPRI;
2051 pollfds[i].revents = 0;
2052 }
2054 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2055 if (do_shutdown)
2056 break;
2057 else if (status == 0) /* timeout */
2058 continue;
2059 else if (status < 0) /* error */
2060 {
2061 status = errno;
2062 if (status != EINTR)
2063 {
2064 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2065 }
2066 continue;
2067 }
2069 for (i = 0; i < pollfds_num; i++)
2070 {
2071 listen_socket_t *client_sock;
2072 struct sockaddr_storage client_sa;
2073 socklen_t client_sa_size;
2074 pthread_t tid;
2075 pthread_attr_t attr;
2077 if (pollfds[i].revents == 0)
2078 continue;
2080 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2081 {
2082 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2083 "poll(2) returned something unexpected for listen FD #%i.",
2084 pollfds[i].fd);
2085 continue;
2086 }
2088 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2089 if (client_sock == NULL)
2090 {
2091 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2092 continue;
2093 }
2094 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2096 client_sa_size = sizeof (client_sa);
2097 client_sock->fd = accept (pollfds[i].fd,
2098 (struct sockaddr *) &client_sa, &client_sa_size);
2099 if (client_sock->fd < 0)
2100 {
2101 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2102 free(client_sock);
2103 continue;
2104 }
2106 pthread_attr_init (&attr);
2107 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2109 status = pthread_create (&tid, &attr, connection_thread_main,
2110 client_sock);
2111 if (status != 0)
2112 {
2113 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2114 close_connection(client_sock);
2115 continue;
2116 }
2117 } /* for (pollfds_num) */
2118 } /* while (do_shutdown == 0) */
2120 RRDD_LOG(LOG_INFO, "starting shutdown");
2122 close_listen_sockets ();
2124 pthread_mutex_lock (&connection_threads_lock);
2125 while (connection_threads_num > 0)
2126 {
2127 pthread_t wait_for;
2129 wait_for = connection_threads[0];
2131 pthread_mutex_unlock (&connection_threads_lock);
2132 pthread_join (wait_for, /* retval = */ NULL);
2133 pthread_mutex_lock (&connection_threads_lock);
2134 }
2135 pthread_mutex_unlock (&connection_threads_lock);
2137 return (NULL);
2138 } /* }}} void *listen_thread_main */
2140 static int daemonize (void) /* {{{ */
2141 {
2142 int status;
2143 int fd;
2144 char *base_dir;
2146 daemon_uid = geteuid();
2148 fd = open_pidfile();
2149 if (fd < 0) return fd;
2151 if (!stay_foreground)
2152 {
2153 pid_t child;
2155 child = fork ();
2156 if (child < 0)
2157 {
2158 fprintf (stderr, "daemonize: fork(2) failed.\n");
2159 return (-1);
2160 }
2161 else if (child > 0)
2162 {
2163 return (1);
2164 }
2166 /* Become session leader */
2167 setsid ();
2169 /* Open the first three file descriptors to /dev/null */
2170 close (2);
2171 close (1);
2172 close (0);
2174 open ("/dev/null", O_RDWR);
2175 dup (0);
2176 dup (0);
2177 } /* if (!stay_foreground) */
2179 /* Change into the /tmp directory. */
2180 base_dir = (config_base_dir != NULL)
2181 ? config_base_dir
2182 : "/tmp";
2183 status = chdir (base_dir);
2184 if (status != 0)
2185 {
2186 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2187 return (-1);
2188 }
2190 install_signal_handlers();
2192 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2193 RRDD_LOG(LOG_INFO, "starting up");
2195 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2196 if (cache_tree == NULL)
2197 {
2198 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2199 return (-1);
2200 }
2202 status = write_pidfile (fd);
2203 return status;
2204 } /* }}} int daemonize */
2206 static int cleanup (void) /* {{{ */
2207 {
2208 do_shutdown++;
2210 pthread_cond_signal (&cache_cond);
2211 pthread_join (queue_thread, /* return = */ NULL);
2213 remove_pidfile ();
2215 RRDD_LOG(LOG_INFO, "goodbye");
2216 closelog ();
2218 return (0);
2219 } /* }}} int cleanup */
2221 static int read_options (int argc, char **argv) /* {{{ */
2222 {
2223 int option;
2224 int status = 0;
2226 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2227 {
2228 switch (option)
2229 {
2230 case 'g':
2231 stay_foreground=1;
2232 break;
2234 case 'L':
2235 case 'l':
2236 {
2237 listen_socket_t **temp;
2238 listen_socket_t *new;
2240 new = malloc(sizeof(listen_socket_t));
2241 if (new == NULL)
2242 {
2243 fprintf(stderr, "read_options: malloc failed.\n");
2244 return(2);
2245 }
2246 memset(new, 0, sizeof(listen_socket_t));
2248 temp = (listen_socket_t **) realloc (config_listen_address_list,
2249 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2250 if (temp == NULL)
2251 {
2252 fprintf (stderr, "read_options: realloc failed.\n");
2253 return (2);
2254 }
2255 config_listen_address_list = temp;
2257 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2258 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2260 temp[config_listen_address_list_len] = new;
2261 config_listen_address_list_len++;
2262 }
2263 break;
2265 case 'f':
2266 {
2267 int temp;
2269 temp = atoi (optarg);
2270 if (temp > 0)
2271 config_flush_interval = temp;
2272 else
2273 {
2274 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2275 status = 3;
2276 }
2277 }
2278 break;
2280 case 'w':
2281 {
2282 int temp;
2284 temp = atoi (optarg);
2285 if (temp > 0)
2286 config_write_interval = temp;
2287 else
2288 {
2289 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2290 status = 2;
2291 }
2292 }
2293 break;
2295 case 'z':
2296 {
2297 int temp;
2299 temp = atoi(optarg);
2300 if (temp > 0)
2301 config_write_jitter = temp;
2302 else
2303 {
2304 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2305 status = 2;
2306 }
2308 break;
2309 }
2311 case 'B':
2312 config_write_base_only = 1;
2313 break;
2315 case 'b':
2316 {
2317 size_t len;
2319 if (config_base_dir != NULL)
2320 free (config_base_dir);
2321 config_base_dir = strdup (optarg);
2322 if (config_base_dir == NULL)
2323 {
2324 fprintf (stderr, "read_options: strdup failed.\n");
2325 return (3);
2326 }
2328 len = strlen (config_base_dir);
2329 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2330 {
2331 config_base_dir[len - 1] = 0;
2332 len--;
2333 }
2335 if (len < 1)
2336 {
2337 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2338 return (4);
2339 }
2341 _config_base_dir_len = len;
2342 }
2343 break;
2345 case 'p':
2346 {
2347 if (config_pid_file != NULL)
2348 free (config_pid_file);
2349 config_pid_file = strdup (optarg);
2350 if (config_pid_file == NULL)
2351 {
2352 fprintf (stderr, "read_options: strdup failed.\n");
2353 return (3);
2354 }
2355 }
2356 break;
2358 case 'F':
2359 config_flush_at_shutdown = 1;
2360 break;
2362 case 'j':
2363 {
2364 struct stat statbuf;
2365 const char *dir = optarg;
2367 status = stat(dir, &statbuf);
2368 if (status != 0)
2369 {
2370 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2371 return 6;
2372 }
2374 if (!S_ISDIR(statbuf.st_mode)
2375 || access(dir, R_OK|W_OK|X_OK) != 0)
2376 {
2377 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2378 errno ? rrd_strerror(errno) : "");
2379 return 6;
2380 }
2382 journal_cur = malloc(PATH_MAX + 1);
2383 journal_old = malloc(PATH_MAX + 1);
2384 if (journal_cur == NULL || journal_old == NULL)
2385 {
2386 fprintf(stderr, "malloc failure for journal files\n");
2387 return 6;
2388 }
2389 else
2390 {
2391 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2392 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2393 }
2394 }
2395 break;
2397 case 'h':
2398 case '?':
2399 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2400 "\n"
2401 "Usage: rrdcached [options]\n"
2402 "\n"
2403 "Valid options are:\n"
2404 " -l <address> Socket address to listen to.\n"
2405 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2406 " -w <seconds> Interval in which to write data.\n"
2407 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2408 " -f <seconds> Interval in which to flush dead data.\n"
2409 " -p <file> Location of the PID-file.\n"
2410 " -b <dir> Base directory to change to.\n"
2411 " -B Restrict file access to paths within -b <dir>\n"
2412 " -g Do not fork and run in the foreground.\n"
2413 " -j <dir> Directory in which to create the journal files.\n"
2414 " -F Always flush all updates at shutdown\n"
2415 "\n"
2416 "For more information and a detailed description of all options "
2417 "please refer\n"
2418 "to the rrdcached(1) manual page.\n",
2419 VERSION);
2420 status = -1;
2421 break;
2422 } /* switch (option) */
2423 } /* while (getopt) */
2425 /* advise the user when values are not sane */
2426 if (config_flush_interval < 2 * config_write_interval)
2427 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2428 " 2x write interval (-w) !\n");
2429 if (config_write_jitter > config_write_interval)
2430 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2431 " write interval (-w) !\n");
2433 if (config_write_base_only && config_base_dir == NULL)
2434 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2435 " Consult the rrdcached documentation\n");
2437 if (journal_cur == NULL)
2438 config_flush_at_shutdown = 1;
2440 return (status);
2441 } /* }}} int read_options */
2443 int main (int argc, char **argv)
2444 {
2445 int status;
2447 status = read_options (argc, argv);
2448 if (status != 0)
2449 {
2450 if (status < 0)
2451 status = 0;
2452 return (status);
2453 }
2455 status = daemonize ();
2456 if (status == 1)
2457 {
2458 struct sigaction sigchld;
2460 memset (&sigchld, 0, sizeof (sigchld));
2461 sigchld.sa_handler = SIG_IGN;
2462 sigaction (SIGCHLD, &sigchld, NULL);
2464 return (0);
2465 }
2466 else if (status != 0)
2467 {
2468 fprintf (stderr, "daemonize failed, exiting.\n");
2469 return (1);
2470 }
2472 journal_init();
2474 /* start the queue thread */
2475 memset (&queue_thread, 0, sizeof (queue_thread));
2476 status = pthread_create (&queue_thread,
2477 NULL, /* attr */
2478 queue_thread_main,
2479 NULL); /* args */
2480 if (status != 0)
2481 {
2482 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2483 cleanup();
2484 return (1);
2485 }
2487 listen_thread_main (NULL);
2488 cleanup ();
2490 return (0);
2491 } /* int main */
2493 /*
2494 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2495 */