a94e0794f649a9b323bf4bd42973b75d00d5d929
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
170 static int config_write_interval = 300;
171 static int config_write_jitter = 0;
172 static int config_flush_interval = 3600;
173 static char *config_pid_file = NULL;
174 static char *config_base_dir = NULL;
176 static char **config_listen_address_list = NULL;
177 static int config_listen_address_list_len = 0;
179 static uint64_t stats_queue_length = 0;
180 static uint64_t stats_updates_received = 0;
181 static uint64_t stats_flush_received = 0;
182 static uint64_t stats_updates_written = 0;
183 static uint64_t stats_data_sets_written = 0;
184 static uint64_t stats_journal_bytes = 0;
185 static uint64_t stats_journal_rotate = 0;
186 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
188 /* Journaled updates */
189 static char *journal_cur = NULL;
190 static char *journal_old = NULL;
191 static FILE *journal_fh = NULL;
192 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
193 static int journal_write(char *cmd, char *args);
194 static void journal_done(void);
195 static void journal_rotate(void);
197 /*
198 * Functions
199 */
200 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
201 {
202 RRDD_LOG(LOG_NOTICE, "caught SIGINT");
203 do_shutdown++;
204 pthread_cond_broadcast(&cache_cond);
205 } /* }}} void sig_int_handler */
207 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
208 {
209 RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
210 do_shutdown++;
211 pthread_cond_broadcast(&cache_cond);
212 } /* }}} void sig_term_handler */
214 static int write_pidfile (void) /* {{{ */
215 {
216 pid_t pid;
217 char *file;
218 int fd;
219 FILE *fh;
221 pid = getpid ();
223 file = (config_pid_file != NULL)
224 ? config_pid_file
225 : LOCALSTATEDIR "/run/rrdcached.pid";
227 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
228 if (fd < 0)
229 {
230 RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
231 file, rrd_strerror(errno));
232 return (-1);
233 }
235 fh = fdopen (fd, "w");
236 if (fh == NULL)
237 {
238 RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
239 close(fd);
240 return (-1);
241 }
243 fprintf (fh, "%i\n", (int) pid);
244 fclose (fh);
246 return (0);
247 } /* }}} int write_pidfile */
249 static int remove_pidfile (void) /* {{{ */
250 {
251 char *file;
252 int status;
254 file = (config_pid_file != NULL)
255 ? config_pid_file
256 : LOCALSTATEDIR "/run/rrdcached.pid";
258 status = unlink (file);
259 if (status == 0)
260 return (0);
261 return (errno);
262 } /* }}} int remove_pidfile */
264 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
265 {
266 char *buffer;
267 size_t buffer_used;
268 size_t buffer_free;
269 ssize_t status;
271 buffer = (char *) buffer_void;
272 buffer_used = 0;
273 buffer_free = buffer_size;
275 while (buffer_free > 0)
276 {
277 status = read (fd, buffer + buffer_used, buffer_free);
278 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
279 continue;
281 if (status < 0)
282 return (-1);
284 if (status == 0)
285 return (0);
287 assert ((0 > status) || (buffer_free >= (size_t) status));
289 buffer_free = buffer_free - status;
290 buffer_used = buffer_used + status;
292 if (buffer[buffer_used - 1] == '\n')
293 break;
294 }
296 assert (buffer_used > 0);
298 if (buffer[buffer_used - 1] != '\n')
299 {
300 errno = ENOBUFS;
301 return (-1);
302 }
304 buffer[buffer_used - 1] = 0;
306 /* Fix network line endings. */
307 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
308 {
309 buffer_used--;
310 buffer[buffer_used - 1] = 0;
311 }
313 return (buffer_used);
314 } /* }}} ssize_t sread */
316 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
317 {
318 const char *ptr;
319 size_t nleft;
320 ssize_t status;
322 /* special case for journal replay */
323 if (fd < 0) return 0;
325 ptr = (const char *) buf;
326 nleft = count;
328 while (nleft > 0)
329 {
330 status = write (fd, (const void *) ptr, nleft);
332 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
333 continue;
335 if (status < 0)
336 return (status);
338 nleft -= status;
339 ptr += status;
340 }
342 return (0);
343 } /* }}} ssize_t swrite */
345 static void _wipe_ci_values(cache_item_t *ci, time_t when)
346 {
347 ci->values = NULL;
348 ci->values_num = 0;
350 ci->last_flush_time = when;
351 if (config_write_jitter > 0)
352 ci->last_flush_time += (random() % config_write_jitter);
354 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
355 }
357 /*
358 * enqueue_cache_item:
359 * `cache_lock' must be acquired before calling this function!
360 */
361 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
362 queue_side_t side)
363 {
364 int did_insert = 0;
366 if (ci == NULL)
367 return (-1);
369 if (ci->values_num == 0)
370 return (0);
372 if (side == HEAD)
373 {
374 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
375 {
376 assert (ci->next == NULL);
377 ci->next = cache_queue_head;
378 cache_queue_head = ci;
380 if (cache_queue_tail == NULL)
381 cache_queue_tail = cache_queue_head;
383 did_insert = 1;
384 }
385 else if (cache_queue_head == ci)
386 {
387 /* do nothing */
388 }
389 else /* enqueued, but not first entry */
390 {
391 cache_item_t *prev;
393 /* find previous entry */
394 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
395 if (prev->next == ci)
396 break;
397 assert (prev != NULL);
399 /* move to the front */
400 prev->next = ci->next;
401 ci->next = cache_queue_head;
402 cache_queue_head = ci;
404 /* check if we need to adapt the tail */
405 if (cache_queue_tail == ci)
406 cache_queue_tail = prev;
407 }
408 }
409 else /* (side == TAIL) */
410 {
411 /* We don't move values back in the list.. */
412 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
413 return (0);
415 assert (ci->next == NULL);
417 if (cache_queue_tail == NULL)
418 cache_queue_head = ci;
419 else
420 cache_queue_tail->next = ci;
421 cache_queue_tail = ci;
423 did_insert = 1;
424 }
426 ci->flags |= CI_FLAGS_IN_QUEUE;
428 if (did_insert)
429 {
430 pthread_mutex_lock (&stats_lock);
431 stats_queue_length++;
432 pthread_mutex_unlock (&stats_lock);
433 }
435 return (0);
436 } /* }}} int enqueue_cache_item */
438 /*
439 * tree_callback_flush:
440 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
441 * while this is in progress.
442 */
443 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
444 gpointer data)
445 {
446 cache_item_t *ci;
447 callback_flush_data_t *cfd;
449 ci = (cache_item_t *) value;
450 cfd = (callback_flush_data_t *) data;
452 if ((ci->last_flush_time <= cfd->abs_timeout)
453 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
454 && (ci->values_num > 0))
455 {
456 enqueue_cache_item (ci, TAIL);
457 }
458 else if ((do_shutdown != 0)
459 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
460 && (ci->values_num > 0))
461 {
462 enqueue_cache_item (ci, TAIL);
463 }
464 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
465 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
466 && (ci->values_num <= 0))
467 {
468 char **temp;
470 temp = (char **) realloc (cfd->keys,
471 sizeof (char *) * (cfd->keys_num + 1));
472 if (temp == NULL)
473 {
474 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
475 return (FALSE);
476 }
477 cfd->keys = temp;
478 /* Make really sure this points to the _same_ place */
479 assert ((char *) key == ci->file);
480 cfd->keys[cfd->keys_num] = (char *) key;
481 cfd->keys_num++;
482 }
484 return (FALSE);
485 } /* }}} gboolean tree_callback_flush */
487 static int flush_old_values (int max_age)
488 {
489 callback_flush_data_t cfd;
490 size_t k;
492 memset (&cfd, 0, sizeof (cfd));
493 /* Pass the current time as user data so that we don't need to call
494 * `time' for each node. */
495 cfd.now = time (NULL);
496 cfd.keys = NULL;
497 cfd.keys_num = 0;
499 if (max_age > 0)
500 cfd.abs_timeout = cfd.now - max_age;
501 else
502 cfd.abs_timeout = cfd.now + 1;
504 /* `tree_callback_flush' will return the keys of all values that haven't
505 * been touched in the last `config_flush_interval' seconds in `cfd'.
506 * The char*'s in this array point to the same memory as ci->file, so we
507 * don't need to free them separately. */
508 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
510 for (k = 0; k < cfd.keys_num; k++)
511 {
512 cache_item_t *ci;
514 /* This must not fail. */
515 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
516 assert (ci != NULL);
518 /* If we end up here with values available, something's seriously
519 * messed up. */
520 assert (ci->values_num == 0);
522 /* Remove the node from the tree */
523 g_tree_remove (cache_tree, cfd.keys[k]);
524 cfd.keys[k] = NULL;
526 /* Now free and clean up `ci'. */
527 free (ci->file);
528 ci->file = NULL;
529 free (ci);
530 ci = NULL;
531 } /* for (k = 0; k < cfd.keys_num; k++) */
533 if (cfd.keys != NULL)
534 {
535 free (cfd.keys);
536 cfd.keys = NULL;
537 }
539 return (0);
540 } /* int flush_old_values */
542 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
543 {
544 struct timeval now;
545 struct timespec next_flush;
547 gettimeofday (&now, NULL);
548 next_flush.tv_sec = now.tv_sec + config_flush_interval;
549 next_flush.tv_nsec = 1000 * now.tv_usec;
551 pthread_mutex_lock (&cache_lock);
552 while ((do_shutdown == 0) || (cache_queue_head != NULL))
553 {
554 cache_item_t *ci;
555 char *file;
556 char **values;
557 int values_num;
558 int status;
559 int i;
561 /* First, check if it's time to do the cache flush. */
562 gettimeofday (&now, NULL);
563 if ((now.tv_sec > next_flush.tv_sec)
564 || ((now.tv_sec == next_flush.tv_sec)
565 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
566 {
567 /* Flush all values that haven't been written in the last
568 * `config_write_interval' seconds. */
569 flush_old_values (config_write_interval);
571 /* Determine the time of the next cache flush. */
572 while (next_flush.tv_sec <= now.tv_sec)
573 next_flush.tv_sec += config_flush_interval;
575 /* unlock the cache while we rotate so we don't block incoming
576 * updates if the fsync() blocks on disk I/O */
577 pthread_mutex_unlock(&cache_lock);
578 journal_rotate();
579 pthread_mutex_lock(&cache_lock);
580 }
582 /* Now, check if there's something to store away. If not, wait until
583 * something comes in or it's time to do the cache flush. */
584 if (cache_queue_head == NULL)
585 {
586 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
587 if ((status != 0) && (status != ETIMEDOUT))
588 {
589 RRDD_LOG (LOG_ERR, "queue_thread_main: "
590 "pthread_cond_timedwait returned %i.", status);
591 }
592 }
594 /* We're about to shut down, so lets flush the entire tree. */
595 if ((do_shutdown != 0) && (cache_queue_head == NULL))
596 flush_old_values (/* max age = */ -1);
598 /* Check if a value has arrived. This may be NULL if we timed out or there
599 * was an interrupt such as a signal. */
600 if (cache_queue_head == NULL)
601 continue;
603 ci = cache_queue_head;
605 /* copy the relevant parts */
606 file = strdup (ci->file);
607 if (file == NULL)
608 {
609 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
610 continue;
611 }
613 assert(ci->values != NULL);
614 assert(ci->values_num > 0);
616 values = ci->values;
617 values_num = ci->values_num;
619 _wipe_ci_values(ci, time(NULL));
621 cache_queue_head = ci->next;
622 if (cache_queue_head == NULL)
623 cache_queue_tail = NULL;
624 ci->next = NULL;
626 pthread_mutex_lock (&stats_lock);
627 assert (stats_queue_length > 0);
628 stats_queue_length--;
629 pthread_mutex_unlock (&stats_lock);
631 pthread_mutex_unlock (&cache_lock);
633 rrd_clear_error ();
634 status = rrd_update_r (file, NULL, values_num, (void *) values);
635 if (status != 0)
636 {
637 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
638 "rrd_update_r (%s) failed with status %i. (%s)",
639 file, status, rrd_get_error());
640 }
642 journal_write("wrote", file);
644 for (i = 0; i < values_num; i++)
645 free (values[i]);
647 free(values);
648 free(file);
650 if (status == 0)
651 {
652 pthread_mutex_lock (&stats_lock);
653 stats_updates_written++;
654 stats_data_sets_written += values_num;
655 pthread_mutex_unlock (&stats_lock);
656 }
658 pthread_mutex_lock (&cache_lock);
659 pthread_cond_broadcast (&flush_cond);
661 /* We're about to shut down, so lets flush the entire tree. */
662 if ((do_shutdown != 0) && (cache_queue_head == NULL))
663 flush_old_values (/* max age = */ -1);
664 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
665 pthread_mutex_unlock (&cache_lock);
667 assert(cache_queue_head == NULL);
668 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
669 journal_done();
671 return (NULL);
672 } /* }}} void *queue_thread_main */
674 static int buffer_get_field (char **buffer_ret, /* {{{ */
675 size_t *buffer_size_ret, char **field_ret)
676 {
677 char *buffer;
678 size_t buffer_pos;
679 size_t buffer_size;
680 char *field;
681 size_t field_size;
682 int status;
684 buffer = *buffer_ret;
685 buffer_pos = 0;
686 buffer_size = *buffer_size_ret;
687 field = *buffer_ret;
688 field_size = 0;
690 if (buffer_size <= 0)
691 return (-1);
693 /* This is ensured by `handle_request'. */
694 assert (buffer[buffer_size - 1] == '\0');
696 status = -1;
697 while (buffer_pos < buffer_size)
698 {
699 /* Check for end-of-field or end-of-buffer */
700 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
701 {
702 field[field_size] = 0;
703 field_size++;
704 buffer_pos++;
705 status = 0;
706 break;
707 }
708 /* Handle escaped characters. */
709 else if (buffer[buffer_pos] == '\\')
710 {
711 if (buffer_pos >= (buffer_size - 1))
712 break;
713 buffer_pos++;
714 field[field_size] = buffer[buffer_pos];
715 field_size++;
716 buffer_pos++;
717 }
718 /* Normal operation */
719 else
720 {
721 field[field_size] = buffer[buffer_pos];
722 field_size++;
723 buffer_pos++;
724 }
725 } /* while (buffer_pos < buffer_size) */
727 if (status != 0)
728 return (status);
730 *buffer_ret = buffer + buffer_pos;
731 *buffer_size_ret = buffer_size - buffer_pos;
732 *field_ret = field;
734 return (0);
735 } /* }}} int buffer_get_field */
737 static int flush_file (const char *filename) /* {{{ */
738 {
739 cache_item_t *ci;
741 pthread_mutex_lock (&cache_lock);
743 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
744 if (ci == NULL)
745 {
746 pthread_mutex_unlock (&cache_lock);
747 return (ENOENT);
748 }
750 /* Enqueue at head */
751 enqueue_cache_item (ci, HEAD);
752 pthread_cond_signal (&cache_cond);
754 while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
755 {
756 ci = NULL;
758 pthread_cond_wait (&flush_cond, &cache_lock);
760 ci = g_tree_lookup (cache_tree, filename);
761 if (ci == NULL)
762 {
763 RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
764 "while waiting for flush.");
765 pthread_mutex_unlock (&cache_lock);
766 return (-1);
767 }
768 }
770 pthread_mutex_unlock (&cache_lock);
771 return (0);
772 } /* }}} int flush_file */
774 static int handle_request_help (int fd, /* {{{ */
775 char *buffer, size_t buffer_size)
776 {
777 int status;
778 char **help_text;
779 size_t help_text_len;
780 char *command;
781 size_t i;
783 char *help_help[] =
784 {
785 "4 Command overview\n",
786 "FLUSH <filename>\n",
787 "HELP [<command>]\n",
788 "UPDATE <filename> <values> [<values> ...]\n",
789 "STATS\n"
790 };
791 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
793 char *help_flush[] =
794 {
795 "4 Help for FLUSH\n",
796 "Usage: FLUSH <filename>\n",
797 "\n",
798 "Adds the given filename to the head of the update queue and returns\n",
799 "after is has been dequeued.\n"
800 };
801 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
803 char *help_update[] =
804 {
805 "9 Help for UPDATE\n",
806 "Usage: UPDATE <filename> <values> [<values> ...]\n"
807 "\n",
808 "Adds the given file to the internal cache if it is not yet known and\n",
809 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
810 "for details.\n",
811 "\n",
812 "Each <values> has the following form:\n",
813 " <values> = <time>:<value>[:<value>[...]]\n",
814 "See the rrdupdate(1) manpage for details.\n"
815 };
816 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
818 char *help_stats[] =
819 {
820 "4 Help for STATS\n",
821 "Usage: STATS\n",
822 "\n",
823 "Returns some performance counters, see the rrdcached(1) manpage for\n",
824 "a description of the values.\n"
825 };
826 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
828 status = buffer_get_field (&buffer, &buffer_size, &command);
829 if (status != 0)
830 {
831 help_text = help_help;
832 help_text_len = help_help_len;
833 }
834 else
835 {
836 if (strcasecmp (command, "update") == 0)
837 {
838 help_text = help_update;
839 help_text_len = help_update_len;
840 }
841 else if (strcasecmp (command, "flush") == 0)
842 {
843 help_text = help_flush;
844 help_text_len = help_flush_len;
845 }
846 else if (strcasecmp (command, "stats") == 0)
847 {
848 help_text = help_stats;
849 help_text_len = help_stats_len;
850 }
851 else
852 {
853 help_text = help_help;
854 help_text_len = help_help_len;
855 }
856 }
858 for (i = 0; i < help_text_len; i++)
859 {
860 status = swrite (fd, help_text[i], strlen (help_text[i]));
861 if (status < 0)
862 {
863 status = errno;
864 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
865 return (status);
866 }
867 }
869 return (0);
870 } /* }}} int handle_request_help */
872 static int handle_request_stats (int fd, /* {{{ */
873 char *buffer __attribute__((unused)),
874 size_t buffer_size __attribute__((unused)))
875 {
876 int status;
877 char outbuf[CMD_MAX];
879 uint64_t copy_queue_length;
880 uint64_t copy_updates_received;
881 uint64_t copy_flush_received;
882 uint64_t copy_updates_written;
883 uint64_t copy_data_sets_written;
884 uint64_t copy_journal_bytes;
885 uint64_t copy_journal_rotate;
887 uint64_t tree_nodes_number;
888 uint64_t tree_depth;
890 pthread_mutex_lock (&stats_lock);
891 copy_queue_length = stats_queue_length;
892 copy_updates_received = stats_updates_received;
893 copy_flush_received = stats_flush_received;
894 copy_updates_written = stats_updates_written;
895 copy_data_sets_written = stats_data_sets_written;
896 copy_journal_bytes = stats_journal_bytes;
897 copy_journal_rotate = stats_journal_rotate;
898 pthread_mutex_unlock (&stats_lock);
900 pthread_mutex_lock (&cache_lock);
901 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
902 tree_depth = (uint64_t) g_tree_height (cache_tree);
903 pthread_mutex_unlock (&cache_lock);
905 #define RRDD_STATS_SEND \
906 outbuf[sizeof (outbuf) - 1] = 0; \
907 status = swrite (fd, outbuf, strlen (outbuf)); \
908 if (status < 0) \
909 { \
910 status = errno; \
911 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
912 return (status); \
913 }
915 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
916 RRDD_STATS_SEND;
918 snprintf (outbuf, sizeof (outbuf),
919 "QueueLength: %"PRIu64"\n", copy_queue_length);
920 RRDD_STATS_SEND;
922 snprintf (outbuf, sizeof (outbuf),
923 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
924 RRDD_STATS_SEND;
926 snprintf (outbuf, sizeof (outbuf),
927 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
928 RRDD_STATS_SEND;
930 snprintf (outbuf, sizeof (outbuf),
931 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
932 RRDD_STATS_SEND;
934 snprintf (outbuf, sizeof (outbuf),
935 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
936 RRDD_STATS_SEND;
938 snprintf (outbuf, sizeof (outbuf),
939 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
940 RRDD_STATS_SEND;
942 snprintf (outbuf, sizeof (outbuf),
943 "TreeDepth: %"PRIu64"\n", tree_depth);
944 RRDD_STATS_SEND;
946 snprintf (outbuf, sizeof(outbuf),
947 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
948 RRDD_STATS_SEND;
950 snprintf (outbuf, sizeof(outbuf),
951 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
952 RRDD_STATS_SEND;
954 return (0);
955 #undef RRDD_STATS_SEND
956 } /* }}} int handle_request_stats */
958 static int handle_request_flush (int fd, /* {{{ */
959 char *buffer, size_t buffer_size)
960 {
961 char *file;
962 int status;
963 char result[CMD_MAX];
965 status = buffer_get_field (&buffer, &buffer_size, &file);
966 if (status != 0)
967 {
968 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
969 }
970 else
971 {
972 pthread_mutex_lock(&stats_lock);
973 stats_flush_received++;
974 pthread_mutex_unlock(&stats_lock);
976 status = flush_file (file);
977 if (status == 0)
978 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
979 else if (status == ENOENT)
980 {
981 /* no file in our tree; see whether it exists at all */
982 struct stat statbuf;
984 memset(&statbuf, 0, sizeof(statbuf));
985 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
986 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
987 else
988 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
989 }
990 else if (status < 0)
991 strncpy (result, "-1 Internal error.\n", sizeof (result));
992 else
993 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
994 }
995 result[sizeof (result) - 1] = 0;
997 status = swrite (fd, result, strlen (result));
998 if (status < 0)
999 {
1000 status = errno;
1001 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1002 return (status);
1003 }
1005 return (0);
1006 } /* }}} int handle_request_flush */
1008 static int handle_request_update (int fd, /* {{{ */
1009 char *buffer, size_t buffer_size)
1010 {
1011 char *file;
1012 int values_num = 0;
1013 int status;
1015 time_t now;
1017 cache_item_t *ci;
1018 char answer[CMD_MAX];
1020 #define RRDD_UPDATE_SEND \
1021 answer[sizeof (answer) - 1] = 0; \
1022 status = swrite (fd, answer, strlen (answer)); \
1023 if (status < 0) \
1024 { \
1025 status = errno; \
1026 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1027 return (status); \
1028 }
1030 now = time (NULL);
1032 status = buffer_get_field (&buffer, &buffer_size, &file);
1033 if (status != 0)
1034 {
1035 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1036 sizeof (answer));
1037 RRDD_UPDATE_SEND;
1038 return (0);
1039 }
1041 pthread_mutex_lock(&stats_lock);
1042 stats_updates_received++;
1043 pthread_mutex_unlock(&stats_lock);
1045 pthread_mutex_lock (&cache_lock);
1047 ci = g_tree_lookup (cache_tree, file);
1048 if (ci == NULL) /* {{{ */
1049 {
1050 struct stat statbuf;
1052 memset (&statbuf, 0, sizeof (statbuf));
1053 status = stat (file, &statbuf);
1054 if (status != 0)
1055 {
1056 pthread_mutex_unlock (&cache_lock);
1057 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1059 status = errno;
1060 if (status == ENOENT)
1061 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1062 else
1063 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1064 status);
1065 RRDD_UPDATE_SEND;
1066 return (0);
1067 }
1068 if (!S_ISREG (statbuf.st_mode))
1069 {
1070 pthread_mutex_unlock (&cache_lock);
1072 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1073 RRDD_UPDATE_SEND;
1074 return (0);
1075 }
1076 if (access(file, R_OK|W_OK) != 0)
1077 {
1078 pthread_mutex_unlock (&cache_lock);
1080 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1081 file, rrd_strerror(errno));
1082 RRDD_UPDATE_SEND;
1083 return (0);
1084 }
1086 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1087 if (ci == NULL)
1088 {
1089 pthread_mutex_unlock (&cache_lock);
1090 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1092 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1093 RRDD_UPDATE_SEND;
1094 return (0);
1095 }
1096 memset (ci, 0, sizeof (cache_item_t));
1098 ci->file = strdup (file);
1099 if (ci->file == NULL)
1100 {
1101 pthread_mutex_unlock (&cache_lock);
1102 free (ci);
1103 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1105 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1106 RRDD_UPDATE_SEND;
1107 return (0);
1108 }
1110 _wipe_ci_values(ci, now);
1111 ci->flags = CI_FLAGS_IN_TREE;
1113 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1114 } /* }}} */
1115 assert (ci != NULL);
1117 while (buffer_size > 0)
1118 {
1119 char **temp;
1120 char *value;
1122 status = buffer_get_field (&buffer, &buffer_size, &value);
1123 if (status != 0)
1124 {
1125 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1126 break;
1127 }
1129 temp = (char **) realloc (ci->values,
1130 sizeof (char *) * (ci->values_num + 1));
1131 if (temp == NULL)
1132 {
1133 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1134 continue;
1135 }
1136 ci->values = temp;
1138 ci->values[ci->values_num] = strdup (value);
1139 if (ci->values[ci->values_num] == NULL)
1140 {
1141 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1142 continue;
1143 }
1144 ci->values_num++;
1146 values_num++;
1147 }
1149 if (((now - ci->last_flush_time) >= config_write_interval)
1150 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1151 && (ci->values_num > 0))
1152 {
1153 enqueue_cache_item (ci, TAIL);
1154 pthread_cond_signal (&cache_cond);
1155 }
1157 pthread_mutex_unlock (&cache_lock);
1159 if (values_num < 1)
1160 {
1161 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1162 }
1163 else
1164 {
1165 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1166 (values_num == 1) ? "" : "s");
1167 }
1168 RRDD_UPDATE_SEND;
1169 return (0);
1170 #undef RRDD_UPDATE_SEND
1171 } /* }}} int handle_request_update */
1173 /* we came across a "WROTE" entry during journal replay.
1174 * throw away any values that we have accumulated for this file
1175 */
1176 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1177 const char *buffer,
1178 size_t buffer_size __attribute__((unused)))
1179 {
1180 int i;
1181 cache_item_t *ci;
1182 const char *file = buffer;
1184 pthread_mutex_lock(&cache_lock);
1186 ci = g_tree_lookup(cache_tree, file);
1187 if (ci == NULL)
1188 {
1189 pthread_mutex_unlock(&cache_lock);
1190 return (0);
1191 }
1193 if (ci->values)
1194 {
1195 for (i=0; i < ci->values_num; i++)
1196 free(ci->values[i]);
1198 free(ci->values);
1199 }
1201 _wipe_ci_values(ci, time(NULL));
1203 pthread_mutex_unlock(&cache_lock);
1204 return (0);
1205 } /* }}} int handle_request_wrote */
1207 /* if fd < 0, we are in journal replay mode */
1208 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1209 {
1210 char *buffer_ptr;
1211 char *command;
1212 int status;
1214 assert (buffer[buffer_size - 1] == '\0');
1216 buffer_ptr = buffer;
1217 command = NULL;
1218 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1219 if (status != 0)
1220 {
1221 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1222 return (-1);
1223 }
1225 if (strcasecmp (command, "update") == 0)
1226 {
1227 /* don't re-write updates in replay mode */
1228 if (fd >= 0)
1229 journal_write(command, buffer_ptr);
1231 return (handle_request_update (fd, buffer_ptr, buffer_size));
1232 }
1233 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1234 {
1235 /* this is only valid in replay mode */
1236 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1237 }
1238 else if (strcasecmp (command, "flush") == 0)
1239 {
1240 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1241 }
1242 else if (strcasecmp (command, "stats") == 0)
1243 {
1244 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1245 }
1246 else if (strcasecmp (command, "help") == 0)
1247 {
1248 return (handle_request_help (fd, buffer_ptr, buffer_size));
1249 }
1250 else
1251 {
1252 char result[CMD_MAX];
1254 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1255 result[sizeof (result) - 1] = 0;
1257 status = swrite (fd, result, strlen (result));
1258 if (status < 0)
1259 {
1260 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1261 return (-1);
1262 }
1263 }
1265 return (0);
1266 } /* }}} int handle_request */
1268 /* MUST NOT hold journal_lock before calling this */
1269 static void journal_rotate(void) /* {{{ */
1270 {
1271 FILE *old_fh = NULL;
1273 if (journal_cur == NULL || journal_old == NULL)
1274 return;
1276 pthread_mutex_lock(&journal_lock);
1278 /* we rotate this way (rename before close) so that the we can release
1279 * the journal lock as fast as possible. Journal writes to the new
1280 * journal can proceed immediately after the new file is opened. The
1281 * fclose can then block without affecting new updates.
1282 */
1283 if (journal_fh != NULL)
1284 {
1285 old_fh = journal_fh;
1286 rename(journal_cur, journal_old);
1287 ++stats_journal_rotate;
1288 }
1290 journal_fh = fopen(journal_cur, "a");
1291 pthread_mutex_unlock(&journal_lock);
1293 if (old_fh != NULL)
1294 fclose(old_fh);
1296 if (journal_fh == NULL)
1297 RRDD_LOG(LOG_CRIT,
1298 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1299 journal_cur, rrd_strerror(errno));
1301 } /* }}} static void journal_rotate */
1303 static void journal_done(void) /* {{{ */
1304 {
1305 if (journal_cur == NULL)
1306 return;
1308 pthread_mutex_lock(&journal_lock);
1309 if (journal_fh != NULL)
1310 {
1311 fclose(journal_fh);
1312 journal_fh = NULL;
1313 }
1315 RRDD_LOG(LOG_INFO, "removing journals");
1317 unlink(journal_old);
1318 unlink(journal_cur);
1319 pthread_mutex_unlock(&journal_lock);
1321 } /* }}} static void journal_done */
1323 static int journal_write(char *cmd, char *args) /* {{{ */
1324 {
1325 int chars;
1327 if (journal_fh == NULL)
1328 return 0;
1330 pthread_mutex_lock(&journal_lock);
1331 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1332 pthread_mutex_unlock(&journal_lock);
1334 if (chars > 0)
1335 {
1336 pthread_mutex_lock(&stats_lock);
1337 stats_journal_bytes += chars;
1338 pthread_mutex_unlock(&stats_lock);
1339 }
1341 return chars;
1342 } /* }}} static int journal_write */
1344 static int journal_replay (const char *file) /* {{{ */
1345 {
1346 FILE *fh;
1347 int entry_cnt = 0;
1348 int fail_cnt = 0;
1349 uint64_t line = 0;
1350 char entry[CMD_MAX];
1352 if (file == NULL) return 0;
1354 fh = fopen(file, "r");
1355 if (fh == NULL)
1356 {
1357 if (errno != ENOENT)
1358 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1359 file, rrd_strerror(errno));
1360 return 0;
1361 }
1362 else
1363 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1365 while(!feof(fh))
1366 {
1367 size_t entry_len;
1369 ++line;
1370 fgets(entry, sizeof(entry), fh);
1371 entry_len = strlen(entry);
1373 /* check \n termination in case journal writing crashed mid-line */
1374 if (entry_len == 0)
1375 continue;
1376 else if (entry[entry_len - 1] != '\n')
1377 {
1378 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1379 ++fail_cnt;
1380 continue;
1381 }
1383 entry[entry_len - 1] = '\0';
1385 if (handle_request(-1, entry, entry_len) == 0)
1386 ++entry_cnt;
1387 else
1388 ++fail_cnt;
1389 }
1391 fclose(fh);
1393 if (entry_cnt > 0)
1394 {
1395 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1396 entry_cnt, fail_cnt);
1397 return 1;
1398 }
1399 else
1400 return 0;
1402 } /* }}} static int journal_replay */
1404 static void *connection_thread_main (void *args) /* {{{ */
1405 {
1406 pthread_t self;
1407 int i;
1408 int fd;
1410 fd = *((int *) args);
1411 free (args);
1413 pthread_mutex_lock (&connection_threads_lock);
1414 {
1415 pthread_t *temp;
1417 temp = (pthread_t *) realloc (connection_threads,
1418 sizeof (pthread_t) * (connection_threads_num + 1));
1419 if (temp == NULL)
1420 {
1421 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1422 }
1423 else
1424 {
1425 connection_threads = temp;
1426 connection_threads[connection_threads_num] = pthread_self ();
1427 connection_threads_num++;
1428 }
1429 }
1430 pthread_mutex_unlock (&connection_threads_lock);
1432 while (do_shutdown == 0)
1433 {
1434 char buffer[CMD_MAX];
1436 struct pollfd pollfd;
1437 int status;
1439 pollfd.fd = fd;
1440 pollfd.events = POLLIN | POLLPRI;
1441 pollfd.revents = 0;
1443 status = poll (&pollfd, 1, /* timeout = */ 500);
1444 if (status == 0) /* timeout */
1445 continue;
1446 else if (status < 0) /* error */
1447 {
1448 status = errno;
1449 if (status == EINTR)
1450 continue;
1451 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1452 continue;
1453 }
1455 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1456 {
1457 close (fd);
1458 break;
1459 }
1460 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1461 {
1462 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1463 "poll(2) returned something unexpected: %#04hx",
1464 pollfd.revents);
1465 close (fd);
1466 break;
1467 }
1469 status = (int) sread (fd, buffer, sizeof (buffer));
1470 if (status <= 0)
1471 {
1472 close (fd);
1474 if (status < 0)
1475 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1477 break;
1478 }
1480 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1481 if (status != 0)
1482 {
1483 close (fd);
1484 break;
1485 }
1486 }
1488 self = pthread_self ();
1489 /* Remove this thread from the connection threads list */
1490 pthread_mutex_lock (&connection_threads_lock);
1491 /* Find out own index in the array */
1492 for (i = 0; i < connection_threads_num; i++)
1493 if (pthread_equal (connection_threads[i], self) != 0)
1494 break;
1495 assert (i < connection_threads_num);
1497 /* Move the trailing threads forward. */
1498 if (i < (connection_threads_num - 1))
1499 {
1500 memmove (connection_threads + i,
1501 connection_threads + i + 1,
1502 sizeof (pthread_t) * (connection_threads_num - i - 1));
1503 }
1505 connection_threads_num--;
1506 pthread_mutex_unlock (&connection_threads_lock);
1508 return (NULL);
1509 } /* }}} void *connection_thread_main */
1511 static int open_listen_socket_unix (const char *path) /* {{{ */
1512 {
1513 int fd;
1514 struct sockaddr_un sa;
1515 listen_socket_t *temp;
1516 int status;
1518 temp = (listen_socket_t *) realloc (listen_fds,
1519 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1520 if (temp == NULL)
1521 {
1522 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1523 return (-1);
1524 }
1525 listen_fds = temp;
1526 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1528 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1529 if (fd < 0)
1530 {
1531 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1532 return (-1);
1533 }
1535 memset (&sa, 0, sizeof (sa));
1536 sa.sun_family = AF_UNIX;
1537 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1539 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1540 if (status != 0)
1541 {
1542 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1543 close (fd);
1544 unlink (path);
1545 return (-1);
1546 }
1548 status = listen (fd, /* backlog = */ 10);
1549 if (status != 0)
1550 {
1551 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1552 close (fd);
1553 unlink (path);
1554 return (-1);
1555 }
1557 listen_fds[listen_fds_num].fd = fd;
1558 snprintf (listen_fds[listen_fds_num].path,
1559 sizeof (listen_fds[listen_fds_num].path) - 1,
1560 "unix:%s", path);
1561 listen_fds_num++;
1563 return (0);
1564 } /* }}} int open_listen_socket_unix */
1566 static int open_listen_socket (const char *addr_orig) /* {{{ */
1567 {
1568 struct addrinfo ai_hints;
1569 struct addrinfo *ai_res;
1570 struct addrinfo *ai_ptr;
1571 char addr_copy[NI_MAXHOST];
1572 char *addr;
1573 char *port;
1574 int status;
1576 assert (addr_orig != NULL);
1578 strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1579 addr_copy[sizeof (addr_copy) - 1] = 0;
1580 addr = addr_copy;
1582 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1583 return (open_listen_socket_unix (addr + strlen ("unix:")));
1584 else if (addr[0] == '/')
1585 return (open_listen_socket_unix (addr));
1587 memset (&ai_hints, 0, sizeof (ai_hints));
1588 ai_hints.ai_flags = 0;
1589 #ifdef AI_ADDRCONFIG
1590 ai_hints.ai_flags |= AI_ADDRCONFIG;
1591 #endif
1592 ai_hints.ai_family = AF_UNSPEC;
1593 ai_hints.ai_socktype = SOCK_STREAM;
1595 port = NULL;
1596 if (*addr == '[') /* IPv6+port format */
1597 {
1598 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1599 addr++;
1601 port = strchr (addr, ']');
1602 if (port == NULL)
1603 {
1604 RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1605 addr_orig);
1606 return (-1);
1607 }
1608 *port = 0;
1609 port++;
1611 if (*port == ':')
1612 port++;
1613 else if (*port == 0)
1614 port = NULL;
1615 else
1616 {
1617 RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1618 port);
1619 return (-1);
1620 }
1621 } /* if (*addr = ']') */
1622 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1623 {
1624 port = rindex(addr, ':');
1625 if (port != NULL)
1626 {
1627 *port = 0;
1628 port++;
1629 }
1630 }
1631 ai_res = NULL;
1632 status = getaddrinfo (addr,
1633 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1634 &ai_hints, &ai_res);
1635 if (status != 0)
1636 {
1637 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1638 "%s", addr, gai_strerror (status));
1639 return (-1);
1640 }
1642 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1643 {
1644 int fd;
1645 listen_socket_t *temp;
1647 temp = (listen_socket_t *) realloc (listen_fds,
1648 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1649 if (temp == NULL)
1650 {
1651 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1652 continue;
1653 }
1654 listen_fds = temp;
1655 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1657 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1658 if (fd < 0)
1659 {
1660 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1661 continue;
1662 }
1664 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1665 if (status != 0)
1666 {
1667 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1668 close (fd);
1669 continue;
1670 }
1672 status = listen (fd, /* backlog = */ 10);
1673 if (status != 0)
1674 {
1675 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1676 close (fd);
1677 return (-1);
1678 }
1680 listen_fds[listen_fds_num].fd = fd;
1681 strncpy (listen_fds[listen_fds_num].path, addr,
1682 sizeof (listen_fds[listen_fds_num].path) - 1);
1683 listen_fds_num++;
1684 } /* for (ai_ptr) */
1686 return (0);
1687 } /* }}} int open_listen_socket */
1689 static int close_listen_sockets (void) /* {{{ */
1690 {
1691 size_t i;
1693 for (i = 0; i < listen_fds_num; i++)
1694 {
1695 close (listen_fds[i].fd);
1696 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1697 unlink (listen_fds[i].path + strlen ("unix:"));
1698 }
1700 free (listen_fds);
1701 listen_fds = NULL;
1702 listen_fds_num = 0;
1704 return (0);
1705 } /* }}} int close_listen_sockets */
1707 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1708 {
1709 struct pollfd *pollfds;
1710 int pollfds_num;
1711 int status;
1712 int i;
1714 for (i = 0; i < config_listen_address_list_len; i++)
1715 open_listen_socket (config_listen_address_list[i]);
1717 if (config_listen_address_list_len < 1)
1718 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1720 if (listen_fds_num < 1)
1721 {
1722 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1723 "could be opened. Sorry.");
1724 return (NULL);
1725 }
1727 pollfds_num = listen_fds_num;
1728 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1729 if (pollfds == NULL)
1730 {
1731 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1732 return (NULL);
1733 }
1734 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1736 RRDD_LOG(LOG_INFO, "listening for connections");
1738 while (do_shutdown == 0)
1739 {
1740 assert (pollfds_num == ((int) listen_fds_num));
1741 for (i = 0; i < pollfds_num; i++)
1742 {
1743 pollfds[i].fd = listen_fds[i].fd;
1744 pollfds[i].events = POLLIN | POLLPRI;
1745 pollfds[i].revents = 0;
1746 }
1748 status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1749 if (status < 1)
1750 {
1751 status = errno;
1752 if (status != EINTR)
1753 {
1754 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1755 }
1756 continue;
1757 }
1759 for (i = 0; i < pollfds_num; i++)
1760 {
1761 int *client_sd;
1762 struct sockaddr_storage client_sa;
1763 socklen_t client_sa_size;
1764 pthread_t tid;
1765 pthread_attr_t attr;
1767 if (pollfds[i].revents == 0)
1768 continue;
1770 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1771 {
1772 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1773 "poll(2) returned something unexpected for listen FD #%i.",
1774 pollfds[i].fd);
1775 continue;
1776 }
1778 client_sd = (int *) malloc (sizeof (int));
1779 if (client_sd == NULL)
1780 {
1781 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1782 continue;
1783 }
1785 client_sa_size = sizeof (client_sa);
1786 *client_sd = accept (pollfds[i].fd,
1787 (struct sockaddr *) &client_sa, &client_sa_size);
1788 if (*client_sd < 0)
1789 {
1790 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1791 continue;
1792 }
1794 pthread_attr_init (&attr);
1795 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1797 status = pthread_create (&tid, &attr, connection_thread_main,
1798 /* args = */ (void *) client_sd);
1799 if (status != 0)
1800 {
1801 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1802 close (*client_sd);
1803 free (client_sd);
1804 continue;
1805 }
1806 } /* for (pollfds_num) */
1807 } /* while (do_shutdown == 0) */
1809 RRDD_LOG(LOG_INFO, "starting shutdown");
1811 close_listen_sockets ();
1813 pthread_mutex_lock (&connection_threads_lock);
1814 while (connection_threads_num > 0)
1815 {
1816 pthread_t wait_for;
1818 wait_for = connection_threads[0];
1820 pthread_mutex_unlock (&connection_threads_lock);
1821 pthread_join (wait_for, /* retval = */ NULL);
1822 pthread_mutex_lock (&connection_threads_lock);
1823 }
1824 pthread_mutex_unlock (&connection_threads_lock);
1826 return (NULL);
1827 } /* }}} void *listen_thread_main */
1829 static int daemonize (void) /* {{{ */
1830 {
1831 int status;
1833 /* These structures are static, because `sigaction' behaves weird if the are
1834 * overwritten.. */
1835 static struct sigaction sa_int;
1836 static struct sigaction sa_term;
1837 static struct sigaction sa_pipe;
1839 if (!stay_foreground)
1840 {
1841 pid_t child;
1842 char *base_dir;
1844 child = fork ();
1845 if (child < 0)
1846 {
1847 fprintf (stderr, "daemonize: fork(2) failed.\n");
1848 return (-1);
1849 }
1850 else if (child > 0)
1851 {
1852 return (1);
1853 }
1855 /* Change into the /tmp directory. */
1856 base_dir = (config_base_dir != NULL)
1857 ? config_base_dir
1858 : "/tmp";
1859 status = chdir (base_dir);
1860 if (status != 0)
1861 {
1862 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1863 return (-1);
1864 }
1866 /* Become session leader */
1867 setsid ();
1869 /* Open the first three file descriptors to /dev/null */
1870 close (2);
1871 close (1);
1872 close (0);
1874 open ("/dev/null", O_RDWR);
1875 dup (0);
1876 dup (0);
1877 } /* if (!stay_foreground) */
1879 /* Install signal handlers */
1880 memset (&sa_int, 0, sizeof (sa_int));
1881 sa_int.sa_handler = sig_int_handler;
1882 sigaction (SIGINT, &sa_int, NULL);
1884 memset (&sa_term, 0, sizeof (sa_term));
1885 sa_term.sa_handler = sig_term_handler;
1886 sigaction (SIGTERM, &sa_term, NULL);
1888 memset (&sa_pipe, 0, sizeof (sa_pipe));
1889 sa_pipe.sa_handler = SIG_IGN;
1890 sigaction (SIGPIPE, &sa_pipe, NULL);
1892 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1893 RRDD_LOG(LOG_INFO, "starting up");
1895 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1896 if (cache_tree == NULL)
1897 {
1898 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1899 return (-1);
1900 }
1902 status = write_pidfile ();
1903 return status;
1904 } /* }}} int daemonize */
1906 static int cleanup (void) /* {{{ */
1907 {
1908 do_shutdown++;
1910 pthread_cond_signal (&cache_cond);
1911 pthread_join (queue_thread, /* return = */ NULL);
1913 remove_pidfile ();
1915 RRDD_LOG(LOG_INFO, "goodbye");
1916 closelog ();
1918 return (0);
1919 } /* }}} int cleanup */
1921 static int read_options (int argc, char **argv) /* {{{ */
1922 {
1923 int option;
1924 int status = 0;
1926 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1927 {
1928 switch (option)
1929 {
1930 case 'g':
1931 stay_foreground=1;
1932 break;
1934 case 'l':
1935 {
1936 char **temp;
1938 temp = (char **) realloc (config_listen_address_list,
1939 sizeof (char *) * (config_listen_address_list_len + 1));
1940 if (temp == NULL)
1941 {
1942 fprintf (stderr, "read_options: realloc failed.\n");
1943 return (2);
1944 }
1945 config_listen_address_list = temp;
1947 temp[config_listen_address_list_len] = strdup (optarg);
1948 if (temp[config_listen_address_list_len] == NULL)
1949 {
1950 fprintf (stderr, "read_options: strdup failed.\n");
1951 return (2);
1952 }
1953 config_listen_address_list_len++;
1954 }
1955 break;
1957 case 'f':
1958 {
1959 int temp;
1961 temp = atoi (optarg);
1962 if (temp > 0)
1963 config_flush_interval = temp;
1964 else
1965 {
1966 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1967 status = 3;
1968 }
1969 }
1970 break;
1972 case 'w':
1973 {
1974 int temp;
1976 temp = atoi (optarg);
1977 if (temp > 0)
1978 config_write_interval = temp;
1979 else
1980 {
1981 fprintf (stderr, "Invalid write interval: %s\n", optarg);
1982 status = 2;
1983 }
1984 }
1985 break;
1987 case 'z':
1988 {
1989 int temp;
1991 temp = atoi(optarg);
1992 if (temp > 0)
1993 config_write_jitter = temp;
1994 else
1995 {
1996 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1997 status = 2;
1998 }
2000 break;
2001 }
2003 case 'b':
2004 {
2005 size_t len;
2007 if (config_base_dir != NULL)
2008 free (config_base_dir);
2009 config_base_dir = strdup (optarg);
2010 if (config_base_dir == NULL)
2011 {
2012 fprintf (stderr, "read_options: strdup failed.\n");
2013 return (3);
2014 }
2016 len = strlen (config_base_dir);
2017 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2018 {
2019 config_base_dir[len - 1] = 0;
2020 len--;
2021 }
2023 if (len < 1)
2024 {
2025 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2026 return (4);
2027 }
2028 }
2029 break;
2031 case 'p':
2032 {
2033 if (config_pid_file != NULL)
2034 free (config_pid_file);
2035 config_pid_file = strdup (optarg);
2036 if (config_pid_file == NULL)
2037 {
2038 fprintf (stderr, "read_options: strdup failed.\n");
2039 return (3);
2040 }
2041 }
2042 break;
2044 case 'j':
2045 {
2046 struct stat statbuf;
2047 const char *dir = optarg;
2049 status = stat(dir, &statbuf);
2050 if (status != 0)
2051 {
2052 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2053 return 6;
2054 }
2056 if (!S_ISDIR(statbuf.st_mode)
2057 || access(dir, R_OK|W_OK|X_OK) != 0)
2058 {
2059 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2060 errno ? rrd_strerror(errno) : "");
2061 return 6;
2062 }
2064 journal_cur = malloc(PATH_MAX + 1);
2065 journal_old = malloc(PATH_MAX + 1);
2066 if (journal_cur == NULL || journal_old == NULL)
2067 {
2068 fprintf(stderr, "malloc failure for journal files\n");
2069 return 6;
2070 }
2071 else
2072 {
2073 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2074 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2075 }
2076 }
2077 break;
2079 case 'h':
2080 case '?':
2081 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2082 "\n"
2083 "Usage: rrdcached [options]\n"
2084 "\n"
2085 "Valid options are:\n"
2086 " -l <address> Socket address to listen to.\n"
2087 " -w <seconds> Interval in which to write data.\n"
2088 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2089 " -f <seconds> Interval in which to flush dead data.\n"
2090 " -p <file> Location of the PID-file.\n"
2091 " -b <dir> Base directory to change to.\n"
2092 " -g Do not fork and run in the foreground.\n"
2093 " -j <dir> Directory in which to create the journal files.\n"
2094 "\n"
2095 "For more information and a detailed description of all options "
2096 "please refer\n"
2097 "to the rrdcached(1) manual page.\n",
2098 VERSION);
2099 status = -1;
2100 break;
2101 } /* switch (option) */
2102 } /* while (getopt) */
2104 /* advise the user when values are not sane */
2105 if (config_flush_interval < 2 * config_write_interval)
2106 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2107 " 2x write interval (-w) !\n");
2108 if (config_write_jitter > config_write_interval)
2109 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2110 " write interval (-w) !\n");
2112 return (status);
2113 } /* }}} int read_options */
2115 int main (int argc, char **argv)
2116 {
2117 int status;
2119 status = read_options (argc, argv);
2120 if (status != 0)
2121 {
2122 if (status < 0)
2123 status = 0;
2124 return (status);
2125 }
2127 status = daemonize ();
2128 if (status == 1)
2129 {
2130 struct sigaction sigchld;
2132 memset (&sigchld, 0, sizeof (sigchld));
2133 sigchld.sa_handler = SIG_IGN;
2134 sigaction (SIGCHLD, &sigchld, NULL);
2136 return (0);
2137 }
2138 else if (status != 0)
2139 {
2140 fprintf (stderr, "daemonize failed, exiting.\n");
2141 return (1);
2142 }
2144 if (journal_cur != NULL)
2145 {
2146 int had_journal = 0;
2148 pthread_mutex_lock(&journal_lock);
2150 RRDD_LOG(LOG_INFO, "checking for journal files");
2152 had_journal += journal_replay(journal_old);
2153 had_journal += journal_replay(journal_cur);
2155 if (had_journal)
2156 flush_old_values(-1);
2158 pthread_mutex_unlock(&journal_lock);
2159 journal_rotate();
2161 RRDD_LOG(LOG_INFO, "journal processing complete");
2162 }
2164 /* start the queue thread */
2165 memset (&queue_thread, 0, sizeof (queue_thread));
2166 status = pthread_create (&queue_thread,
2167 NULL, /* attr */
2168 queue_thread_main,
2169 NULL); /* args */
2170 if (status != 0)
2171 {
2172 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2173 cleanup();
2174 return (1);
2175 }
2177 listen_thread_main (NULL);
2178 cleanup ();
2180 return (0);
2181 } /* int main */
2183 /*
2184 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2185 */