1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
170 static int config_write_interval = 300;
171 static int config_write_jitter = 0;
172 static int config_flush_interval = 3600;
173 static char *config_pid_file = NULL;
174 static char *config_base_dir = NULL;
176 static char **config_listen_address_list = NULL;
177 static int config_listen_address_list_len = 0;
179 static uint64_t stats_queue_length = 0;
180 static uint64_t stats_updates_received = 0;
181 static uint64_t stats_flush_received = 0;
182 static uint64_t stats_updates_written = 0;
183 static uint64_t stats_data_sets_written = 0;
184 static uint64_t stats_journal_bytes = 0;
185 static uint64_t stats_journal_rotate = 0;
186 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
188 /* Journaled updates */
189 static char *journal_cur = NULL;
190 static char *journal_old = NULL;
191 static FILE *journal_fh = NULL;
192 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
193 static int journal_write(char *cmd, char *args);
194 static void journal_done(void);
195 static void journal_rotate(void);
197 /*
198 * Functions
199 */
200 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
201 {
202 RRDD_LOG(LOG_NOTICE, "caught SIGINT");
203 do_shutdown++;
204 pthread_cond_broadcast(&cache_cond);
205 } /* }}} void sig_int_handler */
207 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
208 {
209 RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
210 do_shutdown++;
211 pthread_cond_broadcast(&cache_cond);
212 } /* }}} void sig_term_handler */
214 static int write_pidfile (void) /* {{{ */
215 {
216 pid_t pid;
217 char *file;
218 int fd;
219 FILE *fh;
221 pid = getpid ();
223 file = (config_pid_file != NULL)
224 ? config_pid_file
225 : LOCALSTATEDIR "/run/rrdcached.pid";
227 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
228 if (fd < 0)
229 {
230 RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
231 file, rrd_strerror(errno));
232 return (-1);
233 }
235 fh = fdopen (fd, "w");
236 if (fh == NULL)
237 {
238 RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
239 close(fd);
240 return (-1);
241 }
243 fprintf (fh, "%i\n", (int) pid);
244 fclose (fh);
246 return (0);
247 } /* }}} int write_pidfile */
249 static int remove_pidfile (void) /* {{{ */
250 {
251 char *file;
252 int status;
254 file = (config_pid_file != NULL)
255 ? config_pid_file
256 : LOCALSTATEDIR "/run/rrdcached.pid";
258 status = unlink (file);
259 if (status == 0)
260 return (0);
261 return (errno);
262 } /* }}} int remove_pidfile */
264 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
265 {
266 char *buffer;
267 size_t buffer_used;
268 size_t buffer_free;
269 ssize_t status;
271 buffer = (char *) buffer_void;
272 buffer_used = 0;
273 buffer_free = buffer_size;
275 while (buffer_free > 0)
276 {
277 status = read (fd, buffer + buffer_used, buffer_free);
278 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
279 continue;
281 if (status < 0)
282 return (-1);
284 if (status == 0)
285 return (0);
287 assert ((0 > status) || (buffer_free >= (size_t) status));
289 buffer_free = buffer_free - status;
290 buffer_used = buffer_used + status;
292 if (buffer[buffer_used - 1] == '\n')
293 break;
294 }
296 assert (buffer_used > 0);
298 if (buffer[buffer_used - 1] != '\n')
299 {
300 errno = ENOBUFS;
301 return (-1);
302 }
304 buffer[buffer_used - 1] = 0;
306 /* Fix network line endings. */
307 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
308 {
309 buffer_used--;
310 buffer[buffer_used - 1] = 0;
311 }
313 return (buffer_used);
314 } /* }}} ssize_t sread */
316 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
317 {
318 const char *ptr;
319 size_t nleft;
320 ssize_t status;
322 /* special case for journal replay */
323 if (fd < 0) return 0;
325 ptr = (const char *) buf;
326 nleft = count;
328 while (nleft > 0)
329 {
330 status = write (fd, (const void *) ptr, nleft);
332 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
333 continue;
335 if (status < 0)
336 return (status);
338 nleft -= status;
339 ptr += status;
340 }
342 return (0);
343 } /* }}} ssize_t swrite */
345 static void _wipe_ci_values(cache_item_t *ci, time_t when)
346 {
347 ci->values = NULL;
348 ci->values_num = 0;
350 ci->last_flush_time = when;
351 if (config_write_jitter > 0)
352 ci->last_flush_time += (random() % config_write_jitter);
354 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
355 }
357 /*
358 * enqueue_cache_item:
359 * `cache_lock' must be acquired before calling this function!
360 */
361 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
362 queue_side_t side)
363 {
364 int did_insert = 0;
366 if (ci == NULL)
367 return (-1);
369 if (ci->values_num == 0)
370 return (0);
372 if (side == HEAD)
373 {
374 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
375 {
376 assert (ci->next == NULL);
377 ci->next = cache_queue_head;
378 cache_queue_head = ci;
380 if (cache_queue_tail == NULL)
381 cache_queue_tail = cache_queue_head;
383 did_insert = 1;
384 }
385 else if (cache_queue_head == ci)
386 {
387 /* do nothing */
388 }
389 else /* enqueued, but not first entry */
390 {
391 cache_item_t *prev;
393 /* find previous entry */
394 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
395 if (prev->next == ci)
396 break;
397 assert (prev != NULL);
399 /* move to the front */
400 prev->next = ci->next;
401 ci->next = cache_queue_head;
402 cache_queue_head = ci;
404 /* check if we need to adapt the tail */
405 if (cache_queue_tail == ci)
406 cache_queue_tail = prev;
407 }
408 }
409 else /* (side == TAIL) */
410 {
411 /* We don't move values back in the list.. */
412 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
413 return (0);
415 assert (ci->next == NULL);
417 if (cache_queue_tail == NULL)
418 cache_queue_head = ci;
419 else
420 cache_queue_tail->next = ci;
421 cache_queue_tail = ci;
423 did_insert = 1;
424 }
426 ci->flags |= CI_FLAGS_IN_QUEUE;
428 if (did_insert)
429 {
430 pthread_mutex_lock (&stats_lock);
431 stats_queue_length++;
432 pthread_mutex_unlock (&stats_lock);
433 }
435 return (0);
436 } /* }}} int enqueue_cache_item */
438 /*
439 * tree_callback_flush:
440 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
441 * while this is in progress.
442 */
443 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
444 gpointer data)
445 {
446 cache_item_t *ci;
447 callback_flush_data_t *cfd;
449 ci = (cache_item_t *) value;
450 cfd = (callback_flush_data_t *) data;
452 if ((ci->last_flush_time <= cfd->abs_timeout)
453 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
454 && (ci->values_num > 0))
455 {
456 enqueue_cache_item (ci, TAIL);
457 }
458 else if ((do_shutdown != 0)
459 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
460 && (ci->values_num > 0))
461 {
462 enqueue_cache_item (ci, TAIL);
463 }
464 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
465 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
466 && (ci->values_num <= 0))
467 {
468 char **temp;
470 temp = (char **) realloc (cfd->keys,
471 sizeof (char *) * (cfd->keys_num + 1));
472 if (temp == NULL)
473 {
474 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
475 return (FALSE);
476 }
477 cfd->keys = temp;
478 /* Make really sure this points to the _same_ place */
479 assert ((char *) key == ci->file);
480 cfd->keys[cfd->keys_num] = (char *) key;
481 cfd->keys_num++;
482 }
484 return (FALSE);
485 } /* }}} gboolean tree_callback_flush */
487 static int flush_old_values (int max_age)
488 {
489 callback_flush_data_t cfd;
490 size_t k;
492 memset (&cfd, 0, sizeof (cfd));
493 /* Pass the current time as user data so that we don't need to call
494 * `time' for each node. */
495 cfd.now = time (NULL);
496 cfd.keys = NULL;
497 cfd.keys_num = 0;
499 if (max_age > 0)
500 cfd.abs_timeout = cfd.now - max_age;
501 else
502 cfd.abs_timeout = cfd.now + 1;
504 /* `tree_callback_flush' will return the keys of all values that haven't
505 * been touched in the last `config_flush_interval' seconds in `cfd'.
506 * The char*'s in this array point to the same memory as ci->file, so we
507 * don't need to free them separately. */
508 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
510 for (k = 0; k < cfd.keys_num; k++)
511 {
512 cache_item_t *ci;
514 /* This must not fail. */
515 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
516 assert (ci != NULL);
518 /* If we end up here with values available, something's seriously
519 * messed up. */
520 assert (ci->values_num == 0);
522 /* Remove the node from the tree */
523 g_tree_remove (cache_tree, cfd.keys[k]);
524 cfd.keys[k] = NULL;
526 /* Now free and clean up `ci'. */
527 free (ci->file);
528 ci->file = NULL;
529 free (ci);
530 ci = NULL;
531 } /* for (k = 0; k < cfd.keys_num; k++) */
533 if (cfd.keys != NULL)
534 {
535 free (cfd.keys);
536 cfd.keys = NULL;
537 }
539 return (0);
540 } /* int flush_old_values */
542 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
543 {
544 struct timeval now;
545 struct timespec next_flush;
547 gettimeofday (&now, NULL);
548 next_flush.tv_sec = now.tv_sec + config_flush_interval;
549 next_flush.tv_nsec = 1000 * now.tv_usec;
551 pthread_mutex_lock (&cache_lock);
552 while ((do_shutdown == 0) || (cache_queue_head != NULL))
553 {
554 cache_item_t *ci;
555 char *file;
556 char **values;
557 int values_num;
558 int status;
559 int i;
561 /* First, check if it's time to do the cache flush. */
562 gettimeofday (&now, NULL);
563 if ((now.tv_sec > next_flush.tv_sec)
564 || ((now.tv_sec == next_flush.tv_sec)
565 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
566 {
567 /* Flush all values that haven't been written in the last
568 * `config_write_interval' seconds. */
569 flush_old_values (config_write_interval);
571 /* Determine the time of the next cache flush. */
572 while (next_flush.tv_sec <= now.tv_sec)
573 next_flush.tv_sec += config_flush_interval;
575 /* unlock the cache while we rotate so we don't block incoming
576 * updates if the fsync() blocks on disk I/O */
577 pthread_mutex_unlock(&cache_lock);
578 journal_rotate();
579 pthread_mutex_lock(&cache_lock);
580 }
582 /* Now, check if there's something to store away. If not, wait until
583 * something comes in or it's time to do the cache flush. */
584 if (cache_queue_head == NULL)
585 {
586 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
587 if ((status != 0) && (status != ETIMEDOUT))
588 {
589 RRDD_LOG (LOG_ERR, "queue_thread_main: "
590 "pthread_cond_timedwait returned %i.", status);
591 }
592 }
594 /* We're about to shut down, so lets flush the entire tree. */
595 if ((do_shutdown != 0) && (cache_queue_head == NULL))
596 flush_old_values (/* max age = */ -1);
598 /* Check if a value has arrived. This may be NULL if we timed out or there
599 * was an interrupt such as a signal. */
600 if (cache_queue_head == NULL)
601 continue;
603 ci = cache_queue_head;
605 /* copy the relevant parts */
606 file = strdup (ci->file);
607 if (file == NULL)
608 {
609 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
610 continue;
611 }
613 assert(ci->values != NULL);
614 assert(ci->values_num > 0);
616 values = ci->values;
617 values_num = ci->values_num;
619 _wipe_ci_values(ci, time(NULL));
621 cache_queue_head = ci->next;
622 if (cache_queue_head == NULL)
623 cache_queue_tail = NULL;
624 ci->next = NULL;
626 pthread_mutex_lock (&stats_lock);
627 assert (stats_queue_length > 0);
628 stats_queue_length--;
629 pthread_mutex_unlock (&stats_lock);
631 pthread_mutex_unlock (&cache_lock);
633 rrd_clear_error ();
634 status = rrd_update_r (file, NULL, values_num, (void *) values);
635 if (status != 0)
636 {
637 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
638 "rrd_update_r (%s) failed with status %i. (%s)",
639 file, status, rrd_get_error());
640 }
642 journal_write("wrote", file);
644 for (i = 0; i < values_num; i++)
645 free (values[i]);
647 free(values);
648 free(file);
650 if (status == 0)
651 {
652 pthread_mutex_lock (&stats_lock);
653 stats_updates_written++;
654 stats_data_sets_written += values_num;
655 pthread_mutex_unlock (&stats_lock);
656 }
658 pthread_mutex_lock (&cache_lock);
659 pthread_cond_broadcast (&flush_cond);
661 /* We're about to shut down, so lets flush the entire tree. */
662 if ((do_shutdown != 0) && (cache_queue_head == NULL))
663 flush_old_values (/* max age = */ -1);
664 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
665 pthread_mutex_unlock (&cache_lock);
667 assert(cache_queue_head == NULL);
668 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
669 journal_done();
671 return (NULL);
672 } /* }}} void *queue_thread_main */
674 static int buffer_get_field (char **buffer_ret, /* {{{ */
675 size_t *buffer_size_ret, char **field_ret)
676 {
677 char *buffer;
678 size_t buffer_pos;
679 size_t buffer_size;
680 char *field;
681 size_t field_size;
682 int status;
684 buffer = *buffer_ret;
685 buffer_pos = 0;
686 buffer_size = *buffer_size_ret;
687 field = *buffer_ret;
688 field_size = 0;
690 if (buffer_size <= 0)
691 return (-1);
693 /* This is ensured by `handle_request'. */
694 assert (buffer[buffer_size - 1] == '\0');
696 status = -1;
697 while (buffer_pos < buffer_size)
698 {
699 /* Check for end-of-field or end-of-buffer */
700 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
701 {
702 field[field_size] = 0;
703 field_size++;
704 buffer_pos++;
705 status = 0;
706 break;
707 }
708 /* Handle escaped characters. */
709 else if (buffer[buffer_pos] == '\\')
710 {
711 if (buffer_pos >= (buffer_size - 1))
712 break;
713 buffer_pos++;
714 field[field_size] = buffer[buffer_pos];
715 field_size++;
716 buffer_pos++;
717 }
718 /* Normal operation */
719 else
720 {
721 field[field_size] = buffer[buffer_pos];
722 field_size++;
723 buffer_pos++;
724 }
725 } /* while (buffer_pos < buffer_size) */
727 if (status != 0)
728 return (status);
730 *buffer_ret = buffer + buffer_pos;
731 *buffer_size_ret = buffer_size - buffer_pos;
732 *field_ret = field;
734 return (0);
735 } /* }}} int buffer_get_field */
737 static int flush_file (const char *filename) /* {{{ */
738 {
739 cache_item_t *ci;
741 pthread_mutex_lock (&cache_lock);
743 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
744 if (ci == NULL)
745 {
746 pthread_mutex_unlock (&cache_lock);
747 return (ENOENT);
748 }
750 /* Enqueue at head */
751 enqueue_cache_item (ci, HEAD);
752 pthread_cond_signal (&cache_cond);
754 while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
755 {
756 ci = NULL;
758 pthread_cond_wait (&flush_cond, &cache_lock);
760 ci = g_tree_lookup (cache_tree, filename);
761 if (ci == NULL)
762 {
763 RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
764 "while waiting for flush.");
765 pthread_mutex_unlock (&cache_lock);
766 return (-1);
767 }
768 }
770 pthread_mutex_unlock (&cache_lock);
771 return (0);
772 } /* }}} int flush_file */
774 static int handle_request_help (int fd, /* {{{ */
775 char *buffer, size_t buffer_size)
776 {
777 int status;
778 char **help_text;
779 size_t help_text_len;
780 char *command;
781 size_t i;
783 char *help_help[] =
784 {
785 "4 Command overview\n",
786 "FLUSH <filename>\n",
787 "HELP [<command>]\n",
788 "UPDATE <filename> <values> [<values> ...]\n",
789 "STATS\n"
790 };
791 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
793 char *help_flush[] =
794 {
795 "4 Help for FLUSH\n",
796 "Usage: FLUSH <filename>\n",
797 "\n",
798 "Adds the given filename to the head of the update queue and returns\n",
799 "after is has been dequeued.\n"
800 };
801 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
803 char *help_update[] =
804 {
805 "9 Help for UPDATE\n",
806 "Usage: UPDATE <filename> <values> [<values> ...]\n"
807 "\n",
808 "Adds the given file to the internal cache if it is not yet known and\n",
809 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
810 "for details.\n",
811 "\n",
812 "Each <values> has the following form:\n",
813 " <values> = <time>:<value>[:<value>[...]]\n",
814 "See the rrdupdate(1) manpage for details.\n"
815 };
816 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
818 char *help_stats[] =
819 {
820 "4 Help for STATS\n",
821 "Usage: STATS\n",
822 "\n",
823 "Returns some performance counters, see the rrdcached(1) manpage for\n",
824 "a description of the values.\n"
825 };
826 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
828 status = buffer_get_field (&buffer, &buffer_size, &command);
829 if (status != 0)
830 {
831 help_text = help_help;
832 help_text_len = help_help_len;
833 }
834 else
835 {
836 if (strcasecmp (command, "update") == 0)
837 {
838 help_text = help_update;
839 help_text_len = help_update_len;
840 }
841 else if (strcasecmp (command, "flush") == 0)
842 {
843 help_text = help_flush;
844 help_text_len = help_flush_len;
845 }
846 else if (strcasecmp (command, "stats") == 0)
847 {
848 help_text = help_stats;
849 help_text_len = help_stats_len;
850 }
851 else
852 {
853 help_text = help_help;
854 help_text_len = help_help_len;
855 }
856 }
858 for (i = 0; i < help_text_len; i++)
859 {
860 status = swrite (fd, help_text[i], strlen (help_text[i]));
861 if (status < 0)
862 {
863 status = errno;
864 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
865 return (status);
866 }
867 }
869 return (0);
870 } /* }}} int handle_request_help */
872 static int handle_request_stats (int fd, /* {{{ */
873 char *buffer __attribute__((unused)),
874 size_t buffer_size __attribute__((unused)))
875 {
876 int status;
877 char outbuf[CMD_MAX];
879 uint64_t copy_queue_length;
880 uint64_t copy_updates_received;
881 uint64_t copy_flush_received;
882 uint64_t copy_updates_written;
883 uint64_t copy_data_sets_written;
884 uint64_t copy_journal_bytes;
885 uint64_t copy_journal_rotate;
887 uint64_t tree_nodes_number;
888 uint64_t tree_depth;
890 pthread_mutex_lock (&stats_lock);
891 copy_queue_length = stats_queue_length;
892 copy_updates_received = stats_updates_received;
893 copy_flush_received = stats_flush_received;
894 copy_updates_written = stats_updates_written;
895 copy_data_sets_written = stats_data_sets_written;
896 copy_journal_bytes = stats_journal_bytes;
897 copy_journal_rotate = stats_journal_rotate;
898 pthread_mutex_unlock (&stats_lock);
900 pthread_mutex_lock (&cache_lock);
901 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
902 tree_depth = (uint64_t) g_tree_height (cache_tree);
903 pthread_mutex_unlock (&cache_lock);
905 #define RRDD_STATS_SEND \
906 outbuf[sizeof (outbuf) - 1] = 0; \
907 status = swrite (fd, outbuf, strlen (outbuf)); \
908 if (status < 0) \
909 { \
910 status = errno; \
911 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
912 return (status); \
913 }
915 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
916 RRDD_STATS_SEND;
918 snprintf (outbuf, sizeof (outbuf),
919 "QueueLength: %"PRIu64"\n", copy_queue_length);
920 RRDD_STATS_SEND;
922 snprintf (outbuf, sizeof (outbuf),
923 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
924 RRDD_STATS_SEND;
926 snprintf (outbuf, sizeof (outbuf),
927 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
928 RRDD_STATS_SEND;
930 snprintf (outbuf, sizeof (outbuf),
931 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
932 RRDD_STATS_SEND;
934 snprintf (outbuf, sizeof (outbuf),
935 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
936 RRDD_STATS_SEND;
938 snprintf (outbuf, sizeof (outbuf),
939 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
940 RRDD_STATS_SEND;
942 snprintf (outbuf, sizeof (outbuf),
943 "TreeDepth: %"PRIu64"\n", tree_depth);
944 RRDD_STATS_SEND;
946 snprintf (outbuf, sizeof(outbuf),
947 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
948 RRDD_STATS_SEND;
950 snprintf (outbuf, sizeof(outbuf),
951 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
952 RRDD_STATS_SEND;
954 return (0);
955 #undef RRDD_STATS_SEND
956 } /* }}} int handle_request_stats */
958 static int handle_request_flush (int fd, /* {{{ */
959 char *buffer, size_t buffer_size)
960 {
961 char *file;
962 int status;
963 char result[CMD_MAX];
965 status = buffer_get_field (&buffer, &buffer_size, &file);
966 if (status != 0)
967 {
968 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
969 }
970 else
971 {
972 pthread_mutex_lock(&stats_lock);
973 stats_flush_received++;
974 pthread_mutex_unlock(&stats_lock);
976 status = flush_file (file);
977 if (status == 0)
978 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
979 else if (status == ENOENT)
980 {
981 /* no file in our tree; see whether it exists at all */
982 struct stat statbuf;
984 memset(&statbuf, 0, sizeof(statbuf));
985 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
986 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
987 else
988 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
989 }
990 else if (status < 0)
991 strncpy (result, "-1 Internal error.\n", sizeof (result));
992 else
993 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
994 }
995 result[sizeof (result) - 1] = 0;
997 status = swrite (fd, result, strlen (result));
998 if (status < 0)
999 {
1000 status = errno;
1001 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1002 return (status);
1003 }
1005 return (0);
1006 } /* }}} int handle_request_flush */
1008 static int handle_request_update (int fd, /* {{{ */
1009 char *buffer, size_t buffer_size)
1010 {
1011 char *file;
1012 int values_num = 0;
1013 int status;
1015 time_t now;
1017 cache_item_t *ci;
1018 char answer[CMD_MAX];
1020 #define RRDD_UPDATE_SEND \
1021 answer[sizeof (answer) - 1] = 0; \
1022 status = swrite (fd, answer, strlen (answer)); \
1023 if (status < 0) \
1024 { \
1025 status = errno; \
1026 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1027 return (status); \
1028 }
1030 now = time (NULL);
1032 status = buffer_get_field (&buffer, &buffer_size, &file);
1033 if (status != 0)
1034 {
1035 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1036 sizeof (answer));
1037 RRDD_UPDATE_SEND;
1038 return (0);
1039 }
1041 pthread_mutex_lock(&stats_lock);
1042 stats_updates_received++;
1043 pthread_mutex_unlock(&stats_lock);
1045 pthread_mutex_lock (&cache_lock);
1047 ci = g_tree_lookup (cache_tree, file);
1048 if (ci == NULL) /* {{{ */
1049 {
1050 struct stat statbuf;
1052 memset (&statbuf, 0, sizeof (statbuf));
1053 status = stat (file, &statbuf);
1054 if (status != 0)
1055 {
1056 pthread_mutex_unlock (&cache_lock);
1057 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1059 status = errno;
1060 if (status == ENOENT)
1061 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1062 else
1063 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1064 status);
1065 RRDD_UPDATE_SEND;
1066 return (0);
1067 }
1068 if (!S_ISREG (statbuf.st_mode))
1069 {
1070 pthread_mutex_unlock (&cache_lock);
1072 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1073 RRDD_UPDATE_SEND;
1074 return (0);
1075 }
1076 if (access(file, R_OK|W_OK) != 0)
1077 {
1078 pthread_mutex_unlock (&cache_lock);
1080 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1081 file, rrd_strerror(errno));
1082 RRDD_UPDATE_SEND;
1083 return (0);
1084 }
1086 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1087 if (ci == NULL)
1088 {
1089 pthread_mutex_unlock (&cache_lock);
1090 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1092 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1093 RRDD_UPDATE_SEND;
1094 return (0);
1095 }
1096 memset (ci, 0, sizeof (cache_item_t));
1098 ci->file = strdup (file);
1099 if (ci->file == NULL)
1100 {
1101 pthread_mutex_unlock (&cache_lock);
1102 free (ci);
1103 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1105 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1106 RRDD_UPDATE_SEND;
1107 return (0);
1108 }
1110 _wipe_ci_values(ci, now);
1111 ci->flags = CI_FLAGS_IN_TREE;
1113 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1114 } /* }}} */
1115 assert (ci != NULL);
1117 while (buffer_size > 0)
1118 {
1119 char **temp;
1120 char *value;
1122 status = buffer_get_field (&buffer, &buffer_size, &value);
1123 if (status != 0)
1124 {
1125 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1126 break;
1127 }
1129 temp = (char **) realloc (ci->values,
1130 sizeof (char *) * (ci->values_num + 1));
1131 if (temp == NULL)
1132 {
1133 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1134 continue;
1135 }
1136 ci->values = temp;
1138 ci->values[ci->values_num] = strdup (value);
1139 if (ci->values[ci->values_num] == NULL)
1140 {
1141 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1142 continue;
1143 }
1144 ci->values_num++;
1146 values_num++;
1147 }
1149 if (((now - ci->last_flush_time) >= config_write_interval)
1150 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1151 && (ci->values_num > 0))
1152 {
1153 enqueue_cache_item (ci, TAIL);
1154 pthread_cond_signal (&cache_cond);
1155 }
1157 pthread_mutex_unlock (&cache_lock);
1159 if (values_num < 1)
1160 {
1161 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1162 }
1163 else
1164 {
1165 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1166 (values_num == 1) ? "" : "s");
1167 }
1168 RRDD_UPDATE_SEND;
1169 return (0);
1170 #undef RRDD_UPDATE_SEND
1171 } /* }}} int handle_request_update */
1173 /* we came across a "WROTE" entry during journal replay.
1174 * throw away any values that we have accumulated for this file
1175 */
1176 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1177 const char *buffer,
1178 size_t buffer_size __attribute__((unused)))
1179 {
1180 int i;
1181 cache_item_t *ci;
1182 const char *file = buffer;
1184 pthread_mutex_lock(&cache_lock);
1186 ci = g_tree_lookup(cache_tree, file);
1187 if (ci == NULL)
1188 {
1189 pthread_mutex_unlock(&cache_lock);
1190 return (0);
1191 }
1193 if (ci->values)
1194 {
1195 for (i=0; i < ci->values_num; i++)
1196 free(ci->values[i]);
1198 free(ci->values);
1199 }
1201 _wipe_ci_values(ci, time(NULL));
1203 pthread_mutex_unlock(&cache_lock);
1204 return (0);
1205 } /* }}} int handle_request_wrote */
1207 /* if fd < 0, we are in journal replay mode */
1208 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1209 {
1210 char *buffer_ptr;
1211 char *command;
1212 int status;
1214 assert (buffer[buffer_size - 1] == '\0');
1216 buffer_ptr = buffer;
1217 command = NULL;
1218 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1219 if (status != 0)
1220 {
1221 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1222 return (-1);
1223 }
1225 if (strcasecmp (command, "update") == 0)
1226 {
1227 /* don't re-write updates in replay mode */
1228 if (fd >= 0)
1229 journal_write(command, buffer_ptr);
1231 return (handle_request_update (fd, buffer_ptr, buffer_size));
1232 }
1233 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1234 {
1235 /* this is only valid in replay mode */
1236 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1237 }
1238 else if (strcasecmp (command, "flush") == 0)
1239 {
1240 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1241 }
1242 else if (strcasecmp (command, "stats") == 0)
1243 {
1244 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1245 }
1246 else if (strcasecmp (command, "help") == 0)
1247 {
1248 return (handle_request_help (fd, buffer_ptr, buffer_size));
1249 }
1250 else
1251 {
1252 char result[CMD_MAX];
1254 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1255 result[sizeof (result) - 1] = 0;
1257 status = swrite (fd, result, strlen (result));
1258 if (status < 0)
1259 {
1260 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1261 return (-1);
1262 }
1263 }
1265 return (0);
1266 } /* }}} int handle_request */
1268 /* MUST NOT hold journal_lock before calling this */
1269 static void journal_rotate(void) /* {{{ */
1270 {
1271 FILE *old_fh = NULL;
1273 if (journal_cur == NULL || journal_old == NULL)
1274 return;
1276 pthread_mutex_lock(&journal_lock);
1278 /* we rotate this way (rename before close) so that the we can release
1279 * the journal lock as fast as possible. Journal writes to the new
1280 * journal can proceed immediately after the new file is opened. The
1281 * fclose can then block without affecting new updates.
1282 */
1283 if (journal_fh != NULL)
1284 {
1285 old_fh = journal_fh;
1286 rename(journal_cur, journal_old);
1287 ++stats_journal_rotate;
1288 }
1290 journal_fh = fopen(journal_cur, "a");
1291 pthread_mutex_unlock(&journal_lock);
1293 if (old_fh != NULL)
1294 fclose(old_fh);
1296 if (journal_fh == NULL)
1297 RRDD_LOG(LOG_CRIT,
1298 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1299 journal_cur, rrd_strerror(errno));
1301 } /* }}} static void journal_rotate */
1303 static void journal_done(void) /* {{{ */
1304 {
1305 if (journal_cur == NULL)
1306 return;
1308 pthread_mutex_lock(&journal_lock);
1309 if (journal_fh != NULL)
1310 {
1311 fclose(journal_fh);
1312 journal_fh = NULL;
1313 }
1315 RRDD_LOG(LOG_INFO, "removing journals");
1317 unlink(journal_old);
1318 unlink(journal_cur);
1319 pthread_mutex_unlock(&journal_lock);
1321 } /* }}} static void journal_done */
1323 static int journal_write(char *cmd, char *args) /* {{{ */
1324 {
1325 int chars;
1327 if (journal_fh == NULL)
1328 return 0;
1330 pthread_mutex_lock(&journal_lock);
1331 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1332 pthread_mutex_unlock(&journal_lock);
1334 if (chars > 0)
1335 {
1336 pthread_mutex_lock(&stats_lock);
1337 stats_journal_bytes += chars;
1338 pthread_mutex_unlock(&stats_lock);
1339 }
1341 return chars;
1342 } /* }}} static int journal_write */
1344 static int journal_replay (const char *file) /* {{{ */
1345 {
1346 FILE *fh;
1347 int entry_cnt = 0;
1348 int fail_cnt = 0;
1349 uint64_t line = 0;
1350 char entry[CMD_MAX];
1352 if (file == NULL) return 0;
1354 fh = fopen(file, "r");
1355 if (fh == NULL)
1356 {
1357 if (errno != ENOENT)
1358 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1359 file, rrd_strerror(errno));
1360 return 0;
1361 }
1362 else
1363 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1365 while(!feof(fh))
1366 {
1367 size_t entry_len;
1369 ++line;
1370 fgets(entry, sizeof(entry), fh);
1371 entry_len = strlen(entry);
1373 /* check \n termination in case journal writing crashed mid-line */
1374 if (entry_len == 0)
1375 continue;
1376 else if (entry[entry_len - 1] != '\n')
1377 {
1378 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1379 ++fail_cnt;
1380 continue;
1381 }
1383 entry[entry_len - 1] = '\0';
1385 if (handle_request(-1, entry, entry_len) == 0)
1386 ++entry_cnt;
1387 else
1388 ++fail_cnt;
1389 }
1391 fclose(fh);
1393 if (entry_cnt > 0)
1394 {
1395 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1396 entry_cnt, fail_cnt);
1397 return 1;
1398 }
1399 else
1400 return 0;
1402 } /* }}} static int journal_replay */
1404 static void *connection_thread_main (void *args) /* {{{ */
1405 {
1406 pthread_t self;
1407 int i;
1408 int fd;
1410 fd = *((int *) args);
1411 free (args);
1413 pthread_mutex_lock (&connection_threads_lock);
1414 {
1415 pthread_t *temp;
1417 temp = (pthread_t *) realloc (connection_threads,
1418 sizeof (pthread_t) * (connection_threads_num + 1));
1419 if (temp == NULL)
1420 {
1421 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1422 }
1423 else
1424 {
1425 connection_threads = temp;
1426 connection_threads[connection_threads_num] = pthread_self ();
1427 connection_threads_num++;
1428 }
1429 }
1430 pthread_mutex_unlock (&connection_threads_lock);
1432 while (do_shutdown == 0)
1433 {
1434 char buffer[CMD_MAX];
1436 struct pollfd pollfd;
1437 int status;
1439 pollfd.fd = fd;
1440 pollfd.events = POLLIN | POLLPRI;
1441 pollfd.revents = 0;
1443 status = poll (&pollfd, 1, /* timeout = */ 500);
1444 if (status == 0) /* timeout */
1445 continue;
1446 else if (status < 0) /* error */
1447 {
1448 status = errno;
1449 if (status == EINTR)
1450 continue;
1451 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1452 continue;
1453 }
1455 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1456 {
1457 close (fd);
1458 break;
1459 }
1460 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1461 {
1462 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1463 "poll(2) returned something unexpected: %#04hx",
1464 pollfd.revents);
1465 close (fd);
1466 break;
1467 }
1469 status = (int) sread (fd, buffer, sizeof (buffer));
1470 if (status <= 0)
1471 {
1472 close (fd);
1474 if (status < 0)
1475 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1477 break;
1478 }
1480 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1481 if (status != 0)
1482 {
1483 close (fd);
1484 break;
1485 }
1486 }
1488 self = pthread_self ();
1489 /* Remove this thread from the connection threads list */
1490 pthread_mutex_lock (&connection_threads_lock);
1491 /* Find out own index in the array */
1492 for (i = 0; i < connection_threads_num; i++)
1493 if (pthread_equal (connection_threads[i], self) != 0)
1494 break;
1495 assert (i < connection_threads_num);
1497 /* Move the trailing threads forward. */
1498 if (i < (connection_threads_num - 1))
1499 {
1500 memmove (connection_threads + i,
1501 connection_threads + i + 1,
1502 sizeof (pthread_t) * (connection_threads_num - i - 1));
1503 }
1505 connection_threads_num--;
1506 pthread_mutex_unlock (&connection_threads_lock);
1508 return (NULL);
1509 } /* }}} void *connection_thread_main */
1511 static int open_listen_socket_unix (const char *path) /* {{{ */
1512 {
1513 int fd;
1514 struct sockaddr_un sa;
1515 listen_socket_t *temp;
1516 int status;
1518 temp = (listen_socket_t *) realloc (listen_fds,
1519 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1520 if (temp == NULL)
1521 {
1522 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1523 return (-1);
1524 }
1525 listen_fds = temp;
1526 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1528 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1529 if (fd < 0)
1530 {
1531 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1532 return (-1);
1533 }
1535 memset (&sa, 0, sizeof (sa));
1536 sa.sun_family = AF_UNIX;
1537 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1539 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1540 if (status != 0)
1541 {
1542 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1543 close (fd);
1544 unlink (path);
1545 return (-1);
1546 }
1548 status = listen (fd, /* backlog = */ 10);
1549 if (status != 0)
1550 {
1551 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1552 close (fd);
1553 unlink (path);
1554 return (-1);
1555 }
1557 listen_fds[listen_fds_num].fd = fd;
1558 snprintf (listen_fds[listen_fds_num].path,
1559 sizeof (listen_fds[listen_fds_num].path) - 1,
1560 "unix:%s", path);
1561 listen_fds_num++;
1563 return (0);
1564 } /* }}} int open_listen_socket_unix */
1566 static int open_listen_socket (const char *addr_orig) /* {{{ */
1567 {
1568 struct addrinfo ai_hints;
1569 struct addrinfo *ai_res;
1570 struct addrinfo *ai_ptr;
1571 char addr_copy[NI_MAXHOST];
1572 char *addr;
1573 char *port;
1574 int status;
1576 assert (addr_orig != NULL);
1578 strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1579 addr_copy[sizeof (addr_copy) - 1] = 0;
1580 addr = addr_copy;
1582 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1583 return (open_listen_socket_unix (addr + strlen ("unix:")));
1584 else if (addr[0] == '/')
1585 return (open_listen_socket_unix (addr));
1587 memset (&ai_hints, 0, sizeof (ai_hints));
1588 ai_hints.ai_flags = 0;
1589 #ifdef AI_ADDRCONFIG
1590 ai_hints.ai_flags |= AI_ADDRCONFIG;
1591 #endif
1592 ai_hints.ai_family = AF_UNSPEC;
1593 ai_hints.ai_socktype = SOCK_STREAM;
1595 port = NULL;
1596 if (*addr == '[') /* IPv6+port format */
1597 {
1598 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1599 addr++;
1601 port = strchr (addr, ']');
1602 if (port == NULL)
1603 {
1604 RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1605 addr_orig);
1606 return (-1);
1607 }
1608 *port = 0;
1609 port++;
1611 if (*port == ':')
1612 port++;
1613 else if (*port == 0)
1614 port = NULL;
1615 else
1616 {
1617 RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1618 port);
1619 return (-1);
1620 }
1621 } /* if (*addr = ']') */
1622 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1623 {
1624 port = rindex(addr, ':');
1625 if (port != NULL)
1626 {
1627 *port = 0;
1628 port++;
1629 }
1630 }
1631 ai_res = NULL;
1632 status = getaddrinfo (addr,
1633 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1634 &ai_hints, &ai_res);
1635 if (status != 0)
1636 {
1637 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1638 "%s", addr, gai_strerror (status));
1639 return (-1);
1640 }
1642 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1643 {
1644 int fd;
1645 listen_socket_t *temp;
1646 int one = 1;
1648 temp = (listen_socket_t *) realloc (listen_fds,
1649 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1650 if (temp == NULL)
1651 {
1652 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1653 continue;
1654 }
1655 listen_fds = temp;
1656 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1658 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1659 if (fd < 0)
1660 {
1661 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1662 continue;
1663 }
1665 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1667 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1668 if (status != 0)
1669 {
1670 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1671 close (fd);
1672 continue;
1673 }
1675 status = listen (fd, /* backlog = */ 10);
1676 if (status != 0)
1677 {
1678 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1679 close (fd);
1680 return (-1);
1681 }
1683 listen_fds[listen_fds_num].fd = fd;
1684 strncpy (listen_fds[listen_fds_num].path, addr,
1685 sizeof (listen_fds[listen_fds_num].path) - 1);
1686 listen_fds_num++;
1687 } /* for (ai_ptr) */
1689 return (0);
1690 } /* }}} int open_listen_socket */
1692 static int close_listen_sockets (void) /* {{{ */
1693 {
1694 size_t i;
1696 for (i = 0; i < listen_fds_num; i++)
1697 {
1698 close (listen_fds[i].fd);
1699 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1700 unlink (listen_fds[i].path + strlen ("unix:"));
1701 }
1703 free (listen_fds);
1704 listen_fds = NULL;
1705 listen_fds_num = 0;
1707 return (0);
1708 } /* }}} int close_listen_sockets */
1710 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1711 {
1712 struct pollfd *pollfds;
1713 int pollfds_num;
1714 int status;
1715 int i;
1717 for (i = 0; i < config_listen_address_list_len; i++)
1718 open_listen_socket (config_listen_address_list[i]);
1720 if (config_listen_address_list_len < 1)
1721 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1723 if (listen_fds_num < 1)
1724 {
1725 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1726 "could be opened. Sorry.");
1727 return (NULL);
1728 }
1730 pollfds_num = listen_fds_num;
1731 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1732 if (pollfds == NULL)
1733 {
1734 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1735 return (NULL);
1736 }
1737 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1739 RRDD_LOG(LOG_INFO, "listening for connections");
1741 while (do_shutdown == 0)
1742 {
1743 assert (pollfds_num == ((int) listen_fds_num));
1744 for (i = 0; i < pollfds_num; i++)
1745 {
1746 pollfds[i].fd = listen_fds[i].fd;
1747 pollfds[i].events = POLLIN | POLLPRI;
1748 pollfds[i].revents = 0;
1749 }
1751 status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1752 if (status < 1)
1753 {
1754 status = errno;
1755 if (status != EINTR)
1756 {
1757 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1758 }
1759 continue;
1760 }
1762 for (i = 0; i < pollfds_num; i++)
1763 {
1764 int *client_sd;
1765 struct sockaddr_storage client_sa;
1766 socklen_t client_sa_size;
1767 pthread_t tid;
1768 pthread_attr_t attr;
1770 if (pollfds[i].revents == 0)
1771 continue;
1773 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1774 {
1775 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1776 "poll(2) returned something unexpected for listen FD #%i.",
1777 pollfds[i].fd);
1778 continue;
1779 }
1781 client_sd = (int *) malloc (sizeof (int));
1782 if (client_sd == NULL)
1783 {
1784 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1785 continue;
1786 }
1788 client_sa_size = sizeof (client_sa);
1789 *client_sd = accept (pollfds[i].fd,
1790 (struct sockaddr *) &client_sa, &client_sa_size);
1791 if (*client_sd < 0)
1792 {
1793 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1794 continue;
1795 }
1797 pthread_attr_init (&attr);
1798 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1800 status = pthread_create (&tid, &attr, connection_thread_main,
1801 /* args = */ (void *) client_sd);
1802 if (status != 0)
1803 {
1804 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1805 close (*client_sd);
1806 free (client_sd);
1807 continue;
1808 }
1809 } /* for (pollfds_num) */
1810 } /* while (do_shutdown == 0) */
1812 RRDD_LOG(LOG_INFO, "starting shutdown");
1814 close_listen_sockets ();
1816 pthread_mutex_lock (&connection_threads_lock);
1817 while (connection_threads_num > 0)
1818 {
1819 pthread_t wait_for;
1821 wait_for = connection_threads[0];
1823 pthread_mutex_unlock (&connection_threads_lock);
1824 pthread_join (wait_for, /* retval = */ NULL);
1825 pthread_mutex_lock (&connection_threads_lock);
1826 }
1827 pthread_mutex_unlock (&connection_threads_lock);
1829 return (NULL);
1830 } /* }}} void *listen_thread_main */
1832 static int daemonize (void) /* {{{ */
1833 {
1834 int status;
1836 /* These structures are static, because `sigaction' behaves weird if the are
1837 * overwritten.. */
1838 static struct sigaction sa_int;
1839 static struct sigaction sa_term;
1840 static struct sigaction sa_pipe;
1842 if (!stay_foreground)
1843 {
1844 pid_t child;
1845 char *base_dir;
1847 child = fork ();
1848 if (child < 0)
1849 {
1850 fprintf (stderr, "daemonize: fork(2) failed.\n");
1851 return (-1);
1852 }
1853 else if (child > 0)
1854 {
1855 return (1);
1856 }
1858 /* Change into the /tmp directory. */
1859 base_dir = (config_base_dir != NULL)
1860 ? config_base_dir
1861 : "/tmp";
1862 status = chdir (base_dir);
1863 if (status != 0)
1864 {
1865 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1866 return (-1);
1867 }
1869 /* Become session leader */
1870 setsid ();
1872 /* Open the first three file descriptors to /dev/null */
1873 close (2);
1874 close (1);
1875 close (0);
1877 open ("/dev/null", O_RDWR);
1878 dup (0);
1879 dup (0);
1880 } /* if (!stay_foreground) */
1882 /* Install signal handlers */
1883 memset (&sa_int, 0, sizeof (sa_int));
1884 sa_int.sa_handler = sig_int_handler;
1885 sigaction (SIGINT, &sa_int, NULL);
1887 memset (&sa_term, 0, sizeof (sa_term));
1888 sa_term.sa_handler = sig_term_handler;
1889 sigaction (SIGTERM, &sa_term, NULL);
1891 memset (&sa_pipe, 0, sizeof (sa_pipe));
1892 sa_pipe.sa_handler = SIG_IGN;
1893 sigaction (SIGPIPE, &sa_pipe, NULL);
1895 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1896 RRDD_LOG(LOG_INFO, "starting up");
1898 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1899 if (cache_tree == NULL)
1900 {
1901 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1902 return (-1);
1903 }
1905 status = write_pidfile ();
1906 return status;
1907 } /* }}} int daemonize */
1909 static int cleanup (void) /* {{{ */
1910 {
1911 do_shutdown++;
1913 pthread_cond_signal (&cache_cond);
1914 pthread_join (queue_thread, /* return = */ NULL);
1916 remove_pidfile ();
1918 RRDD_LOG(LOG_INFO, "goodbye");
1919 closelog ();
1921 return (0);
1922 } /* }}} int cleanup */
1924 static int read_options (int argc, char **argv) /* {{{ */
1925 {
1926 int option;
1927 int status = 0;
1929 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1930 {
1931 switch (option)
1932 {
1933 case 'g':
1934 stay_foreground=1;
1935 break;
1937 case 'l':
1938 {
1939 char **temp;
1941 temp = (char **) realloc (config_listen_address_list,
1942 sizeof (char *) * (config_listen_address_list_len + 1));
1943 if (temp == NULL)
1944 {
1945 fprintf (stderr, "read_options: realloc failed.\n");
1946 return (2);
1947 }
1948 config_listen_address_list = temp;
1950 temp[config_listen_address_list_len] = strdup (optarg);
1951 if (temp[config_listen_address_list_len] == NULL)
1952 {
1953 fprintf (stderr, "read_options: strdup failed.\n");
1954 return (2);
1955 }
1956 config_listen_address_list_len++;
1957 }
1958 break;
1960 case 'f':
1961 {
1962 int temp;
1964 temp = atoi (optarg);
1965 if (temp > 0)
1966 config_flush_interval = temp;
1967 else
1968 {
1969 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1970 status = 3;
1971 }
1972 }
1973 break;
1975 case 'w':
1976 {
1977 int temp;
1979 temp = atoi (optarg);
1980 if (temp > 0)
1981 config_write_interval = temp;
1982 else
1983 {
1984 fprintf (stderr, "Invalid write interval: %s\n", optarg);
1985 status = 2;
1986 }
1987 }
1988 break;
1990 case 'z':
1991 {
1992 int temp;
1994 temp = atoi(optarg);
1995 if (temp > 0)
1996 config_write_jitter = temp;
1997 else
1998 {
1999 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2000 status = 2;
2001 }
2003 break;
2004 }
2006 case 'b':
2007 {
2008 size_t len;
2010 if (config_base_dir != NULL)
2011 free (config_base_dir);
2012 config_base_dir = strdup (optarg);
2013 if (config_base_dir == NULL)
2014 {
2015 fprintf (stderr, "read_options: strdup failed.\n");
2016 return (3);
2017 }
2019 len = strlen (config_base_dir);
2020 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2021 {
2022 config_base_dir[len - 1] = 0;
2023 len--;
2024 }
2026 if (len < 1)
2027 {
2028 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2029 return (4);
2030 }
2031 }
2032 break;
2034 case 'p':
2035 {
2036 if (config_pid_file != NULL)
2037 free (config_pid_file);
2038 config_pid_file = strdup (optarg);
2039 if (config_pid_file == NULL)
2040 {
2041 fprintf (stderr, "read_options: strdup failed.\n");
2042 return (3);
2043 }
2044 }
2045 break;
2047 case 'j':
2048 {
2049 struct stat statbuf;
2050 const char *dir = optarg;
2052 status = stat(dir, &statbuf);
2053 if (status != 0)
2054 {
2055 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2056 return 6;
2057 }
2059 if (!S_ISDIR(statbuf.st_mode)
2060 || access(dir, R_OK|W_OK|X_OK) != 0)
2061 {
2062 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2063 errno ? rrd_strerror(errno) : "");
2064 return 6;
2065 }
2067 journal_cur = malloc(PATH_MAX + 1);
2068 journal_old = malloc(PATH_MAX + 1);
2069 if (journal_cur == NULL || journal_old == NULL)
2070 {
2071 fprintf(stderr, "malloc failure for journal files\n");
2072 return 6;
2073 }
2074 else
2075 {
2076 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2077 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2078 }
2079 }
2080 break;
2082 case 'h':
2083 case '?':
2084 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2085 "\n"
2086 "Usage: rrdcached [options]\n"
2087 "\n"
2088 "Valid options are:\n"
2089 " -l <address> Socket address to listen to.\n"
2090 " -w <seconds> Interval in which to write data.\n"
2091 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2092 " -f <seconds> Interval in which to flush dead data.\n"
2093 " -p <file> Location of the PID-file.\n"
2094 " -b <dir> Base directory to change to.\n"
2095 " -g Do not fork and run in the foreground.\n"
2096 " -j <dir> Directory in which to create the journal files.\n"
2097 "\n"
2098 "For more information and a detailed description of all options "
2099 "please refer\n"
2100 "to the rrdcached(1) manual page.\n",
2101 VERSION);
2102 status = -1;
2103 break;
2104 } /* switch (option) */
2105 } /* while (getopt) */
2107 /* advise the user when values are not sane */
2108 if (config_flush_interval < 2 * config_write_interval)
2109 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2110 " 2x write interval (-w) !\n");
2111 if (config_write_jitter > config_write_interval)
2112 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2113 " write interval (-w) !\n");
2115 return (status);
2116 } /* }}} int read_options */
2118 int main (int argc, char **argv)
2119 {
2120 int status;
2122 status = read_options (argc, argv);
2123 if (status != 0)
2124 {
2125 if (status < 0)
2126 status = 0;
2127 return (status);
2128 }
2130 status = daemonize ();
2131 if (status == 1)
2132 {
2133 struct sigaction sigchld;
2135 memset (&sigchld, 0, sizeof (sigchld));
2136 sigchld.sa_handler = SIG_IGN;
2137 sigaction (SIGCHLD, &sigchld, NULL);
2139 return (0);
2140 }
2141 else if (status != 0)
2142 {
2143 fprintf (stderr, "daemonize failed, exiting.\n");
2144 return (1);
2145 }
2147 if (journal_cur != NULL)
2148 {
2149 int had_journal = 0;
2151 pthread_mutex_lock(&journal_lock);
2153 RRDD_LOG(LOG_INFO, "checking for journal files");
2155 had_journal += journal_replay(journal_old);
2156 had_journal += journal_replay(journal_cur);
2158 if (had_journal)
2159 flush_old_values(-1);
2161 pthread_mutex_unlock(&journal_lock);
2162 journal_rotate();
2164 RRDD_LOG(LOG_INFO, "journal processing complete");
2165 }
2167 /* start the queue thread */
2168 memset (&queue_thread, 0, sizeof (queue_thread));
2169 status = pthread_create (&queue_thread,
2170 NULL, /* attr */
2171 queue_thread_main,
2172 NULL); /* args */
2173 if (status != 0)
2174 {
2175 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2176 cleanup();
2177 return (1);
2178 }
2180 listen_thread_main (NULL);
2181 cleanup ();
2183 return (0);
2184 } /* int main */
2186 /*
2187 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2188 */