1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
122 pthread_cond_t flushed;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static int config_write_interval = 300;
169 static int config_write_jitter = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
195 /*
196 * Functions
197 */
198 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
199 {
200 RRDD_LOG(LOG_NOTICE, "caught SIGINT");
201 do_shutdown++;
202 pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_int_handler */
205 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
206 {
207 RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
208 do_shutdown++;
209 pthread_cond_broadcast(&cache_cond);
210 } /* }}} void sig_term_handler */
212 static int open_pidfile(void) /* {{{ */
213 {
214 int fd;
215 char *file;
217 file = (config_pid_file != NULL)
218 ? config_pid_file
219 : LOCALSTATEDIR "/run/rrdcached.pid";
221 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
222 if (fd < 0)
223 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
224 file, rrd_strerror(errno));
226 return(fd);
227 }
229 static int write_pidfile (int fd) /* {{{ */
230 {
231 pid_t pid;
232 FILE *fh;
234 pid = getpid ();
236 fh = fdopen (fd, "w");
237 if (fh == NULL)
238 {
239 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
240 close(fd);
241 return (-1);
242 }
244 fprintf (fh, "%i\n", (int) pid);
245 fclose (fh);
247 return (0);
248 } /* }}} int write_pidfile */
250 static int remove_pidfile (void) /* {{{ */
251 {
252 char *file;
253 int status;
255 file = (config_pid_file != NULL)
256 ? config_pid_file
257 : LOCALSTATEDIR "/run/rrdcached.pid";
259 status = unlink (file);
260 if (status == 0)
261 return (0);
262 return (errno);
263 } /* }}} int remove_pidfile */
265 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
266 {
267 char *buffer;
268 size_t buffer_used;
269 size_t buffer_free;
270 ssize_t status;
272 buffer = (char *) buffer_void;
273 buffer_used = 0;
274 buffer_free = buffer_size;
276 while (buffer_free > 0)
277 {
278 status = read (fd, buffer + buffer_used, buffer_free);
279 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
280 continue;
282 if (status < 0)
283 return (-1);
285 if (status == 0)
286 return (0);
288 assert ((0 > status) || (buffer_free >= (size_t) status));
290 buffer_free = buffer_free - status;
291 buffer_used = buffer_used + status;
293 if (buffer[buffer_used - 1] == '\n')
294 break;
295 }
297 assert (buffer_used > 0);
299 if (buffer[buffer_used - 1] != '\n')
300 {
301 errno = ENOBUFS;
302 return (-1);
303 }
305 buffer[buffer_used - 1] = 0;
307 /* Fix network line endings. */
308 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
309 {
310 buffer_used--;
311 buffer[buffer_used - 1] = 0;
312 }
314 return (buffer_used);
315 } /* }}} ssize_t sread */
317 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
318 {
319 const char *ptr;
320 size_t nleft;
321 ssize_t status;
323 /* special case for journal replay */
324 if (fd < 0) return 0;
326 ptr = (const char *) buf;
327 nleft = count;
329 while (nleft > 0)
330 {
331 status = write (fd, (const void *) ptr, nleft);
333 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
334 continue;
336 if (status < 0)
337 return (status);
339 nleft -= status;
340 ptr += status;
341 }
343 return (0);
344 } /* }}} ssize_t swrite */
346 static void _wipe_ci_values(cache_item_t *ci, time_t when)
347 {
348 ci->values = NULL;
349 ci->values_num = 0;
351 ci->last_flush_time = when;
352 if (config_write_jitter > 0)
353 ci->last_flush_time += (random() % config_write_jitter);
355 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
356 }
358 /*
359 * enqueue_cache_item:
360 * `cache_lock' must be acquired before calling this function!
361 */
362 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
363 queue_side_t side)
364 {
365 int did_insert = 0;
367 if (ci == NULL)
368 return (-1);
370 if (ci->values_num == 0)
371 return (0);
373 if (side == HEAD)
374 {
375 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
376 {
377 assert (ci->next == NULL);
378 ci->next = cache_queue_head;
379 cache_queue_head = ci;
381 if (cache_queue_tail == NULL)
382 cache_queue_tail = cache_queue_head;
384 did_insert = 1;
385 }
386 else if (cache_queue_head == ci)
387 {
388 /* do nothing */
389 }
390 else /* enqueued, but not first entry */
391 {
392 cache_item_t *prev;
394 /* find previous entry */
395 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
396 if (prev->next == ci)
397 break;
398 assert (prev != NULL);
400 /* move to the front */
401 prev->next = ci->next;
402 ci->next = cache_queue_head;
403 cache_queue_head = ci;
405 /* check if we need to adapt the tail */
406 if (cache_queue_tail == ci)
407 cache_queue_tail = prev;
408 }
409 }
410 else /* (side == TAIL) */
411 {
412 /* We don't move values back in the list.. */
413 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
414 return (0);
416 assert (ci->next == NULL);
418 if (cache_queue_tail == NULL)
419 cache_queue_head = ci;
420 else
421 cache_queue_tail->next = ci;
422 cache_queue_tail = ci;
424 did_insert = 1;
425 }
427 ci->flags |= CI_FLAGS_IN_QUEUE;
429 if (did_insert)
430 {
431 pthread_mutex_lock (&stats_lock);
432 stats_queue_length++;
433 pthread_mutex_unlock (&stats_lock);
434 }
436 return (0);
437 } /* }}} int enqueue_cache_item */
439 /*
440 * tree_callback_flush:
441 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
442 * while this is in progress.
443 */
444 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
445 gpointer data)
446 {
447 cache_item_t *ci;
448 callback_flush_data_t *cfd;
450 ci = (cache_item_t *) value;
451 cfd = (callback_flush_data_t *) data;
453 if ((ci->last_flush_time <= cfd->abs_timeout)
454 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
455 && (ci->values_num > 0))
456 {
457 enqueue_cache_item (ci, TAIL);
458 }
459 else if ((do_shutdown != 0)
460 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
461 && (ci->values_num > 0))
462 {
463 enqueue_cache_item (ci, TAIL);
464 }
465 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
466 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
467 && (ci->values_num <= 0))
468 {
469 char **temp;
471 temp = (char **) realloc (cfd->keys,
472 sizeof (char *) * (cfd->keys_num + 1));
473 if (temp == NULL)
474 {
475 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
476 return (FALSE);
477 }
478 cfd->keys = temp;
479 /* Make really sure this points to the _same_ place */
480 assert ((char *) key == ci->file);
481 cfd->keys[cfd->keys_num] = (char *) key;
482 cfd->keys_num++;
483 }
485 return (FALSE);
486 } /* }}} gboolean tree_callback_flush */
488 static int flush_old_values (int max_age)
489 {
490 callback_flush_data_t cfd;
491 size_t k;
493 memset (&cfd, 0, sizeof (cfd));
494 /* Pass the current time as user data so that we don't need to call
495 * `time' for each node. */
496 cfd.now = time (NULL);
497 cfd.keys = NULL;
498 cfd.keys_num = 0;
500 if (max_age > 0)
501 cfd.abs_timeout = cfd.now - max_age;
502 else
503 cfd.abs_timeout = cfd.now + 1;
505 /* `tree_callback_flush' will return the keys of all values that haven't
506 * been touched in the last `config_flush_interval' seconds in `cfd'.
507 * The char*'s in this array point to the same memory as ci->file, so we
508 * don't need to free them separately. */
509 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
511 for (k = 0; k < cfd.keys_num; k++)
512 {
513 cache_item_t *ci;
515 /* This must not fail. */
516 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
517 assert (ci != NULL);
519 /* If we end up here with values available, something's seriously
520 * messed up. */
521 assert (ci->values_num == 0);
523 /* Remove the node from the tree */
524 g_tree_remove (cache_tree, cfd.keys[k]);
525 cfd.keys[k] = NULL;
527 /* Now free and clean up `ci'. */
528 free (ci->file);
529 ci->file = NULL;
530 free (ci);
531 ci = NULL;
532 } /* for (k = 0; k < cfd.keys_num; k++) */
534 if (cfd.keys != NULL)
535 {
536 free (cfd.keys);
537 cfd.keys = NULL;
538 }
540 return (0);
541 } /* int flush_old_values */
543 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
544 {
545 struct timeval now;
546 struct timespec next_flush;
548 gettimeofday (&now, NULL);
549 next_flush.tv_sec = now.tv_sec + config_flush_interval;
550 next_flush.tv_nsec = 1000 * now.tv_usec;
552 pthread_mutex_lock (&cache_lock);
553 while ((do_shutdown == 0) || (cache_queue_head != NULL))
554 {
555 cache_item_t *ci;
556 char *file;
557 char **values;
558 int values_num;
559 int status;
560 int i;
562 /* First, check if it's time to do the cache flush. */
563 gettimeofday (&now, NULL);
564 if ((now.tv_sec > next_flush.tv_sec)
565 || ((now.tv_sec == next_flush.tv_sec)
566 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
567 {
568 /* Flush all values that haven't been written in the last
569 * `config_write_interval' seconds. */
570 flush_old_values (config_write_interval);
572 /* Determine the time of the next cache flush. */
573 while (next_flush.tv_sec <= now.tv_sec)
574 next_flush.tv_sec += config_flush_interval;
576 /* unlock the cache while we rotate so we don't block incoming
577 * updates if the fsync() blocks on disk I/O */
578 pthread_mutex_unlock(&cache_lock);
579 journal_rotate();
580 pthread_mutex_lock(&cache_lock);
581 }
583 /* Now, check if there's something to store away. If not, wait until
584 * something comes in or it's time to do the cache flush. */
585 if (cache_queue_head == NULL)
586 {
587 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
588 if ((status != 0) && (status != ETIMEDOUT))
589 {
590 RRDD_LOG (LOG_ERR, "queue_thread_main: "
591 "pthread_cond_timedwait returned %i.", status);
592 }
593 }
595 /* We're about to shut down, so lets flush the entire tree. */
596 if ((do_shutdown != 0) && (cache_queue_head == NULL))
597 flush_old_values (/* max age = */ -1);
599 /* Check if a value has arrived. This may be NULL if we timed out or there
600 * was an interrupt such as a signal. */
601 if (cache_queue_head == NULL)
602 continue;
604 ci = cache_queue_head;
606 /* copy the relevant parts */
607 file = strdup (ci->file);
608 if (file == NULL)
609 {
610 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
611 continue;
612 }
614 assert(ci->values != NULL);
615 assert(ci->values_num > 0);
617 values = ci->values;
618 values_num = ci->values_num;
620 _wipe_ci_values(ci, time(NULL));
622 cache_queue_head = ci->next;
623 if (cache_queue_head == NULL)
624 cache_queue_tail = NULL;
625 ci->next = NULL;
627 pthread_mutex_lock (&stats_lock);
628 assert (stats_queue_length > 0);
629 stats_queue_length--;
630 pthread_mutex_unlock (&stats_lock);
632 pthread_mutex_unlock (&cache_lock);
634 rrd_clear_error ();
635 status = rrd_update_r (file, NULL, values_num, (void *) values);
636 if (status != 0)
637 {
638 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
639 "rrd_update_r (%s) failed with status %i. (%s)",
640 file, status, rrd_get_error());
641 }
643 journal_write("wrote", file);
644 pthread_cond_broadcast(&ci->flushed);
646 for (i = 0; i < values_num; i++)
647 free (values[i]);
649 free(values);
650 free(file);
652 if (status == 0)
653 {
654 pthread_mutex_lock (&stats_lock);
655 stats_updates_written++;
656 stats_data_sets_written += values_num;
657 pthread_mutex_unlock (&stats_lock);
658 }
660 pthread_mutex_lock (&cache_lock);
662 /* We're about to shut down, so lets flush the entire tree. */
663 if ((do_shutdown != 0) && (cache_queue_head == NULL))
664 flush_old_values (/* max age = */ -1);
665 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
666 pthread_mutex_unlock (&cache_lock);
668 assert(cache_queue_head == NULL);
669 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
670 journal_done();
672 return (NULL);
673 } /* }}} void *queue_thread_main */
675 static int buffer_get_field (char **buffer_ret, /* {{{ */
676 size_t *buffer_size_ret, char **field_ret)
677 {
678 char *buffer;
679 size_t buffer_pos;
680 size_t buffer_size;
681 char *field;
682 size_t field_size;
683 int status;
685 buffer = *buffer_ret;
686 buffer_pos = 0;
687 buffer_size = *buffer_size_ret;
688 field = *buffer_ret;
689 field_size = 0;
691 if (buffer_size <= 0)
692 return (-1);
694 /* This is ensured by `handle_request'. */
695 assert (buffer[buffer_size - 1] == '\0');
697 status = -1;
698 while (buffer_pos < buffer_size)
699 {
700 /* Check for end-of-field or end-of-buffer */
701 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
702 {
703 field[field_size] = 0;
704 field_size++;
705 buffer_pos++;
706 status = 0;
707 break;
708 }
709 /* Handle escaped characters. */
710 else if (buffer[buffer_pos] == '\\')
711 {
712 if (buffer_pos >= (buffer_size - 1))
713 break;
714 buffer_pos++;
715 field[field_size] = buffer[buffer_pos];
716 field_size++;
717 buffer_pos++;
718 }
719 /* Normal operation */
720 else
721 {
722 field[field_size] = buffer[buffer_pos];
723 field_size++;
724 buffer_pos++;
725 }
726 } /* while (buffer_pos < buffer_size) */
728 if (status != 0)
729 return (status);
731 *buffer_ret = buffer + buffer_pos;
732 *buffer_size_ret = buffer_size - buffer_pos;
733 *field_ret = field;
735 return (0);
736 } /* }}} int buffer_get_field */
738 static int flush_file (const char *filename) /* {{{ */
739 {
740 cache_item_t *ci;
742 pthread_mutex_lock (&cache_lock);
744 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
745 if (ci == NULL)
746 {
747 pthread_mutex_unlock (&cache_lock);
748 return (ENOENT);
749 }
751 /* Enqueue at head */
752 enqueue_cache_item (ci, HEAD);
753 pthread_cond_signal (&cache_cond);
755 pthread_cond_wait(&ci->flushed, &cache_lock);
756 pthread_mutex_unlock(&cache_lock);
758 return (0);
759 } /* }}} int flush_file */
761 static int handle_request_help (int fd, /* {{{ */
762 char *buffer, size_t buffer_size)
763 {
764 int status;
765 char **help_text;
766 size_t help_text_len;
767 char *command;
768 size_t i;
770 char *help_help[] =
771 {
772 "4 Command overview\n",
773 "FLUSH <filename>\n",
774 "HELP [<command>]\n",
775 "UPDATE <filename> <values> [<values> ...]\n",
776 "STATS\n"
777 };
778 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
780 char *help_flush[] =
781 {
782 "4 Help for FLUSH\n",
783 "Usage: FLUSH <filename>\n",
784 "\n",
785 "Adds the given filename to the head of the update queue and returns\n",
786 "after is has been dequeued.\n"
787 };
788 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
790 char *help_update[] =
791 {
792 "9 Help for UPDATE\n",
793 "Usage: UPDATE <filename> <values> [<values> ...]\n"
794 "\n",
795 "Adds the given file to the internal cache if it is not yet known and\n",
796 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
797 "for details.\n",
798 "\n",
799 "Each <values> has the following form:\n",
800 " <values> = <time>:<value>[:<value>[...]]\n",
801 "See the rrdupdate(1) manpage for details.\n"
802 };
803 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
805 char *help_stats[] =
806 {
807 "4 Help for STATS\n",
808 "Usage: STATS\n",
809 "\n",
810 "Returns some performance counters, see the rrdcached(1) manpage for\n",
811 "a description of the values.\n"
812 };
813 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
815 status = buffer_get_field (&buffer, &buffer_size, &command);
816 if (status != 0)
817 {
818 help_text = help_help;
819 help_text_len = help_help_len;
820 }
821 else
822 {
823 if (strcasecmp (command, "update") == 0)
824 {
825 help_text = help_update;
826 help_text_len = help_update_len;
827 }
828 else if (strcasecmp (command, "flush") == 0)
829 {
830 help_text = help_flush;
831 help_text_len = help_flush_len;
832 }
833 else if (strcasecmp (command, "stats") == 0)
834 {
835 help_text = help_stats;
836 help_text_len = help_stats_len;
837 }
838 else
839 {
840 help_text = help_help;
841 help_text_len = help_help_len;
842 }
843 }
845 for (i = 0; i < help_text_len; i++)
846 {
847 status = swrite (fd, help_text[i], strlen (help_text[i]));
848 if (status < 0)
849 {
850 status = errno;
851 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
852 return (status);
853 }
854 }
856 return (0);
857 } /* }}} int handle_request_help */
859 static int handle_request_stats (int fd, /* {{{ */
860 char *buffer __attribute__((unused)),
861 size_t buffer_size __attribute__((unused)))
862 {
863 int status;
864 char outbuf[CMD_MAX];
866 uint64_t copy_queue_length;
867 uint64_t copy_updates_received;
868 uint64_t copy_flush_received;
869 uint64_t copy_updates_written;
870 uint64_t copy_data_sets_written;
871 uint64_t copy_journal_bytes;
872 uint64_t copy_journal_rotate;
874 uint64_t tree_nodes_number;
875 uint64_t tree_depth;
877 pthread_mutex_lock (&stats_lock);
878 copy_queue_length = stats_queue_length;
879 copy_updates_received = stats_updates_received;
880 copy_flush_received = stats_flush_received;
881 copy_updates_written = stats_updates_written;
882 copy_data_sets_written = stats_data_sets_written;
883 copy_journal_bytes = stats_journal_bytes;
884 copy_journal_rotate = stats_journal_rotate;
885 pthread_mutex_unlock (&stats_lock);
887 pthread_mutex_lock (&cache_lock);
888 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
889 tree_depth = (uint64_t) g_tree_height (cache_tree);
890 pthread_mutex_unlock (&cache_lock);
892 #define RRDD_STATS_SEND \
893 outbuf[sizeof (outbuf) - 1] = 0; \
894 status = swrite (fd, outbuf, strlen (outbuf)); \
895 if (status < 0) \
896 { \
897 status = errno; \
898 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
899 return (status); \
900 }
902 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
903 RRDD_STATS_SEND;
905 snprintf (outbuf, sizeof (outbuf),
906 "QueueLength: %"PRIu64"\n", copy_queue_length);
907 RRDD_STATS_SEND;
909 snprintf (outbuf, sizeof (outbuf),
910 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
911 RRDD_STATS_SEND;
913 snprintf (outbuf, sizeof (outbuf),
914 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
915 RRDD_STATS_SEND;
917 snprintf (outbuf, sizeof (outbuf),
918 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
919 RRDD_STATS_SEND;
921 snprintf (outbuf, sizeof (outbuf),
922 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
923 RRDD_STATS_SEND;
925 snprintf (outbuf, sizeof (outbuf),
926 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
927 RRDD_STATS_SEND;
929 snprintf (outbuf, sizeof (outbuf),
930 "TreeDepth: %"PRIu64"\n", tree_depth);
931 RRDD_STATS_SEND;
933 snprintf (outbuf, sizeof(outbuf),
934 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
935 RRDD_STATS_SEND;
937 snprintf (outbuf, sizeof(outbuf),
938 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
939 RRDD_STATS_SEND;
941 return (0);
942 #undef RRDD_STATS_SEND
943 } /* }}} int handle_request_stats */
945 static int handle_request_flush (int fd, /* {{{ */
946 char *buffer, size_t buffer_size)
947 {
948 char *file;
949 int status;
950 char result[CMD_MAX];
952 status = buffer_get_field (&buffer, &buffer_size, &file);
953 if (status != 0)
954 {
955 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
956 }
957 else
958 {
959 pthread_mutex_lock(&stats_lock);
960 stats_flush_received++;
961 pthread_mutex_unlock(&stats_lock);
963 status = flush_file (file);
964 if (status == 0)
965 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
966 else if (status == ENOENT)
967 {
968 /* no file in our tree; see whether it exists at all */
969 struct stat statbuf;
971 memset(&statbuf, 0, sizeof(statbuf));
972 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
973 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
974 else
975 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
976 }
977 else if (status < 0)
978 strncpy (result, "-1 Internal error.\n", sizeof (result));
979 else
980 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
981 }
982 result[sizeof (result) - 1] = 0;
984 status = swrite (fd, result, strlen (result));
985 if (status < 0)
986 {
987 status = errno;
988 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
989 return (status);
990 }
992 return (0);
993 } /* }}} int handle_request_flush */
995 static int handle_request_update (int fd, /* {{{ */
996 char *buffer, size_t buffer_size)
997 {
998 char *file;
999 int values_num = 0;
1000 int status;
1002 time_t now;
1004 cache_item_t *ci;
1005 char answer[CMD_MAX];
1007 #define RRDD_UPDATE_SEND \
1008 answer[sizeof (answer) - 1] = 0; \
1009 status = swrite (fd, answer, strlen (answer)); \
1010 if (status < 0) \
1011 { \
1012 status = errno; \
1013 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1014 return (status); \
1015 }
1017 now = time (NULL);
1019 status = buffer_get_field (&buffer, &buffer_size, &file);
1020 if (status != 0)
1021 {
1022 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1023 sizeof (answer));
1024 RRDD_UPDATE_SEND;
1025 return (0);
1026 }
1028 pthread_mutex_lock(&stats_lock);
1029 stats_updates_received++;
1030 pthread_mutex_unlock(&stats_lock);
1032 pthread_mutex_lock (&cache_lock);
1033 ci = g_tree_lookup (cache_tree, file);
1035 if (ci == NULL) /* {{{ */
1036 {
1037 struct stat statbuf;
1039 /* don't hold the lock while we setup; stat(2) might block */
1040 pthread_mutex_unlock(&cache_lock);
1042 memset (&statbuf, 0, sizeof (statbuf));
1043 status = stat (file, &statbuf);
1044 if (status != 0)
1045 {
1046 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1048 status = errno;
1049 if (status == ENOENT)
1050 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1051 else
1052 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1053 status);
1054 RRDD_UPDATE_SEND;
1055 return (0);
1056 }
1057 if (!S_ISREG (statbuf.st_mode))
1058 {
1059 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1060 RRDD_UPDATE_SEND;
1061 return (0);
1062 }
1063 if (access(file, R_OK|W_OK) != 0)
1064 {
1065 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1066 file, rrd_strerror(errno));
1067 RRDD_UPDATE_SEND;
1068 return (0);
1069 }
1071 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1072 if (ci == NULL)
1073 {
1074 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1076 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1077 RRDD_UPDATE_SEND;
1078 return (0);
1079 }
1080 memset (ci, 0, sizeof (cache_item_t));
1082 ci->file = strdup (file);
1083 if (ci->file == NULL)
1084 {
1085 free (ci);
1086 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1088 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1089 RRDD_UPDATE_SEND;
1090 return (0);
1091 }
1093 _wipe_ci_values(ci, now);
1094 ci->flags = CI_FLAGS_IN_TREE;
1096 pthread_mutex_lock(&cache_lock);
1097 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1098 } /* }}} */
1099 assert (ci != NULL);
1101 while (buffer_size > 0)
1102 {
1103 char **temp;
1104 char *value;
1106 status = buffer_get_field (&buffer, &buffer_size, &value);
1107 if (status != 0)
1108 {
1109 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1110 break;
1111 }
1113 temp = (char **) realloc (ci->values,
1114 sizeof (char *) * (ci->values_num + 1));
1115 if (temp == NULL)
1116 {
1117 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1118 continue;
1119 }
1120 ci->values = temp;
1122 ci->values[ci->values_num] = strdup (value);
1123 if (ci->values[ci->values_num] == NULL)
1124 {
1125 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1126 continue;
1127 }
1128 ci->values_num++;
1130 values_num++;
1131 }
1133 if (((now - ci->last_flush_time) >= config_write_interval)
1134 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1135 && (ci->values_num > 0))
1136 {
1137 enqueue_cache_item (ci, TAIL);
1138 pthread_cond_signal (&cache_cond);
1139 }
1141 pthread_mutex_unlock (&cache_lock);
1143 if (values_num < 1)
1144 {
1145 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1146 }
1147 else
1148 {
1149 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1150 (values_num == 1) ? "" : "s");
1151 }
1152 RRDD_UPDATE_SEND;
1153 return (0);
1154 #undef RRDD_UPDATE_SEND
1155 } /* }}} int handle_request_update */
1157 /* we came across a "WROTE" entry during journal replay.
1158 * throw away any values that we have accumulated for this file
1159 */
1160 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1161 const char *buffer,
1162 size_t buffer_size __attribute__((unused)))
1163 {
1164 int i;
1165 cache_item_t *ci;
1166 const char *file = buffer;
1168 pthread_mutex_lock(&cache_lock);
1170 ci = g_tree_lookup(cache_tree, file);
1171 if (ci == NULL)
1172 {
1173 pthread_mutex_unlock(&cache_lock);
1174 return (0);
1175 }
1177 if (ci->values)
1178 {
1179 for (i=0; i < ci->values_num; i++)
1180 free(ci->values[i]);
1182 free(ci->values);
1183 }
1185 _wipe_ci_values(ci, time(NULL));
1187 pthread_mutex_unlock(&cache_lock);
1188 return (0);
1189 } /* }}} int handle_request_wrote */
1191 /* if fd < 0, we are in journal replay mode */
1192 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1193 {
1194 char *buffer_ptr;
1195 char *command;
1196 int status;
1198 assert (buffer[buffer_size - 1] == '\0');
1200 buffer_ptr = buffer;
1201 command = NULL;
1202 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1203 if (status != 0)
1204 {
1205 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1206 return (-1);
1207 }
1209 if (strcasecmp (command, "update") == 0)
1210 {
1211 /* don't re-write updates in replay mode */
1212 if (fd >= 0)
1213 journal_write(command, buffer_ptr);
1215 return (handle_request_update (fd, buffer_ptr, buffer_size));
1216 }
1217 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1218 {
1219 /* this is only valid in replay mode */
1220 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1221 }
1222 else if (strcasecmp (command, "flush") == 0)
1223 {
1224 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1225 }
1226 else if (strcasecmp (command, "stats") == 0)
1227 {
1228 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1229 }
1230 else if (strcasecmp (command, "help") == 0)
1231 {
1232 return (handle_request_help (fd, buffer_ptr, buffer_size));
1233 }
1234 else
1235 {
1236 char result[CMD_MAX];
1238 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1239 result[sizeof (result) - 1] = 0;
1241 status = swrite (fd, result, strlen (result));
1242 if (status < 0)
1243 {
1244 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1245 return (-1);
1246 }
1247 }
1249 return (0);
1250 } /* }}} int handle_request */
1252 /* MUST NOT hold journal_lock before calling this */
1253 static void journal_rotate(void) /* {{{ */
1254 {
1255 FILE *old_fh = NULL;
1257 if (journal_cur == NULL || journal_old == NULL)
1258 return;
1260 pthread_mutex_lock(&journal_lock);
1262 /* we rotate this way (rename before close) so that the we can release
1263 * the journal lock as fast as possible. Journal writes to the new
1264 * journal can proceed immediately after the new file is opened. The
1265 * fclose can then block without affecting new updates.
1266 */
1267 if (journal_fh != NULL)
1268 {
1269 old_fh = journal_fh;
1270 rename(journal_cur, journal_old);
1271 ++stats_journal_rotate;
1272 }
1274 journal_fh = fopen(journal_cur, "a");
1275 pthread_mutex_unlock(&journal_lock);
1277 if (old_fh != NULL)
1278 fclose(old_fh);
1280 if (journal_fh == NULL)
1281 RRDD_LOG(LOG_CRIT,
1282 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1283 journal_cur, rrd_strerror(errno));
1285 } /* }}} static void journal_rotate */
1287 static void journal_done(void) /* {{{ */
1288 {
1289 if (journal_cur == NULL)
1290 return;
1292 pthread_mutex_lock(&journal_lock);
1293 if (journal_fh != NULL)
1294 {
1295 fclose(journal_fh);
1296 journal_fh = NULL;
1297 }
1299 RRDD_LOG(LOG_INFO, "removing journals");
1301 unlink(journal_old);
1302 unlink(journal_cur);
1303 pthread_mutex_unlock(&journal_lock);
1305 } /* }}} static void journal_done */
1307 static int journal_write(char *cmd, char *args) /* {{{ */
1308 {
1309 int chars;
1311 if (journal_fh == NULL)
1312 return 0;
1314 pthread_mutex_lock(&journal_lock);
1315 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1316 pthread_mutex_unlock(&journal_lock);
1318 if (chars > 0)
1319 {
1320 pthread_mutex_lock(&stats_lock);
1321 stats_journal_bytes += chars;
1322 pthread_mutex_unlock(&stats_lock);
1323 }
1325 return chars;
1326 } /* }}} static int journal_write */
1328 static int journal_replay (const char *file) /* {{{ */
1329 {
1330 FILE *fh;
1331 int entry_cnt = 0;
1332 int fail_cnt = 0;
1333 uint64_t line = 0;
1334 char entry[CMD_MAX];
1336 if (file == NULL) return 0;
1338 fh = fopen(file, "r");
1339 if (fh == NULL)
1340 {
1341 if (errno != ENOENT)
1342 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1343 file, rrd_strerror(errno));
1344 return 0;
1345 }
1346 else
1347 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1349 while(!feof(fh))
1350 {
1351 size_t entry_len;
1353 ++line;
1354 fgets(entry, sizeof(entry), fh);
1355 entry_len = strlen(entry);
1357 /* check \n termination in case journal writing crashed mid-line */
1358 if (entry_len == 0)
1359 continue;
1360 else if (entry[entry_len - 1] != '\n')
1361 {
1362 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1363 ++fail_cnt;
1364 continue;
1365 }
1367 entry[entry_len - 1] = '\0';
1369 if (handle_request(-1, entry, entry_len) == 0)
1370 ++entry_cnt;
1371 else
1372 ++fail_cnt;
1373 }
1375 fclose(fh);
1377 if (entry_cnt > 0)
1378 {
1379 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1380 entry_cnt, fail_cnt);
1381 return 1;
1382 }
1383 else
1384 return 0;
1386 } /* }}} static int journal_replay */
1388 static void *connection_thread_main (void *args) /* {{{ */
1389 {
1390 pthread_t self;
1391 int i;
1392 int fd;
1394 fd = *((int *) args);
1395 free (args);
1397 pthread_mutex_lock (&connection_threads_lock);
1398 {
1399 pthread_t *temp;
1401 temp = (pthread_t *) realloc (connection_threads,
1402 sizeof (pthread_t) * (connection_threads_num + 1));
1403 if (temp == NULL)
1404 {
1405 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1406 }
1407 else
1408 {
1409 connection_threads = temp;
1410 connection_threads[connection_threads_num] = pthread_self ();
1411 connection_threads_num++;
1412 }
1413 }
1414 pthread_mutex_unlock (&connection_threads_lock);
1416 while (do_shutdown == 0)
1417 {
1418 char buffer[CMD_MAX];
1420 struct pollfd pollfd;
1421 int status;
1423 pollfd.fd = fd;
1424 pollfd.events = POLLIN | POLLPRI;
1425 pollfd.revents = 0;
1427 status = poll (&pollfd, 1, /* timeout = */ 500);
1428 if (status == 0) /* timeout */
1429 continue;
1430 else if (status < 0) /* error */
1431 {
1432 status = errno;
1433 if (status == EINTR)
1434 continue;
1435 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1436 continue;
1437 }
1439 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1440 {
1441 close (fd);
1442 break;
1443 }
1444 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1445 {
1446 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1447 "poll(2) returned something unexpected: %#04hx",
1448 pollfd.revents);
1449 close (fd);
1450 break;
1451 }
1453 status = (int) sread (fd, buffer, sizeof (buffer));
1454 if (status <= 0)
1455 {
1456 close (fd);
1458 if (status < 0)
1459 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1461 break;
1462 }
1464 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1465 if (status != 0)
1466 break;
1467 }
1469 close(fd);
1471 self = pthread_self ();
1472 /* Remove this thread from the connection threads list */
1473 pthread_mutex_lock (&connection_threads_lock);
1474 /* Find out own index in the array */
1475 for (i = 0; i < connection_threads_num; i++)
1476 if (pthread_equal (connection_threads[i], self) != 0)
1477 break;
1478 assert (i < connection_threads_num);
1480 /* Move the trailing threads forward. */
1481 if (i < (connection_threads_num - 1))
1482 {
1483 memmove (connection_threads + i,
1484 connection_threads + i + 1,
1485 sizeof (pthread_t) * (connection_threads_num - i - 1));
1486 }
1488 connection_threads_num--;
1489 pthread_mutex_unlock (&connection_threads_lock);
1491 return (NULL);
1492 } /* }}} void *connection_thread_main */
1494 static int open_listen_socket_unix (const char *path) /* {{{ */
1495 {
1496 int fd;
1497 struct sockaddr_un sa;
1498 listen_socket_t *temp;
1499 int status;
1501 temp = (listen_socket_t *) realloc (listen_fds,
1502 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1503 if (temp == NULL)
1504 {
1505 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1506 return (-1);
1507 }
1508 listen_fds = temp;
1509 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1511 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1512 if (fd < 0)
1513 {
1514 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1515 return (-1);
1516 }
1518 memset (&sa, 0, sizeof (sa));
1519 sa.sun_family = AF_UNIX;
1520 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1522 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1523 if (status != 0)
1524 {
1525 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1526 close (fd);
1527 unlink (path);
1528 return (-1);
1529 }
1531 status = listen (fd, /* backlog = */ 10);
1532 if (status != 0)
1533 {
1534 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1535 close (fd);
1536 unlink (path);
1537 return (-1);
1538 }
1540 listen_fds[listen_fds_num].fd = fd;
1541 snprintf (listen_fds[listen_fds_num].path,
1542 sizeof (listen_fds[listen_fds_num].path) - 1,
1543 "unix:%s", path);
1544 listen_fds_num++;
1546 return (0);
1547 } /* }}} int open_listen_socket_unix */
1549 static int open_listen_socket (const char *addr_orig) /* {{{ */
1550 {
1551 struct addrinfo ai_hints;
1552 struct addrinfo *ai_res;
1553 struct addrinfo *ai_ptr;
1554 char addr_copy[NI_MAXHOST];
1555 char *addr;
1556 char *port;
1557 int status;
1559 assert (addr_orig != NULL);
1561 strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1562 addr_copy[sizeof (addr_copy) - 1] = 0;
1563 addr = addr_copy;
1565 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1566 return (open_listen_socket_unix (addr + strlen ("unix:")));
1567 else if (addr[0] == '/')
1568 return (open_listen_socket_unix (addr));
1570 memset (&ai_hints, 0, sizeof (ai_hints));
1571 ai_hints.ai_flags = 0;
1572 #ifdef AI_ADDRCONFIG
1573 ai_hints.ai_flags |= AI_ADDRCONFIG;
1574 #endif
1575 ai_hints.ai_family = AF_UNSPEC;
1576 ai_hints.ai_socktype = SOCK_STREAM;
1578 port = NULL;
1579 if (*addr == '[') /* IPv6+port format */
1580 {
1581 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1582 addr++;
1584 port = strchr (addr, ']');
1585 if (port == NULL)
1586 {
1587 RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1588 addr_orig);
1589 return (-1);
1590 }
1591 *port = 0;
1592 port++;
1594 if (*port == ':')
1595 port++;
1596 else if (*port == 0)
1597 port = NULL;
1598 else
1599 {
1600 RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1601 port);
1602 return (-1);
1603 }
1604 } /* if (*addr = ']') */
1605 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1606 {
1607 port = rindex(addr, ':');
1608 if (port != NULL)
1609 {
1610 *port = 0;
1611 port++;
1612 }
1613 }
1614 ai_res = NULL;
1615 status = getaddrinfo (addr,
1616 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1617 &ai_hints, &ai_res);
1618 if (status != 0)
1619 {
1620 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1621 "%s", addr, gai_strerror (status));
1622 return (-1);
1623 }
1625 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1626 {
1627 int fd;
1628 listen_socket_t *temp;
1629 int one = 1;
1631 temp = (listen_socket_t *) realloc (listen_fds,
1632 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1633 if (temp == NULL)
1634 {
1635 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1636 continue;
1637 }
1638 listen_fds = temp;
1639 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1641 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1642 if (fd < 0)
1643 {
1644 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1645 continue;
1646 }
1648 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1650 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1651 if (status != 0)
1652 {
1653 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1654 close (fd);
1655 continue;
1656 }
1658 status = listen (fd, /* backlog = */ 10);
1659 if (status != 0)
1660 {
1661 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1662 close (fd);
1663 return (-1);
1664 }
1666 listen_fds[listen_fds_num].fd = fd;
1667 strncpy (listen_fds[listen_fds_num].path, addr,
1668 sizeof (listen_fds[listen_fds_num].path) - 1);
1669 listen_fds_num++;
1670 } /* for (ai_ptr) */
1672 return (0);
1673 } /* }}} int open_listen_socket */
1675 static int close_listen_sockets (void) /* {{{ */
1676 {
1677 size_t i;
1679 for (i = 0; i < listen_fds_num; i++)
1680 {
1681 close (listen_fds[i].fd);
1682 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1683 unlink (listen_fds[i].path + strlen ("unix:"));
1684 }
1686 free (listen_fds);
1687 listen_fds = NULL;
1688 listen_fds_num = 0;
1690 return (0);
1691 } /* }}} int close_listen_sockets */
1693 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1694 {
1695 struct pollfd *pollfds;
1696 int pollfds_num;
1697 int status;
1698 int i;
1700 for (i = 0; i < config_listen_address_list_len; i++)
1701 open_listen_socket (config_listen_address_list[i]);
1703 if (config_listen_address_list_len < 1)
1704 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1706 if (listen_fds_num < 1)
1707 {
1708 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1709 "could be opened. Sorry.");
1710 return (NULL);
1711 }
1713 pollfds_num = listen_fds_num;
1714 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1715 if (pollfds == NULL)
1716 {
1717 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1718 return (NULL);
1719 }
1720 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1722 RRDD_LOG(LOG_INFO, "listening for connections");
1724 while (do_shutdown == 0)
1725 {
1726 assert (pollfds_num == ((int) listen_fds_num));
1727 for (i = 0; i < pollfds_num; i++)
1728 {
1729 pollfds[i].fd = listen_fds[i].fd;
1730 pollfds[i].events = POLLIN | POLLPRI;
1731 pollfds[i].revents = 0;
1732 }
1734 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1735 if (status == 0)
1736 {
1737 continue; /* timeout */
1738 }
1739 else if (status < 0)
1740 {
1741 status = errno;
1742 if (status != EINTR)
1743 {
1744 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1745 }
1746 continue;
1747 }
1749 for (i = 0; i < pollfds_num; i++)
1750 {
1751 int *client_sd;
1752 struct sockaddr_storage client_sa;
1753 socklen_t client_sa_size;
1754 pthread_t tid;
1755 pthread_attr_t attr;
1757 if (pollfds[i].revents == 0)
1758 continue;
1760 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1761 {
1762 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1763 "poll(2) returned something unexpected for listen FD #%i.",
1764 pollfds[i].fd);
1765 continue;
1766 }
1768 client_sd = (int *) malloc (sizeof (int));
1769 if (client_sd == NULL)
1770 {
1771 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1772 continue;
1773 }
1775 client_sa_size = sizeof (client_sa);
1776 *client_sd = accept (pollfds[i].fd,
1777 (struct sockaddr *) &client_sa, &client_sa_size);
1778 if (*client_sd < 0)
1779 {
1780 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1781 continue;
1782 }
1784 pthread_attr_init (&attr);
1785 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1787 status = pthread_create (&tid, &attr, connection_thread_main,
1788 /* args = */ (void *) client_sd);
1789 if (status != 0)
1790 {
1791 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1792 close (*client_sd);
1793 free (client_sd);
1794 continue;
1795 }
1796 } /* for (pollfds_num) */
1797 } /* while (do_shutdown == 0) */
1799 RRDD_LOG(LOG_INFO, "starting shutdown");
1801 close_listen_sockets ();
1803 pthread_mutex_lock (&connection_threads_lock);
1804 while (connection_threads_num > 0)
1805 {
1806 pthread_t wait_for;
1808 wait_for = connection_threads[0];
1810 pthread_mutex_unlock (&connection_threads_lock);
1811 pthread_join (wait_for, /* retval = */ NULL);
1812 pthread_mutex_lock (&connection_threads_lock);
1813 }
1814 pthread_mutex_unlock (&connection_threads_lock);
1816 return (NULL);
1817 } /* }}} void *listen_thread_main */
1819 static int daemonize (void) /* {{{ */
1820 {
1821 int status;
1822 int fd;
1824 /* These structures are static, because `sigaction' behaves weird if the are
1825 * overwritten.. */
1826 static struct sigaction sa_int;
1827 static struct sigaction sa_term;
1828 static struct sigaction sa_pipe;
1830 fd = open_pidfile();
1831 if (fd < 0) return fd;
1833 if (!stay_foreground)
1834 {
1835 pid_t child;
1836 char *base_dir;
1838 child = fork ();
1839 if (child < 0)
1840 {
1841 fprintf (stderr, "daemonize: fork(2) failed.\n");
1842 return (-1);
1843 }
1844 else if (child > 0)
1845 {
1846 return (1);
1847 }
1849 /* Change into the /tmp directory. */
1850 base_dir = (config_base_dir != NULL)
1851 ? config_base_dir
1852 : "/tmp";
1853 status = chdir (base_dir);
1854 if (status != 0)
1855 {
1856 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1857 return (-1);
1858 }
1860 /* Become session leader */
1861 setsid ();
1863 /* Open the first three file descriptors to /dev/null */
1864 close (2);
1865 close (1);
1866 close (0);
1868 open ("/dev/null", O_RDWR);
1869 dup (0);
1870 dup (0);
1871 } /* if (!stay_foreground) */
1873 /* Install signal handlers */
1874 memset (&sa_int, 0, sizeof (sa_int));
1875 sa_int.sa_handler = sig_int_handler;
1876 sigaction (SIGINT, &sa_int, NULL);
1878 memset (&sa_term, 0, sizeof (sa_term));
1879 sa_term.sa_handler = sig_term_handler;
1880 sigaction (SIGTERM, &sa_term, NULL);
1882 memset (&sa_pipe, 0, sizeof (sa_pipe));
1883 sa_pipe.sa_handler = SIG_IGN;
1884 sigaction (SIGPIPE, &sa_pipe, NULL);
1886 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1887 RRDD_LOG(LOG_INFO, "starting up");
1889 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1890 if (cache_tree == NULL)
1891 {
1892 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1893 return (-1);
1894 }
1896 status = write_pidfile (fd);
1897 return status;
1898 } /* }}} int daemonize */
1900 static int cleanup (void) /* {{{ */
1901 {
1902 do_shutdown++;
1904 pthread_cond_signal (&cache_cond);
1905 pthread_join (queue_thread, /* return = */ NULL);
1907 remove_pidfile ();
1909 RRDD_LOG(LOG_INFO, "goodbye");
1910 closelog ();
1912 return (0);
1913 } /* }}} int cleanup */
1915 static int read_options (int argc, char **argv) /* {{{ */
1916 {
1917 int option;
1918 int status = 0;
1920 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1921 {
1922 switch (option)
1923 {
1924 case 'g':
1925 stay_foreground=1;
1926 break;
1928 case 'l':
1929 {
1930 char **temp;
1932 temp = (char **) realloc (config_listen_address_list,
1933 sizeof (char *) * (config_listen_address_list_len + 1));
1934 if (temp == NULL)
1935 {
1936 fprintf (stderr, "read_options: realloc failed.\n");
1937 return (2);
1938 }
1939 config_listen_address_list = temp;
1941 temp[config_listen_address_list_len] = strdup (optarg);
1942 if (temp[config_listen_address_list_len] == NULL)
1943 {
1944 fprintf (stderr, "read_options: strdup failed.\n");
1945 return (2);
1946 }
1947 config_listen_address_list_len++;
1948 }
1949 break;
1951 case 'f':
1952 {
1953 int temp;
1955 temp = atoi (optarg);
1956 if (temp > 0)
1957 config_flush_interval = temp;
1958 else
1959 {
1960 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1961 status = 3;
1962 }
1963 }
1964 break;
1966 case 'w':
1967 {
1968 int temp;
1970 temp = atoi (optarg);
1971 if (temp > 0)
1972 config_write_interval = temp;
1973 else
1974 {
1975 fprintf (stderr, "Invalid write interval: %s\n", optarg);
1976 status = 2;
1977 }
1978 }
1979 break;
1981 case 'z':
1982 {
1983 int temp;
1985 temp = atoi(optarg);
1986 if (temp > 0)
1987 config_write_jitter = temp;
1988 else
1989 {
1990 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1991 status = 2;
1992 }
1994 break;
1995 }
1997 case 'b':
1998 {
1999 size_t len;
2001 if (config_base_dir != NULL)
2002 free (config_base_dir);
2003 config_base_dir = strdup (optarg);
2004 if (config_base_dir == NULL)
2005 {
2006 fprintf (stderr, "read_options: strdup failed.\n");
2007 return (3);
2008 }
2010 len = strlen (config_base_dir);
2011 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2012 {
2013 config_base_dir[len - 1] = 0;
2014 len--;
2015 }
2017 if (len < 1)
2018 {
2019 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2020 return (4);
2021 }
2022 }
2023 break;
2025 case 'p':
2026 {
2027 if (config_pid_file != NULL)
2028 free (config_pid_file);
2029 config_pid_file = strdup (optarg);
2030 if (config_pid_file == NULL)
2031 {
2032 fprintf (stderr, "read_options: strdup failed.\n");
2033 return (3);
2034 }
2035 }
2036 break;
2038 case 'j':
2039 {
2040 struct stat statbuf;
2041 const char *dir = optarg;
2043 status = stat(dir, &statbuf);
2044 if (status != 0)
2045 {
2046 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2047 return 6;
2048 }
2050 if (!S_ISDIR(statbuf.st_mode)
2051 || access(dir, R_OK|W_OK|X_OK) != 0)
2052 {
2053 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2054 errno ? rrd_strerror(errno) : "");
2055 return 6;
2056 }
2058 journal_cur = malloc(PATH_MAX + 1);
2059 journal_old = malloc(PATH_MAX + 1);
2060 if (journal_cur == NULL || journal_old == NULL)
2061 {
2062 fprintf(stderr, "malloc failure for journal files\n");
2063 return 6;
2064 }
2065 else
2066 {
2067 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2068 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2069 }
2070 }
2071 break;
2073 case 'h':
2074 case '?':
2075 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2076 "\n"
2077 "Usage: rrdcached [options]\n"
2078 "\n"
2079 "Valid options are:\n"
2080 " -l <address> Socket address to listen to.\n"
2081 " -w <seconds> Interval in which to write data.\n"
2082 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2083 " -f <seconds> Interval in which to flush dead data.\n"
2084 " -p <file> Location of the PID-file.\n"
2085 " -b <dir> Base directory to change to.\n"
2086 " -g Do not fork and run in the foreground.\n"
2087 " -j <dir> Directory in which to create the journal files.\n"
2088 "\n"
2089 "For more information and a detailed description of all options "
2090 "please refer\n"
2091 "to the rrdcached(1) manual page.\n",
2092 VERSION);
2093 status = -1;
2094 break;
2095 } /* switch (option) */
2096 } /* while (getopt) */
2098 /* advise the user when values are not sane */
2099 if (config_flush_interval < 2 * config_write_interval)
2100 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2101 " 2x write interval (-w) !\n");
2102 if (config_write_jitter > config_write_interval)
2103 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2104 " write interval (-w) !\n");
2106 return (status);
2107 } /* }}} int read_options */
2109 int main (int argc, char **argv)
2110 {
2111 int status;
2113 status = read_options (argc, argv);
2114 if (status != 0)
2115 {
2116 if (status < 0)
2117 status = 0;
2118 return (status);
2119 }
2121 status = daemonize ();
2122 if (status == 1)
2123 {
2124 struct sigaction sigchld;
2126 memset (&sigchld, 0, sizeof (sigchld));
2127 sigchld.sa_handler = SIG_IGN;
2128 sigaction (SIGCHLD, &sigchld, NULL);
2130 return (0);
2131 }
2132 else if (status != 0)
2133 {
2134 fprintf (stderr, "daemonize failed, exiting.\n");
2135 return (1);
2136 }
2138 if (journal_cur != NULL)
2139 {
2140 int had_journal = 0;
2142 pthread_mutex_lock(&journal_lock);
2144 RRDD_LOG(LOG_INFO, "checking for journal files");
2146 had_journal += journal_replay(journal_old);
2147 had_journal += journal_replay(journal_cur);
2149 if (had_journal)
2150 flush_old_values(-1);
2152 pthread_mutex_unlock(&journal_lock);
2153 journal_rotate();
2155 RRDD_LOG(LOG_INFO, "journal processing complete");
2156 }
2158 /* start the queue thread */
2159 memset (&queue_thread, 0, sizeof (queue_thread));
2160 status = pthread_create (&queue_thread,
2161 NULL, /* attr */
2162 queue_thread_main,
2163 NULL); /* args */
2164 if (status != 0)
2165 {
2166 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2167 cleanup();
2168 return (1);
2169 }
2171 listen_thread_main (NULL);
2172 cleanup ();
2174 return (0);
2175 } /* int main */
2177 /*
2178 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2179 */