9c8847dd7e4a17ce157853a02ff2440d5354dce0
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
113 {
114 int fd;
115 char addr[PATH_MAX + 1];
116 int family;
117 socket_privilege privilege;
119 /* state for BATCH processing */
120 int batch_mode;
121 int batch_cmd;
123 /* buffered IO */
124 char *rbuf;
125 off_t next_cmd;
126 off_t next_read;
128 char *wbuf;
129 ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137 char *file;
138 char **values;
139 int values_num;
140 time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143 int flags;
144 pthread_cond_t flushed;
145 cache_item_t *prev;
146 cache_item_t *next;
147 };
149 struct callback_flush_data_s
150 {
151 time_t now;
152 time_t abs_timeout;
153 char **keys;
154 size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
158 enum queue_side_e
159 {
160 HEAD,
161 TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
169 /*
170 * Variables
171 */
172 static int stay_foreground = 0;
173 static uid_t daemon_uid;
175 static listen_socket_t *listen_fds = NULL;
176 static size_t listen_fds_num = 0;
178 static int do_shutdown = 0;
180 static pthread_t queue_thread;
182 static pthread_t *connection_threads = NULL;
183 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
184 static int connection_threads_num = 0;
186 /* Cache stuff */
187 static GTree *cache_tree = NULL;
188 static cache_item_t *cache_queue_head = NULL;
189 static cache_item_t *cache_queue_tail = NULL;
190 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
193 static int config_write_interval = 300;
194 static int config_write_jitter = 0;
195 static int config_flush_interval = 3600;
196 static int config_flush_at_shutdown = 0;
197 static char *config_pid_file = NULL;
198 static char *config_base_dir = NULL;
199 static size_t _config_base_dir_len = 0;
200 static int config_write_base_only = 0;
202 static listen_socket_t **config_listen_address_list = NULL;
203 static int config_listen_address_list_len = 0;
205 static uint64_t stats_queue_length = 0;
206 static uint64_t stats_updates_received = 0;
207 static uint64_t stats_flush_received = 0;
208 static uint64_t stats_updates_written = 0;
209 static uint64_t stats_data_sets_written = 0;
210 static uint64_t stats_journal_bytes = 0;
211 static uint64_t stats_journal_rotate = 0;
212 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214 /* Journaled updates */
215 static char *journal_cur = NULL;
216 static char *journal_old = NULL;
217 static FILE *journal_fh = NULL;
218 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
219 static int journal_write(char *cmd, char *args);
220 static void journal_done(void);
221 static void journal_rotate(void);
223 /*
224 * Functions
225 */
226 static void sig_common (const char *sig) /* {{{ */
227 {
228 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
229 do_shutdown++;
230 pthread_cond_broadcast(&cache_cond);
231 } /* }}} void sig_common */
233 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
234 {
235 sig_common("INT");
236 } /* }}} void sig_int_handler */
238 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
239 {
240 sig_common("TERM");
241 } /* }}} void sig_term_handler */
243 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 config_flush_at_shutdown = 1;
246 sig_common("USR1");
247 } /* }}} void sig_usr1_handler */
249 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
250 {
251 config_flush_at_shutdown = 0;
252 sig_common("USR2");
253 } /* }}} void sig_usr2_handler */
255 static void install_signal_handlers(void) /* {{{ */
256 {
257 /* These structures are static, because `sigaction' behaves weird if the are
258 * overwritten.. */
259 static struct sigaction sa_int;
260 static struct sigaction sa_term;
261 static struct sigaction sa_pipe;
262 static struct sigaction sa_usr1;
263 static struct sigaction sa_usr2;
265 /* Install signal handlers */
266 memset (&sa_int, 0, sizeof (sa_int));
267 sa_int.sa_handler = sig_int_handler;
268 sigaction (SIGINT, &sa_int, NULL);
270 memset (&sa_term, 0, sizeof (sa_term));
271 sa_term.sa_handler = sig_term_handler;
272 sigaction (SIGTERM, &sa_term, NULL);
274 memset (&sa_pipe, 0, sizeof (sa_pipe));
275 sa_pipe.sa_handler = SIG_IGN;
276 sigaction (SIGPIPE, &sa_pipe, NULL);
278 memset (&sa_pipe, 0, sizeof (sa_usr1));
279 sa_usr1.sa_handler = sig_usr1_handler;
280 sigaction (SIGUSR1, &sa_usr1, NULL);
282 memset (&sa_usr2, 0, sizeof (sa_usr2));
283 sa_usr2.sa_handler = sig_usr2_handler;
284 sigaction (SIGUSR2, &sa_usr2, NULL);
286 } /* }}} void install_signal_handlers */
288 static int open_pidfile(void) /* {{{ */
289 {
290 int fd;
291 char *file;
293 file = (config_pid_file != NULL)
294 ? config_pid_file
295 : LOCALSTATEDIR "/run/rrdcached.pid";
297 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
298 if (fd < 0)
299 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
300 file, rrd_strerror(errno));
302 return(fd);
303 } /* }}} static int open_pidfile */
305 static int write_pidfile (int fd) /* {{{ */
306 {
307 pid_t pid;
308 FILE *fh;
310 pid = getpid ();
312 fh = fdopen (fd, "w");
313 if (fh == NULL)
314 {
315 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
316 close(fd);
317 return (-1);
318 }
320 fprintf (fh, "%i\n", (int) pid);
321 fclose (fh);
323 return (0);
324 } /* }}} int write_pidfile */
326 static int remove_pidfile (void) /* {{{ */
327 {
328 char *file;
329 int status;
331 file = (config_pid_file != NULL)
332 ? config_pid_file
333 : LOCALSTATEDIR "/run/rrdcached.pid";
335 status = unlink (file);
336 if (status == 0)
337 return (0);
338 return (errno);
339 } /* }}} int remove_pidfile */
341 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
342 {
343 char *eol;
345 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
346 sock->next_read - sock->next_cmd);
348 if (eol == NULL)
349 {
350 /* no commands left, move remainder back to front of rbuf */
351 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
352 sock->next_read - sock->next_cmd);
353 sock->next_read -= sock->next_cmd;
354 sock->next_cmd = 0;
355 *len = 0;
356 return NULL;
357 }
358 else
359 {
360 char *cmd = sock->rbuf + sock->next_cmd;
361 *eol = '\0';
363 sock->next_cmd = eol - sock->rbuf + 1;
365 if (eol > sock->rbuf && *(eol-1) == '\r')
366 *(--eol) = '\0'; /* handle "\r\n" EOL */
368 *len = eol - cmd;
370 return cmd;
371 }
373 /* NOTREACHED */
374 assert(1==0);
375 }
377 /* add the characters directly to the write buffer */
378 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
379 {
380 char *new_buf;
382 assert(sock != NULL);
384 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
385 if (new_buf == NULL)
386 {
387 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
388 return -1;
389 }
391 strncpy(new_buf + sock->wbuf_len, str, len + 1);
393 sock->wbuf = new_buf;
394 sock->wbuf_len += len;
396 return 0;
397 } /* }}} static int add_to_wbuf */
399 /* add the text to the "extra" info that's sent after the status line */
400 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
401 {
402 va_list argp;
403 char buffer[CMD_MAX];
404 int len;
406 if (sock == NULL) return 0; /* journal replay mode */
407 if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
409 va_start(argp, fmt);
410 #ifdef HAVE_VSNPRINTF
411 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
412 #else
413 len = vsprintf(buffer, fmt, argp);
414 #endif
415 va_end(argp);
416 if (len < 0)
417 {
418 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
419 return -1;
420 }
422 return add_to_wbuf(sock, buffer, len);
423 } /* }}} static int add_response_info */
425 static int count_lines(char *str) /* {{{ */
426 {
427 int lines = 0;
429 if (str != NULL)
430 {
431 while ((str = strchr(str, '\n')) != NULL)
432 {
433 ++lines;
434 ++str;
435 }
436 }
438 return lines;
439 } /* }}} static int count_lines */
441 /* send the response back to the user.
442 * returns 0 on success, -1 on error
443 * write buffer is always zeroed after this call */
444 static int send_response (listen_socket_t *sock, response_code rc,
445 char *fmt, ...) /* {{{ */
446 {
447 va_list argp;
448 char buffer[CMD_MAX];
449 int lines;
450 ssize_t wrote;
451 int rclen, len;
453 if (sock == NULL) return rc; /* journal replay mode */
455 if (sock->batch_mode)
456 {
457 if (rc == RESP_OK)
458 return rc; /* no response on success during BATCH */
459 lines = sock->batch_cmd;
460 }
461 else if (rc == RESP_OK)
462 lines = count_lines(sock->wbuf);
463 else
464 lines = -1;
466 rclen = sprintf(buffer, "%d ", lines);
467 va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
470 #else
471 len = vsprintf(buffer+rclen, fmt, argp);
472 #endif
473 va_end(argp);
474 if (len < 0)
475 return -1;
477 len += rclen;
479 /* append the result to the wbuf, don't write to the user */
480 if (sock->batch_mode)
481 return add_to_wbuf(sock, buffer, len);
483 /* first write must be complete */
484 if (len != write(sock->fd, buffer, len))
485 {
486 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
487 return -1;
488 }
490 if (sock->wbuf != NULL)
491 {
492 wrote = 0;
493 while (wrote < sock->wbuf_len)
494 {
495 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
496 if (wb <= 0)
497 {
498 RRDD_LOG(LOG_INFO, "send_response: could not write results");
499 return -1;
500 }
501 wrote += wb;
502 }
503 }
505 free(sock->wbuf); sock->wbuf = NULL;
506 sock->wbuf_len = 0;
508 return 0;
509 } /* }}} */
511 static void wipe_ci_values(cache_item_t *ci, time_t when)
512 {
513 ci->values = NULL;
514 ci->values_num = 0;
516 ci->last_flush_time = when;
517 if (config_write_jitter > 0)
518 ci->last_flush_time += (random() % config_write_jitter);
519 }
521 /* remove_from_queue
522 * remove a "cache_item_t" item from the queue.
523 * must hold 'cache_lock' when calling this
524 */
525 static void remove_from_queue(cache_item_t *ci) /* {{{ */
526 {
527 if (ci == NULL) return;
529 if (ci->prev == NULL)
530 cache_queue_head = ci->next; /* reset head */
531 else
532 ci->prev->next = ci->next;
534 if (ci->next == NULL)
535 cache_queue_tail = ci->prev; /* reset the tail */
536 else
537 ci->next->prev = ci->prev;
539 ci->next = ci->prev = NULL;
540 ci->flags &= ~CI_FLAGS_IN_QUEUE;
541 } /* }}} static void remove_from_queue */
543 /* remove an entry from the tree and free all its resources.
544 * must hold 'cache lock' while calling this.
545 * returns 0 on success, otherwise errno */
546 static int forget_file(const char *file)
547 {
548 cache_item_t *ci;
550 ci = g_tree_lookup(cache_tree, file);
551 if (ci == NULL)
552 return ENOENT;
554 g_tree_remove (cache_tree, file);
555 remove_from_queue(ci);
557 for (int i=0; i < ci->values_num; i++)
558 free(ci->values[i]);
560 free (ci->values);
561 free (ci->file);
563 /* in case anyone is waiting */
564 pthread_cond_broadcast(&ci->flushed);
566 free (ci);
568 return 0;
569 } /* }}} static int forget_file */
571 /*
572 * enqueue_cache_item:
573 * `cache_lock' must be acquired before calling this function!
574 */
575 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
576 queue_side_t side)
577 {
578 if (ci == NULL)
579 return (-1);
581 if (ci->values_num == 0)
582 return (0);
584 if (side == HEAD)
585 {
586 if (cache_queue_head == ci)
587 return 0;
589 /* remove from the double linked list */
590 if (ci->flags & CI_FLAGS_IN_QUEUE)
591 remove_from_queue(ci);
593 ci->prev = NULL;
594 ci->next = cache_queue_head;
595 if (ci->next != NULL)
596 ci->next->prev = ci;
597 cache_queue_head = ci;
599 if (cache_queue_tail == NULL)
600 cache_queue_tail = cache_queue_head;
601 }
602 else /* (side == TAIL) */
603 {
604 /* We don't move values back in the list.. */
605 if (ci->flags & CI_FLAGS_IN_QUEUE)
606 return (0);
608 assert (ci->next == NULL);
609 assert (ci->prev == NULL);
611 ci->prev = cache_queue_tail;
613 if (cache_queue_tail == NULL)
614 cache_queue_head = ci;
615 else
616 cache_queue_tail->next = ci;
618 cache_queue_tail = ci;
619 }
621 ci->flags |= CI_FLAGS_IN_QUEUE;
623 pthread_cond_broadcast(&cache_cond);
624 pthread_mutex_lock (&stats_lock);
625 stats_queue_length++;
626 pthread_mutex_unlock (&stats_lock);
628 return (0);
629 } /* }}} int enqueue_cache_item */
631 /*
632 * tree_callback_flush:
633 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
634 * while this is in progress.
635 */
636 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
637 gpointer data)
638 {
639 cache_item_t *ci;
640 callback_flush_data_t *cfd;
642 ci = (cache_item_t *) value;
643 cfd = (callback_flush_data_t *) data;
645 if ((ci->last_flush_time <= cfd->abs_timeout)
646 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
647 && (ci->values_num > 0))
648 {
649 enqueue_cache_item (ci, TAIL);
650 }
651 else if ((do_shutdown != 0)
652 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
653 && (ci->values_num > 0))
654 {
655 enqueue_cache_item (ci, TAIL);
656 }
657 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
658 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
659 && (ci->values_num <= 0))
660 {
661 char **temp;
663 temp = (char **) realloc (cfd->keys,
664 sizeof (char *) * (cfd->keys_num + 1));
665 if (temp == NULL)
666 {
667 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
668 return (FALSE);
669 }
670 cfd->keys = temp;
671 /* Make really sure this points to the _same_ place */
672 assert ((char *) key == ci->file);
673 cfd->keys[cfd->keys_num] = (char *) key;
674 cfd->keys_num++;
675 }
677 return (FALSE);
678 } /* }}} gboolean tree_callback_flush */
680 static int flush_old_values (int max_age)
681 {
682 callback_flush_data_t cfd;
683 size_t k;
685 memset (&cfd, 0, sizeof (cfd));
686 /* Pass the current time as user data so that we don't need to call
687 * `time' for each node. */
688 cfd.now = time (NULL);
689 cfd.keys = NULL;
690 cfd.keys_num = 0;
692 if (max_age > 0)
693 cfd.abs_timeout = cfd.now - max_age;
694 else
695 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
697 /* `tree_callback_flush' will return the keys of all values that haven't
698 * been touched in the last `config_flush_interval' seconds in `cfd'.
699 * The char*'s in this array point to the same memory as ci->file, so we
700 * don't need to free them separately. */
701 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
703 for (k = 0; k < cfd.keys_num; k++)
704 {
705 /* should never fail, since we have held the cache_lock
706 * the entire time */
707 assert( forget_file(cfd.keys[k]) == 0 );
708 }
710 if (cfd.keys != NULL)
711 {
712 free (cfd.keys);
713 cfd.keys = NULL;
714 }
716 return (0);
717 } /* int flush_old_values */
719 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
720 {
721 struct timeval now;
722 struct timespec next_flush;
723 int final_flush = 0; /* make sure we only flush once on shutdown */
725 gettimeofday (&now, NULL);
726 next_flush.tv_sec = now.tv_sec + config_flush_interval;
727 next_flush.tv_nsec = 1000 * now.tv_usec;
729 pthread_mutex_lock (&cache_lock);
730 while ((do_shutdown == 0) || (cache_queue_head != NULL))
731 {
732 cache_item_t *ci;
733 char *file;
734 char **values;
735 int values_num;
736 int status;
737 int i;
739 /* First, check if it's time to do the cache flush. */
740 gettimeofday (&now, NULL);
741 if ((now.tv_sec > next_flush.tv_sec)
742 || ((now.tv_sec == next_flush.tv_sec)
743 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
744 {
745 /* Flush all values that haven't been written in the last
746 * `config_write_interval' seconds. */
747 flush_old_values (config_write_interval);
749 /* Determine the time of the next cache flush. */
750 while (next_flush.tv_sec <= now.tv_sec)
751 next_flush.tv_sec += config_flush_interval;
753 /* unlock the cache while we rotate so we don't block incoming
754 * updates if the fsync() blocks on disk I/O */
755 pthread_mutex_unlock(&cache_lock);
756 journal_rotate();
757 pthread_mutex_lock(&cache_lock);
758 }
760 /* Now, check if there's something to store away. If not, wait until
761 * something comes in or it's time to do the cache flush. if we are
762 * shutting down, do not wait around. */
763 if (cache_queue_head == NULL && !do_shutdown)
764 {
765 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
766 if ((status != 0) && (status != ETIMEDOUT))
767 {
768 RRDD_LOG (LOG_ERR, "queue_thread_main: "
769 "pthread_cond_timedwait returned %i.", status);
770 }
771 }
773 /* We're about to shut down */
774 if (do_shutdown != 0 && !final_flush++)
775 {
776 if (config_flush_at_shutdown)
777 flush_old_values (-1); /* flush everything */
778 else
779 break;
780 }
782 /* Check if a value has arrived. This may be NULL if we timed out or there
783 * was an interrupt such as a signal. */
784 if (cache_queue_head == NULL)
785 continue;
787 ci = cache_queue_head;
789 /* copy the relevant parts */
790 file = strdup (ci->file);
791 if (file == NULL)
792 {
793 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
794 continue;
795 }
797 assert(ci->values != NULL);
798 assert(ci->values_num > 0);
800 values = ci->values;
801 values_num = ci->values_num;
803 wipe_ci_values(ci, time(NULL));
804 remove_from_queue(ci);
806 pthread_mutex_lock (&stats_lock);
807 assert (stats_queue_length > 0);
808 stats_queue_length--;
809 pthread_mutex_unlock (&stats_lock);
811 pthread_mutex_unlock (&cache_lock);
813 rrd_clear_error ();
814 status = rrd_update_r (file, NULL, values_num, (void *) values);
815 if (status != 0)
816 {
817 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
818 "rrd_update_r (%s) failed with status %i. (%s)",
819 file, status, rrd_get_error());
820 }
822 journal_write("wrote", file);
823 pthread_cond_broadcast(&ci->flushed);
825 for (i = 0; i < values_num; i++)
826 free (values[i]);
828 free(values);
829 free(file);
831 if (status == 0)
832 {
833 pthread_mutex_lock (&stats_lock);
834 stats_updates_written++;
835 stats_data_sets_written += values_num;
836 pthread_mutex_unlock (&stats_lock);
837 }
839 pthread_mutex_lock (&cache_lock);
841 /* We're about to shut down */
842 if (do_shutdown != 0 && !final_flush++)
843 {
844 if (config_flush_at_shutdown)
845 flush_old_values (-1); /* flush everything */
846 else
847 break;
848 }
849 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
850 pthread_mutex_unlock (&cache_lock);
852 if (config_flush_at_shutdown)
853 {
854 assert(cache_queue_head == NULL);
855 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
856 }
858 journal_done();
860 return (NULL);
861 } /* }}} void *queue_thread_main */
863 static int buffer_get_field (char **buffer_ret, /* {{{ */
864 size_t *buffer_size_ret, char **field_ret)
865 {
866 char *buffer;
867 size_t buffer_pos;
868 size_t buffer_size;
869 char *field;
870 size_t field_size;
871 int status;
873 buffer = *buffer_ret;
874 buffer_pos = 0;
875 buffer_size = *buffer_size_ret;
876 field = *buffer_ret;
877 field_size = 0;
879 if (buffer_size <= 0)
880 return (-1);
882 /* This is ensured by `handle_request'. */
883 assert (buffer[buffer_size - 1] == '\0');
885 status = -1;
886 while (buffer_pos < buffer_size)
887 {
888 /* Check for end-of-field or end-of-buffer */
889 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
890 {
891 field[field_size] = 0;
892 field_size++;
893 buffer_pos++;
894 status = 0;
895 break;
896 }
897 /* Handle escaped characters. */
898 else if (buffer[buffer_pos] == '\\')
899 {
900 if (buffer_pos >= (buffer_size - 1))
901 break;
902 buffer_pos++;
903 field[field_size] = buffer[buffer_pos];
904 field_size++;
905 buffer_pos++;
906 }
907 /* Normal operation */
908 else
909 {
910 field[field_size] = buffer[buffer_pos];
911 field_size++;
912 buffer_pos++;
913 }
914 } /* while (buffer_pos < buffer_size) */
916 if (status != 0)
917 return (status);
919 *buffer_ret = buffer + buffer_pos;
920 *buffer_size_ret = buffer_size - buffer_pos;
921 *field_ret = field;
923 return (0);
924 } /* }}} int buffer_get_field */
926 /* if we're restricting writes to the base directory,
927 * check whether the file falls within the dir
928 * returns 1 if OK, otherwise 0
929 */
930 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
931 {
932 assert(file != NULL);
934 if (!config_write_base_only
935 || sock == NULL /* journal replay */
936 || config_base_dir == NULL)
937 return 1;
939 if (strstr(file, "../") != NULL) goto err;
941 /* relative paths without "../" are ok */
942 if (*file != '/') return 1;
944 /* file must be of the format base + "/" + <1+ char filename> */
945 if (strlen(file) < _config_base_dir_len + 2) goto err;
946 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
947 if (*(file + _config_base_dir_len) != '/') goto err;
949 return 1;
951 err:
952 if (sock != NULL && sock->fd >= 0)
953 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
955 return 0;
956 } /* }}} static int check_file_access */
958 /* returns 1 if we have the required privilege level,
959 * otherwise issue an error to the user on sock */
960 static int has_privilege (listen_socket_t *sock, /* {{{ */
961 socket_privilege priv)
962 {
963 if (sock == NULL) /* journal replay */
964 return 1;
966 if (sock->privilege >= priv)
967 return 1;
969 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
970 } /* }}} static int has_privilege */
972 static int flush_file (const char *filename) /* {{{ */
973 {
974 cache_item_t *ci;
976 pthread_mutex_lock (&cache_lock);
978 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
979 if (ci == NULL)
980 {
981 pthread_mutex_unlock (&cache_lock);
982 return (ENOENT);
983 }
985 if (ci->values_num > 0)
986 {
987 /* Enqueue at head */
988 enqueue_cache_item (ci, HEAD);
989 pthread_cond_wait(&ci->flushed, &cache_lock);
990 }
992 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
993 * may have been purged during our cond_wait() */
995 pthread_mutex_unlock(&cache_lock);
997 return (0);
998 } /* }}} int flush_file */
1000 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1001 char *buffer, size_t buffer_size)
1002 {
1003 int status;
1004 char **help_text;
1005 char *command;
1007 char *help_help[2] =
1008 {
1009 "Command overview\n"
1010 ,
1011 "HELP [<command>]\n"
1012 "FLUSH <filename>\n"
1013 "FLUSHALL\n"
1014 "PENDING <filename>\n"
1015 "FORGET <filename>\n"
1016 "UPDATE <filename> <values> [<values> ...]\n"
1017 "BATCH\n"
1018 "STATS\n"
1019 };
1021 char *help_flush[2] =
1022 {
1023 "Help for FLUSH\n"
1024 ,
1025 "Usage: FLUSH <filename>\n"
1026 "\n"
1027 "Adds the given filename to the head of the update queue and returns\n"
1028 "after is has been dequeued.\n"
1029 };
1031 char *help_flushall[2] =
1032 {
1033 "Help for FLUSHALL\n"
1034 ,
1035 "Usage: FLUSHALL\n"
1036 "\n"
1037 "Triggers writing of all pending updates. Returns immediately.\n"
1038 };
1040 char *help_pending[2] =
1041 {
1042 "Help for PENDING\n"
1043 ,
1044 "Usage: PENDING <filename>\n"
1045 "\n"
1046 "Shows any 'pending' updates for a file, in order.\n"
1047 "The updates shown have not yet been written to the underlying RRD file.\n"
1048 };
1050 char *help_forget[2] =
1051 {
1052 "Help for FORGET\n"
1053 ,
1054 "Usage: FORGET <filename>\n"
1055 "\n"
1056 "Removes the file completely from the cache.\n"
1057 "Any pending updates for the file will be lost.\n"
1058 };
1060 char *help_update[2] =
1061 {
1062 "Help for UPDATE\n"
1063 ,
1064 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1065 "\n"
1066 "Adds the given file to the internal cache if it is not yet known and\n"
1067 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1068 "for details.\n"
1069 "\n"
1070 "Each <values> has the following form:\n"
1071 " <values> = <time>:<value>[:<value>[...]]\n"
1072 "See the rrdupdate(1) manpage for details.\n"
1073 };
1075 char *help_stats[2] =
1076 {
1077 "Help for STATS\n"
1078 ,
1079 "Usage: STATS\n"
1080 "\n"
1081 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1082 "a description of the values.\n"
1083 };
1085 char *help_batch[2] =
1086 {
1087 "Help for BATCH\n"
1088 ,
1089 "The 'BATCH' command permits the client to initiate a bulk load\n"
1090 " of commands to rrdcached.\n"
1091 "\n"
1092 "Usage:\n"
1093 "\n"
1094 " client: BATCH\n"
1095 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1096 " client: command #1\n"
1097 " client: command #2\n"
1098 " client: ... and so on\n"
1099 " client: .\n"
1100 " server: 2 errors\n"
1101 " server: 7 message for command #7\n"
1102 " server: 9 message for command #9\n"
1103 "\n"
1104 "For more information, consult the rrdcached(1) documentation.\n"
1105 };
1107 status = buffer_get_field (&buffer, &buffer_size, &command);
1108 if (status != 0)
1109 help_text = help_help;
1110 else
1111 {
1112 if (strcasecmp (command, "update") == 0)
1113 help_text = help_update;
1114 else if (strcasecmp (command, "flush") == 0)
1115 help_text = help_flush;
1116 else if (strcasecmp (command, "flushall") == 0)
1117 help_text = help_flushall;
1118 else if (strcasecmp (command, "pending") == 0)
1119 help_text = help_pending;
1120 else if (strcasecmp (command, "forget") == 0)
1121 help_text = help_forget;
1122 else if (strcasecmp (command, "stats") == 0)
1123 help_text = help_stats;
1124 else if (strcasecmp (command, "batch") == 0)
1125 help_text = help_batch;
1126 else
1127 help_text = help_help;
1128 }
1130 add_response_info(sock, help_text[1]);
1131 return send_response(sock, RESP_OK, help_text[0]);
1132 } /* }}} int handle_request_help */
1134 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1135 {
1136 uint64_t copy_queue_length;
1137 uint64_t copy_updates_received;
1138 uint64_t copy_flush_received;
1139 uint64_t copy_updates_written;
1140 uint64_t copy_data_sets_written;
1141 uint64_t copy_journal_bytes;
1142 uint64_t copy_journal_rotate;
1144 uint64_t tree_nodes_number;
1145 uint64_t tree_depth;
1147 pthread_mutex_lock (&stats_lock);
1148 copy_queue_length = stats_queue_length;
1149 copy_updates_received = stats_updates_received;
1150 copy_flush_received = stats_flush_received;
1151 copy_updates_written = stats_updates_written;
1152 copy_data_sets_written = stats_data_sets_written;
1153 copy_journal_bytes = stats_journal_bytes;
1154 copy_journal_rotate = stats_journal_rotate;
1155 pthread_mutex_unlock (&stats_lock);
1157 pthread_mutex_lock (&cache_lock);
1158 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1159 tree_depth = (uint64_t) g_tree_height (cache_tree);
1160 pthread_mutex_unlock (&cache_lock);
1162 add_response_info(sock,
1163 "QueueLength: %"PRIu64"\n", copy_queue_length);
1164 add_response_info(sock,
1165 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1166 add_response_info(sock,
1167 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1168 add_response_info(sock,
1169 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1170 add_response_info(sock,
1171 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1172 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1173 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1174 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1175 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1177 send_response(sock, RESP_OK, "Statistics follow\n");
1179 return (0);
1180 } /* }}} int handle_request_stats */
1182 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1183 char *buffer, size_t buffer_size)
1184 {
1185 char *file;
1186 int status;
1188 status = buffer_get_field (&buffer, &buffer_size, &file);
1189 if (status != 0)
1190 {
1191 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1192 }
1193 else
1194 {
1195 pthread_mutex_lock(&stats_lock);
1196 stats_flush_received++;
1197 pthread_mutex_unlock(&stats_lock);
1199 if (!check_file_access(file, sock)) return 0;
1201 status = flush_file (file);
1202 if (status == 0)
1203 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1204 else if (status == ENOENT)
1205 {
1206 /* no file in our tree; see whether it exists at all */
1207 struct stat statbuf;
1209 memset(&statbuf, 0, sizeof(statbuf));
1210 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1211 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1212 else
1213 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1214 }
1215 else if (status < 0)
1216 return send_response(sock, RESP_ERR, "Internal error.\n");
1217 else
1218 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1219 }
1221 /* NOTREACHED */
1222 assert(1==0);
1223 } /* }}} int handle_request_slurp */
1225 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1226 {
1227 int status;
1229 status = has_privilege(sock, PRIV_HIGH);
1230 if (status <= 0)
1231 return status;
1233 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1235 pthread_mutex_lock(&cache_lock);
1236 flush_old_values(-1);
1237 pthread_mutex_unlock(&cache_lock);
1239 return send_response(sock, RESP_OK, "Started flush.\n");
1240 } /* }}} static int handle_request_flushall */
1242 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1243 char *buffer, size_t buffer_size)
1244 {
1245 int status;
1246 char *file;
1247 cache_item_t *ci;
1249 status = buffer_get_field(&buffer, &buffer_size, &file);
1250 if (status != 0)
1251 return send_response(sock, RESP_ERR,
1252 "Usage: PENDING <filename>\n");
1254 status = has_privilege(sock, PRIV_HIGH);
1255 if (status <= 0)
1256 return status;
1258 pthread_mutex_lock(&cache_lock);
1259 ci = g_tree_lookup(cache_tree, file);
1260 if (ci == NULL)
1261 {
1262 pthread_mutex_unlock(&cache_lock);
1263 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1264 }
1266 for (int i=0; i < ci->values_num; i++)
1267 add_response_info(sock, "%s\n", ci->values[i]);
1269 pthread_mutex_unlock(&cache_lock);
1270 return send_response(sock, RESP_OK, "updates pending\n");
1271 } /* }}} static int handle_request_pending */
1273 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1274 char *buffer, size_t buffer_size)
1275 {
1276 int status;
1277 char *file;
1279 status = buffer_get_field(&buffer, &buffer_size, &file);
1280 if (status != 0)
1281 return send_response(sock, RESP_ERR,
1282 "Usage: FORGET <filename>\n");
1284 status = has_privilege(sock, PRIV_HIGH);
1285 if (status <= 0)
1286 return status;
1288 if (!check_file_access(file, sock)) return 0;
1290 pthread_mutex_lock(&cache_lock);
1291 status = forget_file(file);
1292 pthread_mutex_unlock(&cache_lock);
1294 if (status == 0)
1295 {
1296 if (sock != NULL)
1297 journal_write("forget", file);
1299 return send_response(sock, RESP_OK, "Gone!\n");
1300 }
1301 else
1302 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1303 status < 0 ? "Internal error" : rrd_strerror(status));
1305 /* NOTREACHED */
1306 assert(1==0);
1307 } /* }}} static int handle_request_forget */
1309 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1310 char *buffer, size_t buffer_size)
1311 {
1312 char *file;
1313 int values_num = 0;
1314 int status;
1315 char orig_buf[CMD_MAX];
1317 time_t now;
1318 cache_item_t *ci;
1320 now = time (NULL);
1322 status = has_privilege(sock, PRIV_HIGH);
1323 if (status <= 0)
1324 return status;
1326 /* save it for the journal later */
1327 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1329 status = buffer_get_field (&buffer, &buffer_size, &file);
1330 if (status != 0)
1331 return send_response(sock, RESP_ERR,
1332 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1334 pthread_mutex_lock(&stats_lock);
1335 stats_updates_received++;
1336 pthread_mutex_unlock(&stats_lock);
1338 if (!check_file_access(file, sock)) return 0;
1340 pthread_mutex_lock (&cache_lock);
1341 ci = g_tree_lookup (cache_tree, file);
1343 if (ci == NULL) /* {{{ */
1344 {
1345 struct stat statbuf;
1347 /* don't hold the lock while we setup; stat(2) might block */
1348 pthread_mutex_unlock(&cache_lock);
1350 memset (&statbuf, 0, sizeof (statbuf));
1351 status = stat (file, &statbuf);
1352 if (status != 0)
1353 {
1354 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1356 status = errno;
1357 if (status == ENOENT)
1358 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1359 else
1360 return send_response(sock, RESP_ERR,
1361 "stat failed with error %i.\n", status);
1362 }
1363 if (!S_ISREG (statbuf.st_mode))
1364 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1366 if (access(file, R_OK|W_OK) != 0)
1367 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1368 file, rrd_strerror(errno));
1370 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1371 if (ci == NULL)
1372 {
1373 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1375 return send_response(sock, RESP_ERR, "malloc failed.\n");
1376 }
1377 memset (ci, 0, sizeof (cache_item_t));
1379 ci->file = strdup (file);
1380 if (ci->file == NULL)
1381 {
1382 free (ci);
1383 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1385 return send_response(sock, RESP_ERR, "strdup failed.\n");
1386 }
1388 wipe_ci_values(ci, now);
1389 ci->flags = CI_FLAGS_IN_TREE;
1391 pthread_mutex_lock(&cache_lock);
1392 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1393 } /* }}} */
1394 assert (ci != NULL);
1396 /* don't re-write updates in replay mode */
1397 if (sock != NULL)
1398 journal_write("update", orig_buf);
1400 while (buffer_size > 0)
1401 {
1402 char **temp;
1403 char *value;
1405 status = buffer_get_field (&buffer, &buffer_size, &value);
1406 if (status != 0)
1407 {
1408 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1409 break;
1410 }
1412 temp = (char **) realloc (ci->values,
1413 sizeof (char *) * (ci->values_num + 1));
1414 if (temp == NULL)
1415 {
1416 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1417 continue;
1418 }
1419 ci->values = temp;
1421 ci->values[ci->values_num] = strdup (value);
1422 if (ci->values[ci->values_num] == NULL)
1423 {
1424 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1425 continue;
1426 }
1427 ci->values_num++;
1429 values_num++;
1430 }
1432 if (((now - ci->last_flush_time) >= config_write_interval)
1433 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1434 && (ci->values_num > 0))
1435 {
1436 enqueue_cache_item (ci, TAIL);
1437 }
1439 pthread_mutex_unlock (&cache_lock);
1441 if (values_num < 1)
1442 return send_response(sock, RESP_ERR, "No values updated.\n");
1443 else
1444 return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1446 /* NOTREACHED */
1447 assert(1==0);
1449 } /* }}} int handle_request_update */
1451 /* we came across a "WROTE" entry during journal replay.
1452 * throw away any values that we have accumulated for this file
1453 */
1454 static int handle_request_wrote (const char *buffer) /* {{{ */
1455 {
1456 int i;
1457 cache_item_t *ci;
1458 const char *file = buffer;
1460 pthread_mutex_lock(&cache_lock);
1462 ci = g_tree_lookup(cache_tree, file);
1463 if (ci == NULL)
1464 {
1465 pthread_mutex_unlock(&cache_lock);
1466 return (0);
1467 }
1469 if (ci->values)
1470 {
1471 for (i=0; i < ci->values_num; i++)
1472 free(ci->values[i]);
1474 free(ci->values);
1475 }
1477 wipe_ci_values(ci, time(NULL));
1478 remove_from_queue(ci);
1480 pthread_mutex_unlock(&cache_lock);
1481 return (0);
1482 } /* }}} int handle_request_wrote */
1484 /* start "BATCH" processing */
1485 static int batch_start (listen_socket_t *sock) /* {{{ */
1486 {
1487 int status;
1488 if (sock->batch_mode)
1489 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1491 status = send_response(sock, RESP_OK,
1492 "Go ahead. End with dot '.' on its own line.\n");
1493 sock->batch_mode = 1;
1494 sock->batch_cmd = 0;
1496 return status;
1497 } /* }}} static int batch_start */
1499 /* finish "BATCH" processing and return results to the client */
1500 static int batch_done (listen_socket_t *sock) /* {{{ */
1501 {
1502 assert(sock->batch_mode);
1503 sock->batch_mode = 0;
1504 sock->batch_cmd = 0;
1505 return send_response(sock, RESP_OK, "errors\n");
1506 } /* }}} static int batch_done */
1508 /* if sock==NULL, we are in journal replay mode */
1509 static int handle_request (listen_socket_t *sock, /* {{{ */
1510 char *buffer, size_t buffer_size)
1511 {
1512 char *buffer_ptr;
1513 char *command;
1514 int status;
1516 assert (buffer[buffer_size - 1] == '\0');
1518 buffer_ptr = buffer;
1519 command = NULL;
1520 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1521 if (status != 0)
1522 {
1523 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1524 return (-1);
1525 }
1527 if (sock != NULL && sock->batch_mode)
1528 sock->batch_cmd++;
1530 if (strcasecmp (command, "update") == 0)
1531 return (handle_request_update (sock, buffer_ptr, buffer_size));
1532 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1533 {
1534 /* this is only valid in replay mode */
1535 return (handle_request_wrote (buffer_ptr));
1536 }
1537 else if (strcasecmp (command, "flush") == 0)
1538 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1539 else if (strcasecmp (command, "flushall") == 0)
1540 return (handle_request_flushall(sock));
1541 else if (strcasecmp (command, "pending") == 0)
1542 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1543 else if (strcasecmp (command, "forget") == 0)
1544 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1545 else if (strcasecmp (command, "stats") == 0)
1546 return (handle_request_stats (sock));
1547 else if (strcasecmp (command, "help") == 0)
1548 return (handle_request_help (sock, buffer_ptr, buffer_size));
1549 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1550 return batch_start(sock);
1551 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1552 return batch_done(sock);
1553 else
1554 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1556 /* NOTREACHED */
1557 assert(1==0);
1558 } /* }}} int handle_request */
1560 /* MUST NOT hold journal_lock before calling this */
1561 static void journal_rotate(void) /* {{{ */
1562 {
1563 FILE *old_fh = NULL;
1564 int new_fd;
1566 if (journal_cur == NULL || journal_old == NULL)
1567 return;
1569 pthread_mutex_lock(&journal_lock);
1571 /* we rotate this way (rename before close) so that the we can release
1572 * the journal lock as fast as possible. Journal writes to the new
1573 * journal can proceed immediately after the new file is opened. The
1574 * fclose can then block without affecting new updates.
1575 */
1576 if (journal_fh != NULL)
1577 {
1578 old_fh = journal_fh;
1579 journal_fh = NULL;
1580 rename(journal_cur, journal_old);
1581 ++stats_journal_rotate;
1582 }
1584 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1585 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1586 if (new_fd >= 0)
1587 {
1588 journal_fh = fdopen(new_fd, "a");
1589 if (journal_fh == NULL)
1590 close(new_fd);
1591 }
1593 pthread_mutex_unlock(&journal_lock);
1595 if (old_fh != NULL)
1596 fclose(old_fh);
1598 if (journal_fh == NULL)
1599 {
1600 RRDD_LOG(LOG_CRIT,
1601 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1602 journal_cur, rrd_strerror(errno));
1604 RRDD_LOG(LOG_ERR,
1605 "JOURNALING DISABLED: All values will be flushed at shutdown");
1606 config_flush_at_shutdown = 1;
1607 }
1609 } /* }}} static void journal_rotate */
1611 static void journal_done(void) /* {{{ */
1612 {
1613 if (journal_cur == NULL)
1614 return;
1616 pthread_mutex_lock(&journal_lock);
1617 if (journal_fh != NULL)
1618 {
1619 fclose(journal_fh);
1620 journal_fh = NULL;
1621 }
1623 if (config_flush_at_shutdown)
1624 {
1625 RRDD_LOG(LOG_INFO, "removing journals");
1626 unlink(journal_old);
1627 unlink(journal_cur);
1628 }
1629 else
1630 {
1631 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1632 "journals will be used at next startup");
1633 }
1635 pthread_mutex_unlock(&journal_lock);
1637 } /* }}} static void journal_done */
1639 static int journal_write(char *cmd, char *args) /* {{{ */
1640 {
1641 int chars;
1643 if (journal_fh == NULL)
1644 return 0;
1646 pthread_mutex_lock(&journal_lock);
1647 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1648 pthread_mutex_unlock(&journal_lock);
1650 if (chars > 0)
1651 {
1652 pthread_mutex_lock(&stats_lock);
1653 stats_journal_bytes += chars;
1654 pthread_mutex_unlock(&stats_lock);
1655 }
1657 return chars;
1658 } /* }}} static int journal_write */
1660 static int journal_replay (const char *file) /* {{{ */
1661 {
1662 FILE *fh;
1663 int entry_cnt = 0;
1664 int fail_cnt = 0;
1665 uint64_t line = 0;
1666 char entry[CMD_MAX];
1668 if (file == NULL) return 0;
1670 {
1671 char *reason;
1672 int status = 0;
1673 struct stat statbuf;
1675 memset(&statbuf, 0, sizeof(statbuf));
1676 if (stat(file, &statbuf) != 0)
1677 {
1678 if (errno == ENOENT)
1679 return 0;
1681 reason = "stat error";
1682 status = errno;
1683 }
1684 else if (!S_ISREG(statbuf.st_mode))
1685 {
1686 reason = "not a regular file";
1687 status = EPERM;
1688 }
1689 if (statbuf.st_uid != daemon_uid)
1690 {
1691 reason = "not owned by daemon user";
1692 status = EACCES;
1693 }
1694 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1695 {
1696 reason = "must not be user/group writable";
1697 status = EACCES;
1698 }
1700 if (status != 0)
1701 {
1702 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1703 file, rrd_strerror(status), reason);
1704 return 0;
1705 }
1706 }
1708 fh = fopen(file, "r");
1709 if (fh == NULL)
1710 {
1711 if (errno != ENOENT)
1712 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1713 file, rrd_strerror(errno));
1714 return 0;
1715 }
1716 else
1717 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1719 while(!feof(fh))
1720 {
1721 size_t entry_len;
1723 ++line;
1724 if (fgets(entry, sizeof(entry), fh) == NULL)
1725 break;
1726 entry_len = strlen(entry);
1728 /* check \n termination in case journal writing crashed mid-line */
1729 if (entry_len == 0)
1730 continue;
1731 else if (entry[entry_len - 1] != '\n')
1732 {
1733 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1734 ++fail_cnt;
1735 continue;
1736 }
1738 entry[entry_len - 1] = '\0';
1740 if (handle_request(NULL, entry, entry_len) == 0)
1741 ++entry_cnt;
1742 else
1743 ++fail_cnt;
1744 }
1746 fclose(fh);
1748 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1749 entry_cnt, fail_cnt);
1751 return entry_cnt > 0 ? 1 : 0;
1752 } /* }}} static int journal_replay */
1754 static void journal_init(void) /* {{{ */
1755 {
1756 int had_journal = 0;
1758 if (journal_cur == NULL) return;
1760 pthread_mutex_lock(&journal_lock);
1762 RRDD_LOG(LOG_INFO, "checking for journal files");
1764 had_journal += journal_replay(journal_old);
1765 had_journal += journal_replay(journal_cur);
1767 /* it must have been a crash. start a flush */
1768 if (had_journal && config_flush_at_shutdown)
1769 flush_old_values(-1);
1771 pthread_mutex_unlock(&journal_lock);
1772 journal_rotate();
1774 RRDD_LOG(LOG_INFO, "journal processing complete");
1776 } /* }}} static void journal_init */
1778 static void close_connection(listen_socket_t *sock)
1779 {
1780 close(sock->fd) ; sock->fd = -1;
1781 free(sock->rbuf); sock->rbuf = NULL;
1782 free(sock->wbuf); sock->wbuf = NULL;
1784 free(sock);
1785 }
1787 static void *connection_thread_main (void *args) /* {{{ */
1788 {
1789 pthread_t self;
1790 listen_socket_t *sock;
1791 int i;
1792 int fd;
1794 sock = (listen_socket_t *) args;
1795 fd = sock->fd;
1797 /* init read buffers */
1798 sock->next_read = sock->next_cmd = 0;
1799 sock->rbuf = malloc(RBUF_SIZE);
1800 if (sock->rbuf == NULL)
1801 {
1802 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1803 close_connection(sock);
1804 return NULL;
1805 }
1807 pthread_mutex_lock (&connection_threads_lock);
1808 {
1809 pthread_t *temp;
1811 temp = (pthread_t *) realloc (connection_threads,
1812 sizeof (pthread_t) * (connection_threads_num + 1));
1813 if (temp == NULL)
1814 {
1815 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1816 }
1817 else
1818 {
1819 connection_threads = temp;
1820 connection_threads[connection_threads_num] = pthread_self ();
1821 connection_threads_num++;
1822 }
1823 }
1824 pthread_mutex_unlock (&connection_threads_lock);
1826 while (do_shutdown == 0)
1827 {
1828 char *cmd;
1829 ssize_t cmd_len;
1830 ssize_t rbytes;
1832 struct pollfd pollfd;
1833 int status;
1835 pollfd.fd = fd;
1836 pollfd.events = POLLIN | POLLPRI;
1837 pollfd.revents = 0;
1839 status = poll (&pollfd, 1, /* timeout = */ 500);
1840 if (do_shutdown)
1841 break;
1842 else if (status == 0) /* timeout */
1843 continue;
1844 else if (status < 0) /* error */
1845 {
1846 status = errno;
1847 if (status == EINTR)
1848 continue;
1849 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1850 continue;
1851 }
1853 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1854 {
1855 close_connection(sock);
1856 break;
1857 }
1858 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1859 {
1860 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1861 "poll(2) returned something unexpected: %#04hx",
1862 pollfd.revents);
1863 close_connection(sock);
1864 break;
1865 }
1867 rbytes = read(fd, sock->rbuf + sock->next_read,
1868 RBUF_SIZE - sock->next_read);
1869 if (rbytes < 0)
1870 {
1871 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1872 break;
1873 }
1874 else if (rbytes == 0)
1875 break; /* eof */
1877 sock->next_read += rbytes;
1879 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1880 {
1881 status = handle_request (sock, cmd, cmd_len+1);
1882 if (status != 0)
1883 goto out_close;
1884 }
1885 }
1887 out_close:
1888 close_connection(sock);
1890 self = pthread_self ();
1891 /* Remove this thread from the connection threads list */
1892 pthread_mutex_lock (&connection_threads_lock);
1893 /* Find out own index in the array */
1894 for (i = 0; i < connection_threads_num; i++)
1895 if (pthread_equal (connection_threads[i], self) != 0)
1896 break;
1897 assert (i < connection_threads_num);
1899 /* Move the trailing threads forward. */
1900 if (i < (connection_threads_num - 1))
1901 {
1902 memmove (connection_threads + i,
1903 connection_threads + i + 1,
1904 sizeof (pthread_t) * (connection_threads_num - i - 1));
1905 }
1907 connection_threads_num--;
1908 pthread_mutex_unlock (&connection_threads_lock);
1910 return (NULL);
1911 } /* }}} void *connection_thread_main */
1913 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1914 {
1915 int fd;
1916 struct sockaddr_un sa;
1917 listen_socket_t *temp;
1918 int status;
1919 const char *path;
1921 path = sock->addr;
1922 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1923 path += strlen("unix:");
1925 temp = (listen_socket_t *) realloc (listen_fds,
1926 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1927 if (temp == NULL)
1928 {
1929 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1930 return (-1);
1931 }
1932 listen_fds = temp;
1933 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1935 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1936 if (fd < 0)
1937 {
1938 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1939 return (-1);
1940 }
1942 memset (&sa, 0, sizeof (sa));
1943 sa.sun_family = AF_UNIX;
1944 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1946 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1947 if (status != 0)
1948 {
1949 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1950 close (fd);
1951 unlink (path);
1952 return (-1);
1953 }
1955 status = listen (fd, /* backlog = */ 10);
1956 if (status != 0)
1957 {
1958 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1959 close (fd);
1960 unlink (path);
1961 return (-1);
1962 }
1964 listen_fds[listen_fds_num].fd = fd;
1965 listen_fds[listen_fds_num].family = PF_UNIX;
1966 strncpy(listen_fds[listen_fds_num].addr, path,
1967 sizeof (listen_fds[listen_fds_num].addr) - 1);
1968 listen_fds_num++;
1970 return (0);
1971 } /* }}} int open_listen_socket_unix */
1973 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1974 {
1975 struct addrinfo ai_hints;
1976 struct addrinfo *ai_res;
1977 struct addrinfo *ai_ptr;
1978 char addr_copy[NI_MAXHOST];
1979 char *addr;
1980 char *port;
1981 int status;
1983 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1984 addr_copy[sizeof (addr_copy) - 1] = 0;
1985 addr = addr_copy;
1987 memset (&ai_hints, 0, sizeof (ai_hints));
1988 ai_hints.ai_flags = 0;
1989 #ifdef AI_ADDRCONFIG
1990 ai_hints.ai_flags |= AI_ADDRCONFIG;
1991 #endif
1992 ai_hints.ai_family = AF_UNSPEC;
1993 ai_hints.ai_socktype = SOCK_STREAM;
1995 port = NULL;
1996 if (*addr == '[') /* IPv6+port format */
1997 {
1998 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1999 addr++;
2001 port = strchr (addr, ']');
2002 if (port == NULL)
2003 {
2004 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2005 sock->addr);
2006 return (-1);
2007 }
2008 *port = 0;
2009 port++;
2011 if (*port == ':')
2012 port++;
2013 else if (*port == 0)
2014 port = NULL;
2015 else
2016 {
2017 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2018 port);
2019 return (-1);
2020 }
2021 } /* if (*addr = ']') */
2022 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2023 {
2024 port = rindex(addr, ':');
2025 if (port != NULL)
2026 {
2027 *port = 0;
2028 port++;
2029 }
2030 }
2031 ai_res = NULL;
2032 status = getaddrinfo (addr,
2033 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2034 &ai_hints, &ai_res);
2035 if (status != 0)
2036 {
2037 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2038 "%s", addr, gai_strerror (status));
2039 return (-1);
2040 }
2042 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2043 {
2044 int fd;
2045 listen_socket_t *temp;
2046 int one = 1;
2048 temp = (listen_socket_t *) realloc (listen_fds,
2049 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2050 if (temp == NULL)
2051 {
2052 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2053 continue;
2054 }
2055 listen_fds = temp;
2056 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2058 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2059 if (fd < 0)
2060 {
2061 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2062 continue;
2063 }
2065 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2067 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2068 if (status != 0)
2069 {
2070 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2071 close (fd);
2072 continue;
2073 }
2075 status = listen (fd, /* backlog = */ 10);
2076 if (status != 0)
2077 {
2078 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2079 close (fd);
2080 return (-1);
2081 }
2083 listen_fds[listen_fds_num].fd = fd;
2084 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2085 listen_fds_num++;
2086 } /* for (ai_ptr) */
2088 return (0);
2089 } /* }}} static int open_listen_socket_network */
2091 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2092 {
2093 assert(sock != NULL);
2094 assert(sock->addr != NULL);
2096 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2097 || sock->addr[0] == '/')
2098 return (open_listen_socket_unix(sock));
2099 else
2100 return (open_listen_socket_network(sock));
2101 } /* }}} int open_listen_socket */
2103 static int close_listen_sockets (void) /* {{{ */
2104 {
2105 size_t i;
2107 for (i = 0; i < listen_fds_num; i++)
2108 {
2109 close (listen_fds[i].fd);
2111 if (listen_fds[i].family == PF_UNIX)
2112 unlink(listen_fds[i].addr);
2113 }
2115 free (listen_fds);
2116 listen_fds = NULL;
2117 listen_fds_num = 0;
2119 return (0);
2120 } /* }}} int close_listen_sockets */
2122 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2123 {
2124 struct pollfd *pollfds;
2125 int pollfds_num;
2126 int status;
2127 int i;
2129 for (i = 0; i < config_listen_address_list_len; i++)
2130 open_listen_socket (config_listen_address_list[i]);
2132 if (config_listen_address_list_len < 1)
2133 {
2134 listen_socket_t sock;
2135 memset(&sock, 0, sizeof(sock));
2136 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2137 open_listen_socket (&sock);
2138 }
2140 if (listen_fds_num < 1)
2141 {
2142 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2143 "could be opened. Sorry.");
2144 return (NULL);
2145 }
2147 pollfds_num = listen_fds_num;
2148 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2149 if (pollfds == NULL)
2150 {
2151 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2152 return (NULL);
2153 }
2154 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2156 RRDD_LOG(LOG_INFO, "listening for connections");
2158 while (do_shutdown == 0)
2159 {
2160 assert (pollfds_num == ((int) listen_fds_num));
2161 for (i = 0; i < pollfds_num; i++)
2162 {
2163 pollfds[i].fd = listen_fds[i].fd;
2164 pollfds[i].events = POLLIN | POLLPRI;
2165 pollfds[i].revents = 0;
2166 }
2168 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2169 if (do_shutdown)
2170 break;
2171 else if (status == 0) /* timeout */
2172 continue;
2173 else if (status < 0) /* error */
2174 {
2175 status = errno;
2176 if (status != EINTR)
2177 {
2178 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2179 }
2180 continue;
2181 }
2183 for (i = 0; i < pollfds_num; i++)
2184 {
2185 listen_socket_t *client_sock;
2186 struct sockaddr_storage client_sa;
2187 socklen_t client_sa_size;
2188 pthread_t tid;
2189 pthread_attr_t attr;
2191 if (pollfds[i].revents == 0)
2192 continue;
2194 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2195 {
2196 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2197 "poll(2) returned something unexpected for listen FD #%i.",
2198 pollfds[i].fd);
2199 continue;
2200 }
2202 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2203 if (client_sock == NULL)
2204 {
2205 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2206 continue;
2207 }
2208 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2210 client_sa_size = sizeof (client_sa);
2211 client_sock->fd = accept (pollfds[i].fd,
2212 (struct sockaddr *) &client_sa, &client_sa_size);
2213 if (client_sock->fd < 0)
2214 {
2215 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2216 free(client_sock);
2217 continue;
2218 }
2220 pthread_attr_init (&attr);
2221 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2223 status = pthread_create (&tid, &attr, connection_thread_main,
2224 client_sock);
2225 if (status != 0)
2226 {
2227 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2228 close_connection(client_sock);
2229 continue;
2230 }
2231 } /* for (pollfds_num) */
2232 } /* while (do_shutdown == 0) */
2234 RRDD_LOG(LOG_INFO, "starting shutdown");
2236 close_listen_sockets ();
2238 pthread_mutex_lock (&connection_threads_lock);
2239 while (connection_threads_num > 0)
2240 {
2241 pthread_t wait_for;
2243 wait_for = connection_threads[0];
2245 pthread_mutex_unlock (&connection_threads_lock);
2246 pthread_join (wait_for, /* retval = */ NULL);
2247 pthread_mutex_lock (&connection_threads_lock);
2248 }
2249 pthread_mutex_unlock (&connection_threads_lock);
2251 return (NULL);
2252 } /* }}} void *listen_thread_main */
2254 static int daemonize (void) /* {{{ */
2255 {
2256 int status;
2257 int fd;
2258 char *base_dir;
2260 daemon_uid = geteuid();
2262 fd = open_pidfile();
2263 if (fd < 0) return fd;
2265 if (!stay_foreground)
2266 {
2267 pid_t child;
2269 child = fork ();
2270 if (child < 0)
2271 {
2272 fprintf (stderr, "daemonize: fork(2) failed.\n");
2273 return (-1);
2274 }
2275 else if (child > 0)
2276 {
2277 return (1);
2278 }
2280 /* Become session leader */
2281 setsid ();
2283 /* Open the first three file descriptors to /dev/null */
2284 close (2);
2285 close (1);
2286 close (0);
2288 open ("/dev/null", O_RDWR);
2289 dup (0);
2290 dup (0);
2291 } /* if (!stay_foreground) */
2293 /* Change into the /tmp directory. */
2294 base_dir = (config_base_dir != NULL)
2295 ? config_base_dir
2296 : "/tmp";
2297 status = chdir (base_dir);
2298 if (status != 0)
2299 {
2300 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2301 return (-1);
2302 }
2304 install_signal_handlers();
2306 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2307 RRDD_LOG(LOG_INFO, "starting up");
2309 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2310 if (cache_tree == NULL)
2311 {
2312 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2313 return (-1);
2314 }
2316 status = write_pidfile (fd);
2317 return status;
2318 } /* }}} int daemonize */
2320 static int cleanup (void) /* {{{ */
2321 {
2322 do_shutdown++;
2324 pthread_cond_signal (&cache_cond);
2325 pthread_join (queue_thread, /* return = */ NULL);
2327 remove_pidfile ();
2329 RRDD_LOG(LOG_INFO, "goodbye");
2330 closelog ();
2332 return (0);
2333 } /* }}} int cleanup */
2335 static int read_options (int argc, char **argv) /* {{{ */
2336 {
2337 int option;
2338 int status = 0;
2340 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2341 {
2342 switch (option)
2343 {
2344 case 'g':
2345 stay_foreground=1;
2346 break;
2348 case 'L':
2349 case 'l':
2350 {
2351 listen_socket_t **temp;
2352 listen_socket_t *new;
2354 new = malloc(sizeof(listen_socket_t));
2355 if (new == NULL)
2356 {
2357 fprintf(stderr, "read_options: malloc failed.\n");
2358 return(2);
2359 }
2360 memset(new, 0, sizeof(listen_socket_t));
2362 temp = (listen_socket_t **) realloc (config_listen_address_list,
2363 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2364 if (temp == NULL)
2365 {
2366 fprintf (stderr, "read_options: realloc failed.\n");
2367 return (2);
2368 }
2369 config_listen_address_list = temp;
2371 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2372 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2374 temp[config_listen_address_list_len] = new;
2375 config_listen_address_list_len++;
2376 }
2377 break;
2379 case 'f':
2380 {
2381 int temp;
2383 temp = atoi (optarg);
2384 if (temp > 0)
2385 config_flush_interval = temp;
2386 else
2387 {
2388 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2389 status = 3;
2390 }
2391 }
2392 break;
2394 case 'w':
2395 {
2396 int temp;
2398 temp = atoi (optarg);
2399 if (temp > 0)
2400 config_write_interval = temp;
2401 else
2402 {
2403 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2404 status = 2;
2405 }
2406 }
2407 break;
2409 case 'z':
2410 {
2411 int temp;
2413 temp = atoi(optarg);
2414 if (temp > 0)
2415 config_write_jitter = temp;
2416 else
2417 {
2418 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2419 status = 2;
2420 }
2422 break;
2423 }
2425 case 'B':
2426 config_write_base_only = 1;
2427 break;
2429 case 'b':
2430 {
2431 size_t len;
2433 if (config_base_dir != NULL)
2434 free (config_base_dir);
2435 config_base_dir = strdup (optarg);
2436 if (config_base_dir == NULL)
2437 {
2438 fprintf (stderr, "read_options: strdup failed.\n");
2439 return (3);
2440 }
2442 len = strlen (config_base_dir);
2443 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2444 {
2445 config_base_dir[len - 1] = 0;
2446 len--;
2447 }
2449 if (len < 1)
2450 {
2451 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2452 return (4);
2453 }
2455 _config_base_dir_len = len;
2456 }
2457 break;
2459 case 'p':
2460 {
2461 if (config_pid_file != NULL)
2462 free (config_pid_file);
2463 config_pid_file = strdup (optarg);
2464 if (config_pid_file == NULL)
2465 {
2466 fprintf (stderr, "read_options: strdup failed.\n");
2467 return (3);
2468 }
2469 }
2470 break;
2472 case 'F':
2473 config_flush_at_shutdown = 1;
2474 break;
2476 case 'j':
2477 {
2478 struct stat statbuf;
2479 const char *dir = optarg;
2481 status = stat(dir, &statbuf);
2482 if (status != 0)
2483 {
2484 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2485 return 6;
2486 }
2488 if (!S_ISDIR(statbuf.st_mode)
2489 || access(dir, R_OK|W_OK|X_OK) != 0)
2490 {
2491 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2492 errno ? rrd_strerror(errno) : "");
2493 return 6;
2494 }
2496 journal_cur = malloc(PATH_MAX + 1);
2497 journal_old = malloc(PATH_MAX + 1);
2498 if (journal_cur == NULL || journal_old == NULL)
2499 {
2500 fprintf(stderr, "malloc failure for journal files\n");
2501 return 6;
2502 }
2503 else
2504 {
2505 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2506 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2507 }
2508 }
2509 break;
2511 case 'h':
2512 case '?':
2513 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2514 "\n"
2515 "Usage: rrdcached [options]\n"
2516 "\n"
2517 "Valid options are:\n"
2518 " -l <address> Socket address to listen to.\n"
2519 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2520 " -w <seconds> Interval in which to write data.\n"
2521 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2522 " -f <seconds> Interval in which to flush dead data.\n"
2523 " -p <file> Location of the PID-file.\n"
2524 " -b <dir> Base directory to change to.\n"
2525 " -B Restrict file access to paths within -b <dir>\n"
2526 " -g Do not fork and run in the foreground.\n"
2527 " -j <dir> Directory in which to create the journal files.\n"
2528 " -F Always flush all updates at shutdown\n"
2529 "\n"
2530 "For more information and a detailed description of all options "
2531 "please refer\n"
2532 "to the rrdcached(1) manual page.\n",
2533 VERSION);
2534 status = -1;
2535 break;
2536 } /* switch (option) */
2537 } /* while (getopt) */
2539 /* advise the user when values are not sane */
2540 if (config_flush_interval < 2 * config_write_interval)
2541 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2542 " 2x write interval (-w) !\n");
2543 if (config_write_jitter > config_write_interval)
2544 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2545 " write interval (-w) !\n");
2547 if (config_write_base_only && config_base_dir == NULL)
2548 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2549 " Consult the rrdcached documentation\n");
2551 if (journal_cur == NULL)
2552 config_flush_at_shutdown = 1;
2554 return (status);
2555 } /* }}} int read_options */
2557 int main (int argc, char **argv)
2558 {
2559 int status;
2561 status = read_options (argc, argv);
2562 if (status != 0)
2563 {
2564 if (status < 0)
2565 status = 0;
2566 return (status);
2567 }
2569 status = daemonize ();
2570 if (status == 1)
2571 {
2572 struct sigaction sigchld;
2574 memset (&sigchld, 0, sizeof (sigchld));
2575 sigchld.sa_handler = SIG_IGN;
2576 sigaction (SIGCHLD, &sigchld, NULL);
2578 return (0);
2579 }
2580 else if (status != 0)
2581 {
2582 fprintf (stderr, "daemonize failed, exiting.\n");
2583 return (1);
2584 }
2586 journal_init();
2588 /* start the queue thread */
2589 memset (&queue_thread, 0, sizeof (queue_thread));
2590 status = pthread_create (&queue_thread,
2591 NULL, /* attr */
2592 queue_thread_main,
2593 NULL); /* args */
2594 if (status != 0)
2595 {
2596 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2597 cleanup();
2598 return (1);
2599 }
2601 listen_thread_main (NULL);
2602 cleanup ();
2604 return (0);
2605 } /* int main */
2607 /*
2608 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2609 */