b2cee95b6d7d588e502e809027abb510370d9514
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
113 {
114 int fd;
115 char addr[PATH_MAX + 1];
116 int family;
117 socket_privilege privilege;
119 /* state for BATCH processing */
120 int batch_mode;
121 int batch_cmd;
123 /* buffered IO */
124 char *rbuf;
125 off_t next_cmd;
126 off_t next_read;
128 char *wbuf;
129 ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137 char *file;
138 char **values;
139 int values_num;
140 time_t last_flush_time;
141 time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144 int flags;
145 pthread_cond_t flushed;
146 cache_item_t *prev;
147 cache_item_t *next;
148 };
150 struct callback_flush_data_s
151 {
152 time_t now;
153 time_t abs_timeout;
154 char **keys;
155 size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
159 enum queue_side_e
160 {
161 HEAD,
162 TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
170 /*
171 * Variables
172 */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
187 /* Cache stuff */
188 static GTree *cache_tree = NULL;
189 static cache_item_t *cache_queue_head = NULL;
190 static cache_item_t *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
224 /*
225 * Functions
226 */
227 static void sig_common (const char *sig) /* {{{ */
228 {
229 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230 do_shutdown++;
231 pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
235 {
236 sig_common("INT");
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
240 {
241 sig_common("TERM");
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
245 {
246 config_flush_at_shutdown = 1;
247 sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
251 {
252 config_flush_at_shutdown = 0;
253 sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
257 {
258 /* These structures are static, because `sigaction' behaves weird if the are
259 * overwritten.. */
260 static struct sigaction sa_int;
261 static struct sigaction sa_term;
262 static struct sigaction sa_pipe;
263 static struct sigaction sa_usr1;
264 static struct sigaction sa_usr2;
266 /* Install signal handlers */
267 memset (&sa_int, 0, sizeof (sa_int));
268 sa_int.sa_handler = sig_int_handler;
269 sigaction (SIGINT, &sa_int, NULL);
271 memset (&sa_term, 0, sizeof (sa_term));
272 sa_term.sa_handler = sig_term_handler;
273 sigaction (SIGTERM, &sa_term, NULL);
275 memset (&sa_pipe, 0, sizeof (sa_pipe));
276 sa_pipe.sa_handler = SIG_IGN;
277 sigaction (SIGPIPE, &sa_pipe, NULL);
279 memset (&sa_pipe, 0, sizeof (sa_usr1));
280 sa_usr1.sa_handler = sig_usr1_handler;
281 sigaction (SIGUSR1, &sa_usr1, NULL);
283 memset (&sa_usr2, 0, sizeof (sa_usr2));
284 sa_usr2.sa_handler = sig_usr2_handler;
285 sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(void) /* {{{ */
290 {
291 int fd;
292 char *file;
294 file = (config_pid_file != NULL)
295 ? config_pid_file
296 : LOCALSTATEDIR "/run/rrdcached.pid";
298 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
299 if (fd < 0)
300 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
301 file, rrd_strerror(errno));
303 return(fd);
304 } /* }}} static int open_pidfile */
306 static int write_pidfile (int fd) /* {{{ */
307 {
308 pid_t pid;
309 FILE *fh;
311 pid = getpid ();
313 fh = fdopen (fd, "w");
314 if (fh == NULL)
315 {
316 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
317 close(fd);
318 return (-1);
319 }
321 fprintf (fh, "%i\n", (int) pid);
322 fclose (fh);
324 return (0);
325 } /* }}} int write_pidfile */
327 static int remove_pidfile (void) /* {{{ */
328 {
329 char *file;
330 int status;
332 file = (config_pid_file != NULL)
333 ? config_pid_file
334 : LOCALSTATEDIR "/run/rrdcached.pid";
336 status = unlink (file);
337 if (status == 0)
338 return (0);
339 return (errno);
340 } /* }}} int remove_pidfile */
342 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
343 {
344 char *eol;
346 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
347 sock->next_read - sock->next_cmd);
349 if (eol == NULL)
350 {
351 /* no commands left, move remainder back to front of rbuf */
352 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
353 sock->next_read - sock->next_cmd);
354 sock->next_read -= sock->next_cmd;
355 sock->next_cmd = 0;
356 *len = 0;
357 return NULL;
358 }
359 else
360 {
361 char *cmd = sock->rbuf + sock->next_cmd;
362 *eol = '\0';
364 sock->next_cmd = eol - sock->rbuf + 1;
366 if (eol > sock->rbuf && *(eol-1) == '\r')
367 *(--eol) = '\0'; /* handle "\r\n" EOL */
369 *len = eol - cmd;
371 return cmd;
372 }
374 /* NOTREACHED */
375 assert(1==0);
376 }
378 /* add the characters directly to the write buffer */
379 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
380 {
381 char *new_buf;
383 assert(sock != NULL);
385 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
386 if (new_buf == NULL)
387 {
388 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
389 return -1;
390 }
392 strncpy(new_buf + sock->wbuf_len, str, len + 1);
394 sock->wbuf = new_buf;
395 sock->wbuf_len += len;
397 return 0;
398 } /* }}} static int add_to_wbuf */
400 /* add the text to the "extra" info that's sent after the status line */
401 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
402 {
403 va_list argp;
404 char buffer[CMD_MAX];
405 int len;
407 if (sock == NULL) return 0; /* journal replay mode */
408 if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
410 va_start(argp, fmt);
411 #ifdef HAVE_VSNPRINTF
412 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
413 #else
414 len = vsprintf(buffer, fmt, argp);
415 #endif
416 va_end(argp);
417 if (len < 0)
418 {
419 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
420 return -1;
421 }
423 return add_to_wbuf(sock, buffer, len);
424 } /* }}} static int add_response_info */
426 static int count_lines(char *str) /* {{{ */
427 {
428 int lines = 0;
430 if (str != NULL)
431 {
432 while ((str = strchr(str, '\n')) != NULL)
433 {
434 ++lines;
435 ++str;
436 }
437 }
439 return lines;
440 } /* }}} static int count_lines */
442 /* send the response back to the user.
443 * returns 0 on success, -1 on error
444 * write buffer is always zeroed after this call */
445 static int send_response (listen_socket_t *sock, response_code rc,
446 char *fmt, ...) /* {{{ */
447 {
448 va_list argp;
449 char buffer[CMD_MAX];
450 int lines;
451 ssize_t wrote;
452 int rclen, len;
454 if (sock == NULL) return rc; /* journal replay mode */
456 if (sock->batch_mode)
457 {
458 if (rc == RESP_OK)
459 return rc; /* no response on success during BATCH */
460 lines = sock->batch_cmd;
461 }
462 else if (rc == RESP_OK)
463 lines = count_lines(sock->wbuf);
464 else
465 lines = -1;
467 rclen = sprintf(buffer, "%d ", lines);
468 va_start(argp, fmt);
469 #ifdef HAVE_VSNPRINTF
470 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
471 #else
472 len = vsprintf(buffer+rclen, fmt, argp);
473 #endif
474 va_end(argp);
475 if (len < 0)
476 return -1;
478 len += rclen;
480 /* append the result to the wbuf, don't write to the user */
481 if (sock->batch_mode)
482 return add_to_wbuf(sock, buffer, len);
484 /* first write must be complete */
485 if (len != write(sock->fd, buffer, len))
486 {
487 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
488 return -1;
489 }
491 if (sock->wbuf != NULL)
492 {
493 wrote = 0;
494 while (wrote < sock->wbuf_len)
495 {
496 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
497 if (wb <= 0)
498 {
499 RRDD_LOG(LOG_INFO, "send_response: could not write results");
500 return -1;
501 }
502 wrote += wb;
503 }
504 }
506 free(sock->wbuf); sock->wbuf = NULL;
507 sock->wbuf_len = 0;
509 return 0;
510 } /* }}} */
512 static void wipe_ci_values(cache_item_t *ci, time_t when)
513 {
514 ci->values = NULL;
515 ci->values_num = 0;
517 ci->last_flush_time = when;
518 if (config_write_jitter > 0)
519 ci->last_flush_time += (random() % config_write_jitter);
520 }
522 /* remove_from_queue
523 * remove a "cache_item_t" item from the queue.
524 * must hold 'cache_lock' when calling this
525 */
526 static void remove_from_queue(cache_item_t *ci) /* {{{ */
527 {
528 if (ci == NULL) return;
530 if (ci->prev == NULL)
531 cache_queue_head = ci->next; /* reset head */
532 else
533 ci->prev->next = ci->next;
535 if (ci->next == NULL)
536 cache_queue_tail = ci->prev; /* reset the tail */
537 else
538 ci->next->prev = ci->prev;
540 ci->next = ci->prev = NULL;
541 ci->flags &= ~CI_FLAGS_IN_QUEUE;
542 } /* }}} static void remove_from_queue */
544 /* remove an entry from the tree and free all its resources.
545 * must hold 'cache lock' while calling this.
546 * returns 0 on success, otherwise errno */
547 static int forget_file(const char *file)
548 {
549 cache_item_t *ci;
551 ci = g_tree_lookup(cache_tree, file);
552 if (ci == NULL)
553 return ENOENT;
555 g_tree_remove (cache_tree, file);
556 remove_from_queue(ci);
558 for (int i=0; i < ci->values_num; i++)
559 free(ci->values[i]);
561 free (ci->values);
562 free (ci->file);
564 /* in case anyone is waiting */
565 pthread_cond_broadcast(&ci->flushed);
567 free (ci);
569 return 0;
570 } /* }}} static int forget_file */
572 /*
573 * enqueue_cache_item:
574 * `cache_lock' must be acquired before calling this function!
575 */
576 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
577 queue_side_t side)
578 {
579 if (ci == NULL)
580 return (-1);
582 if (ci->values_num == 0)
583 return (0);
585 if (side == HEAD)
586 {
587 if (cache_queue_head == ci)
588 return 0;
590 /* remove from the double linked list */
591 if (ci->flags & CI_FLAGS_IN_QUEUE)
592 remove_from_queue(ci);
594 ci->prev = NULL;
595 ci->next = cache_queue_head;
596 if (ci->next != NULL)
597 ci->next->prev = ci;
598 cache_queue_head = ci;
600 if (cache_queue_tail == NULL)
601 cache_queue_tail = cache_queue_head;
602 }
603 else /* (side == TAIL) */
604 {
605 /* We don't move values back in the list.. */
606 if (ci->flags & CI_FLAGS_IN_QUEUE)
607 return (0);
609 assert (ci->next == NULL);
610 assert (ci->prev == NULL);
612 ci->prev = cache_queue_tail;
614 if (cache_queue_tail == NULL)
615 cache_queue_head = ci;
616 else
617 cache_queue_tail->next = ci;
619 cache_queue_tail = ci;
620 }
622 ci->flags |= CI_FLAGS_IN_QUEUE;
624 pthread_cond_broadcast(&cache_cond);
625 pthread_mutex_lock (&stats_lock);
626 stats_queue_length++;
627 pthread_mutex_unlock (&stats_lock);
629 return (0);
630 } /* }}} int enqueue_cache_item */
632 /*
633 * tree_callback_flush:
634 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
635 * while this is in progress.
636 */
637 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
638 gpointer data)
639 {
640 cache_item_t *ci;
641 callback_flush_data_t *cfd;
643 ci = (cache_item_t *) value;
644 cfd = (callback_flush_data_t *) data;
646 if ((ci->last_flush_time <= cfd->abs_timeout)
647 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
648 && (ci->values_num > 0))
649 {
650 enqueue_cache_item (ci, TAIL);
651 }
652 else if ((do_shutdown != 0)
653 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
654 && (ci->values_num > 0))
655 {
656 enqueue_cache_item (ci, TAIL);
657 }
658 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
659 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
660 && (ci->values_num <= 0))
661 {
662 char **temp;
664 temp = (char **) realloc (cfd->keys,
665 sizeof (char *) * (cfd->keys_num + 1));
666 if (temp == NULL)
667 {
668 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
669 return (FALSE);
670 }
671 cfd->keys = temp;
672 /* Make really sure this points to the _same_ place */
673 assert ((char *) key == ci->file);
674 cfd->keys[cfd->keys_num] = (char *) key;
675 cfd->keys_num++;
676 }
678 return (FALSE);
679 } /* }}} gboolean tree_callback_flush */
681 static int flush_old_values (int max_age)
682 {
683 callback_flush_data_t cfd;
684 size_t k;
686 memset (&cfd, 0, sizeof (cfd));
687 /* Pass the current time as user data so that we don't need to call
688 * `time' for each node. */
689 cfd.now = time (NULL);
690 cfd.keys = NULL;
691 cfd.keys_num = 0;
693 if (max_age > 0)
694 cfd.abs_timeout = cfd.now - max_age;
695 else
696 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
698 /* `tree_callback_flush' will return the keys of all values that haven't
699 * been touched in the last `config_flush_interval' seconds in `cfd'.
700 * The char*'s in this array point to the same memory as ci->file, so we
701 * don't need to free them separately. */
702 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
704 for (k = 0; k < cfd.keys_num; k++)
705 {
706 /* should never fail, since we have held the cache_lock
707 * the entire time */
708 assert( forget_file(cfd.keys[k]) == 0 );
709 }
711 if (cfd.keys != NULL)
712 {
713 free (cfd.keys);
714 cfd.keys = NULL;
715 }
717 return (0);
718 } /* int flush_old_values */
720 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
721 {
722 struct timeval now;
723 struct timespec next_flush;
724 int final_flush = 0; /* make sure we only flush once on shutdown */
726 gettimeofday (&now, NULL);
727 next_flush.tv_sec = now.tv_sec + config_flush_interval;
728 next_flush.tv_nsec = 1000 * now.tv_usec;
730 pthread_mutex_lock (&cache_lock);
731 while ((do_shutdown == 0) || (cache_queue_head != NULL))
732 {
733 cache_item_t *ci;
734 char *file;
735 char **values;
736 int values_num;
737 int status;
738 int i;
740 /* First, check if it's time to do the cache flush. */
741 gettimeofday (&now, NULL);
742 if ((now.tv_sec > next_flush.tv_sec)
743 || ((now.tv_sec == next_flush.tv_sec)
744 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
745 {
746 /* Flush all values that haven't been written in the last
747 * `config_write_interval' seconds. */
748 flush_old_values (config_write_interval);
750 /* Determine the time of the next cache flush. */
751 while (next_flush.tv_sec <= now.tv_sec)
752 next_flush.tv_sec += config_flush_interval;
754 /* unlock the cache while we rotate so we don't block incoming
755 * updates if the fsync() blocks on disk I/O */
756 pthread_mutex_unlock(&cache_lock);
757 journal_rotate();
758 pthread_mutex_lock(&cache_lock);
759 }
761 /* Now, check if there's something to store away. If not, wait until
762 * something comes in or it's time to do the cache flush. if we are
763 * shutting down, do not wait around. */
764 if (cache_queue_head == NULL && !do_shutdown)
765 {
766 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
767 if ((status != 0) && (status != ETIMEDOUT))
768 {
769 RRDD_LOG (LOG_ERR, "queue_thread_main: "
770 "pthread_cond_timedwait returned %i.", status);
771 }
772 }
774 /* We're about to shut down */
775 if (do_shutdown != 0 && !final_flush++)
776 {
777 if (config_flush_at_shutdown)
778 flush_old_values (-1); /* flush everything */
779 else
780 break;
781 }
783 /* Check if a value has arrived. This may be NULL if we timed out or there
784 * was an interrupt such as a signal. */
785 if (cache_queue_head == NULL)
786 continue;
788 ci = cache_queue_head;
790 /* copy the relevant parts */
791 file = strdup (ci->file);
792 if (file == NULL)
793 {
794 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
795 continue;
796 }
798 assert(ci->values != NULL);
799 assert(ci->values_num > 0);
801 values = ci->values;
802 values_num = ci->values_num;
804 wipe_ci_values(ci, time(NULL));
805 remove_from_queue(ci);
807 pthread_mutex_lock (&stats_lock);
808 assert (stats_queue_length > 0);
809 stats_queue_length--;
810 pthread_mutex_unlock (&stats_lock);
812 pthread_mutex_unlock (&cache_lock);
814 rrd_clear_error ();
815 status = rrd_update_r (file, NULL, values_num, (void *) values);
816 if (status != 0)
817 {
818 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
819 "rrd_update_r (%s) failed with status %i. (%s)",
820 file, status, rrd_get_error());
821 }
823 journal_write("wrote", file);
824 pthread_cond_broadcast(&ci->flushed);
826 for (i = 0; i < values_num; i++)
827 free (values[i]);
829 free(values);
830 free(file);
832 if (status == 0)
833 {
834 pthread_mutex_lock (&stats_lock);
835 stats_updates_written++;
836 stats_data_sets_written += values_num;
837 pthread_mutex_unlock (&stats_lock);
838 }
840 pthread_mutex_lock (&cache_lock);
842 /* We're about to shut down */
843 if (do_shutdown != 0 && !final_flush++)
844 {
845 if (config_flush_at_shutdown)
846 flush_old_values (-1); /* flush everything */
847 else
848 break;
849 }
850 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
851 pthread_mutex_unlock (&cache_lock);
853 if (config_flush_at_shutdown)
854 {
855 assert(cache_queue_head == NULL);
856 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
857 }
859 journal_done();
861 return (NULL);
862 } /* }}} void *queue_thread_main */
864 static int buffer_get_field (char **buffer_ret, /* {{{ */
865 size_t *buffer_size_ret, char **field_ret)
866 {
867 char *buffer;
868 size_t buffer_pos;
869 size_t buffer_size;
870 char *field;
871 size_t field_size;
872 int status;
874 buffer = *buffer_ret;
875 buffer_pos = 0;
876 buffer_size = *buffer_size_ret;
877 field = *buffer_ret;
878 field_size = 0;
880 if (buffer_size <= 0)
881 return (-1);
883 /* This is ensured by `handle_request'. */
884 assert (buffer[buffer_size - 1] == '\0');
886 status = -1;
887 while (buffer_pos < buffer_size)
888 {
889 /* Check for end-of-field or end-of-buffer */
890 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
891 {
892 field[field_size] = 0;
893 field_size++;
894 buffer_pos++;
895 status = 0;
896 break;
897 }
898 /* Handle escaped characters. */
899 else if (buffer[buffer_pos] == '\\')
900 {
901 if (buffer_pos >= (buffer_size - 1))
902 break;
903 buffer_pos++;
904 field[field_size] = buffer[buffer_pos];
905 field_size++;
906 buffer_pos++;
907 }
908 /* Normal operation */
909 else
910 {
911 field[field_size] = buffer[buffer_pos];
912 field_size++;
913 buffer_pos++;
914 }
915 } /* while (buffer_pos < buffer_size) */
917 if (status != 0)
918 return (status);
920 *buffer_ret = buffer + buffer_pos;
921 *buffer_size_ret = buffer_size - buffer_pos;
922 *field_ret = field;
924 return (0);
925 } /* }}} int buffer_get_field */
927 /* if we're restricting writes to the base directory,
928 * check whether the file falls within the dir
929 * returns 1 if OK, otherwise 0
930 */
931 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
932 {
933 assert(file != NULL);
935 if (!config_write_base_only
936 || sock == NULL /* journal replay */
937 || config_base_dir == NULL)
938 return 1;
940 if (strstr(file, "../") != NULL) goto err;
942 /* relative paths without "../" are ok */
943 if (*file != '/') return 1;
945 /* file must be of the format base + "/" + <1+ char filename> */
946 if (strlen(file) < _config_base_dir_len + 2) goto err;
947 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
948 if (*(file + _config_base_dir_len) != '/') goto err;
950 return 1;
952 err:
953 if (sock != NULL && sock->fd >= 0)
954 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
956 return 0;
957 } /* }}} static int check_file_access */
959 /* returns 1 if we have the required privilege level,
960 * otherwise issue an error to the user on sock */
961 static int has_privilege (listen_socket_t *sock, /* {{{ */
962 socket_privilege priv)
963 {
964 if (sock == NULL) /* journal replay */
965 return 1;
967 if (sock->privilege >= priv)
968 return 1;
970 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
971 } /* }}} static int has_privilege */
973 static int flush_file (const char *filename) /* {{{ */
974 {
975 cache_item_t *ci;
977 pthread_mutex_lock (&cache_lock);
979 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
980 if (ci == NULL)
981 {
982 pthread_mutex_unlock (&cache_lock);
983 return (ENOENT);
984 }
986 if (ci->values_num > 0)
987 {
988 /* Enqueue at head */
989 enqueue_cache_item (ci, HEAD);
990 pthread_cond_wait(&ci->flushed, &cache_lock);
991 }
993 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
994 * may have been purged during our cond_wait() */
996 pthread_mutex_unlock(&cache_lock);
998 return (0);
999 } /* }}} int flush_file */
1001 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1002 char *buffer, size_t buffer_size)
1003 {
1004 int status;
1005 char **help_text;
1006 char *command;
1008 char *help_help[2] =
1009 {
1010 "Command overview\n"
1011 ,
1012 "HELP [<command>]\n"
1013 "FLUSH <filename>\n"
1014 "FLUSHALL\n"
1015 "PENDING <filename>\n"
1016 "FORGET <filename>\n"
1017 "UPDATE <filename> <values> [<values> ...]\n"
1018 "BATCH\n"
1019 "STATS\n"
1020 };
1022 char *help_flush[2] =
1023 {
1024 "Help for FLUSH\n"
1025 ,
1026 "Usage: FLUSH <filename>\n"
1027 "\n"
1028 "Adds the given filename to the head of the update queue and returns\n"
1029 "after is has been dequeued.\n"
1030 };
1032 char *help_flushall[2] =
1033 {
1034 "Help for FLUSHALL\n"
1035 ,
1036 "Usage: FLUSHALL\n"
1037 "\n"
1038 "Triggers writing of all pending updates. Returns immediately.\n"
1039 };
1041 char *help_pending[2] =
1042 {
1043 "Help for PENDING\n"
1044 ,
1045 "Usage: PENDING <filename>\n"
1046 "\n"
1047 "Shows any 'pending' updates for a file, in order.\n"
1048 "The updates shown have not yet been written to the underlying RRD file.\n"
1049 };
1051 char *help_forget[2] =
1052 {
1053 "Help for FORGET\n"
1054 ,
1055 "Usage: FORGET <filename>\n"
1056 "\n"
1057 "Removes the file completely from the cache.\n"
1058 "Any pending updates for the file will be lost.\n"
1059 };
1061 char *help_update[2] =
1062 {
1063 "Help for UPDATE\n"
1064 ,
1065 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1066 "\n"
1067 "Adds the given file to the internal cache if it is not yet known and\n"
1068 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1069 "for details.\n"
1070 "\n"
1071 "Each <values> has the following form:\n"
1072 " <values> = <time>:<value>[:<value>[...]]\n"
1073 "See the rrdupdate(1) manpage for details.\n"
1074 };
1076 char *help_stats[2] =
1077 {
1078 "Help for STATS\n"
1079 ,
1080 "Usage: STATS\n"
1081 "\n"
1082 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1083 "a description of the values.\n"
1084 };
1086 char *help_batch[2] =
1087 {
1088 "Help for BATCH\n"
1089 ,
1090 "The 'BATCH' command permits the client to initiate a bulk load\n"
1091 " of commands to rrdcached.\n"
1092 "\n"
1093 "Usage:\n"
1094 "\n"
1095 " client: BATCH\n"
1096 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1097 " client: command #1\n"
1098 " client: command #2\n"
1099 " client: ... and so on\n"
1100 " client: .\n"
1101 " server: 2 errors\n"
1102 " server: 7 message for command #7\n"
1103 " server: 9 message for command #9\n"
1104 "\n"
1105 "For more information, consult the rrdcached(1) documentation.\n"
1106 };
1108 status = buffer_get_field (&buffer, &buffer_size, &command);
1109 if (status != 0)
1110 help_text = help_help;
1111 else
1112 {
1113 if (strcasecmp (command, "update") == 0)
1114 help_text = help_update;
1115 else if (strcasecmp (command, "flush") == 0)
1116 help_text = help_flush;
1117 else if (strcasecmp (command, "flushall") == 0)
1118 help_text = help_flushall;
1119 else if (strcasecmp (command, "pending") == 0)
1120 help_text = help_pending;
1121 else if (strcasecmp (command, "forget") == 0)
1122 help_text = help_forget;
1123 else if (strcasecmp (command, "stats") == 0)
1124 help_text = help_stats;
1125 else if (strcasecmp (command, "batch") == 0)
1126 help_text = help_batch;
1127 else
1128 help_text = help_help;
1129 }
1131 add_response_info(sock, help_text[1]);
1132 return send_response(sock, RESP_OK, help_text[0]);
1133 } /* }}} int handle_request_help */
1135 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1136 {
1137 uint64_t copy_queue_length;
1138 uint64_t copy_updates_received;
1139 uint64_t copy_flush_received;
1140 uint64_t copy_updates_written;
1141 uint64_t copy_data_sets_written;
1142 uint64_t copy_journal_bytes;
1143 uint64_t copy_journal_rotate;
1145 uint64_t tree_nodes_number;
1146 uint64_t tree_depth;
1148 pthread_mutex_lock (&stats_lock);
1149 copy_queue_length = stats_queue_length;
1150 copy_updates_received = stats_updates_received;
1151 copy_flush_received = stats_flush_received;
1152 copy_updates_written = stats_updates_written;
1153 copy_data_sets_written = stats_data_sets_written;
1154 copy_journal_bytes = stats_journal_bytes;
1155 copy_journal_rotate = stats_journal_rotate;
1156 pthread_mutex_unlock (&stats_lock);
1158 pthread_mutex_lock (&cache_lock);
1159 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160 tree_depth = (uint64_t) g_tree_height (cache_tree);
1161 pthread_mutex_unlock (&cache_lock);
1163 add_response_info(sock,
1164 "QueueLength: %"PRIu64"\n", copy_queue_length);
1165 add_response_info(sock,
1166 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167 add_response_info(sock,
1168 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169 add_response_info(sock,
1170 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171 add_response_info(sock,
1172 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1178 send_response(sock, RESP_OK, "Statistics follow\n");
1180 return (0);
1181 } /* }}} int handle_request_stats */
1183 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1184 char *buffer, size_t buffer_size)
1185 {
1186 char *file;
1187 int status;
1189 status = buffer_get_field (&buffer, &buffer_size, &file);
1190 if (status != 0)
1191 {
1192 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1193 }
1194 else
1195 {
1196 pthread_mutex_lock(&stats_lock);
1197 stats_flush_received++;
1198 pthread_mutex_unlock(&stats_lock);
1200 if (!check_file_access(file, sock)) return 0;
1202 status = flush_file (file);
1203 if (status == 0)
1204 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205 else if (status == ENOENT)
1206 {
1207 /* no file in our tree; see whether it exists at all */
1208 struct stat statbuf;
1210 memset(&statbuf, 0, sizeof(statbuf));
1211 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1213 else
1214 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1215 }
1216 else if (status < 0)
1217 return send_response(sock, RESP_ERR, "Internal error.\n");
1218 else
1219 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1220 }
1222 /* NOTREACHED */
1223 assert(1==0);
1224 } /* }}} int handle_request_slurp */
1226 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1227 {
1228 int status;
1230 status = has_privilege(sock, PRIV_HIGH);
1231 if (status <= 0)
1232 return status;
1234 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1236 pthread_mutex_lock(&cache_lock);
1237 flush_old_values(-1);
1238 pthread_mutex_unlock(&cache_lock);
1240 return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1243 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1244 char *buffer, size_t buffer_size)
1245 {
1246 int status;
1247 char *file;
1248 cache_item_t *ci;
1250 status = buffer_get_field(&buffer, &buffer_size, &file);
1251 if (status != 0)
1252 return send_response(sock, RESP_ERR,
1253 "Usage: PENDING <filename>\n");
1255 status = has_privilege(sock, PRIV_HIGH);
1256 if (status <= 0)
1257 return status;
1259 pthread_mutex_lock(&cache_lock);
1260 ci = g_tree_lookup(cache_tree, file);
1261 if (ci == NULL)
1262 {
1263 pthread_mutex_unlock(&cache_lock);
1264 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265 }
1267 for (int i=0; i < ci->values_num; i++)
1268 add_response_info(sock, "%s\n", ci->values[i]);
1270 pthread_mutex_unlock(&cache_lock);
1271 return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1274 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1275 char *buffer, size_t buffer_size)
1276 {
1277 int status;
1278 char *file;
1280 status = buffer_get_field(&buffer, &buffer_size, &file);
1281 if (status != 0)
1282 return send_response(sock, RESP_ERR,
1283 "Usage: FORGET <filename>\n");
1285 status = has_privilege(sock, PRIV_HIGH);
1286 if (status <= 0)
1287 return status;
1289 if (!check_file_access(file, sock)) return 0;
1291 pthread_mutex_lock(&cache_lock);
1292 status = forget_file(file);
1293 pthread_mutex_unlock(&cache_lock);
1295 if (status == 0)
1296 {
1297 if (sock != NULL)
1298 journal_write("forget", file);
1300 return send_response(sock, RESP_OK, "Gone!\n");
1301 }
1302 else
1303 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1304 status < 0 ? "Internal error" : rrd_strerror(status));
1306 /* NOTREACHED */
1307 assert(1==0);
1308 } /* }}} static int handle_request_forget */
1310 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1311 char *buffer, size_t buffer_size)
1312 {
1313 char *file;
1314 int values_num = 0;
1315 int bad_timestamps = 0;
1316 int status;
1317 char orig_buf[CMD_MAX];
1319 time_t now;
1320 cache_item_t *ci;
1322 now = time (NULL);
1324 status = has_privilege(sock, PRIV_HIGH);
1325 if (status <= 0)
1326 return status;
1328 /* save it for the journal later */
1329 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1331 status = buffer_get_field (&buffer, &buffer_size, &file);
1332 if (status != 0)
1333 return send_response(sock, RESP_ERR,
1334 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1336 pthread_mutex_lock(&stats_lock);
1337 stats_updates_received++;
1338 pthread_mutex_unlock(&stats_lock);
1340 if (!check_file_access(file, sock)) return 0;
1342 pthread_mutex_lock (&cache_lock);
1343 ci = g_tree_lookup (cache_tree, file);
1345 if (ci == NULL) /* {{{ */
1346 {
1347 struct stat statbuf;
1349 /* don't hold the lock while we setup; stat(2) might block */
1350 pthread_mutex_unlock(&cache_lock);
1352 memset (&statbuf, 0, sizeof (statbuf));
1353 status = stat (file, &statbuf);
1354 if (status != 0)
1355 {
1356 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1358 status = errno;
1359 if (status == ENOENT)
1360 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1361 else
1362 return send_response(sock, RESP_ERR,
1363 "stat failed with error %i.\n", status);
1364 }
1365 if (!S_ISREG (statbuf.st_mode))
1366 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1368 if (access(file, R_OK|W_OK) != 0)
1369 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1370 file, rrd_strerror(errno));
1372 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1373 if (ci == NULL)
1374 {
1375 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1377 return send_response(sock, RESP_ERR, "malloc failed.\n");
1378 }
1379 memset (ci, 0, sizeof (cache_item_t));
1381 ci->file = strdup (file);
1382 if (ci->file == NULL)
1383 {
1384 free (ci);
1385 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1387 return send_response(sock, RESP_ERR, "strdup failed.\n");
1388 }
1390 wipe_ci_values(ci, now);
1391 ci->flags = CI_FLAGS_IN_TREE;
1393 pthread_mutex_lock(&cache_lock);
1394 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1395 } /* }}} */
1396 assert (ci != NULL);
1398 /* don't re-write updates in replay mode */
1399 if (sock != NULL)
1400 journal_write("update", orig_buf);
1402 while (buffer_size > 0)
1403 {
1404 char **temp;
1405 char *value;
1406 time_t stamp;
1407 char *eostamp;
1409 status = buffer_get_field (&buffer, &buffer_size, &value);
1410 if (status != 0)
1411 {
1412 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1413 break;
1414 }
1416 /* make sure update time is always moving forward */
1417 stamp = strtol(value, &eostamp, 10);
1418 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1419 {
1420 ++bad_timestamps;
1421 add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1422 continue;
1423 }
1424 else if (stamp <= ci->last_update_stamp)
1425 {
1426 ++bad_timestamps;
1427 add_response_info(sock,
1428 "illegal attempt to update using time %ld when"
1429 " last update time is %ld (minimum one second step)\n",
1430 stamp, ci->last_update_stamp);
1431 continue;
1432 }
1433 else
1434 ci->last_update_stamp = stamp;
1436 temp = (char **) realloc (ci->values,
1437 sizeof (char *) * (ci->values_num + 1));
1438 if (temp == NULL)
1439 {
1440 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1441 continue;
1442 }
1443 ci->values = temp;
1445 ci->values[ci->values_num] = strdup (value);
1446 if (ci->values[ci->values_num] == NULL)
1447 {
1448 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1449 continue;
1450 }
1451 ci->values_num++;
1453 values_num++;
1454 }
1456 if (((now - ci->last_flush_time) >= config_write_interval)
1457 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1458 && (ci->values_num > 0))
1459 {
1460 enqueue_cache_item (ci, TAIL);
1461 }
1463 pthread_mutex_unlock (&cache_lock);
1465 if (values_num < 1)
1466 {
1467 /* if we had only one update attempt, then return the full
1468 error message... try to get the most information out
1469 of the limited error space allowed by the protocol
1470 */
1471 if (bad_timestamps == 1)
1472 return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1473 else
1474 return send_response(sock, RESP_ERR,
1475 "No values updated (%d bad timestamps).\n",
1476 bad_timestamps);
1477 }
1478 else
1479 return send_response(sock, RESP_OK,
1480 "errors, enqueued %i value(s).\n", values_num);
1482 /* NOTREACHED */
1483 assert(1==0);
1485 } /* }}} int handle_request_update */
1487 /* we came across a "WROTE" entry during journal replay.
1488 * throw away any values that we have accumulated for this file
1489 */
1490 static int handle_request_wrote (const char *buffer) /* {{{ */
1491 {
1492 int i;
1493 cache_item_t *ci;
1494 const char *file = buffer;
1496 pthread_mutex_lock(&cache_lock);
1498 ci = g_tree_lookup(cache_tree, file);
1499 if (ci == NULL)
1500 {
1501 pthread_mutex_unlock(&cache_lock);
1502 return (0);
1503 }
1505 if (ci->values)
1506 {
1507 for (i=0; i < ci->values_num; i++)
1508 free(ci->values[i]);
1510 free(ci->values);
1511 }
1513 wipe_ci_values(ci, time(NULL));
1514 remove_from_queue(ci);
1516 pthread_mutex_unlock(&cache_lock);
1517 return (0);
1518 } /* }}} int handle_request_wrote */
1520 /* start "BATCH" processing */
1521 static int batch_start (listen_socket_t *sock) /* {{{ */
1522 {
1523 int status;
1524 if (sock->batch_mode)
1525 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1527 status = send_response(sock, RESP_OK,
1528 "Go ahead. End with dot '.' on its own line.\n");
1529 sock->batch_mode = 1;
1530 sock->batch_cmd = 0;
1532 return status;
1533 } /* }}} static int batch_start */
1535 /* finish "BATCH" processing and return results to the client */
1536 static int batch_done (listen_socket_t *sock) /* {{{ */
1537 {
1538 assert(sock->batch_mode);
1539 sock->batch_mode = 0;
1540 sock->batch_cmd = 0;
1541 return send_response(sock, RESP_OK, "errors\n");
1542 } /* }}} static int batch_done */
1544 /* if sock==NULL, we are in journal replay mode */
1545 static int handle_request (listen_socket_t *sock, /* {{{ */
1546 char *buffer, size_t buffer_size)
1547 {
1548 char *buffer_ptr;
1549 char *command;
1550 int status;
1552 assert (buffer[buffer_size - 1] == '\0');
1554 buffer_ptr = buffer;
1555 command = NULL;
1556 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1557 if (status != 0)
1558 {
1559 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1560 return (-1);
1561 }
1563 if (sock != NULL && sock->batch_mode)
1564 sock->batch_cmd++;
1566 if (strcasecmp (command, "update") == 0)
1567 return (handle_request_update (sock, buffer_ptr, buffer_size));
1568 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1569 {
1570 /* this is only valid in replay mode */
1571 return (handle_request_wrote (buffer_ptr));
1572 }
1573 else if (strcasecmp (command, "flush") == 0)
1574 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1575 else if (strcasecmp (command, "flushall") == 0)
1576 return (handle_request_flushall(sock));
1577 else if (strcasecmp (command, "pending") == 0)
1578 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1579 else if (strcasecmp (command, "forget") == 0)
1580 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1581 else if (strcasecmp (command, "stats") == 0)
1582 return (handle_request_stats (sock));
1583 else if (strcasecmp (command, "help") == 0)
1584 return (handle_request_help (sock, buffer_ptr, buffer_size));
1585 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1586 return batch_start(sock);
1587 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1588 return batch_done(sock);
1589 else
1590 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1592 /* NOTREACHED */
1593 assert(1==0);
1594 } /* }}} int handle_request */
1596 /* MUST NOT hold journal_lock before calling this */
1597 static void journal_rotate(void) /* {{{ */
1598 {
1599 FILE *old_fh = NULL;
1600 int new_fd;
1602 if (journal_cur == NULL || journal_old == NULL)
1603 return;
1605 pthread_mutex_lock(&journal_lock);
1607 /* we rotate this way (rename before close) so that the we can release
1608 * the journal lock as fast as possible. Journal writes to the new
1609 * journal can proceed immediately after the new file is opened. The
1610 * fclose can then block without affecting new updates.
1611 */
1612 if (journal_fh != NULL)
1613 {
1614 old_fh = journal_fh;
1615 journal_fh = NULL;
1616 rename(journal_cur, journal_old);
1617 ++stats_journal_rotate;
1618 }
1620 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1621 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1622 if (new_fd >= 0)
1623 {
1624 journal_fh = fdopen(new_fd, "a");
1625 if (journal_fh == NULL)
1626 close(new_fd);
1627 }
1629 pthread_mutex_unlock(&journal_lock);
1631 if (old_fh != NULL)
1632 fclose(old_fh);
1634 if (journal_fh == NULL)
1635 {
1636 RRDD_LOG(LOG_CRIT,
1637 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1638 journal_cur, rrd_strerror(errno));
1640 RRDD_LOG(LOG_ERR,
1641 "JOURNALING DISABLED: All values will be flushed at shutdown");
1642 config_flush_at_shutdown = 1;
1643 }
1645 } /* }}} static void journal_rotate */
1647 static void journal_done(void) /* {{{ */
1648 {
1649 if (journal_cur == NULL)
1650 return;
1652 pthread_mutex_lock(&journal_lock);
1653 if (journal_fh != NULL)
1654 {
1655 fclose(journal_fh);
1656 journal_fh = NULL;
1657 }
1659 if (config_flush_at_shutdown)
1660 {
1661 RRDD_LOG(LOG_INFO, "removing journals");
1662 unlink(journal_old);
1663 unlink(journal_cur);
1664 }
1665 else
1666 {
1667 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1668 "journals will be used at next startup");
1669 }
1671 pthread_mutex_unlock(&journal_lock);
1673 } /* }}} static void journal_done */
1675 static int journal_write(char *cmd, char *args) /* {{{ */
1676 {
1677 int chars;
1679 if (journal_fh == NULL)
1680 return 0;
1682 pthread_mutex_lock(&journal_lock);
1683 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1684 pthread_mutex_unlock(&journal_lock);
1686 if (chars > 0)
1687 {
1688 pthread_mutex_lock(&stats_lock);
1689 stats_journal_bytes += chars;
1690 pthread_mutex_unlock(&stats_lock);
1691 }
1693 return chars;
1694 } /* }}} static int journal_write */
1696 static int journal_replay (const char *file) /* {{{ */
1697 {
1698 FILE *fh;
1699 int entry_cnt = 0;
1700 int fail_cnt = 0;
1701 uint64_t line = 0;
1702 char entry[CMD_MAX];
1704 if (file == NULL) return 0;
1706 {
1707 char *reason;
1708 int status = 0;
1709 struct stat statbuf;
1711 memset(&statbuf, 0, sizeof(statbuf));
1712 if (stat(file, &statbuf) != 0)
1713 {
1714 if (errno == ENOENT)
1715 return 0;
1717 reason = "stat error";
1718 status = errno;
1719 }
1720 else if (!S_ISREG(statbuf.st_mode))
1721 {
1722 reason = "not a regular file";
1723 status = EPERM;
1724 }
1725 if (statbuf.st_uid != daemon_uid)
1726 {
1727 reason = "not owned by daemon user";
1728 status = EACCES;
1729 }
1730 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1731 {
1732 reason = "must not be user/group writable";
1733 status = EACCES;
1734 }
1736 if (status != 0)
1737 {
1738 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1739 file, rrd_strerror(status), reason);
1740 return 0;
1741 }
1742 }
1744 fh = fopen(file, "r");
1745 if (fh == NULL)
1746 {
1747 if (errno != ENOENT)
1748 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1749 file, rrd_strerror(errno));
1750 return 0;
1751 }
1752 else
1753 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1755 while(!feof(fh))
1756 {
1757 size_t entry_len;
1759 ++line;
1760 if (fgets(entry, sizeof(entry), fh) == NULL)
1761 break;
1762 entry_len = strlen(entry);
1764 /* check \n termination in case journal writing crashed mid-line */
1765 if (entry_len == 0)
1766 continue;
1767 else if (entry[entry_len - 1] != '\n')
1768 {
1769 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1770 ++fail_cnt;
1771 continue;
1772 }
1774 entry[entry_len - 1] = '\0';
1776 if (handle_request(NULL, entry, entry_len) == 0)
1777 ++entry_cnt;
1778 else
1779 ++fail_cnt;
1780 }
1782 fclose(fh);
1784 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1785 entry_cnt, fail_cnt);
1787 return entry_cnt > 0 ? 1 : 0;
1788 } /* }}} static int journal_replay */
1790 static void journal_init(void) /* {{{ */
1791 {
1792 int had_journal = 0;
1794 if (journal_cur == NULL) return;
1796 pthread_mutex_lock(&journal_lock);
1798 RRDD_LOG(LOG_INFO, "checking for journal files");
1800 had_journal += journal_replay(journal_old);
1801 had_journal += journal_replay(journal_cur);
1803 /* it must have been a crash. start a flush */
1804 if (had_journal && config_flush_at_shutdown)
1805 flush_old_values(-1);
1807 pthread_mutex_unlock(&journal_lock);
1808 journal_rotate();
1810 RRDD_LOG(LOG_INFO, "journal processing complete");
1812 } /* }}} static void journal_init */
1814 static void close_connection(listen_socket_t *sock)
1815 {
1816 close(sock->fd) ; sock->fd = -1;
1817 free(sock->rbuf); sock->rbuf = NULL;
1818 free(sock->wbuf); sock->wbuf = NULL;
1820 free(sock);
1821 }
1823 static void *connection_thread_main (void *args) /* {{{ */
1824 {
1825 pthread_t self;
1826 listen_socket_t *sock;
1827 int i;
1828 int fd;
1830 sock = (listen_socket_t *) args;
1831 fd = sock->fd;
1833 /* init read buffers */
1834 sock->next_read = sock->next_cmd = 0;
1835 sock->rbuf = malloc(RBUF_SIZE);
1836 if (sock->rbuf == NULL)
1837 {
1838 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1839 close_connection(sock);
1840 return NULL;
1841 }
1843 pthread_mutex_lock (&connection_threads_lock);
1844 {
1845 pthread_t *temp;
1847 temp = (pthread_t *) realloc (connection_threads,
1848 sizeof (pthread_t) * (connection_threads_num + 1));
1849 if (temp == NULL)
1850 {
1851 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1852 }
1853 else
1854 {
1855 connection_threads = temp;
1856 connection_threads[connection_threads_num] = pthread_self ();
1857 connection_threads_num++;
1858 }
1859 }
1860 pthread_mutex_unlock (&connection_threads_lock);
1862 while (do_shutdown == 0)
1863 {
1864 char *cmd;
1865 ssize_t cmd_len;
1866 ssize_t rbytes;
1868 struct pollfd pollfd;
1869 int status;
1871 pollfd.fd = fd;
1872 pollfd.events = POLLIN | POLLPRI;
1873 pollfd.revents = 0;
1875 status = poll (&pollfd, 1, /* timeout = */ 500);
1876 if (do_shutdown)
1877 break;
1878 else if (status == 0) /* timeout */
1879 continue;
1880 else if (status < 0) /* error */
1881 {
1882 status = errno;
1883 if (status != EINTR)
1884 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1885 continue;
1886 }
1888 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1889 break;
1890 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1891 {
1892 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1893 "poll(2) returned something unexpected: %#04hx",
1894 pollfd.revents);
1895 break;
1896 }
1898 rbytes = read(fd, sock->rbuf + sock->next_read,
1899 RBUF_SIZE - sock->next_read);
1900 if (rbytes < 0)
1901 {
1902 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1903 break;
1904 }
1905 else if (rbytes == 0)
1906 break; /* eof */
1908 sock->next_read += rbytes;
1910 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1911 {
1912 status = handle_request (sock, cmd, cmd_len+1);
1913 if (status != 0)
1914 goto out_close;
1915 }
1916 }
1918 out_close:
1919 close_connection(sock);
1921 self = pthread_self ();
1922 /* Remove this thread from the connection threads list */
1923 pthread_mutex_lock (&connection_threads_lock);
1924 /* Find out own index in the array */
1925 for (i = 0; i < connection_threads_num; i++)
1926 if (pthread_equal (connection_threads[i], self) != 0)
1927 break;
1928 assert (i < connection_threads_num);
1930 /* Move the trailing threads forward. */
1931 if (i < (connection_threads_num - 1))
1932 {
1933 memmove (connection_threads + i,
1934 connection_threads + i + 1,
1935 sizeof (pthread_t) * (connection_threads_num - i - 1));
1936 }
1938 connection_threads_num--;
1939 pthread_mutex_unlock (&connection_threads_lock);
1941 return (NULL);
1942 } /* }}} void *connection_thread_main */
1944 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1945 {
1946 int fd;
1947 struct sockaddr_un sa;
1948 listen_socket_t *temp;
1949 int status;
1950 const char *path;
1952 path = sock->addr;
1953 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1954 path += strlen("unix:");
1956 temp = (listen_socket_t *) realloc (listen_fds,
1957 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1958 if (temp == NULL)
1959 {
1960 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1961 return (-1);
1962 }
1963 listen_fds = temp;
1964 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1966 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1967 if (fd < 0)
1968 {
1969 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1970 return (-1);
1971 }
1973 memset (&sa, 0, sizeof (sa));
1974 sa.sun_family = AF_UNIX;
1975 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1977 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1978 if (status != 0)
1979 {
1980 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1981 close (fd);
1982 unlink (path);
1983 return (-1);
1984 }
1986 status = listen (fd, /* backlog = */ 10);
1987 if (status != 0)
1988 {
1989 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1990 close (fd);
1991 unlink (path);
1992 return (-1);
1993 }
1995 listen_fds[listen_fds_num].fd = fd;
1996 listen_fds[listen_fds_num].family = PF_UNIX;
1997 strncpy(listen_fds[listen_fds_num].addr, path,
1998 sizeof (listen_fds[listen_fds_num].addr) - 1);
1999 listen_fds_num++;
2001 return (0);
2002 } /* }}} int open_listen_socket_unix */
2004 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2005 {
2006 struct addrinfo ai_hints;
2007 struct addrinfo *ai_res;
2008 struct addrinfo *ai_ptr;
2009 char addr_copy[NI_MAXHOST];
2010 char *addr;
2011 char *port;
2012 int status;
2014 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2015 addr_copy[sizeof (addr_copy) - 1] = 0;
2016 addr = addr_copy;
2018 memset (&ai_hints, 0, sizeof (ai_hints));
2019 ai_hints.ai_flags = 0;
2020 #ifdef AI_ADDRCONFIG
2021 ai_hints.ai_flags |= AI_ADDRCONFIG;
2022 #endif
2023 ai_hints.ai_family = AF_UNSPEC;
2024 ai_hints.ai_socktype = SOCK_STREAM;
2026 port = NULL;
2027 if (*addr == '[') /* IPv6+port format */
2028 {
2029 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2030 addr++;
2032 port = strchr (addr, ']');
2033 if (port == NULL)
2034 {
2035 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2036 sock->addr);
2037 return (-1);
2038 }
2039 *port = 0;
2040 port++;
2042 if (*port == ':')
2043 port++;
2044 else if (*port == 0)
2045 port = NULL;
2046 else
2047 {
2048 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2049 port);
2050 return (-1);
2051 }
2052 } /* if (*addr = ']') */
2053 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2054 {
2055 port = rindex(addr, ':');
2056 if (port != NULL)
2057 {
2058 *port = 0;
2059 port++;
2060 }
2061 }
2062 ai_res = NULL;
2063 status = getaddrinfo (addr,
2064 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2065 &ai_hints, &ai_res);
2066 if (status != 0)
2067 {
2068 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2069 "%s", addr, gai_strerror (status));
2070 return (-1);
2071 }
2073 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2074 {
2075 int fd;
2076 listen_socket_t *temp;
2077 int one = 1;
2079 temp = (listen_socket_t *) realloc (listen_fds,
2080 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2081 if (temp == NULL)
2082 {
2083 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2084 continue;
2085 }
2086 listen_fds = temp;
2087 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2089 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2090 if (fd < 0)
2091 {
2092 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2093 continue;
2094 }
2096 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2098 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2099 if (status != 0)
2100 {
2101 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2102 close (fd);
2103 continue;
2104 }
2106 status = listen (fd, /* backlog = */ 10);
2107 if (status != 0)
2108 {
2109 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2110 close (fd);
2111 return (-1);
2112 }
2114 listen_fds[listen_fds_num].fd = fd;
2115 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2116 listen_fds_num++;
2117 } /* for (ai_ptr) */
2119 return (0);
2120 } /* }}} static int open_listen_socket_network */
2122 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2123 {
2124 assert(sock != NULL);
2125 assert(sock->addr != NULL);
2127 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2128 || sock->addr[0] == '/')
2129 return (open_listen_socket_unix(sock));
2130 else
2131 return (open_listen_socket_network(sock));
2132 } /* }}} int open_listen_socket */
2134 static int close_listen_sockets (void) /* {{{ */
2135 {
2136 size_t i;
2138 for (i = 0; i < listen_fds_num; i++)
2139 {
2140 close (listen_fds[i].fd);
2142 if (listen_fds[i].family == PF_UNIX)
2143 unlink(listen_fds[i].addr);
2144 }
2146 free (listen_fds);
2147 listen_fds = NULL;
2148 listen_fds_num = 0;
2150 return (0);
2151 } /* }}} int close_listen_sockets */
2153 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2154 {
2155 struct pollfd *pollfds;
2156 int pollfds_num;
2157 int status;
2158 int i;
2160 for (i = 0; i < config_listen_address_list_len; i++)
2161 open_listen_socket (config_listen_address_list[i]);
2163 if (config_listen_address_list_len < 1)
2164 {
2165 listen_socket_t sock;
2166 memset(&sock, 0, sizeof(sock));
2167 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2168 open_listen_socket (&sock);
2169 }
2171 if (listen_fds_num < 1)
2172 {
2173 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2174 "could be opened. Sorry.");
2175 return (NULL);
2176 }
2178 pollfds_num = listen_fds_num;
2179 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2180 if (pollfds == NULL)
2181 {
2182 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2183 return (NULL);
2184 }
2185 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2187 RRDD_LOG(LOG_INFO, "listening for connections");
2189 while (do_shutdown == 0)
2190 {
2191 assert (pollfds_num == ((int) listen_fds_num));
2192 for (i = 0; i < pollfds_num; i++)
2193 {
2194 pollfds[i].fd = listen_fds[i].fd;
2195 pollfds[i].events = POLLIN | POLLPRI;
2196 pollfds[i].revents = 0;
2197 }
2199 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2200 if (do_shutdown)
2201 break;
2202 else if (status == 0) /* timeout */
2203 continue;
2204 else if (status < 0) /* error */
2205 {
2206 status = errno;
2207 if (status != EINTR)
2208 {
2209 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2210 }
2211 continue;
2212 }
2214 for (i = 0; i < pollfds_num; i++)
2215 {
2216 listen_socket_t *client_sock;
2217 struct sockaddr_storage client_sa;
2218 socklen_t client_sa_size;
2219 pthread_t tid;
2220 pthread_attr_t attr;
2222 if (pollfds[i].revents == 0)
2223 continue;
2225 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2226 {
2227 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2228 "poll(2) returned something unexpected for listen FD #%i.",
2229 pollfds[i].fd);
2230 continue;
2231 }
2233 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2234 if (client_sock == NULL)
2235 {
2236 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2237 continue;
2238 }
2239 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2241 client_sa_size = sizeof (client_sa);
2242 client_sock->fd = accept (pollfds[i].fd,
2243 (struct sockaddr *) &client_sa, &client_sa_size);
2244 if (client_sock->fd < 0)
2245 {
2246 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2247 free(client_sock);
2248 continue;
2249 }
2251 pthread_attr_init (&attr);
2252 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2254 status = pthread_create (&tid, &attr, connection_thread_main,
2255 client_sock);
2256 if (status != 0)
2257 {
2258 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2259 close_connection(client_sock);
2260 continue;
2261 }
2262 } /* for (pollfds_num) */
2263 } /* while (do_shutdown == 0) */
2265 RRDD_LOG(LOG_INFO, "starting shutdown");
2267 close_listen_sockets ();
2269 pthread_mutex_lock (&connection_threads_lock);
2270 while (connection_threads_num > 0)
2271 {
2272 pthread_t wait_for;
2274 wait_for = connection_threads[0];
2276 pthread_mutex_unlock (&connection_threads_lock);
2277 pthread_join (wait_for, /* retval = */ NULL);
2278 pthread_mutex_lock (&connection_threads_lock);
2279 }
2280 pthread_mutex_unlock (&connection_threads_lock);
2282 return (NULL);
2283 } /* }}} void *listen_thread_main */
2285 static int daemonize (void) /* {{{ */
2286 {
2287 int status;
2288 int fd;
2289 char *base_dir;
2291 daemon_uid = geteuid();
2293 fd = open_pidfile();
2294 if (fd < 0) return fd;
2296 if (!stay_foreground)
2297 {
2298 pid_t child;
2300 child = fork ();
2301 if (child < 0)
2302 {
2303 fprintf (stderr, "daemonize: fork(2) failed.\n");
2304 return (-1);
2305 }
2306 else if (child > 0)
2307 {
2308 return (1);
2309 }
2311 /* Become session leader */
2312 setsid ();
2314 /* Open the first three file descriptors to /dev/null */
2315 close (2);
2316 close (1);
2317 close (0);
2319 open ("/dev/null", O_RDWR);
2320 dup (0);
2321 dup (0);
2322 } /* if (!stay_foreground) */
2324 /* Change into the /tmp directory. */
2325 base_dir = (config_base_dir != NULL)
2326 ? config_base_dir
2327 : "/tmp";
2328 status = chdir (base_dir);
2329 if (status != 0)
2330 {
2331 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2332 return (-1);
2333 }
2335 install_signal_handlers();
2337 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2338 RRDD_LOG(LOG_INFO, "starting up");
2340 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2341 if (cache_tree == NULL)
2342 {
2343 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2344 return (-1);
2345 }
2347 status = write_pidfile (fd);
2348 return status;
2349 } /* }}} int daemonize */
2351 static int cleanup (void) /* {{{ */
2352 {
2353 do_shutdown++;
2355 pthread_cond_signal (&cache_cond);
2356 pthread_join (queue_thread, /* return = */ NULL);
2358 remove_pidfile ();
2360 RRDD_LOG(LOG_INFO, "goodbye");
2361 closelog ();
2363 return (0);
2364 } /* }}} int cleanup */
2366 static int read_options (int argc, char **argv) /* {{{ */
2367 {
2368 int option;
2369 int status = 0;
2371 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2372 {
2373 switch (option)
2374 {
2375 case 'g':
2376 stay_foreground=1;
2377 break;
2379 case 'L':
2380 case 'l':
2381 {
2382 listen_socket_t **temp;
2383 listen_socket_t *new;
2385 new = malloc(sizeof(listen_socket_t));
2386 if (new == NULL)
2387 {
2388 fprintf(stderr, "read_options: malloc failed.\n");
2389 return(2);
2390 }
2391 memset(new, 0, sizeof(listen_socket_t));
2393 temp = (listen_socket_t **) realloc (config_listen_address_list,
2394 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2395 if (temp == NULL)
2396 {
2397 fprintf (stderr, "read_options: realloc failed.\n");
2398 return (2);
2399 }
2400 config_listen_address_list = temp;
2402 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2403 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2405 temp[config_listen_address_list_len] = new;
2406 config_listen_address_list_len++;
2407 }
2408 break;
2410 case 'f':
2411 {
2412 int temp;
2414 temp = atoi (optarg);
2415 if (temp > 0)
2416 config_flush_interval = temp;
2417 else
2418 {
2419 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2420 status = 3;
2421 }
2422 }
2423 break;
2425 case 'w':
2426 {
2427 int temp;
2429 temp = atoi (optarg);
2430 if (temp > 0)
2431 config_write_interval = temp;
2432 else
2433 {
2434 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2435 status = 2;
2436 }
2437 }
2438 break;
2440 case 'z':
2441 {
2442 int temp;
2444 temp = atoi(optarg);
2445 if (temp > 0)
2446 config_write_jitter = temp;
2447 else
2448 {
2449 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2450 status = 2;
2451 }
2453 break;
2454 }
2456 case 'B':
2457 config_write_base_only = 1;
2458 break;
2460 case 'b':
2461 {
2462 size_t len;
2464 if (config_base_dir != NULL)
2465 free (config_base_dir);
2466 config_base_dir = strdup (optarg);
2467 if (config_base_dir == NULL)
2468 {
2469 fprintf (stderr, "read_options: strdup failed.\n");
2470 return (3);
2471 }
2473 len = strlen (config_base_dir);
2474 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2475 {
2476 config_base_dir[len - 1] = 0;
2477 len--;
2478 }
2480 if (len < 1)
2481 {
2482 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2483 return (4);
2484 }
2486 _config_base_dir_len = len;
2487 }
2488 break;
2490 case 'p':
2491 {
2492 if (config_pid_file != NULL)
2493 free (config_pid_file);
2494 config_pid_file = strdup (optarg);
2495 if (config_pid_file == NULL)
2496 {
2497 fprintf (stderr, "read_options: strdup failed.\n");
2498 return (3);
2499 }
2500 }
2501 break;
2503 case 'F':
2504 config_flush_at_shutdown = 1;
2505 break;
2507 case 'j':
2508 {
2509 struct stat statbuf;
2510 const char *dir = optarg;
2512 status = stat(dir, &statbuf);
2513 if (status != 0)
2514 {
2515 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2516 return 6;
2517 }
2519 if (!S_ISDIR(statbuf.st_mode)
2520 || access(dir, R_OK|W_OK|X_OK) != 0)
2521 {
2522 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2523 errno ? rrd_strerror(errno) : "");
2524 return 6;
2525 }
2527 journal_cur = malloc(PATH_MAX + 1);
2528 journal_old = malloc(PATH_MAX + 1);
2529 if (journal_cur == NULL || journal_old == NULL)
2530 {
2531 fprintf(stderr, "malloc failure for journal files\n");
2532 return 6;
2533 }
2534 else
2535 {
2536 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2537 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2538 }
2539 }
2540 break;
2542 case 'h':
2543 case '?':
2544 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2545 "\n"
2546 "Usage: rrdcached [options]\n"
2547 "\n"
2548 "Valid options are:\n"
2549 " -l <address> Socket address to listen to.\n"
2550 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2551 " -w <seconds> Interval in which to write data.\n"
2552 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2553 " -f <seconds> Interval in which to flush dead data.\n"
2554 " -p <file> Location of the PID-file.\n"
2555 " -b <dir> Base directory to change to.\n"
2556 " -B Restrict file access to paths within -b <dir>\n"
2557 " -g Do not fork and run in the foreground.\n"
2558 " -j <dir> Directory in which to create the journal files.\n"
2559 " -F Always flush all updates at shutdown\n"
2560 "\n"
2561 "For more information and a detailed description of all options "
2562 "please refer\n"
2563 "to the rrdcached(1) manual page.\n",
2564 VERSION);
2565 status = -1;
2566 break;
2567 } /* switch (option) */
2568 } /* while (getopt) */
2570 /* advise the user when values are not sane */
2571 if (config_flush_interval < 2 * config_write_interval)
2572 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2573 " 2x write interval (-w) !\n");
2574 if (config_write_jitter > config_write_interval)
2575 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2576 " write interval (-w) !\n");
2578 if (config_write_base_only && config_base_dir == NULL)
2579 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2580 " Consult the rrdcached documentation\n");
2582 if (journal_cur == NULL)
2583 config_flush_at_shutdown = 1;
2585 return (status);
2586 } /* }}} int read_options */
2588 int main (int argc, char **argv)
2589 {
2590 int status;
2592 status = read_options (argc, argv);
2593 if (status != 0)
2594 {
2595 if (status < 0)
2596 status = 0;
2597 return (status);
2598 }
2600 status = daemonize ();
2601 if (status == 1)
2602 {
2603 struct sigaction sigchld;
2605 memset (&sigchld, 0, sizeof (sigchld));
2606 sigchld.sa_handler = SIG_IGN;
2607 sigaction (SIGCHLD, &sigchld, NULL);
2609 return (0);
2610 }
2611 else if (status != 0)
2612 {
2613 fprintf (stderr, "daemonize failed, exiting.\n");
2614 return (1);
2615 }
2617 journal_init();
2619 /* start the queue thread */
2620 memset (&queue_thread, 0, sizeof (queue_thread));
2621 status = pthread_create (&queue_thread,
2622 NULL, /* attr */
2623 queue_thread_main,
2624 NULL); /* args */
2625 if (status != 0)
2626 {
2627 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2628 cleanup();
2629 return (1);
2630 }
2632 listen_thread_main (NULL);
2633 cleanup ();
2635 return (0);
2636 } /* int main */
2638 /*
2639 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2640 */