c438028134ed5c27b2f89edf8853c529d1c1ad89
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
170 static int config_write_interval = 300;
171 static int config_write_jitter = 0;
172 static int config_flush_interval = 3600;
173 static char *config_pid_file = NULL;
174 static char *config_base_dir = NULL;
176 static char **config_listen_address_list = NULL;
177 static int config_listen_address_list_len = 0;
179 static uint64_t stats_queue_length = 0;
180 static uint64_t stats_updates_received = 0;
181 static uint64_t stats_flush_received = 0;
182 static uint64_t stats_updates_written = 0;
183 static uint64_t stats_data_sets_written = 0;
184 static uint64_t stats_journal_bytes = 0;
185 static uint64_t stats_journal_rotate = 0;
186 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
188 /* Journaled updates */
189 static char *journal_cur = NULL;
190 static char *journal_old = NULL;
191 static FILE *journal_fh = NULL;
192 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
193 static int journal_write(char *cmd, char *args);
194 static void journal_done(void);
195 static void journal_rotate(void);
197 /*
198 * Functions
199 */
200 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
201 {
202 RRDD_LOG(LOG_NOTICE, "caught SIGINT");
203 do_shutdown++;
204 pthread_cond_broadcast(&cache_cond);
205 } /* }}} void sig_int_handler */
207 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
208 {
209 RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
210 do_shutdown++;
211 pthread_cond_broadcast(&cache_cond);
212 } /* }}} void sig_term_handler */
214 static int write_pidfile (void) /* {{{ */
215 {
216 pid_t pid;
217 char *file;
218 int fd;
219 FILE *fh;
221 pid = getpid ();
223 file = (config_pid_file != NULL)
224 ? config_pid_file
225 : LOCALSTATEDIR "/run/rrdcached.pid";
227 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
228 if (fd < 0)
229 {
230 RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
231 file, rrd_strerror(errno));
232 return (-1);
233 }
235 fh = fdopen (fd, "w");
236 if (fh == NULL)
237 {
238 RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
239 close(fd);
240 return (-1);
241 }
243 fprintf (fh, "%i\n", (int) pid);
244 fclose (fh);
246 return (0);
247 } /* }}} int write_pidfile */
249 static int remove_pidfile (void) /* {{{ */
250 {
251 char *file;
252 int status;
254 file = (config_pid_file != NULL)
255 ? config_pid_file
256 : LOCALSTATEDIR "/run/rrdcached.pid";
258 status = unlink (file);
259 if (status == 0)
260 return (0);
261 return (errno);
262 } /* }}} int remove_pidfile */
264 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
265 {
266 char *buffer;
267 size_t buffer_used;
268 size_t buffer_free;
269 ssize_t status;
271 buffer = (char *) buffer_void;
272 buffer_used = 0;
273 buffer_free = buffer_size;
275 while (buffer_free > 0)
276 {
277 status = read (fd, buffer + buffer_used, buffer_free);
278 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
279 continue;
281 if (status < 0)
282 return (-1);
284 if (status == 0)
285 return (0);
287 assert ((0 > status) || (buffer_free >= (size_t) status));
289 buffer_free = buffer_free - status;
290 buffer_used = buffer_used + status;
292 if (buffer[buffer_used - 1] == '\n')
293 break;
294 }
296 assert (buffer_used > 0);
298 if (buffer[buffer_used - 1] != '\n')
299 {
300 errno = ENOBUFS;
301 return (-1);
302 }
304 buffer[buffer_used - 1] = 0;
306 /* Fix network line endings. */
307 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
308 {
309 buffer_used--;
310 buffer[buffer_used - 1] = 0;
311 }
313 return (buffer_used);
314 } /* }}} ssize_t sread */
316 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
317 {
318 const char *ptr;
319 size_t nleft;
320 ssize_t status;
322 /* special case for journal replay */
323 if (fd < 0) return 0;
325 ptr = (const char *) buf;
326 nleft = count;
328 while (nleft > 0)
329 {
330 status = write (fd, (const void *) ptr, nleft);
332 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
333 continue;
335 if (status < 0)
336 return (status);
338 nleft -= status;
339 ptr += status;
340 }
342 return (0);
343 } /* }}} ssize_t swrite */
345 static void _wipe_ci_values(cache_item_t *ci, time_t when)
346 {
347 ci->values = NULL;
348 ci->values_num = 0;
350 ci->last_flush_time = when;
351 if (config_write_jitter > 0)
352 ci->last_flush_time += (random() % config_write_jitter);
354 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
355 }
357 /*
358 * enqueue_cache_item:
359 * `cache_lock' must be acquired before calling this function!
360 */
361 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
362 queue_side_t side)
363 {
364 int did_insert = 0;
366 if (ci == NULL)
367 return (-1);
369 if (ci->values_num == 0)
370 return (0);
372 if (side == HEAD)
373 {
374 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
375 {
376 assert (ci->next == NULL);
377 ci->next = cache_queue_head;
378 cache_queue_head = ci;
380 if (cache_queue_tail == NULL)
381 cache_queue_tail = cache_queue_head;
383 did_insert = 1;
384 }
385 else if (cache_queue_head == ci)
386 {
387 /* do nothing */
388 }
389 else /* enqueued, but not first entry */
390 {
391 cache_item_t *prev;
393 /* find previous entry */
394 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
395 if (prev->next == ci)
396 break;
397 assert (prev != NULL);
399 /* move to the front */
400 prev->next = ci->next;
401 ci->next = cache_queue_head;
402 cache_queue_head = ci;
404 /* check if we need to adapt the tail */
405 if (cache_queue_tail == ci)
406 cache_queue_tail = prev;
407 }
408 }
409 else /* (side == TAIL) */
410 {
411 /* We don't move values back in the list.. */
412 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
413 return (0);
415 assert (ci->next == NULL);
417 if (cache_queue_tail == NULL)
418 cache_queue_head = ci;
419 else
420 cache_queue_tail->next = ci;
421 cache_queue_tail = ci;
423 did_insert = 1;
424 }
426 ci->flags |= CI_FLAGS_IN_QUEUE;
428 if (did_insert)
429 {
430 pthread_mutex_lock (&stats_lock);
431 stats_queue_length++;
432 pthread_mutex_unlock (&stats_lock);
433 }
435 return (0);
436 } /* }}} int enqueue_cache_item */
438 /*
439 * tree_callback_flush:
440 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
441 * while this is in progress.
442 */
443 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
444 gpointer data)
445 {
446 cache_item_t *ci;
447 callback_flush_data_t *cfd;
449 ci = (cache_item_t *) value;
450 cfd = (callback_flush_data_t *) data;
452 if ((ci->last_flush_time <= cfd->abs_timeout)
453 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
454 && (ci->values_num > 0))
455 {
456 enqueue_cache_item (ci, TAIL);
457 }
458 else if ((do_shutdown != 0)
459 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
460 && (ci->values_num > 0))
461 {
462 enqueue_cache_item (ci, TAIL);
463 }
464 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
465 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
466 && (ci->values_num <= 0))
467 {
468 char **temp;
470 temp = (char **) realloc (cfd->keys,
471 sizeof (char *) * (cfd->keys_num + 1));
472 if (temp == NULL)
473 {
474 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
475 return (FALSE);
476 }
477 cfd->keys = temp;
478 /* Make really sure this points to the _same_ place */
479 assert ((char *) key == ci->file);
480 cfd->keys[cfd->keys_num] = (char *) key;
481 cfd->keys_num++;
482 }
484 return (FALSE);
485 } /* }}} gboolean tree_callback_flush */
487 static int flush_old_values (int max_age)
488 {
489 callback_flush_data_t cfd;
490 size_t k;
492 memset (&cfd, 0, sizeof (cfd));
493 /* Pass the current time as user data so that we don't need to call
494 * `time' for each node. */
495 cfd.now = time (NULL);
496 cfd.keys = NULL;
497 cfd.keys_num = 0;
499 if (max_age > 0)
500 cfd.abs_timeout = cfd.now - max_age;
501 else
502 cfd.abs_timeout = cfd.now + 1;
504 /* `tree_callback_flush' will return the keys of all values that haven't
505 * been touched in the last `config_flush_interval' seconds in `cfd'.
506 * The char*'s in this array point to the same memory as ci->file, so we
507 * don't need to free them separately. */
508 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
510 for (k = 0; k < cfd.keys_num; k++)
511 {
512 cache_item_t *ci;
514 /* This must not fail. */
515 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
516 assert (ci != NULL);
518 /* If we end up here with values available, something's seriously
519 * messed up. */
520 assert (ci->values_num == 0);
522 /* Remove the node from the tree */
523 g_tree_remove (cache_tree, cfd.keys[k]);
524 cfd.keys[k] = NULL;
526 /* Now free and clean up `ci'. */
527 free (ci->file);
528 ci->file = NULL;
529 free (ci);
530 ci = NULL;
531 } /* for (k = 0; k < cfd.keys_num; k++) */
533 if (cfd.keys != NULL)
534 {
535 free (cfd.keys);
536 cfd.keys = NULL;
537 }
539 return (0);
540 } /* int flush_old_values */
542 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
543 {
544 struct timeval now;
545 struct timespec next_flush;
547 gettimeofday (&now, NULL);
548 next_flush.tv_sec = now.tv_sec + config_flush_interval;
549 next_flush.tv_nsec = 1000 * now.tv_usec;
551 pthread_mutex_lock (&cache_lock);
552 while ((do_shutdown == 0) || (cache_queue_head != NULL))
553 {
554 cache_item_t *ci;
555 char *file;
556 char **values;
557 int values_num;
558 int status;
559 int i;
561 /* First, check if it's time to do the cache flush. */
562 gettimeofday (&now, NULL);
563 if ((now.tv_sec > next_flush.tv_sec)
564 || ((now.tv_sec == next_flush.tv_sec)
565 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
566 {
567 /* Flush all values that haven't been written in the last
568 * `config_write_interval' seconds. */
569 flush_old_values (config_write_interval);
571 /* Determine the time of the next cache flush. */
572 while (next_flush.tv_sec <= now.tv_sec)
573 next_flush.tv_sec += config_flush_interval;
575 /* unlock the cache while we rotate so we don't block incoming
576 * updates if the fsync() blocks on disk I/O */
577 pthread_mutex_unlock(&cache_lock);
578 journal_rotate();
579 pthread_mutex_lock(&cache_lock);
580 }
582 /* Now, check if there's something to store away. If not, wait until
583 * something comes in or it's time to do the cache flush. */
584 if (cache_queue_head == NULL)
585 {
586 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
587 if ((status != 0) && (status != ETIMEDOUT))
588 {
589 RRDD_LOG (LOG_ERR, "queue_thread_main: "
590 "pthread_cond_timedwait returned %i.", status);
591 }
592 }
594 /* We're about to shut down, so lets flush the entire tree. */
595 if ((do_shutdown != 0) && (cache_queue_head == NULL))
596 flush_old_values (/* max age = */ -1);
598 /* Check if a value has arrived. This may be NULL if we timed out or there
599 * was an interrupt such as a signal. */
600 if (cache_queue_head == NULL)
601 continue;
603 ci = cache_queue_head;
605 /* copy the relevant parts */
606 file = strdup (ci->file);
607 if (file == NULL)
608 {
609 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
610 continue;
611 }
613 assert(ci->values != NULL);
614 assert(ci->values_num > 0);
616 values = ci->values;
617 values_num = ci->values_num;
619 _wipe_ci_values(ci, time(NULL));
621 cache_queue_head = ci->next;
622 if (cache_queue_head == NULL)
623 cache_queue_tail = NULL;
624 ci->next = NULL;
626 pthread_mutex_lock (&stats_lock);
627 assert (stats_queue_length > 0);
628 stats_queue_length--;
629 pthread_mutex_unlock (&stats_lock);
631 pthread_mutex_unlock (&cache_lock);
633 rrd_clear_error ();
634 status = rrd_update_r (file, NULL, values_num, (void *) values);
635 if (status != 0)
636 {
637 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
638 "rrd_update_r (%s) failed with status %i. (%s)",
639 file, status, rrd_get_error());
640 }
642 journal_write("wrote", file);
644 for (i = 0; i < values_num; i++)
645 free (values[i]);
647 free(values);
648 free(file);
650 if (status == 0)
651 {
652 pthread_mutex_lock (&stats_lock);
653 stats_updates_written++;
654 stats_data_sets_written += values_num;
655 pthread_mutex_unlock (&stats_lock);
656 }
658 pthread_mutex_lock (&cache_lock);
659 pthread_cond_broadcast (&flush_cond);
661 /* We're about to shut down, so lets flush the entire tree. */
662 if ((do_shutdown != 0) && (cache_queue_head == NULL))
663 flush_old_values (/* max age = */ -1);
664 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
665 pthread_mutex_unlock (&cache_lock);
667 assert(cache_queue_head == NULL);
668 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
669 journal_done();
671 return (NULL);
672 } /* }}} void *queue_thread_main */
674 static int buffer_get_field (char **buffer_ret, /* {{{ */
675 size_t *buffer_size_ret, char **field_ret)
676 {
677 char *buffer;
678 size_t buffer_pos;
679 size_t buffer_size;
680 char *field;
681 size_t field_size;
682 int status;
684 buffer = *buffer_ret;
685 buffer_pos = 0;
686 buffer_size = *buffer_size_ret;
687 field = *buffer_ret;
688 field_size = 0;
690 if (buffer_size <= 0)
691 return (-1);
693 /* This is ensured by `handle_request'. */
694 assert (buffer[buffer_size - 1] == '\0');
696 status = -1;
697 while (buffer_pos < buffer_size)
698 {
699 /* Check for end-of-field or end-of-buffer */
700 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
701 {
702 field[field_size] = 0;
703 field_size++;
704 buffer_pos++;
705 status = 0;
706 break;
707 }
708 /* Handle escaped characters. */
709 else if (buffer[buffer_pos] == '\\')
710 {
711 if (buffer_pos >= (buffer_size - 1))
712 break;
713 buffer_pos++;
714 field[field_size] = buffer[buffer_pos];
715 field_size++;
716 buffer_pos++;
717 }
718 /* Normal operation */
719 else
720 {
721 field[field_size] = buffer[buffer_pos];
722 field_size++;
723 buffer_pos++;
724 }
725 } /* while (buffer_pos < buffer_size) */
727 if (status != 0)
728 return (status);
730 *buffer_ret = buffer + buffer_pos;
731 *buffer_size_ret = buffer_size - buffer_pos;
732 *field_ret = field;
734 return (0);
735 } /* }}} int buffer_get_field */
737 static int flush_file (const char *filename) /* {{{ */
738 {
739 cache_item_t *ci;
741 pthread_mutex_lock (&cache_lock);
743 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
744 if (ci == NULL)
745 {
746 pthread_mutex_unlock (&cache_lock);
747 return (ENOENT);
748 }
750 /* Enqueue at head */
751 enqueue_cache_item (ci, HEAD);
752 pthread_cond_signal (&cache_cond);
754 while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
755 {
756 ci = NULL;
758 pthread_cond_wait (&flush_cond, &cache_lock);
760 ci = g_tree_lookup (cache_tree, filename);
761 if (ci == NULL)
762 {
763 RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
764 "while waiting for flush.");
765 pthread_mutex_unlock (&cache_lock);
766 return (-1);
767 }
768 }
770 pthread_mutex_unlock (&cache_lock);
771 return (0);
772 } /* }}} int flush_file */
774 static int handle_request_help (int fd, /* {{{ */
775 char *buffer, size_t buffer_size)
776 {
777 int status;
778 char **help_text;
779 size_t help_text_len;
780 char *command;
781 size_t i;
783 char *help_help[] =
784 {
785 "4 Command overview\n",
786 "FLUSH <filename>\n",
787 "HELP [<command>]\n",
788 "UPDATE <filename> <values> [<values> ...]\n",
789 "STATS\n"
790 };
791 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
793 char *help_flush[] =
794 {
795 "4 Help for FLUSH\n",
796 "Usage: FLUSH <filename>\n",
797 "\n",
798 "Adds the given filename to the head of the update queue and returns\n",
799 "after is has been dequeued.\n"
800 };
801 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
803 char *help_update[] =
804 {
805 "9 Help for UPDATE\n",
806 "Usage: UPDATE <filename> <values> [<values> ...]\n"
807 "\n",
808 "Adds the given file to the internal cache if it is not yet known and\n",
809 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
810 "for details.\n",
811 "\n",
812 "Each <values> has the following form:\n",
813 " <values> = <time>:<value>[:<value>[...]]\n",
814 "See the rrdupdate(1) manpage for details.\n"
815 };
816 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
818 char *help_stats[] =
819 {
820 "4 Help for STATS\n",
821 "Usage: STATS\n",
822 "\n",
823 "Returns some performance counters, see the rrdcached(1) manpage for\n",
824 "a description of the values.\n"
825 };
826 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
828 status = buffer_get_field (&buffer, &buffer_size, &command);
829 if (status != 0)
830 {
831 help_text = help_help;
832 help_text_len = help_help_len;
833 }
834 else
835 {
836 if (strcasecmp (command, "update") == 0)
837 {
838 help_text = help_update;
839 help_text_len = help_update_len;
840 }
841 else if (strcasecmp (command, "flush") == 0)
842 {
843 help_text = help_flush;
844 help_text_len = help_flush_len;
845 }
846 else if (strcasecmp (command, "stats") == 0)
847 {
848 help_text = help_stats;
849 help_text_len = help_stats_len;
850 }
851 else
852 {
853 help_text = help_help;
854 help_text_len = help_help_len;
855 }
856 }
858 for (i = 0; i < help_text_len; i++)
859 {
860 status = swrite (fd, help_text[i], strlen (help_text[i]));
861 if (status < 0)
862 {
863 status = errno;
864 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
865 return (status);
866 }
867 }
869 return (0);
870 } /* }}} int handle_request_help */
872 static int handle_request_stats (int fd, /* {{{ */
873 char *buffer __attribute__((unused)),
874 size_t buffer_size __attribute__((unused)))
875 {
876 int status;
877 char outbuf[CMD_MAX];
879 uint64_t copy_queue_length;
880 uint64_t copy_updates_received;
881 uint64_t copy_flush_received;
882 uint64_t copy_updates_written;
883 uint64_t copy_data_sets_written;
884 uint64_t copy_journal_bytes;
885 uint64_t copy_journal_rotate;
887 uint64_t tree_nodes_number;
888 uint64_t tree_depth;
890 pthread_mutex_lock (&stats_lock);
891 copy_queue_length = stats_queue_length;
892 copy_updates_received = stats_updates_received;
893 copy_flush_received = stats_flush_received;
894 copy_updates_written = stats_updates_written;
895 copy_data_sets_written = stats_data_sets_written;
896 copy_journal_bytes = stats_journal_bytes;
897 copy_journal_rotate = stats_journal_rotate;
898 pthread_mutex_unlock (&stats_lock);
900 pthread_mutex_lock (&cache_lock);
901 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
902 tree_depth = (uint64_t) g_tree_height (cache_tree);
903 pthread_mutex_unlock (&cache_lock);
905 #define RRDD_STATS_SEND \
906 outbuf[sizeof (outbuf) - 1] = 0; \
907 status = swrite (fd, outbuf, strlen (outbuf)); \
908 if (status < 0) \
909 { \
910 status = errno; \
911 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
912 return (status); \
913 }
915 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
916 RRDD_STATS_SEND;
918 snprintf (outbuf, sizeof (outbuf),
919 "QueueLength: %"PRIu64"\n", copy_queue_length);
920 RRDD_STATS_SEND;
922 snprintf (outbuf, sizeof (outbuf),
923 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
924 RRDD_STATS_SEND;
926 snprintf (outbuf, sizeof (outbuf),
927 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
928 RRDD_STATS_SEND;
930 snprintf (outbuf, sizeof (outbuf),
931 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
932 RRDD_STATS_SEND;
934 snprintf (outbuf, sizeof (outbuf),
935 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
936 RRDD_STATS_SEND;
938 snprintf (outbuf, sizeof (outbuf),
939 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
940 RRDD_STATS_SEND;
942 snprintf (outbuf, sizeof (outbuf),
943 "TreeDepth: %"PRIu64"\n", tree_depth);
944 RRDD_STATS_SEND;
946 snprintf (outbuf, sizeof(outbuf),
947 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
948 RRDD_STATS_SEND;
950 snprintf (outbuf, sizeof(outbuf),
951 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
952 RRDD_STATS_SEND;
954 return (0);
955 #undef RRDD_STATS_SEND
956 } /* }}} int handle_request_stats */
958 static int handle_request_flush (int fd, /* {{{ */
959 char *buffer, size_t buffer_size)
960 {
961 char *file;
962 int status;
963 char result[CMD_MAX];
965 status = buffer_get_field (&buffer, &buffer_size, &file);
966 if (status != 0)
967 {
968 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
969 }
970 else
971 {
972 pthread_mutex_lock(&stats_lock);
973 stats_flush_received++;
974 pthread_mutex_unlock(&stats_lock);
976 status = flush_file (file);
977 if (status == 0)
978 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
979 else if (status == ENOENT)
980 {
981 /* no file in our tree; see whether it exists at all */
982 struct stat statbuf;
984 memset(&statbuf, 0, sizeof(statbuf));
985 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
986 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
987 else
988 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
989 }
990 else if (status < 0)
991 strncpy (result, "-1 Internal error.\n", sizeof (result));
992 else
993 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
994 }
995 result[sizeof (result) - 1] = 0;
997 status = swrite (fd, result, strlen (result));
998 if (status < 0)
999 {
1000 status = errno;
1001 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1002 return (status);
1003 }
1005 return (0);
1006 } /* }}} int handle_request_flush */
1008 static int handle_request_update (int fd, /* {{{ */
1009 char *buffer, size_t buffer_size)
1010 {
1011 char *file;
1012 int values_num = 0;
1013 int status;
1015 time_t now;
1017 cache_item_t *ci;
1018 char answer[CMD_MAX];
1020 #define RRDD_UPDATE_SEND \
1021 answer[sizeof (answer) - 1] = 0; \
1022 status = swrite (fd, answer, strlen (answer)); \
1023 if (status < 0) \
1024 { \
1025 status = errno; \
1026 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1027 return (status); \
1028 }
1030 now = time (NULL);
1032 status = buffer_get_field (&buffer, &buffer_size, &file);
1033 if (status != 0)
1034 {
1035 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1036 sizeof (answer));
1037 RRDD_UPDATE_SEND;
1038 return (0);
1039 }
1041 pthread_mutex_lock(&stats_lock);
1042 stats_updates_received++;
1043 pthread_mutex_unlock(&stats_lock);
1045 pthread_mutex_lock (&cache_lock);
1047 ci = g_tree_lookup (cache_tree, file);
1048 if (ci == NULL) /* {{{ */
1049 {
1050 struct stat statbuf;
1052 memset (&statbuf, 0, sizeof (statbuf));
1053 status = stat (file, &statbuf);
1054 if (status != 0)
1055 {
1056 pthread_mutex_unlock (&cache_lock);
1057 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1059 status = errno;
1060 if (status == ENOENT)
1061 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1062 else
1063 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1064 status);
1065 RRDD_UPDATE_SEND;
1066 return (0);
1067 }
1068 if (!S_ISREG (statbuf.st_mode))
1069 {
1070 pthread_mutex_unlock (&cache_lock);
1072 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1073 RRDD_UPDATE_SEND;
1074 return (0);
1075 }
1076 if (access(file, R_OK|W_OK) != 0)
1077 {
1078 pthread_mutex_unlock (&cache_lock);
1080 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1081 file, rrd_strerror(errno));
1082 RRDD_UPDATE_SEND;
1083 return (0);
1084 }
1086 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1087 if (ci == NULL)
1088 {
1089 pthread_mutex_unlock (&cache_lock);
1090 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1092 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1093 RRDD_UPDATE_SEND;
1094 return (0);
1095 }
1096 memset (ci, 0, sizeof (cache_item_t));
1098 ci->file = strdup (file);
1099 if (ci->file == NULL)
1100 {
1101 pthread_mutex_unlock (&cache_lock);
1102 free (ci);
1103 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1105 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1106 RRDD_UPDATE_SEND;
1107 return (0);
1108 }
1110 _wipe_ci_values(ci, now);
1111 ci->flags = CI_FLAGS_IN_TREE;
1113 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1114 } /* }}} */
1115 assert (ci != NULL);
1117 while (buffer_size > 0)
1118 {
1119 char **temp;
1120 char *value;
1122 status = buffer_get_field (&buffer, &buffer_size, &value);
1123 if (status != 0)
1124 {
1125 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1126 break;
1127 }
1129 temp = (char **) realloc (ci->values,
1130 sizeof (char *) * (ci->values_num + 1));
1131 if (temp == NULL)
1132 {
1133 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1134 continue;
1135 }
1136 ci->values = temp;
1138 ci->values[ci->values_num] = strdup (value);
1139 if (ci->values[ci->values_num] == NULL)
1140 {
1141 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1142 continue;
1143 }
1144 ci->values_num++;
1146 values_num++;
1147 }
1149 if (((now - ci->last_flush_time) >= config_write_interval)
1150 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1151 && (ci->values_num > 0))
1152 {
1153 enqueue_cache_item (ci, TAIL);
1154 pthread_cond_signal (&cache_cond);
1155 }
1157 pthread_mutex_unlock (&cache_lock);
1159 if (values_num < 1)
1160 {
1161 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1162 }
1163 else
1164 {
1165 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1166 (values_num == 1) ? "" : "s");
1167 }
1168 RRDD_UPDATE_SEND;
1169 return (0);
1170 #undef RRDD_UPDATE_SEND
1171 } /* }}} int handle_request_update */
1173 /* we came across a "WROTE" entry during journal replay.
1174 * throw away any values that we have accumulated for this file
1175 */
1176 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1177 const char *buffer,
1178 size_t buffer_size __attribute__((unused)))
1179 {
1180 int i;
1181 cache_item_t *ci;
1182 const char *file = buffer;
1184 pthread_mutex_lock(&cache_lock);
1186 ci = g_tree_lookup(cache_tree, file);
1187 if (ci == NULL)
1188 {
1189 pthread_mutex_unlock(&cache_lock);
1190 return (0);
1191 }
1193 if (ci->values)
1194 {
1195 for (i=0; i < ci->values_num; i++)
1196 free(ci->values[i]);
1198 free(ci->values);
1199 }
1201 _wipe_ci_values(ci, time(NULL));
1203 pthread_mutex_unlock(&cache_lock);
1204 return (0);
1205 } /* }}} int handle_request_wrote */
1207 /* if fd < 0, we are in journal replay mode */
1208 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1209 {
1210 char *buffer_ptr;
1211 char *command;
1212 int status;
1214 assert (buffer[buffer_size - 1] == '\0');
1216 buffer_ptr = buffer;
1217 command = NULL;
1218 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1219 if (status != 0)
1220 {
1221 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1222 return (-1);
1223 }
1225 if (strcasecmp (command, "update") == 0)
1226 {
1227 /* don't re-write updates in replay mode */
1228 if (fd >= 0)
1229 journal_write(command, buffer_ptr);
1231 return (handle_request_update (fd, buffer_ptr, buffer_size));
1232 }
1233 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1234 {
1235 /* this is only valid in replay mode */
1236 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1237 }
1238 else if (strcasecmp (command, "flush") == 0)
1239 {
1240 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1241 }
1242 else if (strcasecmp (command, "stats") == 0)
1243 {
1244 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1245 }
1246 else if (strcasecmp (command, "help") == 0)
1247 {
1248 return (handle_request_help (fd, buffer_ptr, buffer_size));
1249 }
1250 else
1251 {
1252 char result[CMD_MAX];
1254 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1255 result[sizeof (result) - 1] = 0;
1257 status = swrite (fd, result, strlen (result));
1258 if (status < 0)
1259 {
1260 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1261 return (-1);
1262 }
1263 }
1265 return (0);
1266 } /* }}} int handle_request */
1268 /* MUST NOT hold journal_lock before calling this */
1269 static void journal_rotate(void) /* {{{ */
1270 {
1271 FILE *old_fh = NULL;
1273 if (journal_cur == NULL || journal_old == NULL)
1274 return;
1276 pthread_mutex_lock(&journal_lock);
1278 /* we rotate this way (rename before close) so that the we can release
1279 * the journal lock as fast as possible. Journal writes to the new
1280 * journal can proceed immediately after the new file is opened. The
1281 * fclose can then block without affecting new updates.
1282 */
1283 if (journal_fh != NULL)
1284 {
1285 old_fh = journal_fh;
1286 rename(journal_cur, journal_old);
1287 ++stats_journal_rotate;
1288 }
1290 journal_fh = fopen(journal_cur, "a");
1291 pthread_mutex_unlock(&journal_lock);
1293 if (old_fh != NULL)
1294 fclose(old_fh);
1296 if (journal_fh == NULL)
1297 RRDD_LOG(LOG_CRIT,
1298 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1299 journal_cur, rrd_strerror(errno));
1301 } /* }}} static void journal_rotate */
1303 static void journal_done(void) /* {{{ */
1304 {
1305 if (journal_cur == NULL)
1306 return;
1308 pthread_mutex_lock(&journal_lock);
1309 if (journal_fh != NULL)
1310 {
1311 fclose(journal_fh);
1312 journal_fh = NULL;
1313 }
1315 RRDD_LOG(LOG_INFO, "removing journals");
1317 unlink(journal_old);
1318 unlink(journal_cur);
1319 pthread_mutex_unlock(&journal_lock);
1321 } /* }}} static void journal_done */
1323 static int journal_write(char *cmd, char *args) /* {{{ */
1324 {
1325 int chars;
1327 if (journal_fh == NULL)
1328 return 0;
1330 pthread_mutex_lock(&journal_lock);
1331 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1332 pthread_mutex_unlock(&journal_lock);
1334 if (chars > 0)
1335 {
1336 pthread_mutex_lock(&stats_lock);
1337 stats_journal_bytes += chars;
1338 pthread_mutex_unlock(&stats_lock);
1339 }
1341 return chars;
1342 } /* }}} static int journal_write */
1344 static int journal_replay (const char *file) /* {{{ */
1345 {
1346 FILE *fh;
1347 int entry_cnt = 0;
1348 int fail_cnt = 0;
1349 uint64_t line = 0;
1350 char entry[CMD_MAX];
1352 if (file == NULL) return 0;
1354 fh = fopen(file, "r");
1355 if (fh == NULL)
1356 {
1357 if (errno != ENOENT)
1358 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1359 file, rrd_strerror(errno));
1360 return 0;
1361 }
1362 else
1363 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1365 while(!feof(fh))
1366 {
1367 size_t entry_len;
1369 ++line;
1370 fgets(entry, sizeof(entry), fh);
1371 entry_len = strlen(entry);
1373 /* check \n termination in case journal writing crashed mid-line */
1374 if (entry_len == 0)
1375 continue;
1376 else if (entry[entry_len - 1] != '\n')
1377 {
1378 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1379 ++fail_cnt;
1380 continue;
1381 }
1383 entry[entry_len - 1] = '\0';
1385 if (handle_request(-1, entry, entry_len) == 0)
1386 ++entry_cnt;
1387 else
1388 ++fail_cnt;
1389 }
1391 fclose(fh);
1393 if (entry_cnt > 0)
1394 {
1395 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1396 entry_cnt, fail_cnt);
1397 return 1;
1398 }
1399 else
1400 return 0;
1402 } /* }}} static int journal_replay */
1404 static void *connection_thread_main (void *args) /* {{{ */
1405 {
1406 pthread_t self;
1407 int i;
1408 int fd;
1410 fd = *((int *) args);
1411 free (args);
1413 pthread_mutex_lock (&connection_threads_lock);
1414 {
1415 pthread_t *temp;
1417 temp = (pthread_t *) realloc (connection_threads,
1418 sizeof (pthread_t) * (connection_threads_num + 1));
1419 if (temp == NULL)
1420 {
1421 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1422 }
1423 else
1424 {
1425 connection_threads = temp;
1426 connection_threads[connection_threads_num] = pthread_self ();
1427 connection_threads_num++;
1428 }
1429 }
1430 pthread_mutex_unlock (&connection_threads_lock);
1432 while (do_shutdown == 0)
1433 {
1434 char buffer[CMD_MAX];
1436 struct pollfd pollfd;
1437 int status;
1439 pollfd.fd = fd;
1440 pollfd.events = POLLIN | POLLPRI;
1441 pollfd.revents = 0;
1443 status = poll (&pollfd, 1, /* timeout = */ 500);
1444 if (status == 0) /* timeout */
1445 continue;
1446 else if (status < 0) /* error */
1447 {
1448 status = errno;
1449 if (status == EINTR)
1450 continue;
1451 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1452 continue;
1453 }
1455 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1456 {
1457 close (fd);
1458 break;
1459 }
1460 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1461 {
1462 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1463 "poll(2) returned something unexpected: %#04hx",
1464 pollfd.revents);
1465 close (fd);
1466 break;
1467 }
1469 status = (int) sread (fd, buffer, sizeof (buffer));
1470 if (status <= 0)
1471 {
1472 close (fd);
1474 if (status < 0)
1475 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1477 break;
1478 }
1480 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1481 if (status != 0)
1482 break;
1483 }
1485 close(fd);
1487 self = pthread_self ();
1488 /* Remove this thread from the connection threads list */
1489 pthread_mutex_lock (&connection_threads_lock);
1490 /* Find out own index in the array */
1491 for (i = 0; i < connection_threads_num; i++)
1492 if (pthread_equal (connection_threads[i], self) != 0)
1493 break;
1494 assert (i < connection_threads_num);
1496 /* Move the trailing threads forward. */
1497 if (i < (connection_threads_num - 1))
1498 {
1499 memmove (connection_threads + i,
1500 connection_threads + i + 1,
1501 sizeof (pthread_t) * (connection_threads_num - i - 1));
1502 }
1504 connection_threads_num--;
1505 pthread_mutex_unlock (&connection_threads_lock);
1507 return (NULL);
1508 } /* }}} void *connection_thread_main */
1510 static int open_listen_socket_unix (const char *path) /* {{{ */
1511 {
1512 int fd;
1513 struct sockaddr_un sa;
1514 listen_socket_t *temp;
1515 int status;
1517 temp = (listen_socket_t *) realloc (listen_fds,
1518 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1519 if (temp == NULL)
1520 {
1521 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1522 return (-1);
1523 }
1524 listen_fds = temp;
1525 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1527 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1528 if (fd < 0)
1529 {
1530 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1531 return (-1);
1532 }
1534 memset (&sa, 0, sizeof (sa));
1535 sa.sun_family = AF_UNIX;
1536 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1538 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1539 if (status != 0)
1540 {
1541 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1542 close (fd);
1543 unlink (path);
1544 return (-1);
1545 }
1547 status = listen (fd, /* backlog = */ 10);
1548 if (status != 0)
1549 {
1550 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1551 close (fd);
1552 unlink (path);
1553 return (-1);
1554 }
1556 listen_fds[listen_fds_num].fd = fd;
1557 snprintf (listen_fds[listen_fds_num].path,
1558 sizeof (listen_fds[listen_fds_num].path) - 1,
1559 "unix:%s", path);
1560 listen_fds_num++;
1562 return (0);
1563 } /* }}} int open_listen_socket_unix */
1565 static int open_listen_socket (const char *addr_orig) /* {{{ */
1566 {
1567 struct addrinfo ai_hints;
1568 struct addrinfo *ai_res;
1569 struct addrinfo *ai_ptr;
1570 char addr_copy[NI_MAXHOST];
1571 char *addr;
1572 char *port;
1573 int status;
1575 assert (addr_orig != NULL);
1577 strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1578 addr_copy[sizeof (addr_copy) - 1] = 0;
1579 addr = addr_copy;
1581 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1582 return (open_listen_socket_unix (addr + strlen ("unix:")));
1583 else if (addr[0] == '/')
1584 return (open_listen_socket_unix (addr));
1586 memset (&ai_hints, 0, sizeof (ai_hints));
1587 ai_hints.ai_flags = 0;
1588 #ifdef AI_ADDRCONFIG
1589 ai_hints.ai_flags |= AI_ADDRCONFIG;
1590 #endif
1591 ai_hints.ai_family = AF_UNSPEC;
1592 ai_hints.ai_socktype = SOCK_STREAM;
1594 port = NULL;
1595 if (*addr == '[') /* IPv6+port format */
1596 {
1597 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1598 addr++;
1600 port = strchr (addr, ']');
1601 if (port == NULL)
1602 {
1603 RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1604 addr_orig);
1605 return (-1);
1606 }
1607 *port = 0;
1608 port++;
1610 if (*port == ':')
1611 port++;
1612 else if (*port == 0)
1613 port = NULL;
1614 else
1615 {
1616 RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1617 port);
1618 return (-1);
1619 }
1620 } /* if (*addr = ']') */
1621 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1622 {
1623 port = rindex(addr, ':');
1624 if (port != NULL)
1625 {
1626 *port = 0;
1627 port++;
1628 }
1629 }
1630 ai_res = NULL;
1631 status = getaddrinfo (addr,
1632 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1633 &ai_hints, &ai_res);
1634 if (status != 0)
1635 {
1636 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1637 "%s", addr, gai_strerror (status));
1638 return (-1);
1639 }
1641 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1642 {
1643 int fd;
1644 listen_socket_t *temp;
1645 int one = 1;
1647 temp = (listen_socket_t *) realloc (listen_fds,
1648 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1649 if (temp == NULL)
1650 {
1651 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1652 continue;
1653 }
1654 listen_fds = temp;
1655 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1657 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1658 if (fd < 0)
1659 {
1660 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1661 continue;
1662 }
1664 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1666 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1667 if (status != 0)
1668 {
1669 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1670 close (fd);
1671 continue;
1672 }
1674 status = listen (fd, /* backlog = */ 10);
1675 if (status != 0)
1676 {
1677 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1678 close (fd);
1679 return (-1);
1680 }
1682 listen_fds[listen_fds_num].fd = fd;
1683 strncpy (listen_fds[listen_fds_num].path, addr,
1684 sizeof (listen_fds[listen_fds_num].path) - 1);
1685 listen_fds_num++;
1686 } /* for (ai_ptr) */
1688 return (0);
1689 } /* }}} int open_listen_socket */
1691 static int close_listen_sockets (void) /* {{{ */
1692 {
1693 size_t i;
1695 for (i = 0; i < listen_fds_num; i++)
1696 {
1697 close (listen_fds[i].fd);
1698 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1699 unlink (listen_fds[i].path + strlen ("unix:"));
1700 }
1702 free (listen_fds);
1703 listen_fds = NULL;
1704 listen_fds_num = 0;
1706 return (0);
1707 } /* }}} int close_listen_sockets */
1709 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1710 {
1711 struct pollfd *pollfds;
1712 int pollfds_num;
1713 int status;
1714 int i;
1716 for (i = 0; i < config_listen_address_list_len; i++)
1717 open_listen_socket (config_listen_address_list[i]);
1719 if (config_listen_address_list_len < 1)
1720 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1722 if (listen_fds_num < 1)
1723 {
1724 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1725 "could be opened. Sorry.");
1726 return (NULL);
1727 }
1729 pollfds_num = listen_fds_num;
1730 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1731 if (pollfds == NULL)
1732 {
1733 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1734 return (NULL);
1735 }
1736 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1738 RRDD_LOG(LOG_INFO, "listening for connections");
1740 while (do_shutdown == 0)
1741 {
1742 assert (pollfds_num == ((int) listen_fds_num));
1743 for (i = 0; i < pollfds_num; i++)
1744 {
1745 pollfds[i].fd = listen_fds[i].fd;
1746 pollfds[i].events = POLLIN | POLLPRI;
1747 pollfds[i].revents = 0;
1748 }
1750 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1751 if (status == 0)
1752 {
1753 continue; /* timeout */
1754 }
1755 else if (status < 0)
1756 {
1757 status = errno;
1758 if (status != EINTR)
1759 {
1760 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1761 }
1762 continue;
1763 }
1765 for (i = 0; i < pollfds_num; i++)
1766 {
1767 int *client_sd;
1768 struct sockaddr_storage client_sa;
1769 socklen_t client_sa_size;
1770 pthread_t tid;
1771 pthread_attr_t attr;
1773 if (pollfds[i].revents == 0)
1774 continue;
1776 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1777 {
1778 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1779 "poll(2) returned something unexpected for listen FD #%i.",
1780 pollfds[i].fd);
1781 continue;
1782 }
1784 client_sd = (int *) malloc (sizeof (int));
1785 if (client_sd == NULL)
1786 {
1787 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1788 continue;
1789 }
1791 client_sa_size = sizeof (client_sa);
1792 *client_sd = accept (pollfds[i].fd,
1793 (struct sockaddr *) &client_sa, &client_sa_size);
1794 if (*client_sd < 0)
1795 {
1796 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1797 continue;
1798 }
1800 pthread_attr_init (&attr);
1801 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1803 status = pthread_create (&tid, &attr, connection_thread_main,
1804 /* args = */ (void *) client_sd);
1805 if (status != 0)
1806 {
1807 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1808 close (*client_sd);
1809 free (client_sd);
1810 continue;
1811 }
1812 } /* for (pollfds_num) */
1813 } /* while (do_shutdown == 0) */
1815 RRDD_LOG(LOG_INFO, "starting shutdown");
1817 close_listen_sockets ();
1819 pthread_mutex_lock (&connection_threads_lock);
1820 while (connection_threads_num > 0)
1821 {
1822 pthread_t wait_for;
1824 wait_for = connection_threads[0];
1826 pthread_mutex_unlock (&connection_threads_lock);
1827 pthread_join (wait_for, /* retval = */ NULL);
1828 pthread_mutex_lock (&connection_threads_lock);
1829 }
1830 pthread_mutex_unlock (&connection_threads_lock);
1832 return (NULL);
1833 } /* }}} void *listen_thread_main */
1835 static int daemonize (void) /* {{{ */
1836 {
1837 int status;
1839 /* These structures are static, because `sigaction' behaves weird if the are
1840 * overwritten.. */
1841 static struct sigaction sa_int;
1842 static struct sigaction sa_term;
1843 static struct sigaction sa_pipe;
1845 if (!stay_foreground)
1846 {
1847 pid_t child;
1848 char *base_dir;
1850 child = fork ();
1851 if (child < 0)
1852 {
1853 fprintf (stderr, "daemonize: fork(2) failed.\n");
1854 return (-1);
1855 }
1856 else if (child > 0)
1857 {
1858 return (1);
1859 }
1861 /* Change into the /tmp directory. */
1862 base_dir = (config_base_dir != NULL)
1863 ? config_base_dir
1864 : "/tmp";
1865 status = chdir (base_dir);
1866 if (status != 0)
1867 {
1868 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1869 return (-1);
1870 }
1872 /* Become session leader */
1873 setsid ();
1875 /* Open the first three file descriptors to /dev/null */
1876 close (2);
1877 close (1);
1878 close (0);
1880 open ("/dev/null", O_RDWR);
1881 dup (0);
1882 dup (0);
1883 } /* if (!stay_foreground) */
1885 /* Install signal handlers */
1886 memset (&sa_int, 0, sizeof (sa_int));
1887 sa_int.sa_handler = sig_int_handler;
1888 sigaction (SIGINT, &sa_int, NULL);
1890 memset (&sa_term, 0, sizeof (sa_term));
1891 sa_term.sa_handler = sig_term_handler;
1892 sigaction (SIGTERM, &sa_term, NULL);
1894 memset (&sa_pipe, 0, sizeof (sa_pipe));
1895 sa_pipe.sa_handler = SIG_IGN;
1896 sigaction (SIGPIPE, &sa_pipe, NULL);
1898 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1899 RRDD_LOG(LOG_INFO, "starting up");
1901 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1902 if (cache_tree == NULL)
1903 {
1904 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1905 return (-1);
1906 }
1908 status = write_pidfile ();
1909 return status;
1910 } /* }}} int daemonize */
1912 static int cleanup (void) /* {{{ */
1913 {
1914 do_shutdown++;
1916 pthread_cond_signal (&cache_cond);
1917 pthread_join (queue_thread, /* return = */ NULL);
1919 remove_pidfile ();
1921 RRDD_LOG(LOG_INFO, "goodbye");
1922 closelog ();
1924 return (0);
1925 } /* }}} int cleanup */
1927 static int read_options (int argc, char **argv) /* {{{ */
1928 {
1929 int option;
1930 int status = 0;
1932 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1933 {
1934 switch (option)
1935 {
1936 case 'g':
1937 stay_foreground=1;
1938 break;
1940 case 'l':
1941 {
1942 char **temp;
1944 temp = (char **) realloc (config_listen_address_list,
1945 sizeof (char *) * (config_listen_address_list_len + 1));
1946 if (temp == NULL)
1947 {
1948 fprintf (stderr, "read_options: realloc failed.\n");
1949 return (2);
1950 }
1951 config_listen_address_list = temp;
1953 temp[config_listen_address_list_len] = strdup (optarg);
1954 if (temp[config_listen_address_list_len] == NULL)
1955 {
1956 fprintf (stderr, "read_options: strdup failed.\n");
1957 return (2);
1958 }
1959 config_listen_address_list_len++;
1960 }
1961 break;
1963 case 'f':
1964 {
1965 int temp;
1967 temp = atoi (optarg);
1968 if (temp > 0)
1969 config_flush_interval = temp;
1970 else
1971 {
1972 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1973 status = 3;
1974 }
1975 }
1976 break;
1978 case 'w':
1979 {
1980 int temp;
1982 temp = atoi (optarg);
1983 if (temp > 0)
1984 config_write_interval = temp;
1985 else
1986 {
1987 fprintf (stderr, "Invalid write interval: %s\n", optarg);
1988 status = 2;
1989 }
1990 }
1991 break;
1993 case 'z':
1994 {
1995 int temp;
1997 temp = atoi(optarg);
1998 if (temp > 0)
1999 config_write_jitter = temp;
2000 else
2001 {
2002 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2003 status = 2;
2004 }
2006 break;
2007 }
2009 case 'b':
2010 {
2011 size_t len;
2013 if (config_base_dir != NULL)
2014 free (config_base_dir);
2015 config_base_dir = strdup (optarg);
2016 if (config_base_dir == NULL)
2017 {
2018 fprintf (stderr, "read_options: strdup failed.\n");
2019 return (3);
2020 }
2022 len = strlen (config_base_dir);
2023 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2024 {
2025 config_base_dir[len - 1] = 0;
2026 len--;
2027 }
2029 if (len < 1)
2030 {
2031 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2032 return (4);
2033 }
2034 }
2035 break;
2037 case 'p':
2038 {
2039 if (config_pid_file != NULL)
2040 free (config_pid_file);
2041 config_pid_file = strdup (optarg);
2042 if (config_pid_file == NULL)
2043 {
2044 fprintf (stderr, "read_options: strdup failed.\n");
2045 return (3);
2046 }
2047 }
2048 break;
2050 case 'j':
2051 {
2052 struct stat statbuf;
2053 const char *dir = optarg;
2055 status = stat(dir, &statbuf);
2056 if (status != 0)
2057 {
2058 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2059 return 6;
2060 }
2062 if (!S_ISDIR(statbuf.st_mode)
2063 || access(dir, R_OK|W_OK|X_OK) != 0)
2064 {
2065 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2066 errno ? rrd_strerror(errno) : "");
2067 return 6;
2068 }
2070 journal_cur = malloc(PATH_MAX + 1);
2071 journal_old = malloc(PATH_MAX + 1);
2072 if (journal_cur == NULL || journal_old == NULL)
2073 {
2074 fprintf(stderr, "malloc failure for journal files\n");
2075 return 6;
2076 }
2077 else
2078 {
2079 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2080 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2081 }
2082 }
2083 break;
2085 case 'h':
2086 case '?':
2087 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2088 "\n"
2089 "Usage: rrdcached [options]\n"
2090 "\n"
2091 "Valid options are:\n"
2092 " -l <address> Socket address to listen to.\n"
2093 " -w <seconds> Interval in which to write data.\n"
2094 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2095 " -f <seconds> Interval in which to flush dead data.\n"
2096 " -p <file> Location of the PID-file.\n"
2097 " -b <dir> Base directory to change to.\n"
2098 " -g Do not fork and run in the foreground.\n"
2099 " -j <dir> Directory in which to create the journal files.\n"
2100 "\n"
2101 "For more information and a detailed description of all options "
2102 "please refer\n"
2103 "to the rrdcached(1) manual page.\n",
2104 VERSION);
2105 status = -1;
2106 break;
2107 } /* switch (option) */
2108 } /* while (getopt) */
2110 /* advise the user when values are not sane */
2111 if (config_flush_interval < 2 * config_write_interval)
2112 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2113 " 2x write interval (-w) !\n");
2114 if (config_write_jitter > config_write_interval)
2115 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2116 " write interval (-w) !\n");
2118 return (status);
2119 } /* }}} int read_options */
2121 int main (int argc, char **argv)
2122 {
2123 int status;
2125 status = read_options (argc, argv);
2126 if (status != 0)
2127 {
2128 if (status < 0)
2129 status = 0;
2130 return (status);
2131 }
2133 status = daemonize ();
2134 if (status == 1)
2135 {
2136 struct sigaction sigchld;
2138 memset (&sigchld, 0, sizeof (sigchld));
2139 sigchld.sa_handler = SIG_IGN;
2140 sigaction (SIGCHLD, &sigchld, NULL);
2142 return (0);
2143 }
2144 else if (status != 0)
2145 {
2146 fprintf (stderr, "daemonize failed, exiting.\n");
2147 return (1);
2148 }
2150 if (journal_cur != NULL)
2151 {
2152 int had_journal = 0;
2154 pthread_mutex_lock(&journal_lock);
2156 RRDD_LOG(LOG_INFO, "checking for journal files");
2158 had_journal += journal_replay(journal_old);
2159 had_journal += journal_replay(journal_cur);
2161 if (had_journal)
2162 flush_old_values(-1);
2164 pthread_mutex_unlock(&journal_lock);
2165 journal_rotate();
2167 RRDD_LOG(LOG_INFO, "journal processing complete");
2168 }
2170 /* start the queue thread */
2171 memset (&queue_thread, 0, sizeof (queue_thread));
2172 status = pthread_create (&queue_thread,
2173 NULL, /* attr */
2174 queue_thread_main,
2175 NULL); /* args */
2176 if (status != 0)
2177 {
2178 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2179 cleanup();
2180 return (1);
2181 }
2183 listen_thread_main (NULL);
2184 cleanup ();
2186 return (0);
2187 } /* int main */
2189 /*
2190 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2191 */