1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
122 pthread_cond_t flushed;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static int config_write_interval = 300;
169 static int config_write_jitter = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
195 /*
196 * Functions
197 */
198 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
199 {
200 RRDD_LOG(LOG_NOTICE, "caught SIGINT");
201 do_shutdown++;
202 pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_int_handler */
205 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
206 {
207 RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
208 do_shutdown++;
209 pthread_cond_broadcast(&cache_cond);
210 } /* }}} void sig_term_handler */
212 static int write_pidfile (void) /* {{{ */
213 {
214 pid_t pid;
215 char *file;
216 int fd;
217 FILE *fh;
219 pid = getpid ();
221 file = (config_pid_file != NULL)
222 ? config_pid_file
223 : LOCALSTATEDIR "/run/rrdcached.pid";
225 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
226 if (fd < 0)
227 {
228 RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
229 file, rrd_strerror(errno));
230 return (-1);
231 }
233 fh = fdopen (fd, "w");
234 if (fh == NULL)
235 {
236 RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
237 close(fd);
238 return (-1);
239 }
241 fprintf (fh, "%i\n", (int) pid);
242 fclose (fh);
244 return (0);
245 } /* }}} int write_pidfile */
247 static int remove_pidfile (void) /* {{{ */
248 {
249 char *file;
250 int status;
252 file = (config_pid_file != NULL)
253 ? config_pid_file
254 : LOCALSTATEDIR "/run/rrdcached.pid";
256 status = unlink (file);
257 if (status == 0)
258 return (0);
259 return (errno);
260 } /* }}} int remove_pidfile */
262 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
263 {
264 char *buffer;
265 size_t buffer_used;
266 size_t buffer_free;
267 ssize_t status;
269 buffer = (char *) buffer_void;
270 buffer_used = 0;
271 buffer_free = buffer_size;
273 while (buffer_free > 0)
274 {
275 status = read (fd, buffer + buffer_used, buffer_free);
276 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
277 continue;
279 if (status < 0)
280 return (-1);
282 if (status == 0)
283 return (0);
285 assert ((0 > status) || (buffer_free >= (size_t) status));
287 buffer_free = buffer_free - status;
288 buffer_used = buffer_used + status;
290 if (buffer[buffer_used - 1] == '\n')
291 break;
292 }
294 assert (buffer_used > 0);
296 if (buffer[buffer_used - 1] != '\n')
297 {
298 errno = ENOBUFS;
299 return (-1);
300 }
302 buffer[buffer_used - 1] = 0;
304 /* Fix network line endings. */
305 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
306 {
307 buffer_used--;
308 buffer[buffer_used - 1] = 0;
309 }
311 return (buffer_used);
312 } /* }}} ssize_t sread */
314 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
315 {
316 const char *ptr;
317 size_t nleft;
318 ssize_t status;
320 /* special case for journal replay */
321 if (fd < 0) return 0;
323 ptr = (const char *) buf;
324 nleft = count;
326 while (nleft > 0)
327 {
328 status = write (fd, (const void *) ptr, nleft);
330 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
331 continue;
333 if (status < 0)
334 return (status);
336 nleft -= status;
337 ptr += status;
338 }
340 return (0);
341 } /* }}} ssize_t swrite */
343 static void _wipe_ci_values(cache_item_t *ci, time_t when)
344 {
345 ci->values = NULL;
346 ci->values_num = 0;
348 ci->last_flush_time = when;
349 if (config_write_jitter > 0)
350 ci->last_flush_time += (random() % config_write_jitter);
352 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
353 }
355 /*
356 * enqueue_cache_item:
357 * `cache_lock' must be acquired before calling this function!
358 */
359 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
360 queue_side_t side)
361 {
362 int did_insert = 0;
364 if (ci == NULL)
365 return (-1);
367 if (ci->values_num == 0)
368 return (0);
370 if (side == HEAD)
371 {
372 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
373 {
374 assert (ci->next == NULL);
375 ci->next = cache_queue_head;
376 cache_queue_head = ci;
378 if (cache_queue_tail == NULL)
379 cache_queue_tail = cache_queue_head;
381 did_insert = 1;
382 }
383 else if (cache_queue_head == ci)
384 {
385 /* do nothing */
386 }
387 else /* enqueued, but not first entry */
388 {
389 cache_item_t *prev;
391 /* find previous entry */
392 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
393 if (prev->next == ci)
394 break;
395 assert (prev != NULL);
397 /* move to the front */
398 prev->next = ci->next;
399 ci->next = cache_queue_head;
400 cache_queue_head = ci;
402 /* check if we need to adapt the tail */
403 if (cache_queue_tail == ci)
404 cache_queue_tail = prev;
405 }
406 }
407 else /* (side == TAIL) */
408 {
409 /* We don't move values back in the list.. */
410 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
411 return (0);
413 assert (ci->next == NULL);
415 if (cache_queue_tail == NULL)
416 cache_queue_head = ci;
417 else
418 cache_queue_tail->next = ci;
419 cache_queue_tail = ci;
421 did_insert = 1;
422 }
424 ci->flags |= CI_FLAGS_IN_QUEUE;
426 if (did_insert)
427 {
428 pthread_mutex_lock (&stats_lock);
429 stats_queue_length++;
430 pthread_mutex_unlock (&stats_lock);
431 }
433 return (0);
434 } /* }}} int enqueue_cache_item */
436 /*
437 * tree_callback_flush:
438 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
439 * while this is in progress.
440 */
441 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
442 gpointer data)
443 {
444 cache_item_t *ci;
445 callback_flush_data_t *cfd;
447 ci = (cache_item_t *) value;
448 cfd = (callback_flush_data_t *) data;
450 if ((ci->last_flush_time <= cfd->abs_timeout)
451 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
452 && (ci->values_num > 0))
453 {
454 enqueue_cache_item (ci, TAIL);
455 }
456 else if ((do_shutdown != 0)
457 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
458 && (ci->values_num > 0))
459 {
460 enqueue_cache_item (ci, TAIL);
461 }
462 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
463 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
464 && (ci->values_num <= 0))
465 {
466 char **temp;
468 temp = (char **) realloc (cfd->keys,
469 sizeof (char *) * (cfd->keys_num + 1));
470 if (temp == NULL)
471 {
472 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
473 return (FALSE);
474 }
475 cfd->keys = temp;
476 /* Make really sure this points to the _same_ place */
477 assert ((char *) key == ci->file);
478 cfd->keys[cfd->keys_num] = (char *) key;
479 cfd->keys_num++;
480 }
482 return (FALSE);
483 } /* }}} gboolean tree_callback_flush */
485 static int flush_old_values (int max_age)
486 {
487 callback_flush_data_t cfd;
488 size_t k;
490 memset (&cfd, 0, sizeof (cfd));
491 /* Pass the current time as user data so that we don't need to call
492 * `time' for each node. */
493 cfd.now = time (NULL);
494 cfd.keys = NULL;
495 cfd.keys_num = 0;
497 if (max_age > 0)
498 cfd.abs_timeout = cfd.now - max_age;
499 else
500 cfd.abs_timeout = cfd.now + 1;
502 /* `tree_callback_flush' will return the keys of all values that haven't
503 * been touched in the last `config_flush_interval' seconds in `cfd'.
504 * The char*'s in this array point to the same memory as ci->file, so we
505 * don't need to free them separately. */
506 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
508 for (k = 0; k < cfd.keys_num; k++)
509 {
510 cache_item_t *ci;
512 /* This must not fail. */
513 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
514 assert (ci != NULL);
516 /* If we end up here with values available, something's seriously
517 * messed up. */
518 assert (ci->values_num == 0);
520 /* Remove the node from the tree */
521 g_tree_remove (cache_tree, cfd.keys[k]);
522 cfd.keys[k] = NULL;
524 /* Now free and clean up `ci'. */
525 free (ci->file);
526 ci->file = NULL;
527 free (ci);
528 ci = NULL;
529 } /* for (k = 0; k < cfd.keys_num; k++) */
531 if (cfd.keys != NULL)
532 {
533 free (cfd.keys);
534 cfd.keys = NULL;
535 }
537 return (0);
538 } /* int flush_old_values */
540 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
541 {
542 struct timeval now;
543 struct timespec next_flush;
545 gettimeofday (&now, NULL);
546 next_flush.tv_sec = now.tv_sec + config_flush_interval;
547 next_flush.tv_nsec = 1000 * now.tv_usec;
549 pthread_mutex_lock (&cache_lock);
550 while ((do_shutdown == 0) || (cache_queue_head != NULL))
551 {
552 cache_item_t *ci;
553 char *file;
554 char **values;
555 int values_num;
556 int status;
557 int i;
559 /* First, check if it's time to do the cache flush. */
560 gettimeofday (&now, NULL);
561 if ((now.tv_sec > next_flush.tv_sec)
562 || ((now.tv_sec == next_flush.tv_sec)
563 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
564 {
565 /* Flush all values that haven't been written in the last
566 * `config_write_interval' seconds. */
567 flush_old_values (config_write_interval);
569 /* Determine the time of the next cache flush. */
570 while (next_flush.tv_sec <= now.tv_sec)
571 next_flush.tv_sec += config_flush_interval;
573 /* unlock the cache while we rotate so we don't block incoming
574 * updates if the fsync() blocks on disk I/O */
575 pthread_mutex_unlock(&cache_lock);
576 journal_rotate();
577 pthread_mutex_lock(&cache_lock);
578 }
580 /* Now, check if there's something to store away. If not, wait until
581 * something comes in or it's time to do the cache flush. */
582 if (cache_queue_head == NULL)
583 {
584 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
585 if ((status != 0) && (status != ETIMEDOUT))
586 {
587 RRDD_LOG (LOG_ERR, "queue_thread_main: "
588 "pthread_cond_timedwait returned %i.", status);
589 }
590 }
592 /* We're about to shut down, so lets flush the entire tree. */
593 if ((do_shutdown != 0) && (cache_queue_head == NULL))
594 flush_old_values (/* max age = */ -1);
596 /* Check if a value has arrived. This may be NULL if we timed out or there
597 * was an interrupt such as a signal. */
598 if (cache_queue_head == NULL)
599 continue;
601 ci = cache_queue_head;
603 /* copy the relevant parts */
604 file = strdup (ci->file);
605 if (file == NULL)
606 {
607 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
608 continue;
609 }
611 assert(ci->values != NULL);
612 assert(ci->values_num > 0);
614 values = ci->values;
615 values_num = ci->values_num;
617 _wipe_ci_values(ci, time(NULL));
619 cache_queue_head = ci->next;
620 if (cache_queue_head == NULL)
621 cache_queue_tail = NULL;
622 ci->next = NULL;
624 pthread_mutex_lock (&stats_lock);
625 assert (stats_queue_length > 0);
626 stats_queue_length--;
627 pthread_mutex_unlock (&stats_lock);
629 pthread_mutex_unlock (&cache_lock);
631 rrd_clear_error ();
632 status = rrd_update_r (file, NULL, values_num, (void *) values);
633 if (status != 0)
634 {
635 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
636 "rrd_update_r (%s) failed with status %i. (%s)",
637 file, status, rrd_get_error());
638 }
640 journal_write("wrote", file);
641 pthread_cond_broadcast(&ci->flushed);
643 for (i = 0; i < values_num; i++)
644 free (values[i]);
646 free(values);
647 free(file);
649 if (status == 0)
650 {
651 pthread_mutex_lock (&stats_lock);
652 stats_updates_written++;
653 stats_data_sets_written += values_num;
654 pthread_mutex_unlock (&stats_lock);
655 }
657 pthread_mutex_lock (&cache_lock);
659 /* We're about to shut down, so lets flush the entire tree. */
660 if ((do_shutdown != 0) && (cache_queue_head == NULL))
661 flush_old_values (/* max age = */ -1);
662 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
663 pthread_mutex_unlock (&cache_lock);
665 assert(cache_queue_head == NULL);
666 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
667 journal_done();
669 return (NULL);
670 } /* }}} void *queue_thread_main */
672 static int buffer_get_field (char **buffer_ret, /* {{{ */
673 size_t *buffer_size_ret, char **field_ret)
674 {
675 char *buffer;
676 size_t buffer_pos;
677 size_t buffer_size;
678 char *field;
679 size_t field_size;
680 int status;
682 buffer = *buffer_ret;
683 buffer_pos = 0;
684 buffer_size = *buffer_size_ret;
685 field = *buffer_ret;
686 field_size = 0;
688 if (buffer_size <= 0)
689 return (-1);
691 /* This is ensured by `handle_request'. */
692 assert (buffer[buffer_size - 1] == '\0');
694 status = -1;
695 while (buffer_pos < buffer_size)
696 {
697 /* Check for end-of-field or end-of-buffer */
698 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
699 {
700 field[field_size] = 0;
701 field_size++;
702 buffer_pos++;
703 status = 0;
704 break;
705 }
706 /* Handle escaped characters. */
707 else if (buffer[buffer_pos] == '\\')
708 {
709 if (buffer_pos >= (buffer_size - 1))
710 break;
711 buffer_pos++;
712 field[field_size] = buffer[buffer_pos];
713 field_size++;
714 buffer_pos++;
715 }
716 /* Normal operation */
717 else
718 {
719 field[field_size] = buffer[buffer_pos];
720 field_size++;
721 buffer_pos++;
722 }
723 } /* while (buffer_pos < buffer_size) */
725 if (status != 0)
726 return (status);
728 *buffer_ret = buffer + buffer_pos;
729 *buffer_size_ret = buffer_size - buffer_pos;
730 *field_ret = field;
732 return (0);
733 } /* }}} int buffer_get_field */
735 static int flush_file (const char *filename) /* {{{ */
736 {
737 cache_item_t *ci;
739 pthread_mutex_lock (&cache_lock);
741 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
742 if (ci == NULL)
743 {
744 pthread_mutex_unlock (&cache_lock);
745 return (ENOENT);
746 }
748 /* Enqueue at head */
749 enqueue_cache_item (ci, HEAD);
750 pthread_cond_signal (&cache_cond);
752 pthread_cond_wait(&ci->flushed, &cache_lock);
753 pthread_mutex_unlock(&cache_lock);
755 return (0);
756 } /* }}} int flush_file */
758 static int handle_request_help (int fd, /* {{{ */
759 char *buffer, size_t buffer_size)
760 {
761 int status;
762 char **help_text;
763 size_t help_text_len;
764 char *command;
765 size_t i;
767 char *help_help[] =
768 {
769 "4 Command overview\n",
770 "FLUSH <filename>\n",
771 "HELP [<command>]\n",
772 "UPDATE <filename> <values> [<values> ...]\n",
773 "STATS\n"
774 };
775 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
777 char *help_flush[] =
778 {
779 "4 Help for FLUSH\n",
780 "Usage: FLUSH <filename>\n",
781 "\n",
782 "Adds the given filename to the head of the update queue and returns\n",
783 "after is has been dequeued.\n"
784 };
785 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
787 char *help_update[] =
788 {
789 "9 Help for UPDATE\n",
790 "Usage: UPDATE <filename> <values> [<values> ...]\n"
791 "\n",
792 "Adds the given file to the internal cache if it is not yet known and\n",
793 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
794 "for details.\n",
795 "\n",
796 "Each <values> has the following form:\n",
797 " <values> = <time>:<value>[:<value>[...]]\n",
798 "See the rrdupdate(1) manpage for details.\n"
799 };
800 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
802 char *help_stats[] =
803 {
804 "4 Help for STATS\n",
805 "Usage: STATS\n",
806 "\n",
807 "Returns some performance counters, see the rrdcached(1) manpage for\n",
808 "a description of the values.\n"
809 };
810 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
812 status = buffer_get_field (&buffer, &buffer_size, &command);
813 if (status != 0)
814 {
815 help_text = help_help;
816 help_text_len = help_help_len;
817 }
818 else
819 {
820 if (strcasecmp (command, "update") == 0)
821 {
822 help_text = help_update;
823 help_text_len = help_update_len;
824 }
825 else if (strcasecmp (command, "flush") == 0)
826 {
827 help_text = help_flush;
828 help_text_len = help_flush_len;
829 }
830 else if (strcasecmp (command, "stats") == 0)
831 {
832 help_text = help_stats;
833 help_text_len = help_stats_len;
834 }
835 else
836 {
837 help_text = help_help;
838 help_text_len = help_help_len;
839 }
840 }
842 for (i = 0; i < help_text_len; i++)
843 {
844 status = swrite (fd, help_text[i], strlen (help_text[i]));
845 if (status < 0)
846 {
847 status = errno;
848 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
849 return (status);
850 }
851 }
853 return (0);
854 } /* }}} int handle_request_help */
856 static int handle_request_stats (int fd, /* {{{ */
857 char *buffer __attribute__((unused)),
858 size_t buffer_size __attribute__((unused)))
859 {
860 int status;
861 char outbuf[CMD_MAX];
863 uint64_t copy_queue_length;
864 uint64_t copy_updates_received;
865 uint64_t copy_flush_received;
866 uint64_t copy_updates_written;
867 uint64_t copy_data_sets_written;
868 uint64_t copy_journal_bytes;
869 uint64_t copy_journal_rotate;
871 uint64_t tree_nodes_number;
872 uint64_t tree_depth;
874 pthread_mutex_lock (&stats_lock);
875 copy_queue_length = stats_queue_length;
876 copy_updates_received = stats_updates_received;
877 copy_flush_received = stats_flush_received;
878 copy_updates_written = stats_updates_written;
879 copy_data_sets_written = stats_data_sets_written;
880 copy_journal_bytes = stats_journal_bytes;
881 copy_journal_rotate = stats_journal_rotate;
882 pthread_mutex_unlock (&stats_lock);
884 pthread_mutex_lock (&cache_lock);
885 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
886 tree_depth = (uint64_t) g_tree_height (cache_tree);
887 pthread_mutex_unlock (&cache_lock);
889 #define RRDD_STATS_SEND \
890 outbuf[sizeof (outbuf) - 1] = 0; \
891 status = swrite (fd, outbuf, strlen (outbuf)); \
892 if (status < 0) \
893 { \
894 status = errno; \
895 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
896 return (status); \
897 }
899 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
900 RRDD_STATS_SEND;
902 snprintf (outbuf, sizeof (outbuf),
903 "QueueLength: %"PRIu64"\n", copy_queue_length);
904 RRDD_STATS_SEND;
906 snprintf (outbuf, sizeof (outbuf),
907 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
908 RRDD_STATS_SEND;
910 snprintf (outbuf, sizeof (outbuf),
911 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
912 RRDD_STATS_SEND;
914 snprintf (outbuf, sizeof (outbuf),
915 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
916 RRDD_STATS_SEND;
918 snprintf (outbuf, sizeof (outbuf),
919 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
920 RRDD_STATS_SEND;
922 snprintf (outbuf, sizeof (outbuf),
923 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
924 RRDD_STATS_SEND;
926 snprintf (outbuf, sizeof (outbuf),
927 "TreeDepth: %"PRIu64"\n", tree_depth);
928 RRDD_STATS_SEND;
930 snprintf (outbuf, sizeof(outbuf),
931 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
932 RRDD_STATS_SEND;
934 snprintf (outbuf, sizeof(outbuf),
935 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
936 RRDD_STATS_SEND;
938 return (0);
939 #undef RRDD_STATS_SEND
940 } /* }}} int handle_request_stats */
942 static int handle_request_flush (int fd, /* {{{ */
943 char *buffer, size_t buffer_size)
944 {
945 char *file;
946 int status;
947 char result[CMD_MAX];
949 status = buffer_get_field (&buffer, &buffer_size, &file);
950 if (status != 0)
951 {
952 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
953 }
954 else
955 {
956 pthread_mutex_lock(&stats_lock);
957 stats_flush_received++;
958 pthread_mutex_unlock(&stats_lock);
960 status = flush_file (file);
961 if (status == 0)
962 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
963 else if (status == ENOENT)
964 {
965 /* no file in our tree; see whether it exists at all */
966 struct stat statbuf;
968 memset(&statbuf, 0, sizeof(statbuf));
969 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
970 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
971 else
972 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
973 }
974 else if (status < 0)
975 strncpy (result, "-1 Internal error.\n", sizeof (result));
976 else
977 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
978 }
979 result[sizeof (result) - 1] = 0;
981 status = swrite (fd, result, strlen (result));
982 if (status < 0)
983 {
984 status = errno;
985 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
986 return (status);
987 }
989 return (0);
990 } /* }}} int handle_request_flush */
992 static int handle_request_update (int fd, /* {{{ */
993 char *buffer, size_t buffer_size)
994 {
995 char *file;
996 int values_num = 0;
997 int status;
999 time_t now;
1001 cache_item_t *ci;
1002 char answer[CMD_MAX];
1004 #define RRDD_UPDATE_SEND \
1005 answer[sizeof (answer) - 1] = 0; \
1006 status = swrite (fd, answer, strlen (answer)); \
1007 if (status < 0) \
1008 { \
1009 status = errno; \
1010 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1011 return (status); \
1012 }
1014 now = time (NULL);
1016 status = buffer_get_field (&buffer, &buffer_size, &file);
1017 if (status != 0)
1018 {
1019 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1020 sizeof (answer));
1021 RRDD_UPDATE_SEND;
1022 return (0);
1023 }
1025 pthread_mutex_lock(&stats_lock);
1026 stats_updates_received++;
1027 pthread_mutex_unlock(&stats_lock);
1029 pthread_mutex_lock (&cache_lock);
1030 ci = g_tree_lookup (cache_tree, file);
1032 if (ci == NULL) /* {{{ */
1033 {
1034 struct stat statbuf;
1036 /* don't hold the lock while we setup; stat(2) might block */
1037 pthread_mutex_unlock(&cache_lock);
1039 memset (&statbuf, 0, sizeof (statbuf));
1040 status = stat (file, &statbuf);
1041 if (status != 0)
1042 {
1043 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1045 status = errno;
1046 if (status == ENOENT)
1047 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1048 else
1049 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1050 status);
1051 RRDD_UPDATE_SEND;
1052 return (0);
1053 }
1054 if (!S_ISREG (statbuf.st_mode))
1055 {
1056 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1057 RRDD_UPDATE_SEND;
1058 return (0);
1059 }
1060 if (access(file, R_OK|W_OK) != 0)
1061 {
1062 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1063 file, rrd_strerror(errno));
1064 RRDD_UPDATE_SEND;
1065 return (0);
1066 }
1068 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1069 if (ci == NULL)
1070 {
1071 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1073 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1074 RRDD_UPDATE_SEND;
1075 return (0);
1076 }
1077 memset (ci, 0, sizeof (cache_item_t));
1079 ci->file = strdup (file);
1080 if (ci->file == NULL)
1081 {
1082 free (ci);
1083 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1085 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1086 RRDD_UPDATE_SEND;
1087 return (0);
1088 }
1090 _wipe_ci_values(ci, now);
1091 ci->flags = CI_FLAGS_IN_TREE;
1093 pthread_mutex_lock(&cache_lock);
1094 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1095 } /* }}} */
1096 assert (ci != NULL);
1098 while (buffer_size > 0)
1099 {
1100 char **temp;
1101 char *value;
1103 status = buffer_get_field (&buffer, &buffer_size, &value);
1104 if (status != 0)
1105 {
1106 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1107 break;
1108 }
1110 temp = (char **) realloc (ci->values,
1111 sizeof (char *) * (ci->values_num + 1));
1112 if (temp == NULL)
1113 {
1114 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1115 continue;
1116 }
1117 ci->values = temp;
1119 ci->values[ci->values_num] = strdup (value);
1120 if (ci->values[ci->values_num] == NULL)
1121 {
1122 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1123 continue;
1124 }
1125 ci->values_num++;
1127 values_num++;
1128 }
1130 if (((now - ci->last_flush_time) >= config_write_interval)
1131 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1132 && (ci->values_num > 0))
1133 {
1134 enqueue_cache_item (ci, TAIL);
1135 pthread_cond_signal (&cache_cond);
1136 }
1138 pthread_mutex_unlock (&cache_lock);
1140 if (values_num < 1)
1141 {
1142 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1143 }
1144 else
1145 {
1146 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1147 (values_num == 1) ? "" : "s");
1148 }
1149 RRDD_UPDATE_SEND;
1150 return (0);
1151 #undef RRDD_UPDATE_SEND
1152 } /* }}} int handle_request_update */
1154 /* we came across a "WROTE" entry during journal replay.
1155 * throw away any values that we have accumulated for this file
1156 */
1157 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1158 const char *buffer,
1159 size_t buffer_size __attribute__((unused)))
1160 {
1161 int i;
1162 cache_item_t *ci;
1163 const char *file = buffer;
1165 pthread_mutex_lock(&cache_lock);
1167 ci = g_tree_lookup(cache_tree, file);
1168 if (ci == NULL)
1169 {
1170 pthread_mutex_unlock(&cache_lock);
1171 return (0);
1172 }
1174 if (ci->values)
1175 {
1176 for (i=0; i < ci->values_num; i++)
1177 free(ci->values[i]);
1179 free(ci->values);
1180 }
1182 _wipe_ci_values(ci, time(NULL));
1184 pthread_mutex_unlock(&cache_lock);
1185 return (0);
1186 } /* }}} int handle_request_wrote */
1188 /* if fd < 0, we are in journal replay mode */
1189 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1190 {
1191 char *buffer_ptr;
1192 char *command;
1193 int status;
1195 assert (buffer[buffer_size - 1] == '\0');
1197 buffer_ptr = buffer;
1198 command = NULL;
1199 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1200 if (status != 0)
1201 {
1202 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1203 return (-1);
1204 }
1206 if (strcasecmp (command, "update") == 0)
1207 {
1208 /* don't re-write updates in replay mode */
1209 if (fd >= 0)
1210 journal_write(command, buffer_ptr);
1212 return (handle_request_update (fd, buffer_ptr, buffer_size));
1213 }
1214 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1215 {
1216 /* this is only valid in replay mode */
1217 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1218 }
1219 else if (strcasecmp (command, "flush") == 0)
1220 {
1221 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1222 }
1223 else if (strcasecmp (command, "stats") == 0)
1224 {
1225 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1226 }
1227 else if (strcasecmp (command, "help") == 0)
1228 {
1229 return (handle_request_help (fd, buffer_ptr, buffer_size));
1230 }
1231 else
1232 {
1233 char result[CMD_MAX];
1235 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1236 result[sizeof (result) - 1] = 0;
1238 status = swrite (fd, result, strlen (result));
1239 if (status < 0)
1240 {
1241 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1242 return (-1);
1243 }
1244 }
1246 return (0);
1247 } /* }}} int handle_request */
1249 /* MUST NOT hold journal_lock before calling this */
1250 static void journal_rotate(void) /* {{{ */
1251 {
1252 FILE *old_fh = NULL;
1254 if (journal_cur == NULL || journal_old == NULL)
1255 return;
1257 pthread_mutex_lock(&journal_lock);
1259 /* we rotate this way (rename before close) so that the we can release
1260 * the journal lock as fast as possible. Journal writes to the new
1261 * journal can proceed immediately after the new file is opened. The
1262 * fclose can then block without affecting new updates.
1263 */
1264 if (journal_fh != NULL)
1265 {
1266 old_fh = journal_fh;
1267 rename(journal_cur, journal_old);
1268 ++stats_journal_rotate;
1269 }
1271 journal_fh = fopen(journal_cur, "a");
1272 pthread_mutex_unlock(&journal_lock);
1274 if (old_fh != NULL)
1275 fclose(old_fh);
1277 if (journal_fh == NULL)
1278 RRDD_LOG(LOG_CRIT,
1279 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1280 journal_cur, rrd_strerror(errno));
1282 } /* }}} static void journal_rotate */
1284 static void journal_done(void) /* {{{ */
1285 {
1286 if (journal_cur == NULL)
1287 return;
1289 pthread_mutex_lock(&journal_lock);
1290 if (journal_fh != NULL)
1291 {
1292 fclose(journal_fh);
1293 journal_fh = NULL;
1294 }
1296 RRDD_LOG(LOG_INFO, "removing journals");
1298 unlink(journal_old);
1299 unlink(journal_cur);
1300 pthread_mutex_unlock(&journal_lock);
1302 } /* }}} static void journal_done */
1304 static int journal_write(char *cmd, char *args) /* {{{ */
1305 {
1306 int chars;
1308 if (journal_fh == NULL)
1309 return 0;
1311 pthread_mutex_lock(&journal_lock);
1312 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1313 pthread_mutex_unlock(&journal_lock);
1315 if (chars > 0)
1316 {
1317 pthread_mutex_lock(&stats_lock);
1318 stats_journal_bytes += chars;
1319 pthread_mutex_unlock(&stats_lock);
1320 }
1322 return chars;
1323 } /* }}} static int journal_write */
1325 static int journal_replay (const char *file) /* {{{ */
1326 {
1327 FILE *fh;
1328 int entry_cnt = 0;
1329 int fail_cnt = 0;
1330 uint64_t line = 0;
1331 char entry[CMD_MAX];
1333 if (file == NULL) return 0;
1335 fh = fopen(file, "r");
1336 if (fh == NULL)
1337 {
1338 if (errno != ENOENT)
1339 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1340 file, rrd_strerror(errno));
1341 return 0;
1342 }
1343 else
1344 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1346 while(!feof(fh))
1347 {
1348 size_t entry_len;
1350 ++line;
1351 fgets(entry, sizeof(entry), fh);
1352 entry_len = strlen(entry);
1354 /* check \n termination in case journal writing crashed mid-line */
1355 if (entry_len == 0)
1356 continue;
1357 else if (entry[entry_len - 1] != '\n')
1358 {
1359 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1360 ++fail_cnt;
1361 continue;
1362 }
1364 entry[entry_len - 1] = '\0';
1366 if (handle_request(-1, entry, entry_len) == 0)
1367 ++entry_cnt;
1368 else
1369 ++fail_cnt;
1370 }
1372 fclose(fh);
1374 if (entry_cnt > 0)
1375 {
1376 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1377 entry_cnt, fail_cnt);
1378 return 1;
1379 }
1380 else
1381 return 0;
1383 } /* }}} static int journal_replay */
1385 static void *connection_thread_main (void *args) /* {{{ */
1386 {
1387 pthread_t self;
1388 int i;
1389 int fd;
1391 fd = *((int *) args);
1392 free (args);
1394 pthread_mutex_lock (&connection_threads_lock);
1395 {
1396 pthread_t *temp;
1398 temp = (pthread_t *) realloc (connection_threads,
1399 sizeof (pthread_t) * (connection_threads_num + 1));
1400 if (temp == NULL)
1401 {
1402 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1403 }
1404 else
1405 {
1406 connection_threads = temp;
1407 connection_threads[connection_threads_num] = pthread_self ();
1408 connection_threads_num++;
1409 }
1410 }
1411 pthread_mutex_unlock (&connection_threads_lock);
1413 while (do_shutdown == 0)
1414 {
1415 char buffer[CMD_MAX];
1417 struct pollfd pollfd;
1418 int status;
1420 pollfd.fd = fd;
1421 pollfd.events = POLLIN | POLLPRI;
1422 pollfd.revents = 0;
1424 status = poll (&pollfd, 1, /* timeout = */ 500);
1425 if (status == 0) /* timeout */
1426 continue;
1427 else if (status < 0) /* error */
1428 {
1429 status = errno;
1430 if (status == EINTR)
1431 continue;
1432 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1433 continue;
1434 }
1436 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1437 {
1438 close (fd);
1439 break;
1440 }
1441 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1442 {
1443 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1444 "poll(2) returned something unexpected: %#04hx",
1445 pollfd.revents);
1446 close (fd);
1447 break;
1448 }
1450 status = (int) sread (fd, buffer, sizeof (buffer));
1451 if (status <= 0)
1452 {
1453 close (fd);
1455 if (status < 0)
1456 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1458 break;
1459 }
1461 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1462 if (status != 0)
1463 break;
1464 }
1466 close(fd);
1468 self = pthread_self ();
1469 /* Remove this thread from the connection threads list */
1470 pthread_mutex_lock (&connection_threads_lock);
1471 /* Find out own index in the array */
1472 for (i = 0; i < connection_threads_num; i++)
1473 if (pthread_equal (connection_threads[i], self) != 0)
1474 break;
1475 assert (i < connection_threads_num);
1477 /* Move the trailing threads forward. */
1478 if (i < (connection_threads_num - 1))
1479 {
1480 memmove (connection_threads + i,
1481 connection_threads + i + 1,
1482 sizeof (pthread_t) * (connection_threads_num - i - 1));
1483 }
1485 connection_threads_num--;
1486 pthread_mutex_unlock (&connection_threads_lock);
1488 return (NULL);
1489 } /* }}} void *connection_thread_main */
1491 static int open_listen_socket_unix (const char *path) /* {{{ */
1492 {
1493 int fd;
1494 struct sockaddr_un sa;
1495 listen_socket_t *temp;
1496 int status;
1498 temp = (listen_socket_t *) realloc (listen_fds,
1499 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1500 if (temp == NULL)
1501 {
1502 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1503 return (-1);
1504 }
1505 listen_fds = temp;
1506 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1508 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1509 if (fd < 0)
1510 {
1511 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1512 return (-1);
1513 }
1515 memset (&sa, 0, sizeof (sa));
1516 sa.sun_family = AF_UNIX;
1517 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1519 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1520 if (status != 0)
1521 {
1522 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1523 close (fd);
1524 unlink (path);
1525 return (-1);
1526 }
1528 status = listen (fd, /* backlog = */ 10);
1529 if (status != 0)
1530 {
1531 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1532 close (fd);
1533 unlink (path);
1534 return (-1);
1535 }
1537 listen_fds[listen_fds_num].fd = fd;
1538 snprintf (listen_fds[listen_fds_num].path,
1539 sizeof (listen_fds[listen_fds_num].path) - 1,
1540 "unix:%s", path);
1541 listen_fds_num++;
1543 return (0);
1544 } /* }}} int open_listen_socket_unix */
1546 static int open_listen_socket (const char *addr_orig) /* {{{ */
1547 {
1548 struct addrinfo ai_hints;
1549 struct addrinfo *ai_res;
1550 struct addrinfo *ai_ptr;
1551 char addr_copy[NI_MAXHOST];
1552 char *addr;
1553 char *port;
1554 int status;
1556 assert (addr_orig != NULL);
1558 strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1559 addr_copy[sizeof (addr_copy) - 1] = 0;
1560 addr = addr_copy;
1562 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1563 return (open_listen_socket_unix (addr + strlen ("unix:")));
1564 else if (addr[0] == '/')
1565 return (open_listen_socket_unix (addr));
1567 memset (&ai_hints, 0, sizeof (ai_hints));
1568 ai_hints.ai_flags = 0;
1569 #ifdef AI_ADDRCONFIG
1570 ai_hints.ai_flags |= AI_ADDRCONFIG;
1571 #endif
1572 ai_hints.ai_family = AF_UNSPEC;
1573 ai_hints.ai_socktype = SOCK_STREAM;
1575 port = NULL;
1576 if (*addr == '[') /* IPv6+port format */
1577 {
1578 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1579 addr++;
1581 port = strchr (addr, ']');
1582 if (port == NULL)
1583 {
1584 RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1585 addr_orig);
1586 return (-1);
1587 }
1588 *port = 0;
1589 port++;
1591 if (*port == ':')
1592 port++;
1593 else if (*port == 0)
1594 port = NULL;
1595 else
1596 {
1597 RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1598 port);
1599 return (-1);
1600 }
1601 } /* if (*addr = ']') */
1602 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1603 {
1604 port = rindex(addr, ':');
1605 if (port != NULL)
1606 {
1607 *port = 0;
1608 port++;
1609 }
1610 }
1611 ai_res = NULL;
1612 status = getaddrinfo (addr,
1613 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1614 &ai_hints, &ai_res);
1615 if (status != 0)
1616 {
1617 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1618 "%s", addr, gai_strerror (status));
1619 return (-1);
1620 }
1622 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1623 {
1624 int fd;
1625 listen_socket_t *temp;
1626 int one = 1;
1628 temp = (listen_socket_t *) realloc (listen_fds,
1629 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1630 if (temp == NULL)
1631 {
1632 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1633 continue;
1634 }
1635 listen_fds = temp;
1636 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1638 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1639 if (fd < 0)
1640 {
1641 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1642 continue;
1643 }
1645 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1647 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1648 if (status != 0)
1649 {
1650 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1651 close (fd);
1652 continue;
1653 }
1655 status = listen (fd, /* backlog = */ 10);
1656 if (status != 0)
1657 {
1658 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1659 close (fd);
1660 return (-1);
1661 }
1663 listen_fds[listen_fds_num].fd = fd;
1664 strncpy (listen_fds[listen_fds_num].path, addr,
1665 sizeof (listen_fds[listen_fds_num].path) - 1);
1666 listen_fds_num++;
1667 } /* for (ai_ptr) */
1669 return (0);
1670 } /* }}} int open_listen_socket */
1672 static int close_listen_sockets (void) /* {{{ */
1673 {
1674 size_t i;
1676 for (i = 0; i < listen_fds_num; i++)
1677 {
1678 close (listen_fds[i].fd);
1679 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1680 unlink (listen_fds[i].path + strlen ("unix:"));
1681 }
1683 free (listen_fds);
1684 listen_fds = NULL;
1685 listen_fds_num = 0;
1687 return (0);
1688 } /* }}} int close_listen_sockets */
1690 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1691 {
1692 struct pollfd *pollfds;
1693 int pollfds_num;
1694 int status;
1695 int i;
1697 for (i = 0; i < config_listen_address_list_len; i++)
1698 open_listen_socket (config_listen_address_list[i]);
1700 if (config_listen_address_list_len < 1)
1701 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1703 if (listen_fds_num < 1)
1704 {
1705 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1706 "could be opened. Sorry.");
1707 return (NULL);
1708 }
1710 pollfds_num = listen_fds_num;
1711 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1712 if (pollfds == NULL)
1713 {
1714 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1715 return (NULL);
1716 }
1717 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1719 RRDD_LOG(LOG_INFO, "listening for connections");
1721 while (do_shutdown == 0)
1722 {
1723 assert (pollfds_num == ((int) listen_fds_num));
1724 for (i = 0; i < pollfds_num; i++)
1725 {
1726 pollfds[i].fd = listen_fds[i].fd;
1727 pollfds[i].events = POLLIN | POLLPRI;
1728 pollfds[i].revents = 0;
1729 }
1731 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1732 if (status == 0)
1733 {
1734 continue; /* timeout */
1735 }
1736 else if (status < 0)
1737 {
1738 status = errno;
1739 if (status != EINTR)
1740 {
1741 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1742 }
1743 continue;
1744 }
1746 for (i = 0; i < pollfds_num; i++)
1747 {
1748 int *client_sd;
1749 struct sockaddr_storage client_sa;
1750 socklen_t client_sa_size;
1751 pthread_t tid;
1752 pthread_attr_t attr;
1754 if (pollfds[i].revents == 0)
1755 continue;
1757 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1758 {
1759 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1760 "poll(2) returned something unexpected for listen FD #%i.",
1761 pollfds[i].fd);
1762 continue;
1763 }
1765 client_sd = (int *) malloc (sizeof (int));
1766 if (client_sd == NULL)
1767 {
1768 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1769 continue;
1770 }
1772 client_sa_size = sizeof (client_sa);
1773 *client_sd = accept (pollfds[i].fd,
1774 (struct sockaddr *) &client_sa, &client_sa_size);
1775 if (*client_sd < 0)
1776 {
1777 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1778 continue;
1779 }
1781 pthread_attr_init (&attr);
1782 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1784 status = pthread_create (&tid, &attr, connection_thread_main,
1785 /* args = */ (void *) client_sd);
1786 if (status != 0)
1787 {
1788 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1789 close (*client_sd);
1790 free (client_sd);
1791 continue;
1792 }
1793 } /* for (pollfds_num) */
1794 } /* while (do_shutdown == 0) */
1796 RRDD_LOG(LOG_INFO, "starting shutdown");
1798 close_listen_sockets ();
1800 pthread_mutex_lock (&connection_threads_lock);
1801 while (connection_threads_num > 0)
1802 {
1803 pthread_t wait_for;
1805 wait_for = connection_threads[0];
1807 pthread_mutex_unlock (&connection_threads_lock);
1808 pthread_join (wait_for, /* retval = */ NULL);
1809 pthread_mutex_lock (&connection_threads_lock);
1810 }
1811 pthread_mutex_unlock (&connection_threads_lock);
1813 return (NULL);
1814 } /* }}} void *listen_thread_main */
1816 static int daemonize (void) /* {{{ */
1817 {
1818 int status;
1820 /* These structures are static, because `sigaction' behaves weird if the are
1821 * overwritten.. */
1822 static struct sigaction sa_int;
1823 static struct sigaction sa_term;
1824 static struct sigaction sa_pipe;
1826 if (!stay_foreground)
1827 {
1828 pid_t child;
1829 char *base_dir;
1831 child = fork ();
1832 if (child < 0)
1833 {
1834 fprintf (stderr, "daemonize: fork(2) failed.\n");
1835 return (-1);
1836 }
1837 else if (child > 0)
1838 {
1839 return (1);
1840 }
1842 /* Change into the /tmp directory. */
1843 base_dir = (config_base_dir != NULL)
1844 ? config_base_dir
1845 : "/tmp";
1846 status = chdir (base_dir);
1847 if (status != 0)
1848 {
1849 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1850 return (-1);
1851 }
1853 /* Become session leader */
1854 setsid ();
1856 /* Open the first three file descriptors to /dev/null */
1857 close (2);
1858 close (1);
1859 close (0);
1861 open ("/dev/null", O_RDWR);
1862 dup (0);
1863 dup (0);
1864 } /* if (!stay_foreground) */
1866 /* Install signal handlers */
1867 memset (&sa_int, 0, sizeof (sa_int));
1868 sa_int.sa_handler = sig_int_handler;
1869 sigaction (SIGINT, &sa_int, NULL);
1871 memset (&sa_term, 0, sizeof (sa_term));
1872 sa_term.sa_handler = sig_term_handler;
1873 sigaction (SIGTERM, &sa_term, NULL);
1875 memset (&sa_pipe, 0, sizeof (sa_pipe));
1876 sa_pipe.sa_handler = SIG_IGN;
1877 sigaction (SIGPIPE, &sa_pipe, NULL);
1879 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1880 RRDD_LOG(LOG_INFO, "starting up");
1882 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1883 if (cache_tree == NULL)
1884 {
1885 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1886 return (-1);
1887 }
1889 status = write_pidfile ();
1890 return status;
1891 } /* }}} int daemonize */
1893 static int cleanup (void) /* {{{ */
1894 {
1895 do_shutdown++;
1897 pthread_cond_signal (&cache_cond);
1898 pthread_join (queue_thread, /* return = */ NULL);
1900 remove_pidfile ();
1902 RRDD_LOG(LOG_INFO, "goodbye");
1903 closelog ();
1905 return (0);
1906 } /* }}} int cleanup */
1908 static int read_options (int argc, char **argv) /* {{{ */
1909 {
1910 int option;
1911 int status = 0;
1913 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1914 {
1915 switch (option)
1916 {
1917 case 'g':
1918 stay_foreground=1;
1919 break;
1921 case 'l':
1922 {
1923 char **temp;
1925 temp = (char **) realloc (config_listen_address_list,
1926 sizeof (char *) * (config_listen_address_list_len + 1));
1927 if (temp == NULL)
1928 {
1929 fprintf (stderr, "read_options: realloc failed.\n");
1930 return (2);
1931 }
1932 config_listen_address_list = temp;
1934 temp[config_listen_address_list_len] = strdup (optarg);
1935 if (temp[config_listen_address_list_len] == NULL)
1936 {
1937 fprintf (stderr, "read_options: strdup failed.\n");
1938 return (2);
1939 }
1940 config_listen_address_list_len++;
1941 }
1942 break;
1944 case 'f':
1945 {
1946 int temp;
1948 temp = atoi (optarg);
1949 if (temp > 0)
1950 config_flush_interval = temp;
1951 else
1952 {
1953 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1954 status = 3;
1955 }
1956 }
1957 break;
1959 case 'w':
1960 {
1961 int temp;
1963 temp = atoi (optarg);
1964 if (temp > 0)
1965 config_write_interval = temp;
1966 else
1967 {
1968 fprintf (stderr, "Invalid write interval: %s\n", optarg);
1969 status = 2;
1970 }
1971 }
1972 break;
1974 case 'z':
1975 {
1976 int temp;
1978 temp = atoi(optarg);
1979 if (temp > 0)
1980 config_write_jitter = temp;
1981 else
1982 {
1983 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1984 status = 2;
1985 }
1987 break;
1988 }
1990 case 'b':
1991 {
1992 size_t len;
1994 if (config_base_dir != NULL)
1995 free (config_base_dir);
1996 config_base_dir = strdup (optarg);
1997 if (config_base_dir == NULL)
1998 {
1999 fprintf (stderr, "read_options: strdup failed.\n");
2000 return (3);
2001 }
2003 len = strlen (config_base_dir);
2004 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2005 {
2006 config_base_dir[len - 1] = 0;
2007 len--;
2008 }
2010 if (len < 1)
2011 {
2012 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2013 return (4);
2014 }
2015 }
2016 break;
2018 case 'p':
2019 {
2020 if (config_pid_file != NULL)
2021 free (config_pid_file);
2022 config_pid_file = strdup (optarg);
2023 if (config_pid_file == NULL)
2024 {
2025 fprintf (stderr, "read_options: strdup failed.\n");
2026 return (3);
2027 }
2028 }
2029 break;
2031 case 'j':
2032 {
2033 struct stat statbuf;
2034 const char *dir = optarg;
2036 status = stat(dir, &statbuf);
2037 if (status != 0)
2038 {
2039 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2040 return 6;
2041 }
2043 if (!S_ISDIR(statbuf.st_mode)
2044 || access(dir, R_OK|W_OK|X_OK) != 0)
2045 {
2046 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2047 errno ? rrd_strerror(errno) : "");
2048 return 6;
2049 }
2051 journal_cur = malloc(PATH_MAX + 1);
2052 journal_old = malloc(PATH_MAX + 1);
2053 if (journal_cur == NULL || journal_old == NULL)
2054 {
2055 fprintf(stderr, "malloc failure for journal files\n");
2056 return 6;
2057 }
2058 else
2059 {
2060 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2061 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2062 }
2063 }
2064 break;
2066 case 'h':
2067 case '?':
2068 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2069 "\n"
2070 "Usage: rrdcached [options]\n"
2071 "\n"
2072 "Valid options are:\n"
2073 " -l <address> Socket address to listen to.\n"
2074 " -w <seconds> Interval in which to write data.\n"
2075 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2076 " -f <seconds> Interval in which to flush dead data.\n"
2077 " -p <file> Location of the PID-file.\n"
2078 " -b <dir> Base directory to change to.\n"
2079 " -g Do not fork and run in the foreground.\n"
2080 " -j <dir> Directory in which to create the journal files.\n"
2081 "\n"
2082 "For more information and a detailed description of all options "
2083 "please refer\n"
2084 "to the rrdcached(1) manual page.\n",
2085 VERSION);
2086 status = -1;
2087 break;
2088 } /* switch (option) */
2089 } /* while (getopt) */
2091 /* advise the user when values are not sane */
2092 if (config_flush_interval < 2 * config_write_interval)
2093 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2094 " 2x write interval (-w) !\n");
2095 if (config_write_jitter > config_write_interval)
2096 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2097 " write interval (-w) !\n");
2099 return (status);
2100 } /* }}} int read_options */
2102 int main (int argc, char **argv)
2103 {
2104 int status;
2106 status = read_options (argc, argv);
2107 if (status != 0)
2108 {
2109 if (status < 0)
2110 status = 0;
2111 return (status);
2112 }
2114 status = daemonize ();
2115 if (status == 1)
2116 {
2117 struct sigaction sigchld;
2119 memset (&sigchld, 0, sizeof (sigchld));
2120 sigchld.sa_handler = SIG_IGN;
2121 sigaction (SIGCHLD, &sigchld, NULL);
2123 return (0);
2124 }
2125 else if (status != 0)
2126 {
2127 fprintf (stderr, "daemonize failed, exiting.\n");
2128 return (1);
2129 }
2131 if (journal_cur != NULL)
2132 {
2133 int had_journal = 0;
2135 pthread_mutex_lock(&journal_lock);
2137 RRDD_LOG(LOG_INFO, "checking for journal files");
2139 had_journal += journal_replay(journal_old);
2140 had_journal += journal_replay(journal_cur);
2142 if (had_journal)
2143 flush_old_values(-1);
2145 pthread_mutex_unlock(&journal_lock);
2146 journal_rotate();
2148 RRDD_LOG(LOG_INFO, "journal processing complete");
2149 }
2151 /* start the queue thread */
2152 memset (&queue_thread, 0, sizeof (queue_thread));
2153 status = pthread_create (&queue_thread,
2154 NULL, /* attr */
2155 queue_thread_main,
2156 NULL); /* args */
2157 if (status != 0)
2158 {
2159 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2160 cleanup();
2161 return (1);
2162 }
2164 listen_thread_main (NULL);
2165 cleanup ();
2167 return (0);
2168 } /* int main */
2170 /*
2171 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2172 */