1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
113 {
114 int fd;
115 char addr[PATH_MAX + 1];
116 int family;
117 socket_privilege privilege;
119 /* state for BATCH processing */
120 int batch_mode;
121 int batch_cmd;
123 /* buffered IO */
124 char *rbuf;
125 off_t next_cmd;
126 off_t next_read;
128 char *wbuf;
129 ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137 char *file;
138 char **values;
139 int values_num;
140 time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143 int flags;
144 pthread_cond_t flushed;
145 cache_item_t *prev;
146 cache_item_t *next;
147 };
149 struct callback_flush_data_s
150 {
151 time_t now;
152 time_t abs_timeout;
153 char **keys;
154 size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
158 enum queue_side_e
159 {
160 HEAD,
161 TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
169 /*
170 * Variables
171 */
172 static int stay_foreground = 0;
173 static uid_t daemon_uid;
175 static listen_socket_t *listen_fds = NULL;
176 static size_t listen_fds_num = 0;
178 static int do_shutdown = 0;
180 static pthread_t queue_thread;
182 static pthread_t *connection_threads = NULL;
183 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
184 static int connection_threads_num = 0;
186 /* Cache stuff */
187 static GTree *cache_tree = NULL;
188 static cache_item_t *cache_queue_head = NULL;
189 static cache_item_t *cache_queue_tail = NULL;
190 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
193 static int config_write_interval = 300;
194 static int config_write_jitter = 0;
195 static int config_flush_interval = 3600;
196 static int config_flush_at_shutdown = 0;
197 static char *config_pid_file = NULL;
198 static char *config_base_dir = NULL;
199 static size_t _config_base_dir_len = 0;
200 static int config_write_base_only = 0;
202 static listen_socket_t **config_listen_address_list = NULL;
203 static int config_listen_address_list_len = 0;
205 static uint64_t stats_queue_length = 0;
206 static uint64_t stats_updates_received = 0;
207 static uint64_t stats_flush_received = 0;
208 static uint64_t stats_updates_written = 0;
209 static uint64_t stats_data_sets_written = 0;
210 static uint64_t stats_journal_bytes = 0;
211 static uint64_t stats_journal_rotate = 0;
212 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214 /* Journaled updates */
215 static char *journal_cur = NULL;
216 static char *journal_old = NULL;
217 static FILE *journal_fh = NULL;
218 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
219 static int journal_write(char *cmd, char *args);
220 static void journal_done(void);
221 static void journal_rotate(void);
223 /*
224 * Functions
225 */
226 static void sig_common (const char *sig) /* {{{ */
227 {
228 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
229 do_shutdown++;
230 pthread_cond_broadcast(&cache_cond);
231 } /* }}} void sig_common */
233 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
234 {
235 sig_common("INT");
236 } /* }}} void sig_int_handler */
238 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
239 {
240 sig_common("TERM");
241 } /* }}} void sig_term_handler */
243 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 config_flush_at_shutdown = 1;
246 sig_common("USR1");
247 } /* }}} void sig_usr1_handler */
249 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
250 {
251 config_flush_at_shutdown = 0;
252 sig_common("USR2");
253 } /* }}} void sig_usr2_handler */
255 static void install_signal_handlers(void) /* {{{ */
256 {
257 /* These structures are static, because `sigaction' behaves weird if the are
258 * overwritten.. */
259 static struct sigaction sa_int;
260 static struct sigaction sa_term;
261 static struct sigaction sa_pipe;
262 static struct sigaction sa_usr1;
263 static struct sigaction sa_usr2;
265 /* Install signal handlers */
266 memset (&sa_int, 0, sizeof (sa_int));
267 sa_int.sa_handler = sig_int_handler;
268 sigaction (SIGINT, &sa_int, NULL);
270 memset (&sa_term, 0, sizeof (sa_term));
271 sa_term.sa_handler = sig_term_handler;
272 sigaction (SIGTERM, &sa_term, NULL);
274 memset (&sa_pipe, 0, sizeof (sa_pipe));
275 sa_pipe.sa_handler = SIG_IGN;
276 sigaction (SIGPIPE, &sa_pipe, NULL);
278 memset (&sa_pipe, 0, sizeof (sa_usr1));
279 sa_usr1.sa_handler = sig_usr1_handler;
280 sigaction (SIGUSR1, &sa_usr1, NULL);
282 memset (&sa_usr2, 0, sizeof (sa_usr2));
283 sa_usr2.sa_handler = sig_usr2_handler;
284 sigaction (SIGUSR2, &sa_usr2, NULL);
286 } /* }}} void install_signal_handlers */
288 static int open_pidfile(void) /* {{{ */
289 {
290 int fd;
291 char *file;
293 file = (config_pid_file != NULL)
294 ? config_pid_file
295 : LOCALSTATEDIR "/run/rrdcached.pid";
297 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
298 if (fd < 0)
299 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
300 file, rrd_strerror(errno));
302 return(fd);
303 } /* }}} static int open_pidfile */
305 static int write_pidfile (int fd) /* {{{ */
306 {
307 pid_t pid;
308 FILE *fh;
310 pid = getpid ();
312 fh = fdopen (fd, "w");
313 if (fh == NULL)
314 {
315 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
316 close(fd);
317 return (-1);
318 }
320 fprintf (fh, "%i\n", (int) pid);
321 fclose (fh);
323 return (0);
324 } /* }}} int write_pidfile */
326 static int remove_pidfile (void) /* {{{ */
327 {
328 char *file;
329 int status;
331 file = (config_pid_file != NULL)
332 ? config_pid_file
333 : LOCALSTATEDIR "/run/rrdcached.pid";
335 status = unlink (file);
336 if (status == 0)
337 return (0);
338 return (errno);
339 } /* }}} int remove_pidfile */
341 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
342 {
343 char *eol;
345 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
346 sock->next_read - sock->next_cmd);
348 if (eol == NULL)
349 {
350 /* no commands left, move remainder back to front of rbuf */
351 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
352 sock->next_read - sock->next_cmd);
353 sock->next_read -= sock->next_cmd;
354 sock->next_cmd = 0;
355 *len = 0;
356 return NULL;
357 }
358 else
359 {
360 char *cmd = sock->rbuf + sock->next_cmd;
361 *eol = '\0';
363 sock->next_cmd = eol - sock->rbuf + 1;
365 if (eol > sock->rbuf && *(eol-1) == '\r')
366 *(--eol) = '\0'; /* handle "\r\n" EOL */
368 *len = eol - cmd;
370 return cmd;
371 }
373 /* NOTREACHED */
374 assert(1==0);
375 }
377 /* add the characters directly to the write buffer */
378 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
379 {
380 char *new_buf;
382 assert(sock != NULL);
384 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
385 if (new_buf == NULL)
386 {
387 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
388 return -1;
389 }
391 strncpy(new_buf + sock->wbuf_len, str, len + 1);
393 sock->wbuf = new_buf;
394 sock->wbuf_len += len;
396 return 0;
397 } /* }}} static int add_to_wbuf */
399 /* add the text to the "extra" info that's sent after the status line */
400 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
401 {
402 va_list argp;
403 char buffer[CMD_MAX];
404 int len;
406 if (sock == NULL) return 0; /* journal replay mode */
407 if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
409 va_start(argp, fmt);
410 #ifdef HAVE_VSNPRINTF
411 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
412 #else
413 len = vsprintf(buffer, fmt, argp);
414 #endif
415 va_end(argp);
416 if (len < 0)
417 {
418 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
419 return -1;
420 }
422 return add_to_wbuf(sock, buffer, len);
423 } /* }}} static int add_response_info */
425 static int count_lines(char *str) /* {{{ */
426 {
427 int lines = 0;
429 if (str != NULL)
430 {
431 while ((str = strchr(str, '\n')) != NULL)
432 {
433 ++lines;
434 ++str;
435 }
436 }
438 return lines;
439 } /* }}} static int count_lines */
441 /* send the response back to the user.
442 * returns 0 on success, -1 on error
443 * write buffer is always zeroed after this call */
444 static int send_response (listen_socket_t *sock, response_code rc,
445 char *fmt, ...) /* {{{ */
446 {
447 va_list argp;
448 char buffer[CMD_MAX];
449 int lines;
450 ssize_t wrote;
451 int rclen, len;
453 if (sock == NULL) return rc; /* journal replay mode */
455 if (sock->batch_mode)
456 {
457 if (rc == RESP_OK)
458 return rc; /* no response on success during BATCH */
459 lines = sock->batch_cmd;
460 }
461 else if (rc == RESP_OK)
462 lines = count_lines(sock->wbuf);
463 else
464 lines = -1;
466 rclen = sprintf(buffer, "%d ", lines);
467 va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
470 #else
471 len = vsprintf(buffer+rclen, fmt, argp);
472 #endif
473 va_end(argp);
474 if (len < 0)
475 return -1;
477 len += rclen;
479 /* append the result to the wbuf, don't write to the user */
480 if (sock->batch_mode)
481 return add_to_wbuf(sock, buffer, len);
483 /* first write must be complete */
484 if (len != write(sock->fd, buffer, len))
485 {
486 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
487 return -1;
488 }
490 if (sock->wbuf != NULL)
491 {
492 wrote = 0;
493 while (wrote < sock->wbuf_len)
494 {
495 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
496 if (wb <= 0)
497 {
498 RRDD_LOG(LOG_INFO, "send_response: could not write results");
499 return -1;
500 }
501 wrote += wb;
502 }
503 }
505 free(sock->wbuf); sock->wbuf = NULL;
506 sock->wbuf_len = 0;
508 return 0;
509 } /* }}} */
511 static void wipe_ci_values(cache_item_t *ci, time_t when)
512 {
513 ci->values = NULL;
514 ci->values_num = 0;
516 ci->last_flush_time = when;
517 if (config_write_jitter > 0)
518 ci->last_flush_time += (random() % config_write_jitter);
519 }
521 /* remove_from_queue
522 * remove a "cache_item_t" item from the queue.
523 * must hold 'cache_lock' when calling this
524 */
525 static void remove_from_queue(cache_item_t *ci) /* {{{ */
526 {
527 if (ci == NULL) return;
529 if (ci->prev == NULL)
530 cache_queue_head = ci->next; /* reset head */
531 else
532 ci->prev->next = ci->next;
534 if (ci->next == NULL)
535 cache_queue_tail = ci->prev; /* reset the tail */
536 else
537 ci->next->prev = ci->prev;
539 ci->next = ci->prev = NULL;
540 ci->flags &= ~CI_FLAGS_IN_QUEUE;
541 } /* }}} static void remove_from_queue */
543 /* remove an entry from the tree and free all its resources.
544 * must hold 'cache lock' while calling this.
545 * returns 0 on success, otherwise errno */
546 static int forget_file(const char *file)
547 {
548 cache_item_t *ci;
550 ci = g_tree_lookup(cache_tree, file);
551 if (ci == NULL)
552 return ENOENT;
554 g_tree_remove (cache_tree, file);
555 remove_from_queue(ci);
557 for (int i=0; i < ci->values_num; i++)
558 free(ci->values[i]);
560 free (ci->values);
561 free (ci->file);
563 /* in case anyone is waiting */
564 pthread_cond_broadcast(&ci->flushed);
566 free (ci);
568 return 0;
569 } /* }}} static int forget_file */
571 /*
572 * enqueue_cache_item:
573 * `cache_lock' must be acquired before calling this function!
574 */
575 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
576 queue_side_t side)
577 {
578 if (ci == NULL)
579 return (-1);
581 if (ci->values_num == 0)
582 return (0);
584 if (side == HEAD)
585 {
586 if (cache_queue_head == ci)
587 return 0;
589 /* remove from the double linked list */
590 if (ci->flags & CI_FLAGS_IN_QUEUE)
591 remove_from_queue(ci);
593 ci->prev = NULL;
594 ci->next = cache_queue_head;
595 if (ci->next != NULL)
596 ci->next->prev = ci;
597 cache_queue_head = ci;
599 if (cache_queue_tail == NULL)
600 cache_queue_tail = cache_queue_head;
601 }
602 else /* (side == TAIL) */
603 {
604 /* We don't move values back in the list.. */
605 if (ci->flags & CI_FLAGS_IN_QUEUE)
606 return (0);
608 assert (ci->next == NULL);
609 assert (ci->prev == NULL);
611 ci->prev = cache_queue_tail;
613 if (cache_queue_tail == NULL)
614 cache_queue_head = ci;
615 else
616 cache_queue_tail->next = ci;
618 cache_queue_tail = ci;
619 }
621 ci->flags |= CI_FLAGS_IN_QUEUE;
623 pthread_cond_broadcast(&cache_cond);
624 pthread_mutex_lock (&stats_lock);
625 stats_queue_length++;
626 pthread_mutex_unlock (&stats_lock);
628 return (0);
629 } /* }}} int enqueue_cache_item */
631 /*
632 * tree_callback_flush:
633 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
634 * while this is in progress.
635 */
636 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
637 gpointer data)
638 {
639 cache_item_t *ci;
640 callback_flush_data_t *cfd;
642 ci = (cache_item_t *) value;
643 cfd = (callback_flush_data_t *) data;
645 if ((ci->last_flush_time <= cfd->abs_timeout)
646 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
647 && (ci->values_num > 0))
648 {
649 enqueue_cache_item (ci, TAIL);
650 }
651 else if ((do_shutdown != 0)
652 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
653 && (ci->values_num > 0))
654 {
655 enqueue_cache_item (ci, TAIL);
656 }
657 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
658 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
659 && (ci->values_num <= 0))
660 {
661 char **temp;
663 temp = (char **) realloc (cfd->keys,
664 sizeof (char *) * (cfd->keys_num + 1));
665 if (temp == NULL)
666 {
667 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
668 return (FALSE);
669 }
670 cfd->keys = temp;
671 /* Make really sure this points to the _same_ place */
672 assert ((char *) key == ci->file);
673 cfd->keys[cfd->keys_num] = (char *) key;
674 cfd->keys_num++;
675 }
677 return (FALSE);
678 } /* }}} gboolean tree_callback_flush */
680 static int flush_old_values (int max_age)
681 {
682 callback_flush_data_t cfd;
683 size_t k;
685 memset (&cfd, 0, sizeof (cfd));
686 /* Pass the current time as user data so that we don't need to call
687 * `time' for each node. */
688 cfd.now = time (NULL);
689 cfd.keys = NULL;
690 cfd.keys_num = 0;
692 if (max_age > 0)
693 cfd.abs_timeout = cfd.now - max_age;
694 else
695 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
697 /* `tree_callback_flush' will return the keys of all values that haven't
698 * been touched in the last `config_flush_interval' seconds in `cfd'.
699 * The char*'s in this array point to the same memory as ci->file, so we
700 * don't need to free them separately. */
701 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
703 for (k = 0; k < cfd.keys_num; k++)
704 {
705 /* should never fail, since we have held the cache_lock
706 * the entire time */
707 assert( forget_file(cfd.keys[k]) == 0 );
708 }
710 if (cfd.keys != NULL)
711 {
712 free (cfd.keys);
713 cfd.keys = NULL;
714 }
716 return (0);
717 } /* int flush_old_values */
719 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
720 {
721 struct timeval now;
722 struct timespec next_flush;
723 int final_flush = 0; /* make sure we only flush once on shutdown */
725 gettimeofday (&now, NULL);
726 next_flush.tv_sec = now.tv_sec + config_flush_interval;
727 next_flush.tv_nsec = 1000 * now.tv_usec;
729 pthread_mutex_lock (&cache_lock);
730 while ((do_shutdown == 0) || (cache_queue_head != NULL))
731 {
732 cache_item_t *ci;
733 char *file;
734 char **values;
735 int values_num;
736 int status;
737 int i;
739 /* First, check if it's time to do the cache flush. */
740 gettimeofday (&now, NULL);
741 if ((now.tv_sec > next_flush.tv_sec)
742 || ((now.tv_sec == next_flush.tv_sec)
743 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
744 {
745 /* Flush all values that haven't been written in the last
746 * `config_write_interval' seconds. */
747 flush_old_values (config_write_interval);
749 /* Determine the time of the next cache flush. */
750 while (next_flush.tv_sec <= now.tv_sec)
751 next_flush.tv_sec += config_flush_interval;
753 /* unlock the cache while we rotate so we don't block incoming
754 * updates if the fsync() blocks on disk I/O */
755 pthread_mutex_unlock(&cache_lock);
756 journal_rotate();
757 pthread_mutex_lock(&cache_lock);
758 }
760 /* Now, check if there's something to store away. If not, wait until
761 * something comes in or it's time to do the cache flush. if we are
762 * shutting down, do not wait around. */
763 if (cache_queue_head == NULL && !do_shutdown)
764 {
765 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
766 if ((status != 0) && (status != ETIMEDOUT))
767 {
768 RRDD_LOG (LOG_ERR, "queue_thread_main: "
769 "pthread_cond_timedwait returned %i.", status);
770 }
771 }
773 /* We're about to shut down */
774 if (do_shutdown != 0 && !final_flush++)
775 {
776 if (config_flush_at_shutdown)
777 flush_old_values (-1); /* flush everything */
778 else
779 break;
780 }
782 /* Check if a value has arrived. This may be NULL if we timed out or there
783 * was an interrupt such as a signal. */
784 if (cache_queue_head == NULL)
785 continue;
787 ci = cache_queue_head;
789 /* copy the relevant parts */
790 file = strdup (ci->file);
791 if (file == NULL)
792 {
793 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
794 continue;
795 }
797 assert(ci->values != NULL);
798 assert(ci->values_num > 0);
800 values = ci->values;
801 values_num = ci->values_num;
803 wipe_ci_values(ci, time(NULL));
804 remove_from_queue(ci);
806 pthread_mutex_lock (&stats_lock);
807 assert (stats_queue_length > 0);
808 stats_queue_length--;
809 pthread_mutex_unlock (&stats_lock);
811 pthread_mutex_unlock (&cache_lock);
813 rrd_clear_error ();
814 status = rrd_update_r (file, NULL, values_num, (void *) values);
815 if (status != 0)
816 {
817 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
818 "rrd_update_r (%s) failed with status %i. (%s)",
819 file, status, rrd_get_error());
820 }
822 journal_write("wrote", file);
823 pthread_cond_broadcast(&ci->flushed);
825 for (i = 0; i < values_num; i++)
826 free (values[i]);
828 free(values);
829 free(file);
831 if (status == 0)
832 {
833 pthread_mutex_lock (&stats_lock);
834 stats_updates_written++;
835 stats_data_sets_written += values_num;
836 pthread_mutex_unlock (&stats_lock);
837 }
839 pthread_mutex_lock (&cache_lock);
841 /* We're about to shut down */
842 if (do_shutdown != 0 && !final_flush++)
843 {
844 if (config_flush_at_shutdown)
845 flush_old_values (-1); /* flush everything */
846 else
847 break;
848 }
849 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
850 pthread_mutex_unlock (&cache_lock);
852 if (config_flush_at_shutdown)
853 {
854 assert(cache_queue_head == NULL);
855 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
856 }
858 journal_done();
860 return (NULL);
861 } /* }}} void *queue_thread_main */
863 static int buffer_get_field (char **buffer_ret, /* {{{ */
864 size_t *buffer_size_ret, char **field_ret)
865 {
866 char *buffer;
867 size_t buffer_pos;
868 size_t buffer_size;
869 char *field;
870 size_t field_size;
871 int status;
873 buffer = *buffer_ret;
874 buffer_pos = 0;
875 buffer_size = *buffer_size_ret;
876 field = *buffer_ret;
877 field_size = 0;
879 if (buffer_size <= 0)
880 return (-1);
882 /* This is ensured by `handle_request'. */
883 assert (buffer[buffer_size - 1] == '\0');
885 status = -1;
886 while (buffer_pos < buffer_size)
887 {
888 /* Check for end-of-field or end-of-buffer */
889 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
890 {
891 field[field_size] = 0;
892 field_size++;
893 buffer_pos++;
894 status = 0;
895 break;
896 }
897 /* Handle escaped characters. */
898 else if (buffer[buffer_pos] == '\\')
899 {
900 if (buffer_pos >= (buffer_size - 1))
901 break;
902 buffer_pos++;
903 field[field_size] = buffer[buffer_pos];
904 field_size++;
905 buffer_pos++;
906 }
907 /* Normal operation */
908 else
909 {
910 field[field_size] = buffer[buffer_pos];
911 field_size++;
912 buffer_pos++;
913 }
914 } /* while (buffer_pos < buffer_size) */
916 if (status != 0)
917 return (status);
919 *buffer_ret = buffer + buffer_pos;
920 *buffer_size_ret = buffer_size - buffer_pos;
921 *field_ret = field;
923 return (0);
924 } /* }}} int buffer_get_field */
926 /* if we're restricting writes to the base directory,
927 * check whether the file falls within the dir
928 * returns 1 if OK, otherwise 0
929 */
930 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
931 {
932 assert(file != NULL);
934 if (!config_write_base_only
935 || sock == NULL /* journal replay */
936 || config_base_dir == NULL)
937 return 1;
939 if (strstr(file, "../") != NULL) goto err;
941 /* relative paths without "../" are ok */
942 if (*file != '/') return 1;
944 /* file must be of the format base + "/" + <1+ char filename> */
945 if (strlen(file) < _config_base_dir_len + 2) goto err;
946 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
947 if (*(file + _config_base_dir_len) != '/') goto err;
949 return 1;
951 err:
952 if (sock != NULL && sock->fd >= 0)
953 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
955 return 0;
956 } /* }}} static int check_file_access */
958 /* returns 1 if we have the required privilege level,
959 * otherwise issue an error to the user on sock */
960 static int has_privilege (listen_socket_t *sock, /* {{{ */
961 socket_privilege priv)
962 {
963 if (sock == NULL) /* journal replay */
964 return 1;
966 if (sock->privilege >= priv)
967 return 1;
969 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
970 } /* }}} static int has_privilege */
972 static int flush_file (const char *filename) /* {{{ */
973 {
974 cache_item_t *ci;
976 pthread_mutex_lock (&cache_lock);
978 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
979 if (ci == NULL)
980 {
981 pthread_mutex_unlock (&cache_lock);
982 return (ENOENT);
983 }
985 if (ci->values_num > 0)
986 {
987 /* Enqueue at head */
988 enqueue_cache_item (ci, HEAD);
989 pthread_cond_wait(&ci->flushed, &cache_lock);
990 }
992 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
993 * may have been purged during our cond_wait() */
995 pthread_mutex_unlock(&cache_lock);
997 return (0);
998 } /* }}} int flush_file */
1000 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1001 char *buffer, size_t buffer_size)
1002 {
1003 int status;
1004 char **help_text;
1005 char *command;
1007 char *help_help[2] =
1008 {
1009 "Command overview\n"
1010 ,
1011 "HELP [<command>]\n"
1012 "FLUSH <filename>\n"
1013 "FLUSHALL\n"
1014 "PENDING <filename>\n"
1015 "FORGET <filename>\n"
1016 "UPDATE <filename> <values> [<values> ...]\n"
1017 "BATCH\n"
1018 "STATS\n"
1019 };
1021 char *help_flush[2] =
1022 {
1023 "Help for FLUSH\n"
1024 ,
1025 "Usage: FLUSH <filename>\n"
1026 "\n"
1027 "Adds the given filename to the head of the update queue and returns\n"
1028 "after is has been dequeued.\n"
1029 };
1031 char *help_flushall[2] =
1032 {
1033 "Help for FLUSHALL\n"
1034 ,
1035 "Usage: FLUSHALL\n"
1036 "\n"
1037 "Triggers writing of all pending updates. Returns immediately.\n"
1038 };
1040 char *help_pending[2] =
1041 {
1042 "Help for PENDING\n"
1043 ,
1044 "Usage: PENDING <filename>\n"
1045 "\n"
1046 "Shows any 'pending' updates for a file, in order.\n"
1047 "The updates shown have not yet been written to the underlying RRD file.\n"
1048 };
1050 char *help_forget[2] =
1051 {
1052 "Help for FORGET\n"
1053 ,
1054 "Usage: FORGET <filename>\n"
1055 "\n"
1056 "Removes the file completely from the cache.\n"
1057 "Any pending updates for the file will be lost.\n"
1058 };
1060 char *help_update[2] =
1061 {
1062 "Help for UPDATE\n"
1063 ,
1064 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1065 "\n"
1066 "Adds the given file to the internal cache if it is not yet known and\n"
1067 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1068 "for details.\n"
1069 "\n"
1070 "Each <values> has the following form:\n"
1071 " <values> = <time>:<value>[:<value>[...]]\n"
1072 "See the rrdupdate(1) manpage for details.\n"
1073 };
1075 char *help_stats[2] =
1076 {
1077 "Help for STATS\n"
1078 ,
1079 "Usage: STATS\n"
1080 "\n"
1081 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1082 "a description of the values.\n"
1083 };
1085 char *help_batch[2] =
1086 {
1087 "Help for BATCH\n"
1088 ,
1089 "The 'BATCH' command permits the client to initiate a bulk load\n"
1090 " of commands to rrdcached.\n"
1091 "\n"
1092 "Usage:\n"
1093 "\n"
1094 " client: BATCH\n"
1095 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1096 " client: command #1\n"
1097 " client: command #2\n"
1098 " client: ... and so on\n"
1099 " client: .\n"
1100 " server: 2 errors\n"
1101 " server: 7 message for command #7\n"
1102 " server: 9 message for command #9\n"
1103 "\n"
1104 "For more information, consult the rrdcached(1) documentation.\n"
1105 };
1107 status = buffer_get_field (&buffer, &buffer_size, &command);
1108 if (status != 0)
1109 help_text = help_help;
1110 else
1111 {
1112 if (strcasecmp (command, "update") == 0)
1113 help_text = help_update;
1114 else if (strcasecmp (command, "flush") == 0)
1115 help_text = help_flush;
1116 else if (strcasecmp (command, "flushall") == 0)
1117 help_text = help_flushall;
1118 else if (strcasecmp (command, "pending") == 0)
1119 help_text = help_pending;
1120 else if (strcasecmp (command, "forget") == 0)
1121 help_text = help_forget;
1122 else if (strcasecmp (command, "stats") == 0)
1123 help_text = help_stats;
1124 else if (strcasecmp (command, "batch") == 0)
1125 help_text = help_batch;
1126 else
1127 help_text = help_help;
1128 }
1130 add_response_info(sock, help_text[1]);
1131 return send_response(sock, RESP_OK, help_text[0]);
1132 } /* }}} int handle_request_help */
1134 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1135 {
1136 uint64_t copy_queue_length;
1137 uint64_t copy_updates_received;
1138 uint64_t copy_flush_received;
1139 uint64_t copy_updates_written;
1140 uint64_t copy_data_sets_written;
1141 uint64_t copy_journal_bytes;
1142 uint64_t copy_journal_rotate;
1144 uint64_t tree_nodes_number;
1145 uint64_t tree_depth;
1147 pthread_mutex_lock (&stats_lock);
1148 copy_queue_length = stats_queue_length;
1149 copy_updates_received = stats_updates_received;
1150 copy_flush_received = stats_flush_received;
1151 copy_updates_written = stats_updates_written;
1152 copy_data_sets_written = stats_data_sets_written;
1153 copy_journal_bytes = stats_journal_bytes;
1154 copy_journal_rotate = stats_journal_rotate;
1155 pthread_mutex_unlock (&stats_lock);
1157 pthread_mutex_lock (&cache_lock);
1158 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1159 tree_depth = (uint64_t) g_tree_height (cache_tree);
1160 pthread_mutex_unlock (&cache_lock);
1162 add_response_info(sock,
1163 "QueueLength: %"PRIu64"\n", copy_queue_length);
1164 add_response_info(sock,
1165 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1166 add_response_info(sock,
1167 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1168 add_response_info(sock,
1169 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1170 add_response_info(sock,
1171 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1172 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1173 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1174 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1175 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1177 send_response(sock, RESP_OK, "Statistics follow\n");
1179 return (0);
1180 } /* }}} int handle_request_stats */
1182 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1183 char *buffer, size_t buffer_size)
1184 {
1185 char *file;
1186 int status;
1188 status = buffer_get_field (&buffer, &buffer_size, &file);
1189 if (status != 0)
1190 {
1191 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1192 }
1193 else
1194 {
1195 pthread_mutex_lock(&stats_lock);
1196 stats_flush_received++;
1197 pthread_mutex_unlock(&stats_lock);
1199 if (!check_file_access(file, sock)) return 0;
1201 status = flush_file (file);
1202 if (status == 0)
1203 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1204 else if (status == ENOENT)
1205 {
1206 /* no file in our tree; see whether it exists at all */
1207 struct stat statbuf;
1209 memset(&statbuf, 0, sizeof(statbuf));
1210 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1211 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1212 else
1213 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1214 }
1215 else if (status < 0)
1216 return send_response(sock, RESP_ERR, "Internal error.\n");
1217 else
1218 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1219 }
1221 /* NOTREACHED */
1222 assert(1==0);
1223 } /* }}} int handle_request_slurp */
1225 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1226 {
1227 int status;
1229 status = has_privilege(sock, PRIV_HIGH);
1230 if (status <= 0)
1231 return status;
1233 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1235 pthread_mutex_lock(&cache_lock);
1236 flush_old_values(-1);
1237 pthread_mutex_unlock(&cache_lock);
1239 return send_response(sock, RESP_OK, "Started flush.\n");
1240 } /* }}} static int handle_request_flushall */
1242 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1243 char *buffer, size_t buffer_size)
1244 {
1245 int status;
1246 char *file;
1247 cache_item_t *ci;
1249 status = buffer_get_field(&buffer, &buffer_size, &file);
1250 if (status != 0)
1251 return send_response(sock, RESP_ERR,
1252 "Usage: PENDING <filename>\n");
1254 status = has_privilege(sock, PRIV_HIGH);
1255 if (status <= 0)
1256 return status;
1258 pthread_mutex_lock(&cache_lock);
1259 ci = g_tree_lookup(cache_tree, file);
1260 if (ci == NULL)
1261 {
1262 pthread_mutex_unlock(&cache_lock);
1263 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1264 }
1266 for (int i=0; i < ci->values_num; i++)
1267 add_response_info(sock, "%s\n", ci->values[i]);
1269 pthread_mutex_unlock(&cache_lock);
1270 return send_response(sock, RESP_OK, "updates pending\n");
1271 } /* }}} static int handle_request_pending */
1273 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1274 char *buffer, size_t buffer_size)
1275 {
1276 int status;
1277 char *file;
1279 status = buffer_get_field(&buffer, &buffer_size, &file);
1280 if (status != 0)
1281 return send_response(sock, RESP_ERR,
1282 "Usage: FORGET <filename>\n");
1284 status = has_privilege(sock, PRIV_HIGH);
1285 if (status <= 0)
1286 return status;
1288 if (!check_file_access(file, sock)) return 0;
1290 pthread_mutex_lock(&cache_lock);
1291 status = forget_file(file);
1292 pthread_mutex_unlock(&cache_lock);
1294 if (status == 0)
1295 {
1296 if (sock != NULL)
1297 journal_write("forget", file);
1299 return send_response(sock, RESP_OK, "Gone!\n");
1300 }
1301 else
1302 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1303 status < 0 ? "Internal error" : rrd_strerror(status));
1305 /* NOTREACHED */
1306 assert(1==0);
1307 } /* }}} static int handle_request_forget */
1309 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1310 char *buffer, size_t buffer_size)
1311 {
1312 char *file;
1313 int values_num = 0;
1314 int status;
1315 char orig_buf[CMD_MAX];
1317 time_t now;
1318 cache_item_t *ci;
1320 now = time (NULL);
1322 status = has_privilege(sock, PRIV_HIGH);
1323 if (status <= 0)
1324 return status;
1326 /* save it for the journal later */
1327 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1329 status = buffer_get_field (&buffer, &buffer_size, &file);
1330 if (status != 0)
1331 return send_response(sock, RESP_ERR,
1332 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1334 pthread_mutex_lock(&stats_lock);
1335 stats_updates_received++;
1336 pthread_mutex_unlock(&stats_lock);
1338 if (!check_file_access(file, sock)) return 0;
1340 pthread_mutex_lock (&cache_lock);
1341 ci = g_tree_lookup (cache_tree, file);
1343 if (ci == NULL) /* {{{ */
1344 {
1345 struct stat statbuf;
1347 /* don't hold the lock while we setup; stat(2) might block */
1348 pthread_mutex_unlock(&cache_lock);
1350 memset (&statbuf, 0, sizeof (statbuf));
1351 status = stat (file, &statbuf);
1352 if (status != 0)
1353 {
1354 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1356 status = errno;
1357 if (status == ENOENT)
1358 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1359 else
1360 return send_response(sock, RESP_ERR,
1361 "stat failed with error %i.\n", status);
1362 }
1363 if (!S_ISREG (statbuf.st_mode))
1364 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1366 if (access(file, R_OK|W_OK) != 0)
1367 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1368 file, rrd_strerror(errno));
1370 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1371 if (ci == NULL)
1372 {
1373 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1375 return send_response(sock, RESP_ERR, "malloc failed.\n");
1376 }
1377 memset (ci, 0, sizeof (cache_item_t));
1379 ci->file = strdup (file);
1380 if (ci->file == NULL)
1381 {
1382 free (ci);
1383 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1385 return send_response(sock, RESP_ERR, "strdup failed.\n");
1386 }
1388 wipe_ci_values(ci, now);
1389 ci->flags = CI_FLAGS_IN_TREE;
1391 pthread_mutex_lock(&cache_lock);
1392 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1393 } /* }}} */
1394 assert (ci != NULL);
1396 /* don't re-write updates in replay mode */
1397 if (sock != NULL)
1398 journal_write("update", orig_buf);
1400 while (buffer_size > 0)
1401 {
1402 char **temp;
1403 char *value;
1405 status = buffer_get_field (&buffer, &buffer_size, &value);
1406 if (status != 0)
1407 {
1408 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1409 break;
1410 }
1412 temp = (char **) realloc (ci->values,
1413 sizeof (char *) * (ci->values_num + 1));
1414 if (temp == NULL)
1415 {
1416 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1417 continue;
1418 }
1419 ci->values = temp;
1421 ci->values[ci->values_num] = strdup (value);
1422 if (ci->values[ci->values_num] == NULL)
1423 {
1424 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1425 continue;
1426 }
1427 ci->values_num++;
1429 values_num++;
1430 }
1432 if (((now - ci->last_flush_time) >= config_write_interval)
1433 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1434 && (ci->values_num > 0))
1435 {
1436 enqueue_cache_item (ci, TAIL);
1437 }
1439 pthread_mutex_unlock (&cache_lock);
1441 if (values_num < 1)
1442 return send_response(sock, RESP_ERR, "No values updated.\n");
1443 else
1444 return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1446 /* NOTREACHED */
1447 assert(1==0);
1449 } /* }}} int handle_request_update */
1451 /* we came across a "WROTE" entry during journal replay.
1452 * throw away any values that we have accumulated for this file
1453 */
1454 static int handle_request_wrote (const char *buffer) /* {{{ */
1455 {
1456 int i;
1457 cache_item_t *ci;
1458 const char *file = buffer;
1460 pthread_mutex_lock(&cache_lock);
1462 ci = g_tree_lookup(cache_tree, file);
1463 if (ci == NULL)
1464 {
1465 pthread_mutex_unlock(&cache_lock);
1466 return (0);
1467 }
1469 if (ci->values)
1470 {
1471 for (i=0; i < ci->values_num; i++)
1472 free(ci->values[i]);
1474 free(ci->values);
1475 }
1477 wipe_ci_values(ci, time(NULL));
1478 remove_from_queue(ci);
1480 pthread_mutex_unlock(&cache_lock);
1481 return (0);
1482 } /* }}} int handle_request_wrote */
1484 /* start "BATCH" processing */
1485 static int batch_start (listen_socket_t *sock) /* {{{ */
1486 {
1487 int status;
1488 if (sock->batch_mode)
1489 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1491 status = send_response(sock, RESP_OK,
1492 "Go ahead. End with dot '.' on its own line.\n");
1493 sock->batch_mode = 1;
1494 sock->batch_cmd = 0;
1496 return status;
1497 } /* }}} static int batch_start */
1499 /* finish "BATCH" processing and return results to the client */
1500 static int batch_done (listen_socket_t *sock) /* {{{ */
1501 {
1502 assert(sock->batch_mode);
1503 sock->batch_mode = 0;
1504 sock->batch_cmd = 0;
1505 return send_response(sock, RESP_OK, "errors\n");
1506 } /* }}} static int batch_done */
1508 /* if sock==NULL, we are in journal replay mode */
1509 static int handle_request (listen_socket_t *sock, /* {{{ */
1510 char *buffer, size_t buffer_size)
1511 {
1512 char *buffer_ptr;
1513 char *command;
1514 int status;
1516 assert (buffer[buffer_size - 1] == '\0');
1518 buffer_ptr = buffer;
1519 command = NULL;
1520 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1521 if (status != 0)
1522 {
1523 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1524 return (-1);
1525 }
1527 if (sock != NULL && sock->batch_mode)
1528 sock->batch_cmd++;
1530 if (strcasecmp (command, "update") == 0)
1531 return (handle_request_update (sock, buffer_ptr, buffer_size));
1532 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1533 {
1534 /* this is only valid in replay mode */
1535 return (handle_request_wrote (buffer_ptr));
1536 }
1537 else if (strcasecmp (command, "flush") == 0)
1538 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1539 else if (strcasecmp (command, "flushall") == 0)
1540 return (handle_request_flushall(sock));
1541 else if (strcasecmp (command, "pending") == 0)
1542 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1543 else if (strcasecmp (command, "forget") == 0)
1544 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1545 else if (strcasecmp (command, "stats") == 0)
1546 return (handle_request_stats (sock));
1547 else if (strcasecmp (command, "help") == 0)
1548 return (handle_request_help (sock, buffer_ptr, buffer_size));
1549 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1550 return batch_start(sock);
1551 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1552 return batch_done(sock);
1553 else
1554 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1556 /* NOTREACHED */
1557 assert(1==0);
1558 } /* }}} int handle_request */
1560 /* MUST NOT hold journal_lock before calling this */
1561 static void journal_rotate(void) /* {{{ */
1562 {
1563 FILE *old_fh = NULL;
1564 int new_fd;
1566 if (journal_cur == NULL || journal_old == NULL)
1567 return;
1569 pthread_mutex_lock(&journal_lock);
1571 /* we rotate this way (rename before close) so that the we can release
1572 * the journal lock as fast as possible. Journal writes to the new
1573 * journal can proceed immediately after the new file is opened. The
1574 * fclose can then block without affecting new updates.
1575 */
1576 if (journal_fh != NULL)
1577 {
1578 old_fh = journal_fh;
1579 journal_fh = NULL;
1580 rename(journal_cur, journal_old);
1581 ++stats_journal_rotate;
1582 }
1584 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1585 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1586 if (new_fd >= 0)
1587 {
1588 journal_fh = fdopen(new_fd, "a");
1589 if (journal_fh == NULL)
1590 close(new_fd);
1591 }
1593 pthread_mutex_unlock(&journal_lock);
1595 if (old_fh != NULL)
1596 fclose(old_fh);
1598 if (journal_fh == NULL)
1599 {
1600 RRDD_LOG(LOG_CRIT,
1601 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1602 journal_cur, rrd_strerror(errno));
1604 RRDD_LOG(LOG_ERR,
1605 "JOURNALING DISABLED: All values will be flushed at shutdown");
1606 config_flush_at_shutdown = 1;
1607 }
1609 } /* }}} static void journal_rotate */
1611 static void journal_done(void) /* {{{ */
1612 {
1613 if (journal_cur == NULL)
1614 return;
1616 pthread_mutex_lock(&journal_lock);
1617 if (journal_fh != NULL)
1618 {
1619 fclose(journal_fh);
1620 journal_fh = NULL;
1621 }
1623 if (config_flush_at_shutdown)
1624 {
1625 RRDD_LOG(LOG_INFO, "removing journals");
1626 unlink(journal_old);
1627 unlink(journal_cur);
1628 }
1629 else
1630 {
1631 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1632 "journals will be used at next startup");
1633 }
1635 pthread_mutex_unlock(&journal_lock);
1637 } /* }}} static void journal_done */
1639 static int journal_write(char *cmd, char *args) /* {{{ */
1640 {
1641 int chars;
1643 if (journal_fh == NULL)
1644 return 0;
1646 pthread_mutex_lock(&journal_lock);
1647 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1648 pthread_mutex_unlock(&journal_lock);
1650 if (chars > 0)
1651 {
1652 pthread_mutex_lock(&stats_lock);
1653 stats_journal_bytes += chars;
1654 pthread_mutex_unlock(&stats_lock);
1655 }
1657 return chars;
1658 } /* }}} static int journal_write */
1660 static int journal_replay (const char *file) /* {{{ */
1661 {
1662 FILE *fh;
1663 int entry_cnt = 0;
1664 int fail_cnt = 0;
1665 uint64_t line = 0;
1666 char entry[CMD_MAX];
1668 if (file == NULL) return 0;
1670 {
1671 char *reason;
1672 int status = 0;
1673 struct stat statbuf;
1675 memset(&statbuf, 0, sizeof(statbuf));
1676 if (stat(file, &statbuf) != 0)
1677 {
1678 if (errno == ENOENT)
1679 return 0;
1681 reason = "stat error";
1682 status = errno;
1683 }
1684 else if (!S_ISREG(statbuf.st_mode))
1685 {
1686 reason = "not a regular file";
1687 status = EPERM;
1688 }
1689 if (statbuf.st_uid != daemon_uid)
1690 {
1691 reason = "not owned by daemon user";
1692 status = EACCES;
1693 }
1694 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1695 {
1696 reason = "must not be user/group writable";
1697 status = EACCES;
1698 }
1700 if (status != 0)
1701 {
1702 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1703 file, rrd_strerror(status), reason);
1704 return 0;
1705 }
1706 }
1708 fh = fopen(file, "r");
1709 if (fh == NULL)
1710 {
1711 if (errno != ENOENT)
1712 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1713 file, rrd_strerror(errno));
1714 return 0;
1715 }
1716 else
1717 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1719 while(!feof(fh))
1720 {
1721 size_t entry_len;
1723 ++line;
1724 if (fgets(entry, sizeof(entry), fh) == NULL)
1725 break;
1726 entry_len = strlen(entry);
1728 /* check \n termination in case journal writing crashed mid-line */
1729 if (entry_len == 0)
1730 continue;
1731 else if (entry[entry_len - 1] != '\n')
1732 {
1733 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1734 ++fail_cnt;
1735 continue;
1736 }
1738 entry[entry_len - 1] = '\0';
1740 if (handle_request(NULL, entry, entry_len) == 0)
1741 ++entry_cnt;
1742 else
1743 ++fail_cnt;
1744 }
1746 fclose(fh);
1748 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1749 entry_cnt, fail_cnt);
1751 return entry_cnt > 0 ? 1 : 0;
1752 } /* }}} static int journal_replay */
1754 static void journal_init(void) /* {{{ */
1755 {
1756 int had_journal = 0;
1758 if (journal_cur == NULL) return;
1760 pthread_mutex_lock(&journal_lock);
1762 RRDD_LOG(LOG_INFO, "checking for journal files");
1764 had_journal += journal_replay(journal_old);
1765 had_journal += journal_replay(journal_cur);
1767 /* it must have been a crash. start a flush */
1768 if (had_journal && config_flush_at_shutdown)
1769 flush_old_values(-1);
1771 pthread_mutex_unlock(&journal_lock);
1772 journal_rotate();
1774 RRDD_LOG(LOG_INFO, "journal processing complete");
1776 } /* }}} static void journal_init */
1778 static void close_connection(listen_socket_t *sock)
1779 {
1780 close(sock->fd) ; sock->fd = -1;
1781 free(sock->rbuf); sock->rbuf = NULL;
1782 free(sock->wbuf); sock->wbuf = NULL;
1784 free(sock);
1785 }
1787 static void *connection_thread_main (void *args) /* {{{ */
1788 {
1789 pthread_t self;
1790 listen_socket_t *sock;
1791 int i;
1792 int fd;
1794 sock = (listen_socket_t *) args;
1795 fd = sock->fd;
1797 /* init read buffers */
1798 sock->next_read = sock->next_cmd = 0;
1799 sock->rbuf = malloc(RBUF_SIZE);
1800 if (sock->rbuf == NULL)
1801 {
1802 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1803 close_connection(sock);
1804 return NULL;
1805 }
1807 pthread_mutex_lock (&connection_threads_lock);
1808 {
1809 pthread_t *temp;
1811 temp = (pthread_t *) realloc (connection_threads,
1812 sizeof (pthread_t) * (connection_threads_num + 1));
1813 if (temp == NULL)
1814 {
1815 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1816 }
1817 else
1818 {
1819 connection_threads = temp;
1820 connection_threads[connection_threads_num] = pthread_self ();
1821 connection_threads_num++;
1822 }
1823 }
1824 pthread_mutex_unlock (&connection_threads_lock);
1826 while (do_shutdown == 0)
1827 {
1828 char *cmd;
1829 ssize_t cmd_len;
1830 ssize_t rbytes;
1832 struct pollfd pollfd;
1833 int status;
1835 pollfd.fd = fd;
1836 pollfd.events = POLLIN | POLLPRI;
1837 pollfd.revents = 0;
1839 status = poll (&pollfd, 1, /* timeout = */ 500);
1840 if (do_shutdown)
1841 break;
1842 else if (status == 0) /* timeout */
1843 continue;
1844 else if (status < 0) /* error */
1845 {
1846 status = errno;
1847 if (status != EINTR)
1848 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1849 continue;
1850 }
1852 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1853 break;
1854 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1855 {
1856 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1857 "poll(2) returned something unexpected: %#04hx",
1858 pollfd.revents);
1859 break;
1860 }
1862 rbytes = read(fd, sock->rbuf + sock->next_read,
1863 RBUF_SIZE - sock->next_read);
1864 if (rbytes < 0)
1865 {
1866 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1867 break;
1868 }
1869 else if (rbytes == 0)
1870 break; /* eof */
1872 sock->next_read += rbytes;
1874 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1875 {
1876 status = handle_request (sock, cmd, cmd_len+1);
1877 if (status != 0)
1878 goto out_close;
1879 }
1880 }
1882 out_close:
1883 close_connection(sock);
1885 self = pthread_self ();
1886 /* Remove this thread from the connection threads list */
1887 pthread_mutex_lock (&connection_threads_lock);
1888 /* Find out own index in the array */
1889 for (i = 0; i < connection_threads_num; i++)
1890 if (pthread_equal (connection_threads[i], self) != 0)
1891 break;
1892 assert (i < connection_threads_num);
1894 /* Move the trailing threads forward. */
1895 if (i < (connection_threads_num - 1))
1896 {
1897 memmove (connection_threads + i,
1898 connection_threads + i + 1,
1899 sizeof (pthread_t) * (connection_threads_num - i - 1));
1900 }
1902 connection_threads_num--;
1903 pthread_mutex_unlock (&connection_threads_lock);
1905 return (NULL);
1906 } /* }}} void *connection_thread_main */
1908 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1909 {
1910 int fd;
1911 struct sockaddr_un sa;
1912 listen_socket_t *temp;
1913 int status;
1914 const char *path;
1916 path = sock->addr;
1917 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1918 path += strlen("unix:");
1920 temp = (listen_socket_t *) realloc (listen_fds,
1921 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1922 if (temp == NULL)
1923 {
1924 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1925 return (-1);
1926 }
1927 listen_fds = temp;
1928 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1930 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1931 if (fd < 0)
1932 {
1933 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1934 return (-1);
1935 }
1937 memset (&sa, 0, sizeof (sa));
1938 sa.sun_family = AF_UNIX;
1939 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1941 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1942 if (status != 0)
1943 {
1944 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1945 close (fd);
1946 unlink (path);
1947 return (-1);
1948 }
1950 status = listen (fd, /* backlog = */ 10);
1951 if (status != 0)
1952 {
1953 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1954 close (fd);
1955 unlink (path);
1956 return (-1);
1957 }
1959 listen_fds[listen_fds_num].fd = fd;
1960 listen_fds[listen_fds_num].family = PF_UNIX;
1961 strncpy(listen_fds[listen_fds_num].addr, path,
1962 sizeof (listen_fds[listen_fds_num].addr) - 1);
1963 listen_fds_num++;
1965 return (0);
1966 } /* }}} int open_listen_socket_unix */
1968 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1969 {
1970 struct addrinfo ai_hints;
1971 struct addrinfo *ai_res;
1972 struct addrinfo *ai_ptr;
1973 char addr_copy[NI_MAXHOST];
1974 char *addr;
1975 char *port;
1976 int status;
1978 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1979 addr_copy[sizeof (addr_copy) - 1] = 0;
1980 addr = addr_copy;
1982 memset (&ai_hints, 0, sizeof (ai_hints));
1983 ai_hints.ai_flags = 0;
1984 #ifdef AI_ADDRCONFIG
1985 ai_hints.ai_flags |= AI_ADDRCONFIG;
1986 #endif
1987 ai_hints.ai_family = AF_UNSPEC;
1988 ai_hints.ai_socktype = SOCK_STREAM;
1990 port = NULL;
1991 if (*addr == '[') /* IPv6+port format */
1992 {
1993 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1994 addr++;
1996 port = strchr (addr, ']');
1997 if (port == NULL)
1998 {
1999 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2000 sock->addr);
2001 return (-1);
2002 }
2003 *port = 0;
2004 port++;
2006 if (*port == ':')
2007 port++;
2008 else if (*port == 0)
2009 port = NULL;
2010 else
2011 {
2012 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2013 port);
2014 return (-1);
2015 }
2016 } /* if (*addr = ']') */
2017 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2018 {
2019 port = rindex(addr, ':');
2020 if (port != NULL)
2021 {
2022 *port = 0;
2023 port++;
2024 }
2025 }
2026 ai_res = NULL;
2027 status = getaddrinfo (addr,
2028 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2029 &ai_hints, &ai_res);
2030 if (status != 0)
2031 {
2032 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2033 "%s", addr, gai_strerror (status));
2034 return (-1);
2035 }
2037 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2038 {
2039 int fd;
2040 listen_socket_t *temp;
2041 int one = 1;
2043 temp = (listen_socket_t *) realloc (listen_fds,
2044 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2045 if (temp == NULL)
2046 {
2047 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2048 continue;
2049 }
2050 listen_fds = temp;
2051 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2053 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2054 if (fd < 0)
2055 {
2056 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2057 continue;
2058 }
2060 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2062 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2063 if (status != 0)
2064 {
2065 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2066 close (fd);
2067 continue;
2068 }
2070 status = listen (fd, /* backlog = */ 10);
2071 if (status != 0)
2072 {
2073 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2074 close (fd);
2075 return (-1);
2076 }
2078 listen_fds[listen_fds_num].fd = fd;
2079 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2080 listen_fds_num++;
2081 } /* for (ai_ptr) */
2083 return (0);
2084 } /* }}} static int open_listen_socket_network */
2086 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2087 {
2088 assert(sock != NULL);
2089 assert(sock->addr != NULL);
2091 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2092 || sock->addr[0] == '/')
2093 return (open_listen_socket_unix(sock));
2094 else
2095 return (open_listen_socket_network(sock));
2096 } /* }}} int open_listen_socket */
2098 static int close_listen_sockets (void) /* {{{ */
2099 {
2100 size_t i;
2102 for (i = 0; i < listen_fds_num; i++)
2103 {
2104 close (listen_fds[i].fd);
2106 if (listen_fds[i].family == PF_UNIX)
2107 unlink(listen_fds[i].addr);
2108 }
2110 free (listen_fds);
2111 listen_fds = NULL;
2112 listen_fds_num = 0;
2114 return (0);
2115 } /* }}} int close_listen_sockets */
2117 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2118 {
2119 struct pollfd *pollfds;
2120 int pollfds_num;
2121 int status;
2122 int i;
2124 for (i = 0; i < config_listen_address_list_len; i++)
2125 open_listen_socket (config_listen_address_list[i]);
2127 if (config_listen_address_list_len < 1)
2128 {
2129 listen_socket_t sock;
2130 memset(&sock, 0, sizeof(sock));
2131 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2132 open_listen_socket (&sock);
2133 }
2135 if (listen_fds_num < 1)
2136 {
2137 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2138 "could be opened. Sorry.");
2139 return (NULL);
2140 }
2142 pollfds_num = listen_fds_num;
2143 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2144 if (pollfds == NULL)
2145 {
2146 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2147 return (NULL);
2148 }
2149 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2151 RRDD_LOG(LOG_INFO, "listening for connections");
2153 while (do_shutdown == 0)
2154 {
2155 assert (pollfds_num == ((int) listen_fds_num));
2156 for (i = 0; i < pollfds_num; i++)
2157 {
2158 pollfds[i].fd = listen_fds[i].fd;
2159 pollfds[i].events = POLLIN | POLLPRI;
2160 pollfds[i].revents = 0;
2161 }
2163 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2164 if (do_shutdown)
2165 break;
2166 else if (status == 0) /* timeout */
2167 continue;
2168 else if (status < 0) /* error */
2169 {
2170 status = errno;
2171 if (status != EINTR)
2172 {
2173 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2174 }
2175 continue;
2176 }
2178 for (i = 0; i < pollfds_num; i++)
2179 {
2180 listen_socket_t *client_sock;
2181 struct sockaddr_storage client_sa;
2182 socklen_t client_sa_size;
2183 pthread_t tid;
2184 pthread_attr_t attr;
2186 if (pollfds[i].revents == 0)
2187 continue;
2189 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2190 {
2191 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2192 "poll(2) returned something unexpected for listen FD #%i.",
2193 pollfds[i].fd);
2194 continue;
2195 }
2197 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2198 if (client_sock == NULL)
2199 {
2200 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2201 continue;
2202 }
2203 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2205 client_sa_size = sizeof (client_sa);
2206 client_sock->fd = accept (pollfds[i].fd,
2207 (struct sockaddr *) &client_sa, &client_sa_size);
2208 if (client_sock->fd < 0)
2209 {
2210 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2211 free(client_sock);
2212 continue;
2213 }
2215 pthread_attr_init (&attr);
2216 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2218 status = pthread_create (&tid, &attr, connection_thread_main,
2219 client_sock);
2220 if (status != 0)
2221 {
2222 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2223 close_connection(client_sock);
2224 continue;
2225 }
2226 } /* for (pollfds_num) */
2227 } /* while (do_shutdown == 0) */
2229 RRDD_LOG(LOG_INFO, "starting shutdown");
2231 close_listen_sockets ();
2233 pthread_mutex_lock (&connection_threads_lock);
2234 while (connection_threads_num > 0)
2235 {
2236 pthread_t wait_for;
2238 wait_for = connection_threads[0];
2240 pthread_mutex_unlock (&connection_threads_lock);
2241 pthread_join (wait_for, /* retval = */ NULL);
2242 pthread_mutex_lock (&connection_threads_lock);
2243 }
2244 pthread_mutex_unlock (&connection_threads_lock);
2246 return (NULL);
2247 } /* }}} void *listen_thread_main */
2249 static int daemonize (void) /* {{{ */
2250 {
2251 int status;
2252 int fd;
2253 char *base_dir;
2255 daemon_uid = geteuid();
2257 fd = open_pidfile();
2258 if (fd < 0) return fd;
2260 if (!stay_foreground)
2261 {
2262 pid_t child;
2264 child = fork ();
2265 if (child < 0)
2266 {
2267 fprintf (stderr, "daemonize: fork(2) failed.\n");
2268 return (-1);
2269 }
2270 else if (child > 0)
2271 {
2272 return (1);
2273 }
2275 /* Become session leader */
2276 setsid ();
2278 /* Open the first three file descriptors to /dev/null */
2279 close (2);
2280 close (1);
2281 close (0);
2283 open ("/dev/null", O_RDWR);
2284 dup (0);
2285 dup (0);
2286 } /* if (!stay_foreground) */
2288 /* Change into the /tmp directory. */
2289 base_dir = (config_base_dir != NULL)
2290 ? config_base_dir
2291 : "/tmp";
2292 status = chdir (base_dir);
2293 if (status != 0)
2294 {
2295 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2296 return (-1);
2297 }
2299 install_signal_handlers();
2301 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2302 RRDD_LOG(LOG_INFO, "starting up");
2304 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2305 if (cache_tree == NULL)
2306 {
2307 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2308 return (-1);
2309 }
2311 status = write_pidfile (fd);
2312 return status;
2313 } /* }}} int daemonize */
2315 static int cleanup (void) /* {{{ */
2316 {
2317 do_shutdown++;
2319 pthread_cond_signal (&cache_cond);
2320 pthread_join (queue_thread, /* return = */ NULL);
2322 remove_pidfile ();
2324 RRDD_LOG(LOG_INFO, "goodbye");
2325 closelog ();
2327 return (0);
2328 } /* }}} int cleanup */
2330 static int read_options (int argc, char **argv) /* {{{ */
2331 {
2332 int option;
2333 int status = 0;
2335 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2336 {
2337 switch (option)
2338 {
2339 case 'g':
2340 stay_foreground=1;
2341 break;
2343 case 'L':
2344 case 'l':
2345 {
2346 listen_socket_t **temp;
2347 listen_socket_t *new;
2349 new = malloc(sizeof(listen_socket_t));
2350 if (new == NULL)
2351 {
2352 fprintf(stderr, "read_options: malloc failed.\n");
2353 return(2);
2354 }
2355 memset(new, 0, sizeof(listen_socket_t));
2357 temp = (listen_socket_t **) realloc (config_listen_address_list,
2358 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2359 if (temp == NULL)
2360 {
2361 fprintf (stderr, "read_options: realloc failed.\n");
2362 return (2);
2363 }
2364 config_listen_address_list = temp;
2366 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2367 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2369 temp[config_listen_address_list_len] = new;
2370 config_listen_address_list_len++;
2371 }
2372 break;
2374 case 'f':
2375 {
2376 int temp;
2378 temp = atoi (optarg);
2379 if (temp > 0)
2380 config_flush_interval = temp;
2381 else
2382 {
2383 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2384 status = 3;
2385 }
2386 }
2387 break;
2389 case 'w':
2390 {
2391 int temp;
2393 temp = atoi (optarg);
2394 if (temp > 0)
2395 config_write_interval = temp;
2396 else
2397 {
2398 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2399 status = 2;
2400 }
2401 }
2402 break;
2404 case 'z':
2405 {
2406 int temp;
2408 temp = atoi(optarg);
2409 if (temp > 0)
2410 config_write_jitter = temp;
2411 else
2412 {
2413 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2414 status = 2;
2415 }
2417 break;
2418 }
2420 case 'B':
2421 config_write_base_only = 1;
2422 break;
2424 case 'b':
2425 {
2426 size_t len;
2428 if (config_base_dir != NULL)
2429 free (config_base_dir);
2430 config_base_dir = strdup (optarg);
2431 if (config_base_dir == NULL)
2432 {
2433 fprintf (stderr, "read_options: strdup failed.\n");
2434 return (3);
2435 }
2437 len = strlen (config_base_dir);
2438 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2439 {
2440 config_base_dir[len - 1] = 0;
2441 len--;
2442 }
2444 if (len < 1)
2445 {
2446 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2447 return (4);
2448 }
2450 _config_base_dir_len = len;
2451 }
2452 break;
2454 case 'p':
2455 {
2456 if (config_pid_file != NULL)
2457 free (config_pid_file);
2458 config_pid_file = strdup (optarg);
2459 if (config_pid_file == NULL)
2460 {
2461 fprintf (stderr, "read_options: strdup failed.\n");
2462 return (3);
2463 }
2464 }
2465 break;
2467 case 'F':
2468 config_flush_at_shutdown = 1;
2469 break;
2471 case 'j':
2472 {
2473 struct stat statbuf;
2474 const char *dir = optarg;
2476 status = stat(dir, &statbuf);
2477 if (status != 0)
2478 {
2479 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2480 return 6;
2481 }
2483 if (!S_ISDIR(statbuf.st_mode)
2484 || access(dir, R_OK|W_OK|X_OK) != 0)
2485 {
2486 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2487 errno ? rrd_strerror(errno) : "");
2488 return 6;
2489 }
2491 journal_cur = malloc(PATH_MAX + 1);
2492 journal_old = malloc(PATH_MAX + 1);
2493 if (journal_cur == NULL || journal_old == NULL)
2494 {
2495 fprintf(stderr, "malloc failure for journal files\n");
2496 return 6;
2497 }
2498 else
2499 {
2500 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2501 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2502 }
2503 }
2504 break;
2506 case 'h':
2507 case '?':
2508 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2509 "\n"
2510 "Usage: rrdcached [options]\n"
2511 "\n"
2512 "Valid options are:\n"
2513 " -l <address> Socket address to listen to.\n"
2514 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2515 " -w <seconds> Interval in which to write data.\n"
2516 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2517 " -f <seconds> Interval in which to flush dead data.\n"
2518 " -p <file> Location of the PID-file.\n"
2519 " -b <dir> Base directory to change to.\n"
2520 " -B Restrict file access to paths within -b <dir>\n"
2521 " -g Do not fork and run in the foreground.\n"
2522 " -j <dir> Directory in which to create the journal files.\n"
2523 " -F Always flush all updates at shutdown\n"
2524 "\n"
2525 "For more information and a detailed description of all options "
2526 "please refer\n"
2527 "to the rrdcached(1) manual page.\n",
2528 VERSION);
2529 status = -1;
2530 break;
2531 } /* switch (option) */
2532 } /* while (getopt) */
2534 /* advise the user when values are not sane */
2535 if (config_flush_interval < 2 * config_write_interval)
2536 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2537 " 2x write interval (-w) !\n");
2538 if (config_write_jitter > config_write_interval)
2539 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2540 " write interval (-w) !\n");
2542 if (config_write_base_only && config_base_dir == NULL)
2543 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2544 " Consult the rrdcached documentation\n");
2546 if (journal_cur == NULL)
2547 config_flush_at_shutdown = 1;
2549 return (status);
2550 } /* }}} int read_options */
2552 int main (int argc, char **argv)
2553 {
2554 int status;
2556 status = read_options (argc, argv);
2557 if (status != 0)
2558 {
2559 if (status < 0)
2560 status = 0;
2561 return (status);
2562 }
2564 status = daemonize ();
2565 if (status == 1)
2566 {
2567 struct sigaction sigchld;
2569 memset (&sigchld, 0, sizeof (sigchld));
2570 sigchld.sa_handler = SIG_IGN;
2571 sigaction (SIGCHLD, &sigchld, NULL);
2573 return (0);
2574 }
2575 else if (status != 0)
2576 {
2577 fprintf (stderr, "daemonize failed, exiting.\n");
2578 return (1);
2579 }
2581 journal_init();
2583 /* start the queue thread */
2584 memset (&queue_thread, 0, sizeof (queue_thread));
2585 status = pthread_create (&queue_thread,
2586 NULL, /* attr */
2587 queue_thread_main,
2588 NULL); /* args */
2589 if (status != 0)
2590 {
2591 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2592 cleanup();
2593 return (1);
2594 }
2596 listen_thread_main (NULL);
2597 cleanup ();
2599 return (0);
2600 } /* int main */
2602 /*
2603 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2604 */