1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
122 pthread_cond_t flushed;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static int config_write_interval = 300;
169 static int config_write_jitter = 0;
170 static int config_flush_interval = 3600;
171 static int config_flush_at_shutdown = 0;
172 static char *config_pid_file = NULL;
173 static char *config_base_dir = NULL;
175 static char **config_listen_address_list = NULL;
176 static int config_listen_address_list_len = 0;
178 static uint64_t stats_queue_length = 0;
179 static uint64_t stats_updates_received = 0;
180 static uint64_t stats_flush_received = 0;
181 static uint64_t stats_updates_written = 0;
182 static uint64_t stats_data_sets_written = 0;
183 static uint64_t stats_journal_bytes = 0;
184 static uint64_t stats_journal_rotate = 0;
185 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
187 /* Journaled updates */
188 static char *journal_cur = NULL;
189 static char *journal_old = NULL;
190 static FILE *journal_fh = NULL;
191 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
192 static int journal_write(char *cmd, char *args);
193 static void journal_done(void);
194 static void journal_rotate(void);
196 /*
197 * Functions
198 */
199 static void sig_common (const char *sig) /* {{{ */
200 {
201 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
202 do_shutdown++;
203 pthread_cond_broadcast(&cache_cond);
204 } /* }}} void sig_common */
206 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
207 {
208 sig_common("INT");
209 } /* }}} void sig_int_handler */
211 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
212 {
213 sig_common("TERM");
214 } /* }}} void sig_term_handler */
216 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
217 {
218 config_flush_at_shutdown = 1;
219 sig_common("USR1");
220 } /* }}} void sig_usr1_handler */
222 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
223 {
224 config_flush_at_shutdown = 0;
225 sig_common("USR2");
226 } /* }}} void sig_usr2_handler */
228 static void install_signal_handlers(void) /* {{{ */
229 {
230 /* These structures are static, because `sigaction' behaves weird if the are
231 * overwritten.. */
232 static struct sigaction sa_int;
233 static struct sigaction sa_term;
234 static struct sigaction sa_pipe;
235 static struct sigaction sa_usr1;
236 static struct sigaction sa_usr2;
238 /* Install signal handlers */
239 memset (&sa_int, 0, sizeof (sa_int));
240 sa_int.sa_handler = sig_int_handler;
241 sigaction (SIGINT, &sa_int, NULL);
243 memset (&sa_term, 0, sizeof (sa_term));
244 sa_term.sa_handler = sig_term_handler;
245 sigaction (SIGTERM, &sa_term, NULL);
247 memset (&sa_pipe, 0, sizeof (sa_pipe));
248 sa_pipe.sa_handler = SIG_IGN;
249 sigaction (SIGPIPE, &sa_pipe, NULL);
251 memset (&sa_pipe, 0, sizeof (sa_usr1));
252 sa_usr1.sa_handler = sig_usr1_handler;
253 sigaction (SIGUSR1, &sa_usr1, NULL);
255 memset (&sa_usr2, 0, sizeof (sa_usr2));
256 sa_usr2.sa_handler = sig_usr2_handler;
257 sigaction (SIGUSR2, &sa_usr2, NULL);
259 } /* }}} void install_signal_handlers */
261 static int open_pidfile(void) /* {{{ */
262 {
263 int fd;
264 char *file;
266 file = (config_pid_file != NULL)
267 ? config_pid_file
268 : LOCALSTATEDIR "/run/rrdcached.pid";
270 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
271 if (fd < 0)
272 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
273 file, rrd_strerror(errno));
275 return(fd);
276 }
278 static int write_pidfile (int fd) /* {{{ */
279 {
280 pid_t pid;
281 FILE *fh;
283 pid = getpid ();
285 fh = fdopen (fd, "w");
286 if (fh == NULL)
287 {
288 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
289 close(fd);
290 return (-1);
291 }
293 fprintf (fh, "%i\n", (int) pid);
294 fclose (fh);
296 return (0);
297 } /* }}} int write_pidfile */
299 static int remove_pidfile (void) /* {{{ */
300 {
301 char *file;
302 int status;
304 file = (config_pid_file != NULL)
305 ? config_pid_file
306 : LOCALSTATEDIR "/run/rrdcached.pid";
308 status = unlink (file);
309 if (status == 0)
310 return (0);
311 return (errno);
312 } /* }}} int remove_pidfile */
314 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
315 {
316 char *buffer;
317 size_t buffer_used;
318 size_t buffer_free;
319 ssize_t status;
321 buffer = (char *) buffer_void;
322 buffer_used = 0;
323 buffer_free = buffer_size;
325 while (buffer_free > 0)
326 {
327 status = read (fd, buffer + buffer_used, buffer_free);
328 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
329 continue;
331 if (status < 0)
332 return (-1);
334 if (status == 0)
335 return (0);
337 assert ((0 > status) || (buffer_free >= (size_t) status));
339 buffer_free = buffer_free - status;
340 buffer_used = buffer_used + status;
342 if (buffer[buffer_used - 1] == '\n')
343 break;
344 }
346 assert (buffer_used > 0);
348 if (buffer[buffer_used - 1] != '\n')
349 {
350 errno = ENOBUFS;
351 return (-1);
352 }
354 buffer[buffer_used - 1] = 0;
356 /* Fix network line endings. */
357 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
358 {
359 buffer_used--;
360 buffer[buffer_used - 1] = 0;
361 }
363 return (buffer_used);
364 } /* }}} ssize_t sread */
366 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
367 {
368 const char *ptr;
369 size_t nleft;
370 ssize_t status;
372 /* special case for journal replay */
373 if (fd < 0) return 0;
375 ptr = (const char *) buf;
376 nleft = count;
378 while (nleft > 0)
379 {
380 status = write (fd, (const void *) ptr, nleft);
382 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
383 continue;
385 if (status < 0)
386 return (status);
388 nleft -= status;
389 ptr += status;
390 }
392 return (0);
393 } /* }}} ssize_t swrite */
395 static void _wipe_ci_values(cache_item_t *ci, time_t when)
396 {
397 ci->values = NULL;
398 ci->values_num = 0;
400 ci->last_flush_time = when;
401 if (config_write_jitter > 0)
402 ci->last_flush_time += (random() % config_write_jitter);
404 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
405 }
407 /*
408 * enqueue_cache_item:
409 * `cache_lock' must be acquired before calling this function!
410 */
411 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
412 queue_side_t side)
413 {
414 int did_insert = 0;
416 if (ci == NULL)
417 return (-1);
419 if (ci->values_num == 0)
420 return (0);
422 if (side == HEAD)
423 {
424 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
425 {
426 assert (ci->next == NULL);
427 ci->next = cache_queue_head;
428 cache_queue_head = ci;
430 if (cache_queue_tail == NULL)
431 cache_queue_tail = cache_queue_head;
433 did_insert = 1;
434 }
435 else if (cache_queue_head == ci)
436 {
437 /* do nothing */
438 }
439 else /* enqueued, but not first entry */
440 {
441 cache_item_t *prev;
443 /* find previous entry */
444 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
445 if (prev->next == ci)
446 break;
447 assert (prev != NULL);
449 /* move to the front */
450 prev->next = ci->next;
451 ci->next = cache_queue_head;
452 cache_queue_head = ci;
454 /* check if we need to adapt the tail */
455 if (cache_queue_tail == ci)
456 cache_queue_tail = prev;
457 }
458 }
459 else /* (side == TAIL) */
460 {
461 /* We don't move values back in the list.. */
462 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
463 return (0);
465 assert (ci->next == NULL);
467 if (cache_queue_tail == NULL)
468 cache_queue_head = ci;
469 else
470 cache_queue_tail->next = ci;
471 cache_queue_tail = ci;
473 did_insert = 1;
474 }
476 ci->flags |= CI_FLAGS_IN_QUEUE;
478 if (did_insert)
479 {
480 pthread_cond_broadcast(&cache_cond);
481 pthread_mutex_lock (&stats_lock);
482 stats_queue_length++;
483 pthread_mutex_unlock (&stats_lock);
484 }
486 return (0);
487 } /* }}} int enqueue_cache_item */
489 /*
490 * tree_callback_flush:
491 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
492 * while this is in progress.
493 */
494 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
495 gpointer data)
496 {
497 cache_item_t *ci;
498 callback_flush_data_t *cfd;
500 ci = (cache_item_t *) value;
501 cfd = (callback_flush_data_t *) data;
503 if ((ci->last_flush_time <= cfd->abs_timeout)
504 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
505 && (ci->values_num > 0))
506 {
507 enqueue_cache_item (ci, TAIL);
508 }
509 else if ((do_shutdown != 0)
510 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
511 && (ci->values_num > 0))
512 {
513 enqueue_cache_item (ci, TAIL);
514 }
515 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
516 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
517 && (ci->values_num <= 0))
518 {
519 char **temp;
521 temp = (char **) realloc (cfd->keys,
522 sizeof (char *) * (cfd->keys_num + 1));
523 if (temp == NULL)
524 {
525 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
526 return (FALSE);
527 }
528 cfd->keys = temp;
529 /* Make really sure this points to the _same_ place */
530 assert ((char *) key == ci->file);
531 cfd->keys[cfd->keys_num] = (char *) key;
532 cfd->keys_num++;
533 }
535 return (FALSE);
536 } /* }}} gboolean tree_callback_flush */
538 static int flush_old_values (int max_age)
539 {
540 callback_flush_data_t cfd;
541 size_t k;
543 memset (&cfd, 0, sizeof (cfd));
544 /* Pass the current time as user data so that we don't need to call
545 * `time' for each node. */
546 cfd.now = time (NULL);
547 cfd.keys = NULL;
548 cfd.keys_num = 0;
550 if (max_age > 0)
551 cfd.abs_timeout = cfd.now - max_age;
552 else
553 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
555 /* `tree_callback_flush' will return the keys of all values that haven't
556 * been touched in the last `config_flush_interval' seconds in `cfd'.
557 * The char*'s in this array point to the same memory as ci->file, so we
558 * don't need to free them separately. */
559 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
561 for (k = 0; k < cfd.keys_num; k++)
562 {
563 cache_item_t *ci;
565 /* This must not fail. */
566 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
567 assert (ci != NULL);
569 /* If we end up here with values available, something's seriously
570 * messed up. */
571 assert (ci->values_num == 0);
573 /* Remove the node from the tree */
574 g_tree_remove (cache_tree, cfd.keys[k]);
575 cfd.keys[k] = NULL;
577 /* Now free and clean up `ci'. */
578 free (ci->file);
579 ci->file = NULL;
580 free (ci);
581 ci = NULL;
582 } /* for (k = 0; k < cfd.keys_num; k++) */
584 if (cfd.keys != NULL)
585 {
586 free (cfd.keys);
587 cfd.keys = NULL;
588 }
590 return (0);
591 } /* int flush_old_values */
593 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
594 {
595 struct timeval now;
596 struct timespec next_flush;
597 int final_flush = 0; /* make sure we only flush once on shutdown */
599 gettimeofday (&now, NULL);
600 next_flush.tv_sec = now.tv_sec + config_flush_interval;
601 next_flush.tv_nsec = 1000 * now.tv_usec;
603 pthread_mutex_lock (&cache_lock);
604 while ((do_shutdown == 0) || (cache_queue_head != NULL))
605 {
606 cache_item_t *ci;
607 char *file;
608 char **values;
609 int values_num;
610 int status;
611 int i;
613 /* First, check if it's time to do the cache flush. */
614 gettimeofday (&now, NULL);
615 if ((now.tv_sec > next_flush.tv_sec)
616 || ((now.tv_sec == next_flush.tv_sec)
617 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
618 {
619 /* Flush all values that haven't been written in the last
620 * `config_write_interval' seconds. */
621 flush_old_values (config_write_interval);
623 /* Determine the time of the next cache flush. */
624 while (next_flush.tv_sec <= now.tv_sec)
625 next_flush.tv_sec += config_flush_interval;
627 /* unlock the cache while we rotate so we don't block incoming
628 * updates if the fsync() blocks on disk I/O */
629 pthread_mutex_unlock(&cache_lock);
630 journal_rotate();
631 pthread_mutex_lock(&cache_lock);
632 }
634 /* Now, check if there's something to store away. If not, wait until
635 * something comes in or it's time to do the cache flush. if we are
636 * shutting down, do not wait around. */
637 if (cache_queue_head == NULL && !do_shutdown)
638 {
639 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
640 if ((status != 0) && (status != ETIMEDOUT))
641 {
642 RRDD_LOG (LOG_ERR, "queue_thread_main: "
643 "pthread_cond_timedwait returned %i.", status);
644 }
645 }
647 /* We're about to shut down */
648 if (do_shutdown != 0 && !final_flush++)
649 {
650 if (config_flush_at_shutdown)
651 flush_old_values (-1); /* flush everything */
652 else
653 break;
654 }
656 /* Check if a value has arrived. This may be NULL if we timed out or there
657 * was an interrupt such as a signal. */
658 if (cache_queue_head == NULL)
659 continue;
661 ci = cache_queue_head;
663 /* copy the relevant parts */
664 file = strdup (ci->file);
665 if (file == NULL)
666 {
667 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
668 continue;
669 }
671 assert(ci->values != NULL);
672 assert(ci->values_num > 0);
674 values = ci->values;
675 values_num = ci->values_num;
677 _wipe_ci_values(ci, time(NULL));
679 cache_queue_head = ci->next;
680 if (cache_queue_head == NULL)
681 cache_queue_tail = NULL;
682 ci->next = NULL;
684 pthread_mutex_lock (&stats_lock);
685 assert (stats_queue_length > 0);
686 stats_queue_length--;
687 pthread_mutex_unlock (&stats_lock);
689 pthread_mutex_unlock (&cache_lock);
691 rrd_clear_error ();
692 status = rrd_update_r (file, NULL, values_num, (void *) values);
693 if (status != 0)
694 {
695 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
696 "rrd_update_r (%s) failed with status %i. (%s)",
697 file, status, rrd_get_error());
698 }
700 journal_write("wrote", file);
701 pthread_cond_broadcast(&ci->flushed);
703 for (i = 0; i < values_num; i++)
704 free (values[i]);
706 free(values);
707 free(file);
709 if (status == 0)
710 {
711 pthread_mutex_lock (&stats_lock);
712 stats_updates_written++;
713 stats_data_sets_written += values_num;
714 pthread_mutex_unlock (&stats_lock);
715 }
717 pthread_mutex_lock (&cache_lock);
719 /* We're about to shut down */
720 if (do_shutdown != 0 && !final_flush++)
721 {
722 if (config_flush_at_shutdown)
723 flush_old_values (-1); /* flush everything */
724 else
725 break;
726 }
727 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
728 pthread_mutex_unlock (&cache_lock);
730 if (config_flush_at_shutdown)
731 {
732 assert(cache_queue_head == NULL);
733 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
734 }
736 journal_done();
738 return (NULL);
739 } /* }}} void *queue_thread_main */
741 static int buffer_get_field (char **buffer_ret, /* {{{ */
742 size_t *buffer_size_ret, char **field_ret)
743 {
744 char *buffer;
745 size_t buffer_pos;
746 size_t buffer_size;
747 char *field;
748 size_t field_size;
749 int status;
751 buffer = *buffer_ret;
752 buffer_pos = 0;
753 buffer_size = *buffer_size_ret;
754 field = *buffer_ret;
755 field_size = 0;
757 if (buffer_size <= 0)
758 return (-1);
760 /* This is ensured by `handle_request'. */
761 assert (buffer[buffer_size - 1] == '\0');
763 status = -1;
764 while (buffer_pos < buffer_size)
765 {
766 /* Check for end-of-field or end-of-buffer */
767 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
768 {
769 field[field_size] = 0;
770 field_size++;
771 buffer_pos++;
772 status = 0;
773 break;
774 }
775 /* Handle escaped characters. */
776 else if (buffer[buffer_pos] == '\\')
777 {
778 if (buffer_pos >= (buffer_size - 1))
779 break;
780 buffer_pos++;
781 field[field_size] = buffer[buffer_pos];
782 field_size++;
783 buffer_pos++;
784 }
785 /* Normal operation */
786 else
787 {
788 field[field_size] = buffer[buffer_pos];
789 field_size++;
790 buffer_pos++;
791 }
792 } /* while (buffer_pos < buffer_size) */
794 if (status != 0)
795 return (status);
797 *buffer_ret = buffer + buffer_pos;
798 *buffer_size_ret = buffer_size - buffer_pos;
799 *field_ret = field;
801 return (0);
802 } /* }}} int buffer_get_field */
804 static int flush_file (const char *filename) /* {{{ */
805 {
806 cache_item_t *ci;
808 pthread_mutex_lock (&cache_lock);
810 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
811 if (ci == NULL)
812 {
813 pthread_mutex_unlock (&cache_lock);
814 return (ENOENT);
815 }
817 /* Enqueue at head */
818 enqueue_cache_item (ci, HEAD);
820 pthread_cond_wait(&ci->flushed, &cache_lock);
821 pthread_mutex_unlock(&cache_lock);
823 return (0);
824 } /* }}} int flush_file */
826 static int handle_request_help (int fd, /* {{{ */
827 char *buffer, size_t buffer_size)
828 {
829 int status;
830 char **help_text;
831 size_t help_text_len;
832 char *command;
833 size_t i;
835 char *help_help[] =
836 {
837 "5 Command overview\n",
838 "FLUSH <filename>\n",
839 "FLUSHALL\n",
840 "HELP [<command>]\n",
841 "UPDATE <filename> <values> [<values> ...]\n",
842 "STATS\n"
843 };
844 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
846 char *help_flush[] =
847 {
848 "4 Help for FLUSH\n",
849 "Usage: FLUSH <filename>\n",
850 "\n",
851 "Adds the given filename to the head of the update queue and returns\n",
852 "after is has been dequeued.\n"
853 };
854 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
856 char *help_flushall[] =
857 {
858 "3 Help for FLUSHALL\n",
859 "Usage: FLUSHALL\n",
860 "\n",
861 "Triggers writing of all pending updates. Returns immediately.\n"
862 };
863 size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
865 char *help_update[] =
866 {
867 "9 Help for UPDATE\n",
868 "Usage: UPDATE <filename> <values> [<values> ...]\n"
869 "\n",
870 "Adds the given file to the internal cache if it is not yet known and\n",
871 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
872 "for details.\n",
873 "\n",
874 "Each <values> has the following form:\n",
875 " <values> = <time>:<value>[:<value>[...]]\n",
876 "See the rrdupdate(1) manpage for details.\n"
877 };
878 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
880 char *help_stats[] =
881 {
882 "4 Help for STATS\n",
883 "Usage: STATS\n",
884 "\n",
885 "Returns some performance counters, see the rrdcached(1) manpage for\n",
886 "a description of the values.\n"
887 };
888 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
890 status = buffer_get_field (&buffer, &buffer_size, &command);
891 if (status != 0)
892 {
893 help_text = help_help;
894 help_text_len = help_help_len;
895 }
896 else
897 {
898 if (strcasecmp (command, "update") == 0)
899 {
900 help_text = help_update;
901 help_text_len = help_update_len;
902 }
903 else if (strcasecmp (command, "flush") == 0)
904 {
905 help_text = help_flush;
906 help_text_len = help_flush_len;
907 }
908 else if (strcasecmp (command, "flushall") == 0)
909 {
910 help_text = help_flushall;
911 help_text_len = help_flushall_len;
912 }
913 else if (strcasecmp (command, "stats") == 0)
914 {
915 help_text = help_stats;
916 help_text_len = help_stats_len;
917 }
918 else
919 {
920 help_text = help_help;
921 help_text_len = help_help_len;
922 }
923 }
925 for (i = 0; i < help_text_len; i++)
926 {
927 status = swrite (fd, help_text[i], strlen (help_text[i]));
928 if (status < 0)
929 {
930 status = errno;
931 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
932 return (status);
933 }
934 }
936 return (0);
937 } /* }}} int handle_request_help */
939 static int handle_request_stats (int fd, /* {{{ */
940 char *buffer __attribute__((unused)),
941 size_t buffer_size __attribute__((unused)))
942 {
943 int status;
944 char outbuf[CMD_MAX];
946 uint64_t copy_queue_length;
947 uint64_t copy_updates_received;
948 uint64_t copy_flush_received;
949 uint64_t copy_updates_written;
950 uint64_t copy_data_sets_written;
951 uint64_t copy_journal_bytes;
952 uint64_t copy_journal_rotate;
954 uint64_t tree_nodes_number;
955 uint64_t tree_depth;
957 pthread_mutex_lock (&stats_lock);
958 copy_queue_length = stats_queue_length;
959 copy_updates_received = stats_updates_received;
960 copy_flush_received = stats_flush_received;
961 copy_updates_written = stats_updates_written;
962 copy_data_sets_written = stats_data_sets_written;
963 copy_journal_bytes = stats_journal_bytes;
964 copy_journal_rotate = stats_journal_rotate;
965 pthread_mutex_unlock (&stats_lock);
967 pthread_mutex_lock (&cache_lock);
968 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
969 tree_depth = (uint64_t) g_tree_height (cache_tree);
970 pthread_mutex_unlock (&cache_lock);
972 #define RRDD_STATS_SEND \
973 outbuf[sizeof (outbuf) - 1] = 0; \
974 status = swrite (fd, outbuf, strlen (outbuf)); \
975 if (status < 0) \
976 { \
977 status = errno; \
978 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
979 return (status); \
980 }
982 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
983 RRDD_STATS_SEND;
985 snprintf (outbuf, sizeof (outbuf),
986 "QueueLength: %"PRIu64"\n", copy_queue_length);
987 RRDD_STATS_SEND;
989 snprintf (outbuf, sizeof (outbuf),
990 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
991 RRDD_STATS_SEND;
993 snprintf (outbuf, sizeof (outbuf),
994 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
995 RRDD_STATS_SEND;
997 snprintf (outbuf, sizeof (outbuf),
998 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
999 RRDD_STATS_SEND;
1001 snprintf (outbuf, sizeof (outbuf),
1002 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1003 RRDD_STATS_SEND;
1005 snprintf (outbuf, sizeof (outbuf),
1006 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1007 RRDD_STATS_SEND;
1009 snprintf (outbuf, sizeof (outbuf),
1010 "TreeDepth: %"PRIu64"\n", tree_depth);
1011 RRDD_STATS_SEND;
1013 snprintf (outbuf, sizeof(outbuf),
1014 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1015 RRDD_STATS_SEND;
1017 snprintf (outbuf, sizeof(outbuf),
1018 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1019 RRDD_STATS_SEND;
1021 return (0);
1022 #undef RRDD_STATS_SEND
1023 } /* }}} int handle_request_stats */
1025 static int handle_request_flush (int fd, /* {{{ */
1026 char *buffer, size_t buffer_size)
1027 {
1028 char *file;
1029 int status;
1030 char result[CMD_MAX];
1032 status = buffer_get_field (&buffer, &buffer_size, &file);
1033 if (status != 0)
1034 {
1035 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1036 }
1037 else
1038 {
1039 pthread_mutex_lock(&stats_lock);
1040 stats_flush_received++;
1041 pthread_mutex_unlock(&stats_lock);
1043 status = flush_file (file);
1044 if (status == 0)
1045 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1046 else if (status == ENOENT)
1047 {
1048 /* no file in our tree; see whether it exists at all */
1049 struct stat statbuf;
1051 memset(&statbuf, 0, sizeof(statbuf));
1052 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1053 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1054 else
1055 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1056 }
1057 else if (status < 0)
1058 strncpy (result, "-1 Internal error.\n", sizeof (result));
1059 else
1060 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1061 }
1062 result[sizeof (result) - 1] = 0;
1064 status = swrite (fd, result, strlen (result));
1065 if (status < 0)
1066 {
1067 status = errno;
1068 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1069 return (status);
1070 }
1072 return (0);
1073 } /* }}} int handle_request_flush */
1075 static int handle_request_flushall(int fd) /* {{{ */
1076 {
1077 int status;
1078 char answer[] ="0 Started flush.\n";
1080 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1082 pthread_mutex_lock(&cache_lock);
1083 flush_old_values(-1);
1084 pthread_mutex_unlock(&cache_lock);
1086 status = swrite(fd, answer, strlen(answer));
1087 if (status < 0)
1088 {
1089 status = errno;
1090 RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1091 }
1093 return (status);
1094 }
1096 static int handle_request_update (int fd, /* {{{ */
1097 char *buffer, size_t buffer_size)
1098 {
1099 char *file;
1100 int values_num = 0;
1101 int status;
1103 time_t now;
1105 cache_item_t *ci;
1106 char answer[CMD_MAX];
1108 #define RRDD_UPDATE_SEND \
1109 answer[sizeof (answer) - 1] = 0; \
1110 status = swrite (fd, answer, strlen (answer)); \
1111 if (status < 0) \
1112 { \
1113 status = errno; \
1114 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1115 return (status); \
1116 }
1118 now = time (NULL);
1120 status = buffer_get_field (&buffer, &buffer_size, &file);
1121 if (status != 0)
1122 {
1123 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1124 sizeof (answer));
1125 RRDD_UPDATE_SEND;
1126 return (0);
1127 }
1129 pthread_mutex_lock(&stats_lock);
1130 stats_updates_received++;
1131 pthread_mutex_unlock(&stats_lock);
1133 pthread_mutex_lock (&cache_lock);
1134 ci = g_tree_lookup (cache_tree, file);
1136 if (ci == NULL) /* {{{ */
1137 {
1138 struct stat statbuf;
1140 /* don't hold the lock while we setup; stat(2) might block */
1141 pthread_mutex_unlock(&cache_lock);
1143 memset (&statbuf, 0, sizeof (statbuf));
1144 status = stat (file, &statbuf);
1145 if (status != 0)
1146 {
1147 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1149 status = errno;
1150 if (status == ENOENT)
1151 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1152 else
1153 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1154 status);
1155 RRDD_UPDATE_SEND;
1156 return (0);
1157 }
1158 if (!S_ISREG (statbuf.st_mode))
1159 {
1160 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1161 RRDD_UPDATE_SEND;
1162 return (0);
1163 }
1164 if (access(file, R_OK|W_OK) != 0)
1165 {
1166 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1167 file, rrd_strerror(errno));
1168 RRDD_UPDATE_SEND;
1169 return (0);
1170 }
1172 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1173 if (ci == NULL)
1174 {
1175 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1177 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1178 RRDD_UPDATE_SEND;
1179 return (0);
1180 }
1181 memset (ci, 0, sizeof (cache_item_t));
1183 ci->file = strdup (file);
1184 if (ci->file == NULL)
1185 {
1186 free (ci);
1187 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1189 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1190 RRDD_UPDATE_SEND;
1191 return (0);
1192 }
1194 _wipe_ci_values(ci, now);
1195 ci->flags = CI_FLAGS_IN_TREE;
1197 pthread_mutex_lock(&cache_lock);
1198 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1199 } /* }}} */
1200 assert (ci != NULL);
1202 while (buffer_size > 0)
1203 {
1204 char **temp;
1205 char *value;
1207 status = buffer_get_field (&buffer, &buffer_size, &value);
1208 if (status != 0)
1209 {
1210 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1211 break;
1212 }
1214 temp = (char **) realloc (ci->values,
1215 sizeof (char *) * (ci->values_num + 1));
1216 if (temp == NULL)
1217 {
1218 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1219 continue;
1220 }
1221 ci->values = temp;
1223 ci->values[ci->values_num] = strdup (value);
1224 if (ci->values[ci->values_num] == NULL)
1225 {
1226 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1227 continue;
1228 }
1229 ci->values_num++;
1231 values_num++;
1232 }
1234 if (((now - ci->last_flush_time) >= config_write_interval)
1235 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1236 && (ci->values_num > 0))
1237 {
1238 enqueue_cache_item (ci, TAIL);
1239 }
1241 pthread_mutex_unlock (&cache_lock);
1243 if (values_num < 1)
1244 {
1245 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1246 }
1247 else
1248 {
1249 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1250 (values_num == 1) ? "" : "s");
1251 }
1252 RRDD_UPDATE_SEND;
1253 return (0);
1254 #undef RRDD_UPDATE_SEND
1255 } /* }}} int handle_request_update */
1257 /* we came across a "WROTE" entry during journal replay.
1258 * throw away any values that we have accumulated for this file
1259 */
1260 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1261 const char *buffer,
1262 size_t buffer_size __attribute__((unused)))
1263 {
1264 int i;
1265 cache_item_t *ci;
1266 const char *file = buffer;
1268 pthread_mutex_lock(&cache_lock);
1270 ci = g_tree_lookup(cache_tree, file);
1271 if (ci == NULL)
1272 {
1273 pthread_mutex_unlock(&cache_lock);
1274 return (0);
1275 }
1277 if (ci->values)
1278 {
1279 for (i=0; i < ci->values_num; i++)
1280 free(ci->values[i]);
1282 free(ci->values);
1283 }
1285 _wipe_ci_values(ci, time(NULL));
1287 pthread_mutex_unlock(&cache_lock);
1288 return (0);
1289 } /* }}} int handle_request_wrote */
1291 /* if fd < 0, we are in journal replay mode */
1292 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1293 {
1294 char *buffer_ptr;
1295 char *command;
1296 int status;
1298 assert (buffer[buffer_size - 1] == '\0');
1300 buffer_ptr = buffer;
1301 command = NULL;
1302 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1303 if (status != 0)
1304 {
1305 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1306 return (-1);
1307 }
1309 if (strcasecmp (command, "update") == 0)
1310 {
1311 /* don't re-write updates in replay mode */
1312 if (fd >= 0)
1313 journal_write(command, buffer_ptr);
1315 return (handle_request_update (fd, buffer_ptr, buffer_size));
1316 }
1317 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1318 {
1319 /* this is only valid in replay mode */
1320 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1321 }
1322 else if (strcasecmp (command, "flush") == 0)
1323 {
1324 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1325 }
1326 else if (strcasecmp (command, "flushall") == 0)
1327 {
1328 return (handle_request_flushall(fd));
1329 }
1330 else if (strcasecmp (command, "stats") == 0)
1331 {
1332 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1333 }
1334 else if (strcasecmp (command, "help") == 0)
1335 {
1336 return (handle_request_help (fd, buffer_ptr, buffer_size));
1337 }
1338 else
1339 {
1340 char result[CMD_MAX];
1342 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1343 result[sizeof (result) - 1] = 0;
1345 status = swrite (fd, result, strlen (result));
1346 if (status < 0)
1347 {
1348 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1349 return (-1);
1350 }
1351 }
1353 return (0);
1354 } /* }}} int handle_request */
1356 /* MUST NOT hold journal_lock before calling this */
1357 static void journal_rotate(void) /* {{{ */
1358 {
1359 FILE *old_fh = NULL;
1361 if (journal_cur == NULL || journal_old == NULL)
1362 return;
1364 pthread_mutex_lock(&journal_lock);
1366 /* we rotate this way (rename before close) so that the we can release
1367 * the journal lock as fast as possible. Journal writes to the new
1368 * journal can proceed immediately after the new file is opened. The
1369 * fclose can then block without affecting new updates.
1370 */
1371 if (journal_fh != NULL)
1372 {
1373 old_fh = journal_fh;
1374 rename(journal_cur, journal_old);
1375 ++stats_journal_rotate;
1376 }
1378 journal_fh = fopen(journal_cur, "a");
1379 pthread_mutex_unlock(&journal_lock);
1381 if (old_fh != NULL)
1382 fclose(old_fh);
1384 if (journal_fh == NULL)
1385 {
1386 RRDD_LOG(LOG_CRIT,
1387 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1388 journal_cur, rrd_strerror(errno));
1390 RRDD_LOG(LOG_ERR,
1391 "JOURNALING DISABLED: All values will be flushed at shutdown");
1392 config_flush_at_shutdown = 1;
1393 }
1395 } /* }}} static void journal_rotate */
1397 static void journal_done(void) /* {{{ */
1398 {
1399 if (journal_cur == NULL)
1400 return;
1402 pthread_mutex_lock(&journal_lock);
1403 if (journal_fh != NULL)
1404 {
1405 fclose(journal_fh);
1406 journal_fh = NULL;
1407 }
1409 if (config_flush_at_shutdown)
1410 {
1411 RRDD_LOG(LOG_INFO, "removing journals");
1412 unlink(journal_old);
1413 unlink(journal_cur);
1414 }
1415 else
1416 {
1417 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1418 "journals will be used at next startup");
1419 }
1421 pthread_mutex_unlock(&journal_lock);
1423 } /* }}} static void journal_done */
1425 static int journal_write(char *cmd, char *args) /* {{{ */
1426 {
1427 int chars;
1429 if (journal_fh == NULL)
1430 return 0;
1432 pthread_mutex_lock(&journal_lock);
1433 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1434 pthread_mutex_unlock(&journal_lock);
1436 if (chars > 0)
1437 {
1438 pthread_mutex_lock(&stats_lock);
1439 stats_journal_bytes += chars;
1440 pthread_mutex_unlock(&stats_lock);
1441 }
1443 return chars;
1444 } /* }}} static int journal_write */
1446 static int journal_replay (const char *file) /* {{{ */
1447 {
1448 FILE *fh;
1449 int entry_cnt = 0;
1450 int fail_cnt = 0;
1451 uint64_t line = 0;
1452 char entry[CMD_MAX];
1454 if (file == NULL) return 0;
1456 fh = fopen(file, "r");
1457 if (fh == NULL)
1458 {
1459 if (errno != ENOENT)
1460 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1461 file, rrd_strerror(errno));
1462 return 0;
1463 }
1464 else
1465 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1467 while(!feof(fh))
1468 {
1469 size_t entry_len;
1471 ++line;
1472 fgets(entry, sizeof(entry), fh);
1473 entry_len = strlen(entry);
1475 /* check \n termination in case journal writing crashed mid-line */
1476 if (entry_len == 0)
1477 continue;
1478 else if (entry[entry_len - 1] != '\n')
1479 {
1480 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1481 ++fail_cnt;
1482 continue;
1483 }
1485 entry[entry_len - 1] = '\0';
1487 if (handle_request(-1, entry, entry_len) == 0)
1488 ++entry_cnt;
1489 else
1490 ++fail_cnt;
1491 }
1493 fclose(fh);
1495 if (entry_cnt > 0)
1496 {
1497 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1498 entry_cnt, fail_cnt);
1499 return 1;
1500 }
1501 else
1502 return 0;
1504 } /* }}} static int journal_replay */
1506 static void *connection_thread_main (void *args) /* {{{ */
1507 {
1508 pthread_t self;
1509 int i;
1510 int fd;
1512 fd = *((int *) args);
1513 free (args);
1515 pthread_mutex_lock (&connection_threads_lock);
1516 {
1517 pthread_t *temp;
1519 temp = (pthread_t *) realloc (connection_threads,
1520 sizeof (pthread_t) * (connection_threads_num + 1));
1521 if (temp == NULL)
1522 {
1523 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1524 }
1525 else
1526 {
1527 connection_threads = temp;
1528 connection_threads[connection_threads_num] = pthread_self ();
1529 connection_threads_num++;
1530 }
1531 }
1532 pthread_mutex_unlock (&connection_threads_lock);
1534 while (do_shutdown == 0)
1535 {
1536 char buffer[CMD_MAX];
1538 struct pollfd pollfd;
1539 int status;
1541 pollfd.fd = fd;
1542 pollfd.events = POLLIN | POLLPRI;
1543 pollfd.revents = 0;
1545 status = poll (&pollfd, 1, /* timeout = */ 500);
1546 if (do_shutdown)
1547 break;
1548 else if (status == 0) /* timeout */
1549 continue;
1550 else if (status < 0) /* error */
1551 {
1552 status = errno;
1553 if (status == EINTR)
1554 continue;
1555 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1556 continue;
1557 }
1559 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1560 {
1561 close (fd);
1562 break;
1563 }
1564 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1565 {
1566 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1567 "poll(2) returned something unexpected: %#04hx",
1568 pollfd.revents);
1569 close (fd);
1570 break;
1571 }
1573 status = (int) sread (fd, buffer, sizeof (buffer));
1574 if (status <= 0)
1575 {
1576 close (fd);
1578 if (status < 0)
1579 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1581 break;
1582 }
1584 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1585 if (status != 0)
1586 break;
1587 }
1589 close(fd);
1591 self = pthread_self ();
1592 /* Remove this thread from the connection threads list */
1593 pthread_mutex_lock (&connection_threads_lock);
1594 /* Find out own index in the array */
1595 for (i = 0; i < connection_threads_num; i++)
1596 if (pthread_equal (connection_threads[i], self) != 0)
1597 break;
1598 assert (i < connection_threads_num);
1600 /* Move the trailing threads forward. */
1601 if (i < (connection_threads_num - 1))
1602 {
1603 memmove (connection_threads + i,
1604 connection_threads + i + 1,
1605 sizeof (pthread_t) * (connection_threads_num - i - 1));
1606 }
1608 connection_threads_num--;
1609 pthread_mutex_unlock (&connection_threads_lock);
1611 return (NULL);
1612 } /* }}} void *connection_thread_main */
1614 static int open_listen_socket_unix (const char *path) /* {{{ */
1615 {
1616 int fd;
1617 struct sockaddr_un sa;
1618 listen_socket_t *temp;
1619 int status;
1621 temp = (listen_socket_t *) realloc (listen_fds,
1622 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1623 if (temp == NULL)
1624 {
1625 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1626 return (-1);
1627 }
1628 listen_fds = temp;
1629 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1631 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1632 if (fd < 0)
1633 {
1634 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1635 return (-1);
1636 }
1638 memset (&sa, 0, sizeof (sa));
1639 sa.sun_family = AF_UNIX;
1640 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1642 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1643 if (status != 0)
1644 {
1645 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1646 close (fd);
1647 unlink (path);
1648 return (-1);
1649 }
1651 status = listen (fd, /* backlog = */ 10);
1652 if (status != 0)
1653 {
1654 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1655 close (fd);
1656 unlink (path);
1657 return (-1);
1658 }
1660 listen_fds[listen_fds_num].fd = fd;
1661 snprintf (listen_fds[listen_fds_num].path,
1662 sizeof (listen_fds[listen_fds_num].path) - 1,
1663 "unix:%s", path);
1664 listen_fds_num++;
1666 return (0);
1667 } /* }}} int open_listen_socket_unix */
1669 static int open_listen_socket (const char *addr_orig) /* {{{ */
1670 {
1671 struct addrinfo ai_hints;
1672 struct addrinfo *ai_res;
1673 struct addrinfo *ai_ptr;
1674 char addr_copy[NI_MAXHOST];
1675 char *addr;
1676 char *port;
1677 int status;
1679 assert (addr_orig != NULL);
1681 strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1682 addr_copy[sizeof (addr_copy) - 1] = 0;
1683 addr = addr_copy;
1685 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1686 return (open_listen_socket_unix (addr + strlen ("unix:")));
1687 else if (addr[0] == '/')
1688 return (open_listen_socket_unix (addr));
1690 memset (&ai_hints, 0, sizeof (ai_hints));
1691 ai_hints.ai_flags = 0;
1692 #ifdef AI_ADDRCONFIG
1693 ai_hints.ai_flags |= AI_ADDRCONFIG;
1694 #endif
1695 ai_hints.ai_family = AF_UNSPEC;
1696 ai_hints.ai_socktype = SOCK_STREAM;
1698 port = NULL;
1699 if (*addr == '[') /* IPv6+port format */
1700 {
1701 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1702 addr++;
1704 port = strchr (addr, ']');
1705 if (port == NULL)
1706 {
1707 RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1708 addr_orig);
1709 return (-1);
1710 }
1711 *port = 0;
1712 port++;
1714 if (*port == ':')
1715 port++;
1716 else if (*port == 0)
1717 port = NULL;
1718 else
1719 {
1720 RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1721 port);
1722 return (-1);
1723 }
1724 } /* if (*addr = ']') */
1725 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1726 {
1727 port = rindex(addr, ':');
1728 if (port != NULL)
1729 {
1730 *port = 0;
1731 port++;
1732 }
1733 }
1734 ai_res = NULL;
1735 status = getaddrinfo (addr,
1736 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1737 &ai_hints, &ai_res);
1738 if (status != 0)
1739 {
1740 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1741 "%s", addr, gai_strerror (status));
1742 return (-1);
1743 }
1745 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1746 {
1747 int fd;
1748 listen_socket_t *temp;
1749 int one = 1;
1751 temp = (listen_socket_t *) realloc (listen_fds,
1752 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1753 if (temp == NULL)
1754 {
1755 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1756 continue;
1757 }
1758 listen_fds = temp;
1759 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1761 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1762 if (fd < 0)
1763 {
1764 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1765 continue;
1766 }
1768 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1770 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1771 if (status != 0)
1772 {
1773 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1774 close (fd);
1775 continue;
1776 }
1778 status = listen (fd, /* backlog = */ 10);
1779 if (status != 0)
1780 {
1781 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1782 close (fd);
1783 return (-1);
1784 }
1786 listen_fds[listen_fds_num].fd = fd;
1787 strncpy (listen_fds[listen_fds_num].path, addr,
1788 sizeof (listen_fds[listen_fds_num].path) - 1);
1789 listen_fds_num++;
1790 } /* for (ai_ptr) */
1792 return (0);
1793 } /* }}} int open_listen_socket */
1795 static int close_listen_sockets (void) /* {{{ */
1796 {
1797 size_t i;
1799 for (i = 0; i < listen_fds_num; i++)
1800 {
1801 close (listen_fds[i].fd);
1802 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1803 unlink (listen_fds[i].path + strlen ("unix:"));
1804 }
1806 free (listen_fds);
1807 listen_fds = NULL;
1808 listen_fds_num = 0;
1810 return (0);
1811 } /* }}} int close_listen_sockets */
1813 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1814 {
1815 struct pollfd *pollfds;
1816 int pollfds_num;
1817 int status;
1818 int i;
1820 for (i = 0; i < config_listen_address_list_len; i++)
1821 open_listen_socket (config_listen_address_list[i]);
1823 if (config_listen_address_list_len < 1)
1824 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1826 if (listen_fds_num < 1)
1827 {
1828 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1829 "could be opened. Sorry.");
1830 return (NULL);
1831 }
1833 pollfds_num = listen_fds_num;
1834 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1835 if (pollfds == NULL)
1836 {
1837 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1838 return (NULL);
1839 }
1840 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1842 RRDD_LOG(LOG_INFO, "listening for connections");
1844 while (do_shutdown == 0)
1845 {
1846 assert (pollfds_num == ((int) listen_fds_num));
1847 for (i = 0; i < pollfds_num; i++)
1848 {
1849 pollfds[i].fd = listen_fds[i].fd;
1850 pollfds[i].events = POLLIN | POLLPRI;
1851 pollfds[i].revents = 0;
1852 }
1854 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1855 if (do_shutdown)
1856 break;
1857 else if (status == 0) /* timeout */
1858 continue;
1859 else if (status < 0) /* error */
1860 {
1861 status = errno;
1862 if (status != EINTR)
1863 {
1864 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1865 }
1866 continue;
1867 }
1869 for (i = 0; i < pollfds_num; i++)
1870 {
1871 int *client_sd;
1872 struct sockaddr_storage client_sa;
1873 socklen_t client_sa_size;
1874 pthread_t tid;
1875 pthread_attr_t attr;
1877 if (pollfds[i].revents == 0)
1878 continue;
1880 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1881 {
1882 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1883 "poll(2) returned something unexpected for listen FD #%i.",
1884 pollfds[i].fd);
1885 continue;
1886 }
1888 client_sd = (int *) malloc (sizeof (int));
1889 if (client_sd == NULL)
1890 {
1891 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1892 continue;
1893 }
1895 client_sa_size = sizeof (client_sa);
1896 *client_sd = accept (pollfds[i].fd,
1897 (struct sockaddr *) &client_sa, &client_sa_size);
1898 if (*client_sd < 0)
1899 {
1900 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1901 continue;
1902 }
1904 pthread_attr_init (&attr);
1905 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1907 status = pthread_create (&tid, &attr, connection_thread_main,
1908 /* args = */ (void *) client_sd);
1909 if (status != 0)
1910 {
1911 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1912 close (*client_sd);
1913 free (client_sd);
1914 continue;
1915 }
1916 } /* for (pollfds_num) */
1917 } /* while (do_shutdown == 0) */
1919 RRDD_LOG(LOG_INFO, "starting shutdown");
1921 close_listen_sockets ();
1923 pthread_mutex_lock (&connection_threads_lock);
1924 while (connection_threads_num > 0)
1925 {
1926 pthread_t wait_for;
1928 wait_for = connection_threads[0];
1930 pthread_mutex_unlock (&connection_threads_lock);
1931 pthread_join (wait_for, /* retval = */ NULL);
1932 pthread_mutex_lock (&connection_threads_lock);
1933 }
1934 pthread_mutex_unlock (&connection_threads_lock);
1936 return (NULL);
1937 } /* }}} void *listen_thread_main */
1939 static int daemonize (void) /* {{{ */
1940 {
1941 int status;
1942 int fd;
1944 fd = open_pidfile();
1945 if (fd < 0) return fd;
1947 if (!stay_foreground)
1948 {
1949 pid_t child;
1950 char *base_dir;
1952 child = fork ();
1953 if (child < 0)
1954 {
1955 fprintf (stderr, "daemonize: fork(2) failed.\n");
1956 return (-1);
1957 }
1958 else if (child > 0)
1959 {
1960 return (1);
1961 }
1963 /* Change into the /tmp directory. */
1964 base_dir = (config_base_dir != NULL)
1965 ? config_base_dir
1966 : "/tmp";
1967 status = chdir (base_dir);
1968 if (status != 0)
1969 {
1970 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1971 return (-1);
1972 }
1974 /* Become session leader */
1975 setsid ();
1977 /* Open the first three file descriptors to /dev/null */
1978 close (2);
1979 close (1);
1980 close (0);
1982 open ("/dev/null", O_RDWR);
1983 dup (0);
1984 dup (0);
1985 } /* if (!stay_foreground) */
1987 install_signal_handlers();
1989 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1990 RRDD_LOG(LOG_INFO, "starting up");
1992 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1993 if (cache_tree == NULL)
1994 {
1995 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1996 return (-1);
1997 }
1999 status = write_pidfile (fd);
2000 return status;
2001 } /* }}} int daemonize */
2003 static int cleanup (void) /* {{{ */
2004 {
2005 do_shutdown++;
2007 pthread_cond_signal (&cache_cond);
2008 pthread_join (queue_thread, /* return = */ NULL);
2010 remove_pidfile ();
2012 RRDD_LOG(LOG_INFO, "goodbye");
2013 closelog ();
2015 return (0);
2016 } /* }}} int cleanup */
2018 static int read_options (int argc, char **argv) /* {{{ */
2019 {
2020 int option;
2021 int status = 0;
2023 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?F")) != -1)
2024 {
2025 switch (option)
2026 {
2027 case 'g':
2028 stay_foreground=1;
2029 break;
2031 case 'l':
2032 {
2033 char **temp;
2035 temp = (char **) realloc (config_listen_address_list,
2036 sizeof (char *) * (config_listen_address_list_len + 1));
2037 if (temp == NULL)
2038 {
2039 fprintf (stderr, "read_options: realloc failed.\n");
2040 return (2);
2041 }
2042 config_listen_address_list = temp;
2044 temp[config_listen_address_list_len] = strdup (optarg);
2045 if (temp[config_listen_address_list_len] == NULL)
2046 {
2047 fprintf (stderr, "read_options: strdup failed.\n");
2048 return (2);
2049 }
2050 config_listen_address_list_len++;
2051 }
2052 break;
2054 case 'f':
2055 {
2056 int temp;
2058 temp = atoi (optarg);
2059 if (temp > 0)
2060 config_flush_interval = temp;
2061 else
2062 {
2063 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2064 status = 3;
2065 }
2066 }
2067 break;
2069 case 'w':
2070 {
2071 int temp;
2073 temp = atoi (optarg);
2074 if (temp > 0)
2075 config_write_interval = temp;
2076 else
2077 {
2078 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2079 status = 2;
2080 }
2081 }
2082 break;
2084 case 'z':
2085 {
2086 int temp;
2088 temp = atoi(optarg);
2089 if (temp > 0)
2090 config_write_jitter = temp;
2091 else
2092 {
2093 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2094 status = 2;
2095 }
2097 break;
2098 }
2100 case 'b':
2101 {
2102 size_t len;
2104 if (config_base_dir != NULL)
2105 free (config_base_dir);
2106 config_base_dir = strdup (optarg);
2107 if (config_base_dir == NULL)
2108 {
2109 fprintf (stderr, "read_options: strdup failed.\n");
2110 return (3);
2111 }
2113 len = strlen (config_base_dir);
2114 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2115 {
2116 config_base_dir[len - 1] = 0;
2117 len--;
2118 }
2120 if (len < 1)
2121 {
2122 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2123 return (4);
2124 }
2125 }
2126 break;
2128 case 'p':
2129 {
2130 if (config_pid_file != NULL)
2131 free (config_pid_file);
2132 config_pid_file = strdup (optarg);
2133 if (config_pid_file == NULL)
2134 {
2135 fprintf (stderr, "read_options: strdup failed.\n");
2136 return (3);
2137 }
2138 }
2139 break;
2141 case 'F':
2142 config_flush_at_shutdown = 1;
2143 break;
2145 case 'j':
2146 {
2147 struct stat statbuf;
2148 const char *dir = optarg;
2150 status = stat(dir, &statbuf);
2151 if (status != 0)
2152 {
2153 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2154 return 6;
2155 }
2157 if (!S_ISDIR(statbuf.st_mode)
2158 || access(dir, R_OK|W_OK|X_OK) != 0)
2159 {
2160 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2161 errno ? rrd_strerror(errno) : "");
2162 return 6;
2163 }
2165 journal_cur = malloc(PATH_MAX + 1);
2166 journal_old = malloc(PATH_MAX + 1);
2167 if (journal_cur == NULL || journal_old == NULL)
2168 {
2169 fprintf(stderr, "malloc failure for journal files\n");
2170 return 6;
2171 }
2172 else
2173 {
2174 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2175 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2176 }
2177 }
2178 break;
2180 case 'h':
2181 case '?':
2182 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2183 "\n"
2184 "Usage: rrdcached [options]\n"
2185 "\n"
2186 "Valid options are:\n"
2187 " -l <address> Socket address to listen to.\n"
2188 " -w <seconds> Interval in which to write data.\n"
2189 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2190 " -f <seconds> Interval in which to flush dead data.\n"
2191 " -p <file> Location of the PID-file.\n"
2192 " -b <dir> Base directory to change to.\n"
2193 " -g Do not fork and run in the foreground.\n"
2194 " -j <dir> Directory in which to create the journal files.\n"
2195 " -F Always flush all updates at shutdown\n"
2196 "\n"
2197 "For more information and a detailed description of all options "
2198 "please refer\n"
2199 "to the rrdcached(1) manual page.\n",
2200 VERSION);
2201 status = -1;
2202 break;
2203 } /* switch (option) */
2204 } /* while (getopt) */
2206 /* advise the user when values are not sane */
2207 if (config_flush_interval < 2 * config_write_interval)
2208 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2209 " 2x write interval (-w) !\n");
2210 if (config_write_jitter > config_write_interval)
2211 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2212 " write interval (-w) !\n");
2214 if (journal_cur == NULL)
2215 config_flush_at_shutdown = 1;
2217 return (status);
2218 } /* }}} int read_options */
2220 int main (int argc, char **argv)
2221 {
2222 int status;
2224 status = read_options (argc, argv);
2225 if (status != 0)
2226 {
2227 if (status < 0)
2228 status = 0;
2229 return (status);
2230 }
2232 status = daemonize ();
2233 if (status == 1)
2234 {
2235 struct sigaction sigchld;
2237 memset (&sigchld, 0, sizeof (sigchld));
2238 sigchld.sa_handler = SIG_IGN;
2239 sigaction (SIGCHLD, &sigchld, NULL);
2241 return (0);
2242 }
2243 else if (status != 0)
2244 {
2245 fprintf (stderr, "daemonize failed, exiting.\n");
2246 return (1);
2247 }
2249 if (journal_cur != NULL)
2250 {
2251 int had_journal = 0;
2253 pthread_mutex_lock(&journal_lock);
2255 RRDD_LOG(LOG_INFO, "checking for journal files");
2257 had_journal += journal_replay(journal_old);
2258 had_journal += journal_replay(journal_cur);
2260 if (had_journal)
2261 flush_old_values(-1);
2263 pthread_mutex_unlock(&journal_lock);
2264 journal_rotate();
2266 RRDD_LOG(LOG_INFO, "journal processing complete");
2267 }
2269 /* start the queue thread */
2270 memset (&queue_thread, 0, sizeof (queue_thread));
2271 status = pthread_create (&queue_thread,
2272 NULL, /* attr */
2273 queue_thread_main,
2274 NULL); /* args */
2275 if (status != 0)
2276 {
2277 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2278 cleanup();
2279 return (1);
2280 }
2282 listen_thread_main (NULL);
2283 cleanup ();
2285 return (0);
2286 } /* int main */
2288 /*
2289 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2290 */