1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
170 static int config_write_interval = 300;
171 static int config_write_jitter = 0;
172 static int config_flush_interval = 3600;
173 static char *config_pid_file = NULL;
174 static char *config_base_dir = NULL;
176 static char **config_listen_address_list = NULL;
177 static int config_listen_address_list_len = 0;
179 static uint64_t stats_queue_length = 0;
180 static uint64_t stats_updates_received = 0;
181 static uint64_t stats_flush_received = 0;
182 static uint64_t stats_updates_written = 0;
183 static uint64_t stats_data_sets_written = 0;
184 static uint64_t stats_journal_bytes = 0;
185 static uint64_t stats_journal_rotate = 0;
186 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
188 /* Journaled updates */
189 static char *journal_cur = NULL;
190 static char *journal_old = NULL;
191 static FILE *journal_fh = NULL;
192 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
193 static int journal_write(char *cmd, char *args);
194 static void journal_done(void);
195 static void journal_rotate(void);
197 /*
198 * Functions
199 */
200 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
201 {
202 RRDD_LOG(LOG_NOTICE, "caught SIGINT");
203 do_shutdown++;
204 pthread_cond_broadcast(&cache_cond);
205 } /* }}} void sig_int_handler */
207 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
208 {
209 RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
210 do_shutdown++;
211 pthread_cond_broadcast(&cache_cond);
212 } /* }}} void sig_term_handler */
214 static int write_pidfile (void) /* {{{ */
215 {
216 pid_t pid;
217 char *file;
218 int fd;
219 FILE *fh;
221 pid = getpid ();
223 file = (config_pid_file != NULL)
224 ? config_pid_file
225 : LOCALSTATEDIR "/run/rrdcached.pid";
227 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
228 if (fd < 0)
229 {
230 RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
231 file, rrd_strerror(errno));
232 return (-1);
233 }
235 fh = fdopen (fd, "w");
236 if (fh == NULL)
237 {
238 RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
239 close(fd);
240 return (-1);
241 }
243 fprintf (fh, "%i\n", (int) pid);
244 fclose (fh);
246 return (0);
247 } /* }}} int write_pidfile */
249 static int remove_pidfile (void) /* {{{ */
250 {
251 char *file;
252 int status;
254 file = (config_pid_file != NULL)
255 ? config_pid_file
256 : LOCALSTATEDIR "/run/rrdcached.pid";
258 status = unlink (file);
259 if (status == 0)
260 return (0);
261 return (errno);
262 } /* }}} int remove_pidfile */
264 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
265 {
266 char *buffer;
267 size_t buffer_used;
268 size_t buffer_free;
269 ssize_t status;
271 buffer = (char *) buffer_void;
272 buffer_used = 0;
273 buffer_free = buffer_size;
275 while (buffer_free > 0)
276 {
277 status = read (fd, buffer + buffer_used, buffer_free);
278 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
279 continue;
281 if (status < 0)
282 return (-1);
284 if (status == 0)
285 return (0);
287 assert ((0 > status) || (buffer_free >= (size_t) status));
289 buffer_free = buffer_free - status;
290 buffer_used = buffer_used + status;
292 if (buffer[buffer_used - 1] == '\n')
293 break;
294 }
296 assert (buffer_used > 0);
298 if (buffer[buffer_used - 1] != '\n')
299 {
300 errno = ENOBUFS;
301 return (-1);
302 }
304 buffer[buffer_used - 1] = 0;
306 /* Fix network line endings. */
307 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
308 {
309 buffer_used--;
310 buffer[buffer_used - 1] = 0;
311 }
313 return (buffer_used);
314 } /* }}} ssize_t sread */
316 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
317 {
318 const char *ptr;
319 size_t nleft;
320 ssize_t status;
322 /* special case for journal replay */
323 if (fd < 0) return 0;
325 ptr = (const char *) buf;
326 nleft = count;
328 while (nleft > 0)
329 {
330 status = write (fd, (const void *) ptr, nleft);
332 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
333 continue;
335 if (status < 0)
336 return (status);
338 nleft -= status;
339 ptr += status;
340 }
342 return (0);
343 } /* }}} ssize_t swrite */
345 static void _wipe_ci_values(cache_item_t *ci, time_t when)
346 {
347 ci->values = NULL;
348 ci->values_num = 0;
350 ci->last_flush_time = when;
351 if (config_write_jitter > 0)
352 ci->last_flush_time += (random() % config_write_jitter);
354 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
355 }
357 /*
358 * enqueue_cache_item:
359 * `cache_lock' must be acquired before calling this function!
360 */
361 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
362 queue_side_t side)
363 {
364 int did_insert = 0;
366 if (ci == NULL)
367 return (-1);
369 if (ci->values_num == 0)
370 return (0);
372 if (side == HEAD)
373 {
374 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
375 {
376 assert (ci->next == NULL);
377 ci->next = cache_queue_head;
378 cache_queue_head = ci;
380 if (cache_queue_tail == NULL)
381 cache_queue_tail = cache_queue_head;
383 did_insert = 1;
384 }
385 else if (cache_queue_head == ci)
386 {
387 /* do nothing */
388 }
389 else /* enqueued, but not first entry */
390 {
391 cache_item_t *prev;
393 /* find previous entry */
394 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
395 if (prev->next == ci)
396 break;
397 assert (prev != NULL);
399 /* move to the front */
400 prev->next = ci->next;
401 ci->next = cache_queue_head;
402 cache_queue_head = ci;
404 /* check if we need to adapt the tail */
405 if (cache_queue_tail == ci)
406 cache_queue_tail = prev;
407 }
408 }
409 else /* (side == TAIL) */
410 {
411 /* We don't move values back in the list.. */
412 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
413 return (0);
415 assert (ci->next == NULL);
417 if (cache_queue_tail == NULL)
418 cache_queue_head = ci;
419 else
420 cache_queue_tail->next = ci;
421 cache_queue_tail = ci;
423 did_insert = 1;
424 }
426 ci->flags |= CI_FLAGS_IN_QUEUE;
428 if (did_insert)
429 {
430 pthread_mutex_lock (&stats_lock);
431 stats_queue_length++;
432 pthread_mutex_unlock (&stats_lock);
433 }
435 return (0);
436 } /* }}} int enqueue_cache_item */
438 /*
439 * tree_callback_flush:
440 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
441 * while this is in progress.
442 */
443 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
444 gpointer data)
445 {
446 cache_item_t *ci;
447 callback_flush_data_t *cfd;
449 ci = (cache_item_t *) value;
450 cfd = (callback_flush_data_t *) data;
452 if ((ci->last_flush_time <= cfd->abs_timeout)
453 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
454 && (ci->values_num > 0))
455 {
456 enqueue_cache_item (ci, TAIL);
457 }
458 else if ((do_shutdown != 0)
459 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
460 && (ci->values_num > 0))
461 {
462 enqueue_cache_item (ci, TAIL);
463 }
464 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
465 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
466 && (ci->values_num <= 0))
467 {
468 char **temp;
470 temp = (char **) realloc (cfd->keys,
471 sizeof (char *) * (cfd->keys_num + 1));
472 if (temp == NULL)
473 {
474 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
475 return (FALSE);
476 }
477 cfd->keys = temp;
478 /* Make really sure this points to the _same_ place */
479 assert ((char *) key == ci->file);
480 cfd->keys[cfd->keys_num] = (char *) key;
481 cfd->keys_num++;
482 }
484 return (FALSE);
485 } /* }}} gboolean tree_callback_flush */
487 static int flush_old_values (int max_age)
488 {
489 callback_flush_data_t cfd;
490 size_t k;
492 memset (&cfd, 0, sizeof (cfd));
493 /* Pass the current time as user data so that we don't need to call
494 * `time' for each node. */
495 cfd.now = time (NULL);
496 cfd.keys = NULL;
497 cfd.keys_num = 0;
499 if (max_age > 0)
500 cfd.abs_timeout = cfd.now - max_age;
501 else
502 cfd.abs_timeout = cfd.now + 1;
504 /* `tree_callback_flush' will return the keys of all values that haven't
505 * been touched in the last `config_flush_interval' seconds in `cfd'.
506 * The char*'s in this array point to the same memory as ci->file, so we
507 * don't need to free them separately. */
508 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
510 for (k = 0; k < cfd.keys_num; k++)
511 {
512 cache_item_t *ci;
514 /* This must not fail. */
515 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
516 assert (ci != NULL);
518 /* If we end up here with values available, something's seriously
519 * messed up. */
520 assert (ci->values_num == 0);
522 /* Remove the node from the tree */
523 g_tree_remove (cache_tree, cfd.keys[k]);
524 cfd.keys[k] = NULL;
526 /* Now free and clean up `ci'. */
527 free (ci->file);
528 ci->file = NULL;
529 free (ci);
530 ci = NULL;
531 } /* for (k = 0; k < cfd.keys_num; k++) */
533 if (cfd.keys != NULL)
534 {
535 free (cfd.keys);
536 cfd.keys = NULL;
537 }
539 return (0);
540 } /* int flush_old_values */
542 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
543 {
544 struct timeval now;
545 struct timespec next_flush;
547 gettimeofday (&now, NULL);
548 next_flush.tv_sec = now.tv_sec + config_flush_interval;
549 next_flush.tv_nsec = 1000 * now.tv_usec;
551 pthread_mutex_lock (&cache_lock);
552 while ((do_shutdown == 0) || (cache_queue_head != NULL))
553 {
554 cache_item_t *ci;
555 char *file;
556 char **values;
557 int values_num;
558 int status;
559 int i;
561 /* First, check if it's time to do the cache flush. */
562 gettimeofday (&now, NULL);
563 if ((now.tv_sec > next_flush.tv_sec)
564 || ((now.tv_sec == next_flush.tv_sec)
565 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
566 {
567 /* Flush all values that haven't been written in the last
568 * `config_write_interval' seconds. */
569 flush_old_values (config_write_interval);
571 /* Determine the time of the next cache flush. */
572 while (next_flush.tv_sec <= now.tv_sec)
573 next_flush.tv_sec += config_flush_interval;
575 /* unlock the cache while we rotate so we don't block incoming
576 * updates if the fsync() blocks on disk I/O */
577 pthread_mutex_unlock(&cache_lock);
578 journal_rotate();
579 pthread_mutex_lock(&cache_lock);
580 }
582 /* Now, check if there's something to store away. If not, wait until
583 * something comes in or it's time to do the cache flush. */
584 if (cache_queue_head == NULL)
585 {
586 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
587 if ((status != 0) && (status != ETIMEDOUT))
588 {
589 RRDD_LOG (LOG_ERR, "queue_thread_main: "
590 "pthread_cond_timedwait returned %i.", status);
591 }
592 }
594 /* We're about to shut down, so lets flush the entire tree. */
595 if ((do_shutdown != 0) && (cache_queue_head == NULL))
596 flush_old_values (/* max age = */ -1);
598 /* Check if a value has arrived. This may be NULL if we timed out or there
599 * was an interrupt such as a signal. */
600 if (cache_queue_head == NULL)
601 continue;
603 ci = cache_queue_head;
605 /* copy the relevant parts */
606 file = strdup (ci->file);
607 if (file == NULL)
608 {
609 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
610 continue;
611 }
613 assert(ci->values != NULL);
614 assert(ci->values_num > 0);
616 values = ci->values;
617 values_num = ci->values_num;
619 _wipe_ci_values(ci, time(NULL));
621 cache_queue_head = ci->next;
622 if (cache_queue_head == NULL)
623 cache_queue_tail = NULL;
624 ci->next = NULL;
626 pthread_mutex_lock (&stats_lock);
627 assert (stats_queue_length > 0);
628 stats_queue_length--;
629 pthread_mutex_unlock (&stats_lock);
631 pthread_mutex_unlock (&cache_lock);
633 rrd_clear_error ();
634 status = rrd_update_r (file, NULL, values_num, (void *) values);
635 if (status != 0)
636 {
637 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
638 "rrd_update_r (%s) failed with status %i. (%s)",
639 file, status, rrd_get_error());
640 }
642 journal_write("wrote", file);
644 for (i = 0; i < values_num; i++)
645 free (values[i]);
647 free(values);
648 free(file);
650 if (status == 0)
651 {
652 pthread_mutex_lock (&stats_lock);
653 stats_updates_written++;
654 stats_data_sets_written += values_num;
655 pthread_mutex_unlock (&stats_lock);
656 }
658 pthread_mutex_lock (&cache_lock);
659 pthread_cond_broadcast (&flush_cond);
661 /* We're about to shut down, so lets flush the entire tree. */
662 if ((do_shutdown != 0) && (cache_queue_head == NULL))
663 flush_old_values (/* max age = */ -1);
664 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
665 pthread_mutex_unlock (&cache_lock);
667 assert(cache_queue_head == NULL);
668 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
669 journal_done();
671 return (NULL);
672 } /* }}} void *queue_thread_main */
674 static int buffer_get_field (char **buffer_ret, /* {{{ */
675 size_t *buffer_size_ret, char **field_ret)
676 {
677 char *buffer;
678 size_t buffer_pos;
679 size_t buffer_size;
680 char *field;
681 size_t field_size;
682 int status;
684 buffer = *buffer_ret;
685 buffer_pos = 0;
686 buffer_size = *buffer_size_ret;
687 field = *buffer_ret;
688 field_size = 0;
690 if (buffer_size <= 0)
691 return (-1);
693 /* This is ensured by `handle_request'. */
694 assert (buffer[buffer_size - 1] == '\0');
696 status = -1;
697 while (buffer_pos < buffer_size)
698 {
699 /* Check for end-of-field or end-of-buffer */
700 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
701 {
702 field[field_size] = 0;
703 field_size++;
704 buffer_pos++;
705 status = 0;
706 break;
707 }
708 /* Handle escaped characters. */
709 else if (buffer[buffer_pos] == '\\')
710 {
711 if (buffer_pos >= (buffer_size - 1))
712 break;
713 buffer_pos++;
714 field[field_size] = buffer[buffer_pos];
715 field_size++;
716 buffer_pos++;
717 }
718 /* Normal operation */
719 else
720 {
721 field[field_size] = buffer[buffer_pos];
722 field_size++;
723 buffer_pos++;
724 }
725 } /* while (buffer_pos < buffer_size) */
727 if (status != 0)
728 return (status);
730 *buffer_ret = buffer + buffer_pos;
731 *buffer_size_ret = buffer_size - buffer_pos;
732 *field_ret = field;
734 return (0);
735 } /* }}} int buffer_get_field */
737 static int flush_file (const char *filename) /* {{{ */
738 {
739 cache_item_t *ci;
741 pthread_mutex_lock (&cache_lock);
743 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
744 if (ci == NULL)
745 {
746 pthread_mutex_unlock (&cache_lock);
747 return (ENOENT);
748 }
750 /* Enqueue at head */
751 enqueue_cache_item (ci, HEAD);
752 pthread_cond_signal (&cache_cond);
754 while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
755 {
756 ci = NULL;
758 pthread_cond_wait (&flush_cond, &cache_lock);
760 ci = g_tree_lookup (cache_tree, filename);
761 if (ci == NULL)
762 {
763 RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
764 "while waiting for flush.");
765 pthread_mutex_unlock (&cache_lock);
766 return (-1);
767 }
768 }
770 pthread_mutex_unlock (&cache_lock);
771 return (0);
772 } /* }}} int flush_file */
774 static int handle_request_help (int fd, /* {{{ */
775 char *buffer, size_t buffer_size)
776 {
777 int status;
778 char **help_text;
779 size_t help_text_len;
780 char *command;
781 size_t i;
783 char *help_help[] =
784 {
785 "4 Command overview\n",
786 "FLUSH <filename>\n",
787 "HELP [<command>]\n",
788 "UPDATE <filename> <values> [<values> ...]\n",
789 "STATS\n"
790 };
791 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
793 char *help_flush[] =
794 {
795 "4 Help for FLUSH\n",
796 "Usage: FLUSH <filename>\n",
797 "\n",
798 "Adds the given filename to the head of the update queue and returns\n",
799 "after is has been dequeued.\n"
800 };
801 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
803 char *help_update[] =
804 {
805 "9 Help for UPDATE\n",
806 "Usage: UPDATE <filename> <values> [<values> ...]\n"
807 "\n",
808 "Adds the given file to the internal cache if it is not yet known and\n",
809 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
810 "for details.\n",
811 "\n",
812 "Each <values> has the following form:\n",
813 " <values> = <time>:<value>[:<value>[...]]\n",
814 "See the rrdupdate(1) manpage for details.\n"
815 };
816 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
818 char *help_stats[] =
819 {
820 "4 Help for STATS\n",
821 "Usage: STATS\n",
822 "\n",
823 "Returns some performance counters, see the rrdcached(1) manpage for\n",
824 "a description of the values.\n"
825 };
826 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
828 status = buffer_get_field (&buffer, &buffer_size, &command);
829 if (status != 0)
830 {
831 help_text = help_help;
832 help_text_len = help_help_len;
833 }
834 else
835 {
836 if (strcasecmp (command, "update") == 0)
837 {
838 help_text = help_update;
839 help_text_len = help_update_len;
840 }
841 else if (strcasecmp (command, "flush") == 0)
842 {
843 help_text = help_flush;
844 help_text_len = help_flush_len;
845 }
846 else if (strcasecmp (command, "stats") == 0)
847 {
848 help_text = help_stats;
849 help_text_len = help_stats_len;
850 }
851 else
852 {
853 help_text = help_help;
854 help_text_len = help_help_len;
855 }
856 }
858 for (i = 0; i < help_text_len; i++)
859 {
860 status = swrite (fd, help_text[i], strlen (help_text[i]));
861 if (status < 0)
862 {
863 status = errno;
864 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
865 return (status);
866 }
867 }
869 return (0);
870 } /* }}} int handle_request_help */
872 static int handle_request_stats (int fd, /* {{{ */
873 char *buffer __attribute__((unused)),
874 size_t buffer_size __attribute__((unused)))
875 {
876 int status;
877 char outbuf[CMD_MAX];
879 uint64_t copy_queue_length;
880 uint64_t copy_updates_received;
881 uint64_t copy_flush_received;
882 uint64_t copy_updates_written;
883 uint64_t copy_data_sets_written;
884 uint64_t copy_journal_bytes;
885 uint64_t copy_journal_rotate;
887 uint64_t tree_nodes_number;
888 uint64_t tree_depth;
890 pthread_mutex_lock (&stats_lock);
891 copy_queue_length = stats_queue_length;
892 copy_updates_received = stats_updates_received;
893 copy_flush_received = stats_flush_received;
894 copy_updates_written = stats_updates_written;
895 copy_data_sets_written = stats_data_sets_written;
896 copy_journal_bytes = stats_journal_bytes;
897 copy_journal_rotate = stats_journal_rotate;
898 pthread_mutex_unlock (&stats_lock);
900 pthread_mutex_lock (&cache_lock);
901 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
902 tree_depth = (uint64_t) g_tree_height (cache_tree);
903 pthread_mutex_unlock (&cache_lock);
905 #define RRDD_STATS_SEND \
906 outbuf[sizeof (outbuf) - 1] = 0; \
907 status = swrite (fd, outbuf, strlen (outbuf)); \
908 if (status < 0) \
909 { \
910 status = errno; \
911 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
912 return (status); \
913 }
915 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
916 RRDD_STATS_SEND;
918 snprintf (outbuf, sizeof (outbuf),
919 "QueueLength: %"PRIu64"\n", copy_queue_length);
920 RRDD_STATS_SEND;
922 snprintf (outbuf, sizeof (outbuf),
923 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
924 RRDD_STATS_SEND;
926 snprintf (outbuf, sizeof (outbuf),
927 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
928 RRDD_STATS_SEND;
930 snprintf (outbuf, sizeof (outbuf),
931 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
932 RRDD_STATS_SEND;
934 snprintf (outbuf, sizeof (outbuf),
935 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
936 RRDD_STATS_SEND;
938 snprintf (outbuf, sizeof (outbuf),
939 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
940 RRDD_STATS_SEND;
942 snprintf (outbuf, sizeof (outbuf),
943 "TreeDepth: %"PRIu64"\n", tree_depth);
944 RRDD_STATS_SEND;
946 snprintf (outbuf, sizeof(outbuf),
947 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
948 RRDD_STATS_SEND;
950 snprintf (outbuf, sizeof(outbuf),
951 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
952 RRDD_STATS_SEND;
954 return (0);
955 #undef RRDD_STATS_SEND
956 } /* }}} int handle_request_stats */
958 static int handle_request_flush (int fd, /* {{{ */
959 char *buffer, size_t buffer_size)
960 {
961 char *file;
962 int status;
963 char result[CMD_MAX];
965 status = buffer_get_field (&buffer, &buffer_size, &file);
966 if (status != 0)
967 {
968 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
969 }
970 else
971 {
972 pthread_mutex_lock(&stats_lock);
973 stats_flush_received++;
974 pthread_mutex_unlock(&stats_lock);
976 status = flush_file (file);
977 if (status == 0)
978 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
979 else if (status == ENOENT)
980 {
981 /* no file in our tree; see whether it exists at all */
982 struct stat statbuf;
984 memset(&statbuf, 0, sizeof(statbuf));
985 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
986 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
987 else
988 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
989 }
990 else if (status < 0)
991 strncpy (result, "-1 Internal error.\n", sizeof (result));
992 else
993 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
994 }
995 result[sizeof (result) - 1] = 0;
997 status = swrite (fd, result, strlen (result));
998 if (status < 0)
999 {
1000 status = errno;
1001 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1002 return (status);
1003 }
1005 return (0);
1006 } /* }}} int handle_request_flush */
1008 static int handle_request_update (int fd, /* {{{ */
1009 char *buffer, size_t buffer_size)
1010 {
1011 char *file;
1012 int values_num = 0;
1013 int status;
1015 time_t now;
1017 cache_item_t *ci;
1018 char answer[CMD_MAX];
1020 #define RRDD_UPDATE_SEND \
1021 answer[sizeof (answer) - 1] = 0; \
1022 status = swrite (fd, answer, strlen (answer)); \
1023 if (status < 0) \
1024 { \
1025 status = errno; \
1026 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1027 return (status); \
1028 }
1030 now = time (NULL);
1032 status = buffer_get_field (&buffer, &buffer_size, &file);
1033 if (status != 0)
1034 {
1035 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1036 sizeof (answer));
1037 RRDD_UPDATE_SEND;
1038 return (0);
1039 }
1041 pthread_mutex_lock(&stats_lock);
1042 stats_updates_received++;
1043 pthread_mutex_unlock(&stats_lock);
1045 pthread_mutex_lock (&cache_lock);
1047 ci = g_tree_lookup (cache_tree, file);
1048 if (ci == NULL) /* {{{ */
1049 {
1050 struct stat statbuf;
1052 memset (&statbuf, 0, sizeof (statbuf));
1053 status = stat (file, &statbuf);
1054 if (status != 0)
1055 {
1056 pthread_mutex_unlock (&cache_lock);
1057 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1059 status = errno;
1060 if (status == ENOENT)
1061 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1062 else
1063 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1064 status);
1065 RRDD_UPDATE_SEND;
1066 return (0);
1067 }
1068 if (!S_ISREG (statbuf.st_mode))
1069 {
1070 pthread_mutex_unlock (&cache_lock);
1072 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1073 RRDD_UPDATE_SEND;
1074 return (0);
1075 }
1076 if (access(file, R_OK|W_OK) != 0)
1077 {
1078 pthread_mutex_unlock (&cache_lock);
1080 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1081 file, rrd_strerror(errno));
1082 RRDD_UPDATE_SEND;
1083 return (0);
1084 }
1086 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1087 if (ci == NULL)
1088 {
1089 pthread_mutex_unlock (&cache_lock);
1090 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1092 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1093 RRDD_UPDATE_SEND;
1094 return (0);
1095 }
1096 memset (ci, 0, sizeof (cache_item_t));
1098 ci->file = strdup (file);
1099 if (ci->file == NULL)
1100 {
1101 pthread_mutex_unlock (&cache_lock);
1102 free (ci);
1103 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1105 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1106 RRDD_UPDATE_SEND;
1107 return (0);
1108 }
1110 _wipe_ci_values(ci, now);
1111 ci->flags = CI_FLAGS_IN_TREE;
1113 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1114 } /* }}} */
1115 assert (ci != NULL);
1117 while (buffer_size > 0)
1118 {
1119 char **temp;
1120 char *value;
1122 status = buffer_get_field (&buffer, &buffer_size, &value);
1123 if (status != 0)
1124 {
1125 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1126 break;
1127 }
1129 temp = (char **) realloc (ci->values,
1130 sizeof (char *) * (ci->values_num + 1));
1131 if (temp == NULL)
1132 {
1133 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1134 continue;
1135 }
1136 ci->values = temp;
1138 ci->values[ci->values_num] = strdup (value);
1139 if (ci->values[ci->values_num] == NULL)
1140 {
1141 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1142 continue;
1143 }
1144 ci->values_num++;
1146 values_num++;
1147 }
1149 if (((now - ci->last_flush_time) >= config_write_interval)
1150 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1151 && (ci->values_num > 0))
1152 {
1153 enqueue_cache_item (ci, TAIL);
1154 pthread_cond_signal (&cache_cond);
1155 }
1157 pthread_mutex_unlock (&cache_lock);
1159 if (values_num < 1)
1160 {
1161 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1162 }
1163 else
1164 {
1165 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1166 (values_num == 1) ? "" : "s");
1167 }
1168 RRDD_UPDATE_SEND;
1169 return (0);
1170 #undef RRDD_UPDATE_SEND
1171 } /* }}} int handle_request_update */
1173 /* we came across a "WROTE" entry during journal replay.
1174 * throw away any values that we have accumulated for this file
1175 */
1176 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1177 const char *buffer,
1178 size_t buffer_size __attribute__((unused)))
1179 {
1180 int i;
1181 cache_item_t *ci;
1182 const char *file = buffer;
1184 pthread_mutex_lock(&cache_lock);
1186 ci = g_tree_lookup(cache_tree, file);
1187 if (ci == NULL)
1188 {
1189 pthread_mutex_unlock(&cache_lock);
1190 return (0);
1191 }
1193 if (ci->values)
1194 {
1195 for (i=0; i < ci->values_num; i++)
1196 free(ci->values[i]);
1198 free(ci->values);
1199 }
1201 _wipe_ci_values(ci, time(NULL));
1203 pthread_mutex_unlock(&cache_lock);
1204 return (0);
1205 } /* }}} int handle_request_wrote */
1207 /* if fd < 0, we are in journal replay mode */
1208 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1209 {
1210 char *buffer_ptr;
1211 char *command;
1212 int status;
1214 assert (buffer[buffer_size - 1] == '\0');
1216 buffer_ptr = buffer;
1217 command = NULL;
1218 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1219 if (status != 0)
1220 {
1221 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1222 return (-1);
1223 }
1225 if (strcasecmp (command, "update") == 0)
1226 {
1227 /* don't re-write updates in replay mode */
1228 if (fd >= 0)
1229 journal_write(command, buffer_ptr);
1231 return (handle_request_update (fd, buffer_ptr, buffer_size));
1232 }
1233 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1234 {
1235 /* this is only valid in replay mode */
1236 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1237 }
1238 else if (strcasecmp (command, "flush") == 0)
1239 {
1240 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1241 }
1242 else if (strcasecmp (command, "stats") == 0)
1243 {
1244 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1245 }
1246 else if (strcasecmp (command, "help") == 0)
1247 {
1248 return (handle_request_help (fd, buffer_ptr, buffer_size));
1249 }
1250 else
1251 {
1252 char result[CMD_MAX];
1254 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1255 result[sizeof (result) - 1] = 0;
1257 status = swrite (fd, result, strlen (result));
1258 if (status < 0)
1259 {
1260 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1261 return (-1);
1262 }
1263 }
1265 return (0);
1266 } /* }}} int handle_request */
1268 /* MUST NOT hold journal_lock before calling this */
1269 static void journal_rotate(void) /* {{{ */
1270 {
1271 FILE *old_fh = NULL;
1273 if (journal_cur == NULL || journal_old == NULL)
1274 return;
1276 pthread_mutex_lock(&journal_lock);
1278 /* we rotate this way (rename before close) so that the we can release
1279 * the journal lock as fast as possible. Journal writes to the new
1280 * journal can proceed immediately after the new file is opened. The
1281 * fclose can then block without affecting new updates.
1282 */
1283 if (journal_fh != NULL)
1284 {
1285 old_fh = journal_fh;
1286 rename(journal_cur, journal_old);
1287 ++stats_journal_rotate;
1288 }
1290 journal_fh = fopen(journal_cur, "a");
1291 pthread_mutex_unlock(&journal_lock);
1293 if (old_fh != NULL)
1294 fclose(old_fh);
1296 if (journal_fh == NULL)
1297 RRDD_LOG(LOG_CRIT,
1298 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1299 journal_cur, rrd_strerror(errno));
1301 } /* }}} static void journal_rotate */
1303 static void journal_done(void) /* {{{ */
1304 {
1305 if (journal_cur == NULL)
1306 return;
1308 pthread_mutex_lock(&journal_lock);
1309 if (journal_fh != NULL)
1310 {
1311 fclose(journal_fh);
1312 journal_fh = NULL;
1313 }
1315 RRDD_LOG(LOG_INFO, "removing journals");
1317 unlink(journal_old);
1318 unlink(journal_cur);
1319 pthread_mutex_unlock(&journal_lock);
1321 } /* }}} static void journal_done */
1323 static int journal_write(char *cmd, char *args) /* {{{ */
1324 {
1325 int chars;
1327 if (journal_fh == NULL)
1328 return 0;
1330 pthread_mutex_lock(&journal_lock);
1331 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1332 pthread_mutex_unlock(&journal_lock);
1334 if (chars > 0)
1335 {
1336 pthread_mutex_lock(&stats_lock);
1337 stats_journal_bytes += chars;
1338 pthread_mutex_unlock(&stats_lock);
1339 }
1341 return chars;
1342 } /* }}} static int journal_write */
1344 static int journal_replay (const char *file) /* {{{ */
1345 {
1346 FILE *fh;
1347 int entry_cnt = 0;
1348 int fail_cnt = 0;
1349 uint64_t line = 0;
1350 char entry[CMD_MAX];
1352 if (file == NULL) return 0;
1354 fh = fopen(file, "r");
1355 if (fh == NULL)
1356 {
1357 if (errno != ENOENT)
1358 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1359 file, rrd_strerror(errno));
1360 return 0;
1361 }
1362 else
1363 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1365 while(!feof(fh))
1366 {
1367 size_t entry_len;
1369 ++line;
1370 fgets(entry, sizeof(entry), fh);
1371 entry_len = strlen(entry);
1373 /* check \n termination in case journal writing crashed mid-line */
1374 if (entry_len == 0)
1375 continue;
1376 else if (entry[entry_len - 1] != '\n')
1377 {
1378 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1379 ++fail_cnt;
1380 continue;
1381 }
1383 entry[entry_len - 1] = '\0';
1385 if (handle_request(-1, entry, entry_len) == 0)
1386 ++entry_cnt;
1387 else
1388 ++fail_cnt;
1389 }
1391 fclose(fh);
1393 if (entry_cnt > 0)
1394 {
1395 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1396 entry_cnt, fail_cnt);
1397 return 1;
1398 }
1399 else
1400 return 0;
1402 } /* }}} static int journal_replay */
1404 static void *connection_thread_main (void *args) /* {{{ */
1405 {
1406 pthread_t self;
1407 int i;
1408 int fd;
1410 fd = *((int *) args);
1411 free (args);
1413 pthread_mutex_lock (&connection_threads_lock);
1414 {
1415 pthread_t *temp;
1417 temp = (pthread_t *) realloc (connection_threads,
1418 sizeof (pthread_t) * (connection_threads_num + 1));
1419 if (temp == NULL)
1420 {
1421 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1422 }
1423 else
1424 {
1425 connection_threads = temp;
1426 connection_threads[connection_threads_num] = pthread_self ();
1427 connection_threads_num++;
1428 }
1429 }
1430 pthread_mutex_unlock (&connection_threads_lock);
1432 while (do_shutdown == 0)
1433 {
1434 char buffer[CMD_MAX];
1436 struct pollfd pollfd;
1437 int status;
1439 pollfd.fd = fd;
1440 pollfd.events = POLLIN | POLLPRI;
1441 pollfd.revents = 0;
1443 status = poll (&pollfd, 1, /* timeout = */ 500);
1444 if (status == 0) /* timeout */
1445 continue;
1446 else if (status < 0) /* error */
1447 {
1448 status = errno;
1449 if (status == EINTR)
1450 continue;
1451 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1452 continue;
1453 }
1455 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1456 {
1457 close (fd);
1458 break;
1459 }
1460 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1461 {
1462 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1463 "poll(2) returned something unexpected: %#04hx",
1464 pollfd.revents);
1465 close (fd);
1466 break;
1467 }
1469 status = (int) sread (fd, buffer, sizeof (buffer));
1470 if (status <= 0)
1471 {
1472 close (fd);
1474 if (status < 0)
1475 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1477 break;
1478 }
1480 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1481 if (status != 0)
1482 {
1483 close (fd);
1484 break;
1485 }
1486 }
1488 self = pthread_self ();
1489 /* Remove this thread from the connection threads list */
1490 pthread_mutex_lock (&connection_threads_lock);
1491 /* Find out own index in the array */
1492 for (i = 0; i < connection_threads_num; i++)
1493 if (pthread_equal (connection_threads[i], self) != 0)
1494 break;
1495 assert (i < connection_threads_num);
1497 /* Move the trailing threads forward. */
1498 if (i < (connection_threads_num - 1))
1499 {
1500 memmove (connection_threads + i,
1501 connection_threads + i + 1,
1502 sizeof (pthread_t) * (connection_threads_num - i - 1));
1503 }
1505 connection_threads_num--;
1506 pthread_mutex_unlock (&connection_threads_lock);
1508 return (NULL);
1509 } /* }}} void *connection_thread_main */
1511 static int open_listen_socket_unix (const char *path) /* {{{ */
1512 {
1513 int fd;
1514 struct sockaddr_un sa;
1515 listen_socket_t *temp;
1516 int status;
1518 temp = (listen_socket_t *) realloc (listen_fds,
1519 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1520 if (temp == NULL)
1521 {
1522 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1523 return (-1);
1524 }
1525 listen_fds = temp;
1526 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1528 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1529 if (fd < 0)
1530 {
1531 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1532 return (-1);
1533 }
1535 memset (&sa, 0, sizeof (sa));
1536 sa.sun_family = AF_UNIX;
1537 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1539 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1540 if (status != 0)
1541 {
1542 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1543 close (fd);
1544 unlink (path);
1545 return (-1);
1546 }
1548 status = listen (fd, /* backlog = */ 10);
1549 if (status != 0)
1550 {
1551 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1552 close (fd);
1553 unlink (path);
1554 return (-1);
1555 }
1557 listen_fds[listen_fds_num].fd = fd;
1558 snprintf (listen_fds[listen_fds_num].path,
1559 sizeof (listen_fds[listen_fds_num].path) - 1,
1560 "unix:%s", path);
1561 listen_fds_num++;
1563 return (0);
1564 } /* }}} int open_listen_socket_unix */
1566 static int open_listen_socket (const char *addr) /* {{{ */
1567 {
1568 struct addrinfo ai_hints;
1569 struct addrinfo *ai_res;
1570 struct addrinfo *ai_ptr;
1571 int status;
1573 assert (addr != NULL);
1575 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1576 return (open_listen_socket_unix (addr + strlen ("unix:")));
1577 else if (addr[0] == '/')
1578 return (open_listen_socket_unix (addr));
1580 memset (&ai_hints, 0, sizeof (ai_hints));
1581 ai_hints.ai_flags = 0;
1582 #ifdef AI_ADDRCONFIG
1583 ai_hints.ai_flags |= AI_ADDRCONFIG;
1584 #endif
1585 ai_hints.ai_family = AF_UNSPEC;
1586 ai_hints.ai_socktype = SOCK_STREAM;
1588 ai_res = NULL;
1589 status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
1590 if (status != 0)
1591 {
1592 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1593 "%s", addr, gai_strerror (status));
1594 return (-1);
1595 }
1597 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1598 {
1599 int fd;
1600 listen_socket_t *temp;
1602 temp = (listen_socket_t *) realloc (listen_fds,
1603 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1604 if (temp == NULL)
1605 {
1606 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1607 continue;
1608 }
1609 listen_fds = temp;
1610 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1612 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1613 if (fd < 0)
1614 {
1615 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1616 continue;
1617 }
1619 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1620 if (status != 0)
1621 {
1622 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1623 close (fd);
1624 continue;
1625 }
1627 status = listen (fd, /* backlog = */ 10);
1628 if (status != 0)
1629 {
1630 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1631 close (fd);
1632 return (-1);
1633 }
1635 listen_fds[listen_fds_num].fd = fd;
1636 strncpy (listen_fds[listen_fds_num].path, addr,
1637 sizeof (listen_fds[listen_fds_num].path) - 1);
1638 listen_fds_num++;
1639 } /* for (ai_ptr) */
1641 return (0);
1642 } /* }}} int open_listen_socket */
1644 static int close_listen_sockets (void) /* {{{ */
1645 {
1646 size_t i;
1648 for (i = 0; i < listen_fds_num; i++)
1649 {
1650 close (listen_fds[i].fd);
1651 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1652 unlink (listen_fds[i].path + strlen ("unix:"));
1653 }
1655 free (listen_fds);
1656 listen_fds = NULL;
1657 listen_fds_num = 0;
1659 return (0);
1660 } /* }}} int close_listen_sockets */
1662 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1663 {
1664 struct pollfd *pollfds;
1665 int pollfds_num;
1666 int status;
1667 int i;
1669 for (i = 0; i < config_listen_address_list_len; i++)
1670 open_listen_socket (config_listen_address_list[i]);
1672 if (config_listen_address_list_len < 1)
1673 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1675 if (listen_fds_num < 1)
1676 {
1677 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1678 "could be opened. Sorry.");
1679 return (NULL);
1680 }
1682 pollfds_num = listen_fds_num;
1683 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1684 if (pollfds == NULL)
1685 {
1686 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1687 return (NULL);
1688 }
1689 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1691 RRDD_LOG(LOG_INFO, "listening for connections");
1693 while (do_shutdown == 0)
1694 {
1695 assert (pollfds_num == ((int) listen_fds_num));
1696 for (i = 0; i < pollfds_num; i++)
1697 {
1698 pollfds[i].fd = listen_fds[i].fd;
1699 pollfds[i].events = POLLIN | POLLPRI;
1700 pollfds[i].revents = 0;
1701 }
1703 status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1704 if (status < 1)
1705 {
1706 status = errno;
1707 if (status != EINTR)
1708 {
1709 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1710 }
1711 continue;
1712 }
1714 for (i = 0; i < pollfds_num; i++)
1715 {
1716 int *client_sd;
1717 struct sockaddr_storage client_sa;
1718 socklen_t client_sa_size;
1719 pthread_t tid;
1720 pthread_attr_t attr;
1722 if (pollfds[i].revents == 0)
1723 continue;
1725 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1726 {
1727 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1728 "poll(2) returned something unexpected for listen FD #%i.",
1729 pollfds[i].fd);
1730 continue;
1731 }
1733 client_sd = (int *) malloc (sizeof (int));
1734 if (client_sd == NULL)
1735 {
1736 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1737 continue;
1738 }
1740 client_sa_size = sizeof (client_sa);
1741 *client_sd = accept (pollfds[i].fd,
1742 (struct sockaddr *) &client_sa, &client_sa_size);
1743 if (*client_sd < 0)
1744 {
1745 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1746 continue;
1747 }
1749 pthread_attr_init (&attr);
1750 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1752 status = pthread_create (&tid, &attr, connection_thread_main,
1753 /* args = */ (void *) client_sd);
1754 if (status != 0)
1755 {
1756 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1757 close (*client_sd);
1758 free (client_sd);
1759 continue;
1760 }
1761 } /* for (pollfds_num) */
1762 } /* while (do_shutdown == 0) */
1764 RRDD_LOG(LOG_INFO, "starting shutdown");
1766 close_listen_sockets ();
1768 pthread_mutex_lock (&connection_threads_lock);
1769 while (connection_threads_num > 0)
1770 {
1771 pthread_t wait_for;
1773 wait_for = connection_threads[0];
1775 pthread_mutex_unlock (&connection_threads_lock);
1776 pthread_join (wait_for, /* retval = */ NULL);
1777 pthread_mutex_lock (&connection_threads_lock);
1778 }
1779 pthread_mutex_unlock (&connection_threads_lock);
1781 return (NULL);
1782 } /* }}} void *listen_thread_main */
1784 static int daemonize (void) /* {{{ */
1785 {
1786 int status;
1788 /* These structures are static, because `sigaction' behaves weird if the are
1789 * overwritten.. */
1790 static struct sigaction sa_int;
1791 static struct sigaction sa_term;
1792 static struct sigaction sa_pipe;
1794 if (!stay_foreground)
1795 {
1796 pid_t child;
1797 char *base_dir;
1799 child = fork ();
1800 if (child < 0)
1801 {
1802 fprintf (stderr, "daemonize: fork(2) failed.\n");
1803 return (-1);
1804 }
1805 else if (child > 0)
1806 {
1807 return (1);
1808 }
1810 /* Change into the /tmp directory. */
1811 base_dir = (config_base_dir != NULL)
1812 ? config_base_dir
1813 : "/tmp";
1814 status = chdir (base_dir);
1815 if (status != 0)
1816 {
1817 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1818 return (-1);
1819 }
1821 /* Become session leader */
1822 setsid ();
1824 /* Open the first three file descriptors to /dev/null */
1825 close (2);
1826 close (1);
1827 close (0);
1829 open ("/dev/null", O_RDWR);
1830 dup (0);
1831 dup (0);
1832 } /* if (!stay_foreground) */
1834 /* Install signal handlers */
1835 memset (&sa_int, 0, sizeof (sa_int));
1836 sa_int.sa_handler = sig_int_handler;
1837 sigaction (SIGINT, &sa_int, NULL);
1839 memset (&sa_term, 0, sizeof (sa_term));
1840 sa_term.sa_handler = sig_term_handler;
1841 sigaction (SIGTERM, &sa_term, NULL);
1843 memset (&sa_pipe, 0, sizeof (sa_pipe));
1844 sa_pipe.sa_handler = SIG_IGN;
1845 sigaction (SIGPIPE, &sa_pipe, NULL);
1847 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1848 RRDD_LOG(LOG_INFO, "starting up");
1850 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1851 if (cache_tree == NULL)
1852 {
1853 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1854 return (-1);
1855 }
1857 status = write_pidfile ();
1858 return status;
1859 } /* }}} int daemonize */
1861 static int cleanup (void) /* {{{ */
1862 {
1863 do_shutdown++;
1865 pthread_cond_signal (&cache_cond);
1866 pthread_join (queue_thread, /* return = */ NULL);
1868 remove_pidfile ();
1870 RRDD_LOG(LOG_INFO, "goodbye");
1871 closelog ();
1873 return (0);
1874 } /* }}} int cleanup */
1876 static int read_options (int argc, char **argv) /* {{{ */
1877 {
1878 int option;
1879 int status = 0;
1881 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1882 {
1883 switch (option)
1884 {
1885 case 'g':
1886 stay_foreground=1;
1887 break;
1889 case 'l':
1890 {
1891 char **temp;
1893 temp = (char **) realloc (config_listen_address_list,
1894 sizeof (char *) * (config_listen_address_list_len + 1));
1895 if (temp == NULL)
1896 {
1897 fprintf (stderr, "read_options: realloc failed.\n");
1898 return (2);
1899 }
1900 config_listen_address_list = temp;
1902 temp[config_listen_address_list_len] = strdup (optarg);
1903 if (temp[config_listen_address_list_len] == NULL)
1904 {
1905 fprintf (stderr, "read_options: strdup failed.\n");
1906 return (2);
1907 }
1908 config_listen_address_list_len++;
1909 }
1910 break;
1912 case 'f':
1913 {
1914 int temp;
1916 temp = atoi (optarg);
1917 if (temp > 0)
1918 config_flush_interval = temp;
1919 else
1920 {
1921 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1922 status = 3;
1923 }
1924 }
1925 break;
1927 case 'w':
1928 {
1929 int temp;
1931 temp = atoi (optarg);
1932 if (temp > 0)
1933 config_write_interval = temp;
1934 else
1935 {
1936 fprintf (stderr, "Invalid write interval: %s\n", optarg);
1937 status = 2;
1938 }
1939 }
1940 break;
1942 case 'z':
1943 {
1944 int temp;
1946 temp = atoi(optarg);
1947 if (temp > 0)
1948 config_write_jitter = temp;
1949 else
1950 {
1951 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1952 status = 2;
1953 }
1955 break;
1956 }
1958 case 'b':
1959 {
1960 size_t len;
1962 if (config_base_dir != NULL)
1963 free (config_base_dir);
1964 config_base_dir = strdup (optarg);
1965 if (config_base_dir == NULL)
1966 {
1967 fprintf (stderr, "read_options: strdup failed.\n");
1968 return (3);
1969 }
1971 len = strlen (config_base_dir);
1972 while ((len > 0) && (config_base_dir[len - 1] == '/'))
1973 {
1974 config_base_dir[len - 1] = 0;
1975 len--;
1976 }
1978 if (len < 1)
1979 {
1980 fprintf (stderr, "Invalid base directory: %s\n", optarg);
1981 return (4);
1982 }
1983 }
1984 break;
1986 case 'p':
1987 {
1988 if (config_pid_file != NULL)
1989 free (config_pid_file);
1990 config_pid_file = strdup (optarg);
1991 if (config_pid_file == NULL)
1992 {
1993 fprintf (stderr, "read_options: strdup failed.\n");
1994 return (3);
1995 }
1996 }
1997 break;
1999 case 'j':
2000 {
2001 struct stat statbuf;
2002 const char *dir = optarg;
2004 status = stat(dir, &statbuf);
2005 if (status != 0)
2006 {
2007 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2008 return 6;
2009 }
2011 if (!S_ISDIR(statbuf.st_mode)
2012 || access(dir, R_OK|W_OK|X_OK) != 0)
2013 {
2014 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2015 errno ? rrd_strerror(errno) : "");
2016 return 6;
2017 }
2019 journal_cur = malloc(PATH_MAX + 1);
2020 journal_old = malloc(PATH_MAX + 1);
2021 if (journal_cur == NULL || journal_old == NULL)
2022 {
2023 fprintf(stderr, "malloc failure for journal files\n");
2024 return 6;
2025 }
2026 else
2027 {
2028 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2029 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2030 }
2031 }
2032 break;
2034 case 'h':
2035 case '?':
2036 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2037 "\n"
2038 "Usage: rrdcached [options]\n"
2039 "\n"
2040 "Valid options are:\n"
2041 " -l <address> Socket address to listen to.\n"
2042 " -w <seconds> Interval in which to write data.\n"
2043 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2044 " -f <seconds> Interval in which to flush dead data.\n"
2045 " -p <file> Location of the PID-file.\n"
2046 " -b <dir> Base directory to change to.\n"
2047 " -g Do not fork and run in the foreground.\n"
2048 " -j <dir> Directory in which to create the journal files.\n"
2049 "\n"
2050 "For more information and a detailed description of all options "
2051 "please refer\n"
2052 "to the rrdcached(1) manual page.\n",
2053 VERSION);
2054 status = -1;
2055 break;
2056 } /* switch (option) */
2057 } /* while (getopt) */
2059 /* advise the user when values are not sane */
2060 if (config_flush_interval < 2 * config_write_interval)
2061 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2062 " 2x write interval (-w) !\n");
2063 if (config_write_jitter > config_write_interval)
2064 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2065 " write interval (-w) !\n");
2067 return (status);
2068 } /* }}} int read_options */
2070 int main (int argc, char **argv)
2071 {
2072 int status;
2074 status = read_options (argc, argv);
2075 if (status != 0)
2076 {
2077 if (status < 0)
2078 status = 0;
2079 return (status);
2080 }
2082 status = daemonize ();
2083 if (status == 1)
2084 {
2085 struct sigaction sigchld;
2087 memset (&sigchld, 0, sizeof (sigchld));
2088 sigchld.sa_handler = SIG_IGN;
2089 sigaction (SIGCHLD, &sigchld, NULL);
2091 return (0);
2092 }
2093 else if (status != 0)
2094 {
2095 fprintf (stderr, "daemonize failed, exiting.\n");
2096 return (1);
2097 }
2099 if (journal_cur != NULL)
2100 {
2101 int had_journal = 0;
2103 pthread_mutex_lock(&journal_lock);
2105 RRDD_LOG(LOG_INFO, "checking for journal files");
2107 had_journal += journal_replay(journal_old);
2108 had_journal += journal_replay(journal_cur);
2110 if (had_journal)
2111 flush_old_values(-1);
2113 pthread_mutex_unlock(&journal_lock);
2114 journal_rotate();
2116 RRDD_LOG(LOG_INFO, "journal processing complete");
2117 }
2119 /* start the queue thread */
2120 memset (&queue_thread, 0, sizeof (queue_thread));
2121 status = pthread_create (&queue_thread,
2122 NULL, /* attr */
2123 queue_thread_main,
2124 NULL); /* args */
2125 if (status != 0)
2126 {
2127 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2128 cleanup();
2129 return (1);
2130 }
2132 listen_thread_main (NULL);
2133 cleanup ();
2135 return (0);
2136 } /* int main */
2138 /*
2139 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2140 */