1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
122 pthread_cond_t flushed;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static int config_write_interval = 300;
169 static int config_write_jitter = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
195 /*
196 * Functions
197 */
198 static void sig_common (const char *sig) /* {{{ */
199 {
200 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
201 do_shutdown++;
202 pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_common */
205 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
206 {
207 sig_common("INT");
208 } /* }}} void sig_int_handler */
210 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
211 {
212 sig_common("TERM");
213 } /* }}} void sig_term_handler */
215 static void install_signal_handlers(void) /* {{{ */
216 {
217 /* These structures are static, because `sigaction' behaves weird if the are
218 * overwritten.. */
219 static struct sigaction sa_int;
220 static struct sigaction sa_term;
221 static struct sigaction sa_pipe;
223 /* Install signal handlers */
224 memset (&sa_int, 0, sizeof (sa_int));
225 sa_int.sa_handler = sig_int_handler;
226 sigaction (SIGINT, &sa_int, NULL);
228 memset (&sa_term, 0, sizeof (sa_term));
229 sa_term.sa_handler = sig_term_handler;
230 sigaction (SIGTERM, &sa_term, NULL);
232 memset (&sa_pipe, 0, sizeof (sa_pipe));
233 sa_pipe.sa_handler = SIG_IGN;
234 sigaction (SIGPIPE, &sa_pipe, NULL);
236 } /* }}} void install_signal_handlers */
238 static int open_pidfile(void) /* {{{ */
239 {
240 int fd;
241 char *file;
243 file = (config_pid_file != NULL)
244 ? config_pid_file
245 : LOCALSTATEDIR "/run/rrdcached.pid";
247 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
248 if (fd < 0)
249 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
250 file, rrd_strerror(errno));
252 return(fd);
253 }
255 static int write_pidfile (int fd) /* {{{ */
256 {
257 pid_t pid;
258 FILE *fh;
260 pid = getpid ();
262 fh = fdopen (fd, "w");
263 if (fh == NULL)
264 {
265 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
266 close(fd);
267 return (-1);
268 }
270 fprintf (fh, "%i\n", (int) pid);
271 fclose (fh);
273 return (0);
274 } /* }}} int write_pidfile */
276 static int remove_pidfile (void) /* {{{ */
277 {
278 char *file;
279 int status;
281 file = (config_pid_file != NULL)
282 ? config_pid_file
283 : LOCALSTATEDIR "/run/rrdcached.pid";
285 status = unlink (file);
286 if (status == 0)
287 return (0);
288 return (errno);
289 } /* }}} int remove_pidfile */
291 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
292 {
293 char *buffer;
294 size_t buffer_used;
295 size_t buffer_free;
296 ssize_t status;
298 buffer = (char *) buffer_void;
299 buffer_used = 0;
300 buffer_free = buffer_size;
302 while (buffer_free > 0)
303 {
304 status = read (fd, buffer + buffer_used, buffer_free);
305 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
306 continue;
308 if (status < 0)
309 return (-1);
311 if (status == 0)
312 return (0);
314 assert ((0 > status) || (buffer_free >= (size_t) status));
316 buffer_free = buffer_free - status;
317 buffer_used = buffer_used + status;
319 if (buffer[buffer_used - 1] == '\n')
320 break;
321 }
323 assert (buffer_used > 0);
325 if (buffer[buffer_used - 1] != '\n')
326 {
327 errno = ENOBUFS;
328 return (-1);
329 }
331 buffer[buffer_used - 1] = 0;
333 /* Fix network line endings. */
334 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
335 {
336 buffer_used--;
337 buffer[buffer_used - 1] = 0;
338 }
340 return (buffer_used);
341 } /* }}} ssize_t sread */
343 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
344 {
345 const char *ptr;
346 size_t nleft;
347 ssize_t status;
349 /* special case for journal replay */
350 if (fd < 0) return 0;
352 ptr = (const char *) buf;
353 nleft = count;
355 while (nleft > 0)
356 {
357 status = write (fd, (const void *) ptr, nleft);
359 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
360 continue;
362 if (status < 0)
363 return (status);
365 nleft -= status;
366 ptr += status;
367 }
369 return (0);
370 } /* }}} ssize_t swrite */
372 static void _wipe_ci_values(cache_item_t *ci, time_t when)
373 {
374 ci->values = NULL;
375 ci->values_num = 0;
377 ci->last_flush_time = when;
378 if (config_write_jitter > 0)
379 ci->last_flush_time += (random() % config_write_jitter);
381 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
382 }
384 /*
385 * enqueue_cache_item:
386 * `cache_lock' must be acquired before calling this function!
387 */
388 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
389 queue_side_t side)
390 {
391 int did_insert = 0;
393 if (ci == NULL)
394 return (-1);
396 if (ci->values_num == 0)
397 return (0);
399 if (side == HEAD)
400 {
401 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
402 {
403 assert (ci->next == NULL);
404 ci->next = cache_queue_head;
405 cache_queue_head = ci;
407 if (cache_queue_tail == NULL)
408 cache_queue_tail = cache_queue_head;
410 did_insert = 1;
411 }
412 else if (cache_queue_head == ci)
413 {
414 /* do nothing */
415 }
416 else /* enqueued, but not first entry */
417 {
418 cache_item_t *prev;
420 /* find previous entry */
421 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
422 if (prev->next == ci)
423 break;
424 assert (prev != NULL);
426 /* move to the front */
427 prev->next = ci->next;
428 ci->next = cache_queue_head;
429 cache_queue_head = ci;
431 /* check if we need to adapt the tail */
432 if (cache_queue_tail == ci)
433 cache_queue_tail = prev;
434 }
435 }
436 else /* (side == TAIL) */
437 {
438 /* We don't move values back in the list.. */
439 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
440 return (0);
442 assert (ci->next == NULL);
444 if (cache_queue_tail == NULL)
445 cache_queue_head = ci;
446 else
447 cache_queue_tail->next = ci;
448 cache_queue_tail = ci;
450 did_insert = 1;
451 }
453 ci->flags |= CI_FLAGS_IN_QUEUE;
455 if (did_insert)
456 {
457 pthread_cond_broadcast(&cache_cond);
458 pthread_mutex_lock (&stats_lock);
459 stats_queue_length++;
460 pthread_mutex_unlock (&stats_lock);
461 }
463 return (0);
464 } /* }}} int enqueue_cache_item */
466 /*
467 * tree_callback_flush:
468 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
469 * while this is in progress.
470 */
471 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
472 gpointer data)
473 {
474 cache_item_t *ci;
475 callback_flush_data_t *cfd;
477 ci = (cache_item_t *) value;
478 cfd = (callback_flush_data_t *) data;
480 if ((ci->last_flush_time <= cfd->abs_timeout)
481 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
482 && (ci->values_num > 0))
483 {
484 enqueue_cache_item (ci, TAIL);
485 }
486 else if ((do_shutdown != 0)
487 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
488 && (ci->values_num > 0))
489 {
490 enqueue_cache_item (ci, TAIL);
491 }
492 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
493 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
494 && (ci->values_num <= 0))
495 {
496 char **temp;
498 temp = (char **) realloc (cfd->keys,
499 sizeof (char *) * (cfd->keys_num + 1));
500 if (temp == NULL)
501 {
502 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
503 return (FALSE);
504 }
505 cfd->keys = temp;
506 /* Make really sure this points to the _same_ place */
507 assert ((char *) key == ci->file);
508 cfd->keys[cfd->keys_num] = (char *) key;
509 cfd->keys_num++;
510 }
512 return (FALSE);
513 } /* }}} gboolean tree_callback_flush */
515 static int flush_old_values (int max_age)
516 {
517 callback_flush_data_t cfd;
518 size_t k;
520 memset (&cfd, 0, sizeof (cfd));
521 /* Pass the current time as user data so that we don't need to call
522 * `time' for each node. */
523 cfd.now = time (NULL);
524 cfd.keys = NULL;
525 cfd.keys_num = 0;
527 if (max_age > 0)
528 cfd.abs_timeout = cfd.now - max_age;
529 else
530 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
532 /* `tree_callback_flush' will return the keys of all values that haven't
533 * been touched in the last `config_flush_interval' seconds in `cfd'.
534 * The char*'s in this array point to the same memory as ci->file, so we
535 * don't need to free them separately. */
536 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
538 for (k = 0; k < cfd.keys_num; k++)
539 {
540 cache_item_t *ci;
542 /* This must not fail. */
543 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
544 assert (ci != NULL);
546 /* If we end up here with values available, something's seriously
547 * messed up. */
548 assert (ci->values_num == 0);
550 /* Remove the node from the tree */
551 g_tree_remove (cache_tree, cfd.keys[k]);
552 cfd.keys[k] = NULL;
554 /* Now free and clean up `ci'. */
555 free (ci->file);
556 ci->file = NULL;
557 free (ci);
558 ci = NULL;
559 } /* for (k = 0; k < cfd.keys_num; k++) */
561 if (cfd.keys != NULL)
562 {
563 free (cfd.keys);
564 cfd.keys = NULL;
565 }
567 return (0);
568 } /* int flush_old_values */
570 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
571 {
572 struct timeval now;
573 struct timespec next_flush;
575 gettimeofday (&now, NULL);
576 next_flush.tv_sec = now.tv_sec + config_flush_interval;
577 next_flush.tv_nsec = 1000 * now.tv_usec;
579 pthread_mutex_lock (&cache_lock);
580 while ((do_shutdown == 0) || (cache_queue_head != NULL))
581 {
582 cache_item_t *ci;
583 char *file;
584 char **values;
585 int values_num;
586 int status;
587 int i;
589 /* First, check if it's time to do the cache flush. */
590 gettimeofday (&now, NULL);
591 if ((now.tv_sec > next_flush.tv_sec)
592 || ((now.tv_sec == next_flush.tv_sec)
593 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
594 {
595 /* Flush all values that haven't been written in the last
596 * `config_write_interval' seconds. */
597 flush_old_values (config_write_interval);
599 /* Determine the time of the next cache flush. */
600 while (next_flush.tv_sec <= now.tv_sec)
601 next_flush.tv_sec += config_flush_interval;
603 /* unlock the cache while we rotate so we don't block incoming
604 * updates if the fsync() blocks on disk I/O */
605 pthread_mutex_unlock(&cache_lock);
606 journal_rotate();
607 pthread_mutex_lock(&cache_lock);
608 }
610 /* Now, check if there's something to store away. If not, wait until
611 * something comes in or it's time to do the cache flush. */
612 if (cache_queue_head == NULL)
613 {
614 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
615 if ((status != 0) && (status != ETIMEDOUT))
616 {
617 RRDD_LOG (LOG_ERR, "queue_thread_main: "
618 "pthread_cond_timedwait returned %i.", status);
619 }
620 }
622 /* We're about to shut down, so lets flush the entire tree. */
623 if ((do_shutdown != 0) && (cache_queue_head == NULL))
624 flush_old_values (/* max age = */ -1);
626 /* Check if a value has arrived. This may be NULL if we timed out or there
627 * was an interrupt such as a signal. */
628 if (cache_queue_head == NULL)
629 continue;
631 ci = cache_queue_head;
633 /* copy the relevant parts */
634 file = strdup (ci->file);
635 if (file == NULL)
636 {
637 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
638 continue;
639 }
641 assert(ci->values != NULL);
642 assert(ci->values_num > 0);
644 values = ci->values;
645 values_num = ci->values_num;
647 _wipe_ci_values(ci, time(NULL));
649 cache_queue_head = ci->next;
650 if (cache_queue_head == NULL)
651 cache_queue_tail = NULL;
652 ci->next = NULL;
654 pthread_mutex_lock (&stats_lock);
655 assert (stats_queue_length > 0);
656 stats_queue_length--;
657 pthread_mutex_unlock (&stats_lock);
659 pthread_mutex_unlock (&cache_lock);
661 rrd_clear_error ();
662 status = rrd_update_r (file, NULL, values_num, (void *) values);
663 if (status != 0)
664 {
665 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
666 "rrd_update_r (%s) failed with status %i. (%s)",
667 file, status, rrd_get_error());
668 }
670 journal_write("wrote", file);
671 pthread_cond_broadcast(&ci->flushed);
673 for (i = 0; i < values_num; i++)
674 free (values[i]);
676 free(values);
677 free(file);
679 if (status == 0)
680 {
681 pthread_mutex_lock (&stats_lock);
682 stats_updates_written++;
683 stats_data_sets_written += values_num;
684 pthread_mutex_unlock (&stats_lock);
685 }
687 pthread_mutex_lock (&cache_lock);
689 /* We're about to shut down, so lets flush the entire tree. */
690 if ((do_shutdown != 0) && (cache_queue_head == NULL))
691 flush_old_values (/* max age = */ -1);
692 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
693 pthread_mutex_unlock (&cache_lock);
695 assert(cache_queue_head == NULL);
696 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
697 journal_done();
699 return (NULL);
700 } /* }}} void *queue_thread_main */
702 static int buffer_get_field (char **buffer_ret, /* {{{ */
703 size_t *buffer_size_ret, char **field_ret)
704 {
705 char *buffer;
706 size_t buffer_pos;
707 size_t buffer_size;
708 char *field;
709 size_t field_size;
710 int status;
712 buffer = *buffer_ret;
713 buffer_pos = 0;
714 buffer_size = *buffer_size_ret;
715 field = *buffer_ret;
716 field_size = 0;
718 if (buffer_size <= 0)
719 return (-1);
721 /* This is ensured by `handle_request'. */
722 assert (buffer[buffer_size - 1] == '\0');
724 status = -1;
725 while (buffer_pos < buffer_size)
726 {
727 /* Check for end-of-field or end-of-buffer */
728 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
729 {
730 field[field_size] = 0;
731 field_size++;
732 buffer_pos++;
733 status = 0;
734 break;
735 }
736 /* Handle escaped characters. */
737 else if (buffer[buffer_pos] == '\\')
738 {
739 if (buffer_pos >= (buffer_size - 1))
740 break;
741 buffer_pos++;
742 field[field_size] = buffer[buffer_pos];
743 field_size++;
744 buffer_pos++;
745 }
746 /* Normal operation */
747 else
748 {
749 field[field_size] = buffer[buffer_pos];
750 field_size++;
751 buffer_pos++;
752 }
753 } /* while (buffer_pos < buffer_size) */
755 if (status != 0)
756 return (status);
758 *buffer_ret = buffer + buffer_pos;
759 *buffer_size_ret = buffer_size - buffer_pos;
760 *field_ret = field;
762 return (0);
763 } /* }}} int buffer_get_field */
765 static int flush_file (const char *filename) /* {{{ */
766 {
767 cache_item_t *ci;
769 pthread_mutex_lock (&cache_lock);
771 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
772 if (ci == NULL)
773 {
774 pthread_mutex_unlock (&cache_lock);
775 return (ENOENT);
776 }
778 /* Enqueue at head */
779 enqueue_cache_item (ci, HEAD);
781 pthread_cond_wait(&ci->flushed, &cache_lock);
782 pthread_mutex_unlock(&cache_lock);
784 return (0);
785 } /* }}} int flush_file */
787 static int handle_request_help (int fd, /* {{{ */
788 char *buffer, size_t buffer_size)
789 {
790 int status;
791 char **help_text;
792 size_t help_text_len;
793 char *command;
794 size_t i;
796 char *help_help[] =
797 {
798 "5 Command overview\n",
799 "FLUSH <filename>\n",
800 "FLUSHALL\n",
801 "HELP [<command>]\n",
802 "UPDATE <filename> <values> [<values> ...]\n",
803 "STATS\n"
804 };
805 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
807 char *help_flush[] =
808 {
809 "4 Help for FLUSH\n",
810 "Usage: FLUSH <filename>\n",
811 "\n",
812 "Adds the given filename to the head of the update queue and returns\n",
813 "after is has been dequeued.\n"
814 };
815 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
817 char *help_flushall[] =
818 {
819 "3 Help for FLUSHALL\n",
820 "Usage: FLUSHALL\n",
821 "\n",
822 "Triggers writing of all pending updates. Returns immediately.\n"
823 };
824 size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
826 char *help_update[] =
827 {
828 "9 Help for UPDATE\n",
829 "Usage: UPDATE <filename> <values> [<values> ...]\n"
830 "\n",
831 "Adds the given file to the internal cache if it is not yet known and\n",
832 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
833 "for details.\n",
834 "\n",
835 "Each <values> has the following form:\n",
836 " <values> = <time>:<value>[:<value>[...]]\n",
837 "See the rrdupdate(1) manpage for details.\n"
838 };
839 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
841 char *help_stats[] =
842 {
843 "4 Help for STATS\n",
844 "Usage: STATS\n",
845 "\n",
846 "Returns some performance counters, see the rrdcached(1) manpage for\n",
847 "a description of the values.\n"
848 };
849 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
851 status = buffer_get_field (&buffer, &buffer_size, &command);
852 if (status != 0)
853 {
854 help_text = help_help;
855 help_text_len = help_help_len;
856 }
857 else
858 {
859 if (strcasecmp (command, "update") == 0)
860 {
861 help_text = help_update;
862 help_text_len = help_update_len;
863 }
864 else if (strcasecmp (command, "flush") == 0)
865 {
866 help_text = help_flush;
867 help_text_len = help_flush_len;
868 }
869 else if (strcasecmp (command, "flushall") == 0)
870 {
871 help_text = help_flushall;
872 help_text_len = help_flushall_len;
873 }
874 else if (strcasecmp (command, "stats") == 0)
875 {
876 help_text = help_stats;
877 help_text_len = help_stats_len;
878 }
879 else
880 {
881 help_text = help_help;
882 help_text_len = help_help_len;
883 }
884 }
886 for (i = 0; i < help_text_len; i++)
887 {
888 status = swrite (fd, help_text[i], strlen (help_text[i]));
889 if (status < 0)
890 {
891 status = errno;
892 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
893 return (status);
894 }
895 }
897 return (0);
898 } /* }}} int handle_request_help */
900 static int handle_request_stats (int fd, /* {{{ */
901 char *buffer __attribute__((unused)),
902 size_t buffer_size __attribute__((unused)))
903 {
904 int status;
905 char outbuf[CMD_MAX];
907 uint64_t copy_queue_length;
908 uint64_t copy_updates_received;
909 uint64_t copy_flush_received;
910 uint64_t copy_updates_written;
911 uint64_t copy_data_sets_written;
912 uint64_t copy_journal_bytes;
913 uint64_t copy_journal_rotate;
915 uint64_t tree_nodes_number;
916 uint64_t tree_depth;
918 pthread_mutex_lock (&stats_lock);
919 copy_queue_length = stats_queue_length;
920 copy_updates_received = stats_updates_received;
921 copy_flush_received = stats_flush_received;
922 copy_updates_written = stats_updates_written;
923 copy_data_sets_written = stats_data_sets_written;
924 copy_journal_bytes = stats_journal_bytes;
925 copy_journal_rotate = stats_journal_rotate;
926 pthread_mutex_unlock (&stats_lock);
928 pthread_mutex_lock (&cache_lock);
929 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
930 tree_depth = (uint64_t) g_tree_height (cache_tree);
931 pthread_mutex_unlock (&cache_lock);
933 #define RRDD_STATS_SEND \
934 outbuf[sizeof (outbuf) - 1] = 0; \
935 status = swrite (fd, outbuf, strlen (outbuf)); \
936 if (status < 0) \
937 { \
938 status = errno; \
939 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
940 return (status); \
941 }
943 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
944 RRDD_STATS_SEND;
946 snprintf (outbuf, sizeof (outbuf),
947 "QueueLength: %"PRIu64"\n", copy_queue_length);
948 RRDD_STATS_SEND;
950 snprintf (outbuf, sizeof (outbuf),
951 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
952 RRDD_STATS_SEND;
954 snprintf (outbuf, sizeof (outbuf),
955 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
956 RRDD_STATS_SEND;
958 snprintf (outbuf, sizeof (outbuf),
959 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
960 RRDD_STATS_SEND;
962 snprintf (outbuf, sizeof (outbuf),
963 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
964 RRDD_STATS_SEND;
966 snprintf (outbuf, sizeof (outbuf),
967 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
968 RRDD_STATS_SEND;
970 snprintf (outbuf, sizeof (outbuf),
971 "TreeDepth: %"PRIu64"\n", tree_depth);
972 RRDD_STATS_SEND;
974 snprintf (outbuf, sizeof(outbuf),
975 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
976 RRDD_STATS_SEND;
978 snprintf (outbuf, sizeof(outbuf),
979 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
980 RRDD_STATS_SEND;
982 return (0);
983 #undef RRDD_STATS_SEND
984 } /* }}} int handle_request_stats */
986 static int handle_request_flush (int fd, /* {{{ */
987 char *buffer, size_t buffer_size)
988 {
989 char *file;
990 int status;
991 char result[CMD_MAX];
993 status = buffer_get_field (&buffer, &buffer_size, &file);
994 if (status != 0)
995 {
996 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
997 }
998 else
999 {
1000 pthread_mutex_lock(&stats_lock);
1001 stats_flush_received++;
1002 pthread_mutex_unlock(&stats_lock);
1004 status = flush_file (file);
1005 if (status == 0)
1006 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1007 else if (status == ENOENT)
1008 {
1009 /* no file in our tree; see whether it exists at all */
1010 struct stat statbuf;
1012 memset(&statbuf, 0, sizeof(statbuf));
1013 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1014 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1015 else
1016 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1017 }
1018 else if (status < 0)
1019 strncpy (result, "-1 Internal error.\n", sizeof (result));
1020 else
1021 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1022 }
1023 result[sizeof (result) - 1] = 0;
1025 status = swrite (fd, result, strlen (result));
1026 if (status < 0)
1027 {
1028 status = errno;
1029 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1030 return (status);
1031 }
1033 return (0);
1034 } /* }}} int handle_request_flush */
1036 static int handle_request_flushall(int fd) /* {{{ */
1037 {
1038 int status;
1039 char answer[] ="0 Started flush.\n";
1041 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1043 pthread_mutex_lock(&cache_lock);
1044 flush_old_values(-1);
1045 pthread_mutex_unlock(&cache_lock);
1047 status = swrite(fd, answer, strlen(answer));
1048 if (status < 0)
1049 {
1050 status = errno;
1051 RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1052 }
1054 return (status);
1055 }
1057 static int handle_request_update (int fd, /* {{{ */
1058 char *buffer, size_t buffer_size)
1059 {
1060 char *file;
1061 int values_num = 0;
1062 int status;
1064 time_t now;
1066 cache_item_t *ci;
1067 char answer[CMD_MAX];
1069 #define RRDD_UPDATE_SEND \
1070 answer[sizeof (answer) - 1] = 0; \
1071 status = swrite (fd, answer, strlen (answer)); \
1072 if (status < 0) \
1073 { \
1074 status = errno; \
1075 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1076 return (status); \
1077 }
1079 now = time (NULL);
1081 status = buffer_get_field (&buffer, &buffer_size, &file);
1082 if (status != 0)
1083 {
1084 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1085 sizeof (answer));
1086 RRDD_UPDATE_SEND;
1087 return (0);
1088 }
1090 pthread_mutex_lock(&stats_lock);
1091 stats_updates_received++;
1092 pthread_mutex_unlock(&stats_lock);
1094 pthread_mutex_lock (&cache_lock);
1095 ci = g_tree_lookup (cache_tree, file);
1097 if (ci == NULL) /* {{{ */
1098 {
1099 struct stat statbuf;
1101 /* don't hold the lock while we setup; stat(2) might block */
1102 pthread_mutex_unlock(&cache_lock);
1104 memset (&statbuf, 0, sizeof (statbuf));
1105 status = stat (file, &statbuf);
1106 if (status != 0)
1107 {
1108 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1110 status = errno;
1111 if (status == ENOENT)
1112 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1113 else
1114 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1115 status);
1116 RRDD_UPDATE_SEND;
1117 return (0);
1118 }
1119 if (!S_ISREG (statbuf.st_mode))
1120 {
1121 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1122 RRDD_UPDATE_SEND;
1123 return (0);
1124 }
1125 if (access(file, R_OK|W_OK) != 0)
1126 {
1127 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1128 file, rrd_strerror(errno));
1129 RRDD_UPDATE_SEND;
1130 return (0);
1131 }
1133 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1134 if (ci == NULL)
1135 {
1136 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1138 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1139 RRDD_UPDATE_SEND;
1140 return (0);
1141 }
1142 memset (ci, 0, sizeof (cache_item_t));
1144 ci->file = strdup (file);
1145 if (ci->file == NULL)
1146 {
1147 free (ci);
1148 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1150 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1151 RRDD_UPDATE_SEND;
1152 return (0);
1153 }
1155 _wipe_ci_values(ci, now);
1156 ci->flags = CI_FLAGS_IN_TREE;
1158 pthread_mutex_lock(&cache_lock);
1159 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1160 } /* }}} */
1161 assert (ci != NULL);
1163 while (buffer_size > 0)
1164 {
1165 char **temp;
1166 char *value;
1168 status = buffer_get_field (&buffer, &buffer_size, &value);
1169 if (status != 0)
1170 {
1171 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1172 break;
1173 }
1175 temp = (char **) realloc (ci->values,
1176 sizeof (char *) * (ci->values_num + 1));
1177 if (temp == NULL)
1178 {
1179 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1180 continue;
1181 }
1182 ci->values = temp;
1184 ci->values[ci->values_num] = strdup (value);
1185 if (ci->values[ci->values_num] == NULL)
1186 {
1187 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1188 continue;
1189 }
1190 ci->values_num++;
1192 values_num++;
1193 }
1195 if (((now - ci->last_flush_time) >= config_write_interval)
1196 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1197 && (ci->values_num > 0))
1198 {
1199 enqueue_cache_item (ci, TAIL);
1200 }
1202 pthread_mutex_unlock (&cache_lock);
1204 if (values_num < 1)
1205 {
1206 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1207 }
1208 else
1209 {
1210 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1211 (values_num == 1) ? "" : "s");
1212 }
1213 RRDD_UPDATE_SEND;
1214 return (0);
1215 #undef RRDD_UPDATE_SEND
1216 } /* }}} int handle_request_update */
1218 /* we came across a "WROTE" entry during journal replay.
1219 * throw away any values that we have accumulated for this file
1220 */
1221 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1222 const char *buffer,
1223 size_t buffer_size __attribute__((unused)))
1224 {
1225 int i;
1226 cache_item_t *ci;
1227 const char *file = buffer;
1229 pthread_mutex_lock(&cache_lock);
1231 ci = g_tree_lookup(cache_tree, file);
1232 if (ci == NULL)
1233 {
1234 pthread_mutex_unlock(&cache_lock);
1235 return (0);
1236 }
1238 if (ci->values)
1239 {
1240 for (i=0; i < ci->values_num; i++)
1241 free(ci->values[i]);
1243 free(ci->values);
1244 }
1246 _wipe_ci_values(ci, time(NULL));
1248 pthread_mutex_unlock(&cache_lock);
1249 return (0);
1250 } /* }}} int handle_request_wrote */
1252 /* if fd < 0, we are in journal replay mode */
1253 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1254 {
1255 char *buffer_ptr;
1256 char *command;
1257 int status;
1259 assert (buffer[buffer_size - 1] == '\0');
1261 buffer_ptr = buffer;
1262 command = NULL;
1263 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1264 if (status != 0)
1265 {
1266 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1267 return (-1);
1268 }
1270 if (strcasecmp (command, "update") == 0)
1271 {
1272 /* don't re-write updates in replay mode */
1273 if (fd >= 0)
1274 journal_write(command, buffer_ptr);
1276 return (handle_request_update (fd, buffer_ptr, buffer_size));
1277 }
1278 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1279 {
1280 /* this is only valid in replay mode */
1281 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1282 }
1283 else if (strcasecmp (command, "flush") == 0)
1284 {
1285 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1286 }
1287 else if (strcasecmp (command, "flushall") == 0)
1288 {
1289 return (handle_request_flushall(fd));
1290 }
1291 else if (strcasecmp (command, "stats") == 0)
1292 {
1293 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1294 }
1295 else if (strcasecmp (command, "help") == 0)
1296 {
1297 return (handle_request_help (fd, buffer_ptr, buffer_size));
1298 }
1299 else
1300 {
1301 char result[CMD_MAX];
1303 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1304 result[sizeof (result) - 1] = 0;
1306 status = swrite (fd, result, strlen (result));
1307 if (status < 0)
1308 {
1309 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1310 return (-1);
1311 }
1312 }
1314 return (0);
1315 } /* }}} int handle_request */
1317 /* MUST NOT hold journal_lock before calling this */
1318 static void journal_rotate(void) /* {{{ */
1319 {
1320 FILE *old_fh = NULL;
1322 if (journal_cur == NULL || journal_old == NULL)
1323 return;
1325 pthread_mutex_lock(&journal_lock);
1327 /* we rotate this way (rename before close) so that the we can release
1328 * the journal lock as fast as possible. Journal writes to the new
1329 * journal can proceed immediately after the new file is opened. The
1330 * fclose can then block without affecting new updates.
1331 */
1332 if (journal_fh != NULL)
1333 {
1334 old_fh = journal_fh;
1335 rename(journal_cur, journal_old);
1336 ++stats_journal_rotate;
1337 }
1339 journal_fh = fopen(journal_cur, "a");
1340 pthread_mutex_unlock(&journal_lock);
1342 if (old_fh != NULL)
1343 fclose(old_fh);
1345 if (journal_fh == NULL)
1346 RRDD_LOG(LOG_CRIT,
1347 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1348 journal_cur, rrd_strerror(errno));
1350 } /* }}} static void journal_rotate */
1352 static void journal_done(void) /* {{{ */
1353 {
1354 if (journal_cur == NULL)
1355 return;
1357 pthread_mutex_lock(&journal_lock);
1358 if (journal_fh != NULL)
1359 {
1360 fclose(journal_fh);
1361 journal_fh = NULL;
1362 }
1364 RRDD_LOG(LOG_INFO, "removing journals");
1366 unlink(journal_old);
1367 unlink(journal_cur);
1368 pthread_mutex_unlock(&journal_lock);
1370 } /* }}} static void journal_done */
1372 static int journal_write(char *cmd, char *args) /* {{{ */
1373 {
1374 int chars;
1376 if (journal_fh == NULL)
1377 return 0;
1379 pthread_mutex_lock(&journal_lock);
1380 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1381 pthread_mutex_unlock(&journal_lock);
1383 if (chars > 0)
1384 {
1385 pthread_mutex_lock(&stats_lock);
1386 stats_journal_bytes += chars;
1387 pthread_mutex_unlock(&stats_lock);
1388 }
1390 return chars;
1391 } /* }}} static int journal_write */
1393 static int journal_replay (const char *file) /* {{{ */
1394 {
1395 FILE *fh;
1396 int entry_cnt = 0;
1397 int fail_cnt = 0;
1398 uint64_t line = 0;
1399 char entry[CMD_MAX];
1401 if (file == NULL) return 0;
1403 fh = fopen(file, "r");
1404 if (fh == NULL)
1405 {
1406 if (errno != ENOENT)
1407 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1408 file, rrd_strerror(errno));
1409 return 0;
1410 }
1411 else
1412 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1414 while(!feof(fh))
1415 {
1416 size_t entry_len;
1418 ++line;
1419 fgets(entry, sizeof(entry), fh);
1420 entry_len = strlen(entry);
1422 /* check \n termination in case journal writing crashed mid-line */
1423 if (entry_len == 0)
1424 continue;
1425 else if (entry[entry_len - 1] != '\n')
1426 {
1427 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1428 ++fail_cnt;
1429 continue;
1430 }
1432 entry[entry_len - 1] = '\0';
1434 if (handle_request(-1, entry, entry_len) == 0)
1435 ++entry_cnt;
1436 else
1437 ++fail_cnt;
1438 }
1440 fclose(fh);
1442 if (entry_cnt > 0)
1443 {
1444 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1445 entry_cnt, fail_cnt);
1446 return 1;
1447 }
1448 else
1449 return 0;
1451 } /* }}} static int journal_replay */
1453 static void *connection_thread_main (void *args) /* {{{ */
1454 {
1455 pthread_t self;
1456 int i;
1457 int fd;
1459 fd = *((int *) args);
1460 free (args);
1462 pthread_mutex_lock (&connection_threads_lock);
1463 {
1464 pthread_t *temp;
1466 temp = (pthread_t *) realloc (connection_threads,
1467 sizeof (pthread_t) * (connection_threads_num + 1));
1468 if (temp == NULL)
1469 {
1470 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1471 }
1472 else
1473 {
1474 connection_threads = temp;
1475 connection_threads[connection_threads_num] = pthread_self ();
1476 connection_threads_num++;
1477 }
1478 }
1479 pthread_mutex_unlock (&connection_threads_lock);
1481 while (do_shutdown == 0)
1482 {
1483 char buffer[CMD_MAX];
1485 struct pollfd pollfd;
1486 int status;
1488 pollfd.fd = fd;
1489 pollfd.events = POLLIN | POLLPRI;
1490 pollfd.revents = 0;
1492 status = poll (&pollfd, 1, /* timeout = */ 500);
1493 if (status == 0) /* timeout */
1494 continue;
1495 else if (status < 0) /* error */
1496 {
1497 status = errno;
1498 if (status == EINTR)
1499 continue;
1500 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1501 continue;
1502 }
1504 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1505 {
1506 close (fd);
1507 break;
1508 }
1509 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1510 {
1511 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1512 "poll(2) returned something unexpected: %#04hx",
1513 pollfd.revents);
1514 close (fd);
1515 break;
1516 }
1518 status = (int) sread (fd, buffer, sizeof (buffer));
1519 if (status <= 0)
1520 {
1521 close (fd);
1523 if (status < 0)
1524 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1526 break;
1527 }
1529 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1530 if (status != 0)
1531 break;
1532 }
1534 close(fd);
1536 self = pthread_self ();
1537 /* Remove this thread from the connection threads list */
1538 pthread_mutex_lock (&connection_threads_lock);
1539 /* Find out own index in the array */
1540 for (i = 0; i < connection_threads_num; i++)
1541 if (pthread_equal (connection_threads[i], self) != 0)
1542 break;
1543 assert (i < connection_threads_num);
1545 /* Move the trailing threads forward. */
1546 if (i < (connection_threads_num - 1))
1547 {
1548 memmove (connection_threads + i,
1549 connection_threads + i + 1,
1550 sizeof (pthread_t) * (connection_threads_num - i - 1));
1551 }
1553 connection_threads_num--;
1554 pthread_mutex_unlock (&connection_threads_lock);
1556 return (NULL);
1557 } /* }}} void *connection_thread_main */
1559 static int open_listen_socket_unix (const char *path) /* {{{ */
1560 {
1561 int fd;
1562 struct sockaddr_un sa;
1563 listen_socket_t *temp;
1564 int status;
1566 temp = (listen_socket_t *) realloc (listen_fds,
1567 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1568 if (temp == NULL)
1569 {
1570 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1571 return (-1);
1572 }
1573 listen_fds = temp;
1574 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1576 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1577 if (fd < 0)
1578 {
1579 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1580 return (-1);
1581 }
1583 memset (&sa, 0, sizeof (sa));
1584 sa.sun_family = AF_UNIX;
1585 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1587 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1588 if (status != 0)
1589 {
1590 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1591 close (fd);
1592 unlink (path);
1593 return (-1);
1594 }
1596 status = listen (fd, /* backlog = */ 10);
1597 if (status != 0)
1598 {
1599 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1600 close (fd);
1601 unlink (path);
1602 return (-1);
1603 }
1605 listen_fds[listen_fds_num].fd = fd;
1606 snprintf (listen_fds[listen_fds_num].path,
1607 sizeof (listen_fds[listen_fds_num].path) - 1,
1608 "unix:%s", path);
1609 listen_fds_num++;
1611 return (0);
1612 } /* }}} int open_listen_socket_unix */
1614 static int open_listen_socket (const char *addr_orig) /* {{{ */
1615 {
1616 struct addrinfo ai_hints;
1617 struct addrinfo *ai_res;
1618 struct addrinfo *ai_ptr;
1619 char addr_copy[NI_MAXHOST];
1620 char *addr;
1621 char *port;
1622 int status;
1624 assert (addr_orig != NULL);
1626 strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1627 addr_copy[sizeof (addr_copy) - 1] = 0;
1628 addr = addr_copy;
1630 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1631 return (open_listen_socket_unix (addr + strlen ("unix:")));
1632 else if (addr[0] == '/')
1633 return (open_listen_socket_unix (addr));
1635 memset (&ai_hints, 0, sizeof (ai_hints));
1636 ai_hints.ai_flags = 0;
1637 #ifdef AI_ADDRCONFIG
1638 ai_hints.ai_flags |= AI_ADDRCONFIG;
1639 #endif
1640 ai_hints.ai_family = AF_UNSPEC;
1641 ai_hints.ai_socktype = SOCK_STREAM;
1643 port = NULL;
1644 if (*addr == '[') /* IPv6+port format */
1645 {
1646 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1647 addr++;
1649 port = strchr (addr, ']');
1650 if (port == NULL)
1651 {
1652 RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1653 addr_orig);
1654 return (-1);
1655 }
1656 *port = 0;
1657 port++;
1659 if (*port == ':')
1660 port++;
1661 else if (*port == 0)
1662 port = NULL;
1663 else
1664 {
1665 RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1666 port);
1667 return (-1);
1668 }
1669 } /* if (*addr = ']') */
1670 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1671 {
1672 port = rindex(addr, ':');
1673 if (port != NULL)
1674 {
1675 *port = 0;
1676 port++;
1677 }
1678 }
1679 ai_res = NULL;
1680 status = getaddrinfo (addr,
1681 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1682 &ai_hints, &ai_res);
1683 if (status != 0)
1684 {
1685 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1686 "%s", addr, gai_strerror (status));
1687 return (-1);
1688 }
1690 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1691 {
1692 int fd;
1693 listen_socket_t *temp;
1694 int one = 1;
1696 temp = (listen_socket_t *) realloc (listen_fds,
1697 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1698 if (temp == NULL)
1699 {
1700 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1701 continue;
1702 }
1703 listen_fds = temp;
1704 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1706 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1707 if (fd < 0)
1708 {
1709 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1710 continue;
1711 }
1713 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1715 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1716 if (status != 0)
1717 {
1718 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1719 close (fd);
1720 continue;
1721 }
1723 status = listen (fd, /* backlog = */ 10);
1724 if (status != 0)
1725 {
1726 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1727 close (fd);
1728 return (-1);
1729 }
1731 listen_fds[listen_fds_num].fd = fd;
1732 strncpy (listen_fds[listen_fds_num].path, addr,
1733 sizeof (listen_fds[listen_fds_num].path) - 1);
1734 listen_fds_num++;
1735 } /* for (ai_ptr) */
1737 return (0);
1738 } /* }}} int open_listen_socket */
1740 static int close_listen_sockets (void) /* {{{ */
1741 {
1742 size_t i;
1744 for (i = 0; i < listen_fds_num; i++)
1745 {
1746 close (listen_fds[i].fd);
1747 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1748 unlink (listen_fds[i].path + strlen ("unix:"));
1749 }
1751 free (listen_fds);
1752 listen_fds = NULL;
1753 listen_fds_num = 0;
1755 return (0);
1756 } /* }}} int close_listen_sockets */
1758 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1759 {
1760 struct pollfd *pollfds;
1761 int pollfds_num;
1762 int status;
1763 int i;
1765 for (i = 0; i < config_listen_address_list_len; i++)
1766 open_listen_socket (config_listen_address_list[i]);
1768 if (config_listen_address_list_len < 1)
1769 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1771 if (listen_fds_num < 1)
1772 {
1773 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1774 "could be opened. Sorry.");
1775 return (NULL);
1776 }
1778 pollfds_num = listen_fds_num;
1779 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1780 if (pollfds == NULL)
1781 {
1782 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1783 return (NULL);
1784 }
1785 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1787 RRDD_LOG(LOG_INFO, "listening for connections");
1789 while (do_shutdown == 0)
1790 {
1791 assert (pollfds_num == ((int) listen_fds_num));
1792 for (i = 0; i < pollfds_num; i++)
1793 {
1794 pollfds[i].fd = listen_fds[i].fd;
1795 pollfds[i].events = POLLIN | POLLPRI;
1796 pollfds[i].revents = 0;
1797 }
1799 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1800 if (status == 0)
1801 {
1802 continue; /* timeout */
1803 }
1804 else if (status < 0)
1805 {
1806 status = errno;
1807 if (status != EINTR)
1808 {
1809 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1810 }
1811 continue;
1812 }
1814 for (i = 0; i < pollfds_num; i++)
1815 {
1816 int *client_sd;
1817 struct sockaddr_storage client_sa;
1818 socklen_t client_sa_size;
1819 pthread_t tid;
1820 pthread_attr_t attr;
1822 if (pollfds[i].revents == 0)
1823 continue;
1825 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1826 {
1827 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1828 "poll(2) returned something unexpected for listen FD #%i.",
1829 pollfds[i].fd);
1830 continue;
1831 }
1833 client_sd = (int *) malloc (sizeof (int));
1834 if (client_sd == NULL)
1835 {
1836 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1837 continue;
1838 }
1840 client_sa_size = sizeof (client_sa);
1841 *client_sd = accept (pollfds[i].fd,
1842 (struct sockaddr *) &client_sa, &client_sa_size);
1843 if (*client_sd < 0)
1844 {
1845 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1846 continue;
1847 }
1849 pthread_attr_init (&attr);
1850 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1852 status = pthread_create (&tid, &attr, connection_thread_main,
1853 /* args = */ (void *) client_sd);
1854 if (status != 0)
1855 {
1856 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1857 close (*client_sd);
1858 free (client_sd);
1859 continue;
1860 }
1861 } /* for (pollfds_num) */
1862 } /* while (do_shutdown == 0) */
1864 RRDD_LOG(LOG_INFO, "starting shutdown");
1866 close_listen_sockets ();
1868 pthread_mutex_lock (&connection_threads_lock);
1869 while (connection_threads_num > 0)
1870 {
1871 pthread_t wait_for;
1873 wait_for = connection_threads[0];
1875 pthread_mutex_unlock (&connection_threads_lock);
1876 pthread_join (wait_for, /* retval = */ NULL);
1877 pthread_mutex_lock (&connection_threads_lock);
1878 }
1879 pthread_mutex_unlock (&connection_threads_lock);
1881 return (NULL);
1882 } /* }}} void *listen_thread_main */
1884 static int daemonize (void) /* {{{ */
1885 {
1886 int status;
1887 int fd;
1889 fd = open_pidfile();
1890 if (fd < 0) return fd;
1892 if (!stay_foreground)
1893 {
1894 pid_t child;
1895 char *base_dir;
1897 child = fork ();
1898 if (child < 0)
1899 {
1900 fprintf (stderr, "daemonize: fork(2) failed.\n");
1901 return (-1);
1902 }
1903 else if (child > 0)
1904 {
1905 return (1);
1906 }
1908 /* Change into the /tmp directory. */
1909 base_dir = (config_base_dir != NULL)
1910 ? config_base_dir
1911 : "/tmp";
1912 status = chdir (base_dir);
1913 if (status != 0)
1914 {
1915 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1916 return (-1);
1917 }
1919 /* Become session leader */
1920 setsid ();
1922 /* Open the first three file descriptors to /dev/null */
1923 close (2);
1924 close (1);
1925 close (0);
1927 open ("/dev/null", O_RDWR);
1928 dup (0);
1929 dup (0);
1930 } /* if (!stay_foreground) */
1932 install_signal_handlers();
1934 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1935 RRDD_LOG(LOG_INFO, "starting up");
1937 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1938 if (cache_tree == NULL)
1939 {
1940 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1941 return (-1);
1942 }
1944 status = write_pidfile (fd);
1945 return status;
1946 } /* }}} int daemonize */
1948 static int cleanup (void) /* {{{ */
1949 {
1950 do_shutdown++;
1952 pthread_cond_signal (&cache_cond);
1953 pthread_join (queue_thread, /* return = */ NULL);
1955 remove_pidfile ();
1957 RRDD_LOG(LOG_INFO, "goodbye");
1958 closelog ();
1960 return (0);
1961 } /* }}} int cleanup */
1963 static int read_options (int argc, char **argv) /* {{{ */
1964 {
1965 int option;
1966 int status = 0;
1968 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1969 {
1970 switch (option)
1971 {
1972 case 'g':
1973 stay_foreground=1;
1974 break;
1976 case 'l':
1977 {
1978 char **temp;
1980 temp = (char **) realloc (config_listen_address_list,
1981 sizeof (char *) * (config_listen_address_list_len + 1));
1982 if (temp == NULL)
1983 {
1984 fprintf (stderr, "read_options: realloc failed.\n");
1985 return (2);
1986 }
1987 config_listen_address_list = temp;
1989 temp[config_listen_address_list_len] = strdup (optarg);
1990 if (temp[config_listen_address_list_len] == NULL)
1991 {
1992 fprintf (stderr, "read_options: strdup failed.\n");
1993 return (2);
1994 }
1995 config_listen_address_list_len++;
1996 }
1997 break;
1999 case 'f':
2000 {
2001 int temp;
2003 temp = atoi (optarg);
2004 if (temp > 0)
2005 config_flush_interval = temp;
2006 else
2007 {
2008 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2009 status = 3;
2010 }
2011 }
2012 break;
2014 case 'w':
2015 {
2016 int temp;
2018 temp = atoi (optarg);
2019 if (temp > 0)
2020 config_write_interval = temp;
2021 else
2022 {
2023 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2024 status = 2;
2025 }
2026 }
2027 break;
2029 case 'z':
2030 {
2031 int temp;
2033 temp = atoi(optarg);
2034 if (temp > 0)
2035 config_write_jitter = temp;
2036 else
2037 {
2038 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2039 status = 2;
2040 }
2042 break;
2043 }
2045 case 'b':
2046 {
2047 size_t len;
2049 if (config_base_dir != NULL)
2050 free (config_base_dir);
2051 config_base_dir = strdup (optarg);
2052 if (config_base_dir == NULL)
2053 {
2054 fprintf (stderr, "read_options: strdup failed.\n");
2055 return (3);
2056 }
2058 len = strlen (config_base_dir);
2059 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2060 {
2061 config_base_dir[len - 1] = 0;
2062 len--;
2063 }
2065 if (len < 1)
2066 {
2067 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2068 return (4);
2069 }
2070 }
2071 break;
2073 case 'p':
2074 {
2075 if (config_pid_file != NULL)
2076 free (config_pid_file);
2077 config_pid_file = strdup (optarg);
2078 if (config_pid_file == NULL)
2079 {
2080 fprintf (stderr, "read_options: strdup failed.\n");
2081 return (3);
2082 }
2083 }
2084 break;
2086 case 'j':
2087 {
2088 struct stat statbuf;
2089 const char *dir = optarg;
2091 status = stat(dir, &statbuf);
2092 if (status != 0)
2093 {
2094 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2095 return 6;
2096 }
2098 if (!S_ISDIR(statbuf.st_mode)
2099 || access(dir, R_OK|W_OK|X_OK) != 0)
2100 {
2101 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2102 errno ? rrd_strerror(errno) : "");
2103 return 6;
2104 }
2106 journal_cur = malloc(PATH_MAX + 1);
2107 journal_old = malloc(PATH_MAX + 1);
2108 if (journal_cur == NULL || journal_old == NULL)
2109 {
2110 fprintf(stderr, "malloc failure for journal files\n");
2111 return 6;
2112 }
2113 else
2114 {
2115 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2116 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2117 }
2118 }
2119 break;
2121 case 'h':
2122 case '?':
2123 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2124 "\n"
2125 "Usage: rrdcached [options]\n"
2126 "\n"
2127 "Valid options are:\n"
2128 " -l <address> Socket address to listen to.\n"
2129 " -w <seconds> Interval in which to write data.\n"
2130 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2131 " -f <seconds> Interval in which to flush dead data.\n"
2132 " -p <file> Location of the PID-file.\n"
2133 " -b <dir> Base directory to change to.\n"
2134 " -g Do not fork and run in the foreground.\n"
2135 " -j <dir> Directory in which to create the journal files.\n"
2136 "\n"
2137 "For more information and a detailed description of all options "
2138 "please refer\n"
2139 "to the rrdcached(1) manual page.\n",
2140 VERSION);
2141 status = -1;
2142 break;
2143 } /* switch (option) */
2144 } /* while (getopt) */
2146 /* advise the user when values are not sane */
2147 if (config_flush_interval < 2 * config_write_interval)
2148 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2149 " 2x write interval (-w) !\n");
2150 if (config_write_jitter > config_write_interval)
2151 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2152 " write interval (-w) !\n");
2154 return (status);
2155 } /* }}} int read_options */
2157 int main (int argc, char **argv)
2158 {
2159 int status;
2161 status = read_options (argc, argv);
2162 if (status != 0)
2163 {
2164 if (status < 0)
2165 status = 0;
2166 return (status);
2167 }
2169 status = daemonize ();
2170 if (status == 1)
2171 {
2172 struct sigaction sigchld;
2174 memset (&sigchld, 0, sizeof (sigchld));
2175 sigchld.sa_handler = SIG_IGN;
2176 sigaction (SIGCHLD, &sigchld, NULL);
2178 return (0);
2179 }
2180 else if (status != 0)
2181 {
2182 fprintf (stderr, "daemonize failed, exiting.\n");
2183 return (1);
2184 }
2186 if (journal_cur != NULL)
2187 {
2188 int had_journal = 0;
2190 pthread_mutex_lock(&journal_lock);
2192 RRDD_LOG(LOG_INFO, "checking for journal files");
2194 had_journal += journal_replay(journal_old);
2195 had_journal += journal_replay(journal_cur);
2197 if (had_journal)
2198 flush_old_values(-1);
2200 pthread_mutex_unlock(&journal_lock);
2201 journal_rotate();
2203 RRDD_LOG(LOG_INFO, "journal processing complete");
2204 }
2206 /* start the queue thread */
2207 memset (&queue_thread, 0, sizeof (queue_thread));
2208 status = pthread_create (&queue_thread,
2209 NULL, /* attr */
2210 queue_thread_main,
2211 NULL); /* args */
2212 if (status != 0)
2213 {
2214 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2215 cleanup();
2216 return (1);
2217 }
2219 listen_thread_main (NULL);
2220 cleanup ();
2222 return (0);
2223 } /* int main */
2225 /*
2226 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2227 */