1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
122 pthread_cond_t flushed;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static int config_write_interval = 300;
169 static int config_write_jitter = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
195 /*
196 * Functions
197 */
198 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
199 {
200 RRDD_LOG(LOG_NOTICE, "caught SIGINT");
201 do_shutdown++;
202 pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_int_handler */
205 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
206 {
207 RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
208 do_shutdown++;
209 pthread_cond_broadcast(&cache_cond);
210 } /* }}} void sig_term_handler */
212 static int open_pidfile(void) /* {{{ */
213 {
214 int fd;
215 char *file;
217 file = (config_pid_file != NULL)
218 ? config_pid_file
219 : LOCALSTATEDIR "/run/rrdcached.pid";
221 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
222 if (fd < 0)
223 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
224 file, rrd_strerror(errno));
226 return(fd);
227 }
229 static int write_pidfile (int fd) /* {{{ */
230 {
231 pid_t pid;
232 FILE *fh;
234 pid = getpid ();
236 fh = fdopen (fd, "w");
237 if (fh == NULL)
238 {
239 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
240 close(fd);
241 return (-1);
242 }
244 fprintf (fh, "%i\n", (int) pid);
245 fclose (fh);
247 return (0);
248 } /* }}} int write_pidfile */
250 static int remove_pidfile (void) /* {{{ */
251 {
252 char *file;
253 int status;
255 file = (config_pid_file != NULL)
256 ? config_pid_file
257 : LOCALSTATEDIR "/run/rrdcached.pid";
259 status = unlink (file);
260 if (status == 0)
261 return (0);
262 return (errno);
263 } /* }}} int remove_pidfile */
265 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
266 {
267 char *buffer;
268 size_t buffer_used;
269 size_t buffer_free;
270 ssize_t status;
272 buffer = (char *) buffer_void;
273 buffer_used = 0;
274 buffer_free = buffer_size;
276 while (buffer_free > 0)
277 {
278 status = read (fd, buffer + buffer_used, buffer_free);
279 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
280 continue;
282 if (status < 0)
283 return (-1);
285 if (status == 0)
286 return (0);
288 assert ((0 > status) || (buffer_free >= (size_t) status));
290 buffer_free = buffer_free - status;
291 buffer_used = buffer_used + status;
293 if (buffer[buffer_used - 1] == '\n')
294 break;
295 }
297 assert (buffer_used > 0);
299 if (buffer[buffer_used - 1] != '\n')
300 {
301 errno = ENOBUFS;
302 return (-1);
303 }
305 buffer[buffer_used - 1] = 0;
307 /* Fix network line endings. */
308 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
309 {
310 buffer_used--;
311 buffer[buffer_used - 1] = 0;
312 }
314 return (buffer_used);
315 } /* }}} ssize_t sread */
317 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
318 {
319 const char *ptr;
320 size_t nleft;
321 ssize_t status;
323 /* special case for journal replay */
324 if (fd < 0) return 0;
326 ptr = (const char *) buf;
327 nleft = count;
329 while (nleft > 0)
330 {
331 status = write (fd, (const void *) ptr, nleft);
333 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
334 continue;
336 if (status < 0)
337 return (status);
339 nleft -= status;
340 ptr += status;
341 }
343 return (0);
344 } /* }}} ssize_t swrite */
346 static void _wipe_ci_values(cache_item_t *ci, time_t when)
347 {
348 ci->values = NULL;
349 ci->values_num = 0;
351 ci->last_flush_time = when;
352 if (config_write_jitter > 0)
353 ci->last_flush_time += (random() % config_write_jitter);
355 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
356 }
358 /*
359 * enqueue_cache_item:
360 * `cache_lock' must be acquired before calling this function!
361 */
362 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
363 queue_side_t side)
364 {
365 int did_insert = 0;
367 if (ci == NULL)
368 return (-1);
370 if (ci->values_num == 0)
371 return (0);
373 if (side == HEAD)
374 {
375 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
376 {
377 assert (ci->next == NULL);
378 ci->next = cache_queue_head;
379 cache_queue_head = ci;
381 if (cache_queue_tail == NULL)
382 cache_queue_tail = cache_queue_head;
384 did_insert = 1;
385 }
386 else if (cache_queue_head == ci)
387 {
388 /* do nothing */
389 }
390 else /* enqueued, but not first entry */
391 {
392 cache_item_t *prev;
394 /* find previous entry */
395 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
396 if (prev->next == ci)
397 break;
398 assert (prev != NULL);
400 /* move to the front */
401 prev->next = ci->next;
402 ci->next = cache_queue_head;
403 cache_queue_head = ci;
405 /* check if we need to adapt the tail */
406 if (cache_queue_tail == ci)
407 cache_queue_tail = prev;
408 }
409 }
410 else /* (side == TAIL) */
411 {
412 /* We don't move values back in the list.. */
413 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
414 return (0);
416 assert (ci->next == NULL);
418 if (cache_queue_tail == NULL)
419 cache_queue_head = ci;
420 else
421 cache_queue_tail->next = ci;
422 cache_queue_tail = ci;
424 did_insert = 1;
425 }
427 ci->flags |= CI_FLAGS_IN_QUEUE;
429 if (did_insert)
430 {
431 pthread_cond_broadcast(&cache_cond);
432 pthread_mutex_lock (&stats_lock);
433 stats_queue_length++;
434 pthread_mutex_unlock (&stats_lock);
435 }
437 return (0);
438 } /* }}} int enqueue_cache_item */
440 /*
441 * tree_callback_flush:
442 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
443 * while this is in progress.
444 */
445 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
446 gpointer data)
447 {
448 cache_item_t *ci;
449 callback_flush_data_t *cfd;
451 ci = (cache_item_t *) value;
452 cfd = (callback_flush_data_t *) data;
454 if ((ci->last_flush_time <= cfd->abs_timeout)
455 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
456 && (ci->values_num > 0))
457 {
458 enqueue_cache_item (ci, TAIL);
459 }
460 else if ((do_shutdown != 0)
461 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
462 && (ci->values_num > 0))
463 {
464 enqueue_cache_item (ci, TAIL);
465 }
466 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
467 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
468 && (ci->values_num <= 0))
469 {
470 char **temp;
472 temp = (char **) realloc (cfd->keys,
473 sizeof (char *) * (cfd->keys_num + 1));
474 if (temp == NULL)
475 {
476 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
477 return (FALSE);
478 }
479 cfd->keys = temp;
480 /* Make really sure this points to the _same_ place */
481 assert ((char *) key == ci->file);
482 cfd->keys[cfd->keys_num] = (char *) key;
483 cfd->keys_num++;
484 }
486 return (FALSE);
487 } /* }}} gboolean tree_callback_flush */
489 static int flush_old_values (int max_age)
490 {
491 callback_flush_data_t cfd;
492 size_t k;
494 memset (&cfd, 0, sizeof (cfd));
495 /* Pass the current time as user data so that we don't need to call
496 * `time' for each node. */
497 cfd.now = time (NULL);
498 cfd.keys = NULL;
499 cfd.keys_num = 0;
501 if (max_age > 0)
502 cfd.abs_timeout = cfd.now - max_age;
503 else
504 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
506 /* `tree_callback_flush' will return the keys of all values that haven't
507 * been touched in the last `config_flush_interval' seconds in `cfd'.
508 * The char*'s in this array point to the same memory as ci->file, so we
509 * don't need to free them separately. */
510 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
512 for (k = 0; k < cfd.keys_num; k++)
513 {
514 cache_item_t *ci;
516 /* This must not fail. */
517 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
518 assert (ci != NULL);
520 /* If we end up here with values available, something's seriously
521 * messed up. */
522 assert (ci->values_num == 0);
524 /* Remove the node from the tree */
525 g_tree_remove (cache_tree, cfd.keys[k]);
526 cfd.keys[k] = NULL;
528 /* Now free and clean up `ci'. */
529 free (ci->file);
530 ci->file = NULL;
531 free (ci);
532 ci = NULL;
533 } /* for (k = 0; k < cfd.keys_num; k++) */
535 if (cfd.keys != NULL)
536 {
537 free (cfd.keys);
538 cfd.keys = NULL;
539 }
541 return (0);
542 } /* int flush_old_values */
544 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
545 {
546 struct timeval now;
547 struct timespec next_flush;
549 gettimeofday (&now, NULL);
550 next_flush.tv_sec = now.tv_sec + config_flush_interval;
551 next_flush.tv_nsec = 1000 * now.tv_usec;
553 pthread_mutex_lock (&cache_lock);
554 while ((do_shutdown == 0) || (cache_queue_head != NULL))
555 {
556 cache_item_t *ci;
557 char *file;
558 char **values;
559 int values_num;
560 int status;
561 int i;
563 /* First, check if it's time to do the cache flush. */
564 gettimeofday (&now, NULL);
565 if ((now.tv_sec > next_flush.tv_sec)
566 || ((now.tv_sec == next_flush.tv_sec)
567 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
568 {
569 /* Flush all values that haven't been written in the last
570 * `config_write_interval' seconds. */
571 flush_old_values (config_write_interval);
573 /* Determine the time of the next cache flush. */
574 while (next_flush.tv_sec <= now.tv_sec)
575 next_flush.tv_sec += config_flush_interval;
577 /* unlock the cache while we rotate so we don't block incoming
578 * updates if the fsync() blocks on disk I/O */
579 pthread_mutex_unlock(&cache_lock);
580 journal_rotate();
581 pthread_mutex_lock(&cache_lock);
582 }
584 /* Now, check if there's something to store away. If not, wait until
585 * something comes in or it's time to do the cache flush. */
586 if (cache_queue_head == NULL)
587 {
588 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
589 if ((status != 0) && (status != ETIMEDOUT))
590 {
591 RRDD_LOG (LOG_ERR, "queue_thread_main: "
592 "pthread_cond_timedwait returned %i.", status);
593 }
594 }
596 /* We're about to shut down, so lets flush the entire tree. */
597 if ((do_shutdown != 0) && (cache_queue_head == NULL))
598 flush_old_values (/* max age = */ -1);
600 /* Check if a value has arrived. This may be NULL if we timed out or there
601 * was an interrupt such as a signal. */
602 if (cache_queue_head == NULL)
603 continue;
605 ci = cache_queue_head;
607 /* copy the relevant parts */
608 file = strdup (ci->file);
609 if (file == NULL)
610 {
611 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
612 continue;
613 }
615 assert(ci->values != NULL);
616 assert(ci->values_num > 0);
618 values = ci->values;
619 values_num = ci->values_num;
621 _wipe_ci_values(ci, time(NULL));
623 cache_queue_head = ci->next;
624 if (cache_queue_head == NULL)
625 cache_queue_tail = NULL;
626 ci->next = NULL;
628 pthread_mutex_lock (&stats_lock);
629 assert (stats_queue_length > 0);
630 stats_queue_length--;
631 pthread_mutex_unlock (&stats_lock);
633 pthread_mutex_unlock (&cache_lock);
635 rrd_clear_error ();
636 status = rrd_update_r (file, NULL, values_num, (void *) values);
637 if (status != 0)
638 {
639 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
640 "rrd_update_r (%s) failed with status %i. (%s)",
641 file, status, rrd_get_error());
642 }
644 journal_write("wrote", file);
645 pthread_cond_broadcast(&ci->flushed);
647 for (i = 0; i < values_num; i++)
648 free (values[i]);
650 free(values);
651 free(file);
653 if (status == 0)
654 {
655 pthread_mutex_lock (&stats_lock);
656 stats_updates_written++;
657 stats_data_sets_written += values_num;
658 pthread_mutex_unlock (&stats_lock);
659 }
661 pthread_mutex_lock (&cache_lock);
663 /* We're about to shut down, so lets flush the entire tree. */
664 if ((do_shutdown != 0) && (cache_queue_head == NULL))
665 flush_old_values (/* max age = */ -1);
666 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
667 pthread_mutex_unlock (&cache_lock);
669 assert(cache_queue_head == NULL);
670 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
671 journal_done();
673 return (NULL);
674 } /* }}} void *queue_thread_main */
676 static int buffer_get_field (char **buffer_ret, /* {{{ */
677 size_t *buffer_size_ret, char **field_ret)
678 {
679 char *buffer;
680 size_t buffer_pos;
681 size_t buffer_size;
682 char *field;
683 size_t field_size;
684 int status;
686 buffer = *buffer_ret;
687 buffer_pos = 0;
688 buffer_size = *buffer_size_ret;
689 field = *buffer_ret;
690 field_size = 0;
692 if (buffer_size <= 0)
693 return (-1);
695 /* This is ensured by `handle_request'. */
696 assert (buffer[buffer_size - 1] == '\0');
698 status = -1;
699 while (buffer_pos < buffer_size)
700 {
701 /* Check for end-of-field or end-of-buffer */
702 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
703 {
704 field[field_size] = 0;
705 field_size++;
706 buffer_pos++;
707 status = 0;
708 break;
709 }
710 /* Handle escaped characters. */
711 else if (buffer[buffer_pos] == '\\')
712 {
713 if (buffer_pos >= (buffer_size - 1))
714 break;
715 buffer_pos++;
716 field[field_size] = buffer[buffer_pos];
717 field_size++;
718 buffer_pos++;
719 }
720 /* Normal operation */
721 else
722 {
723 field[field_size] = buffer[buffer_pos];
724 field_size++;
725 buffer_pos++;
726 }
727 } /* while (buffer_pos < buffer_size) */
729 if (status != 0)
730 return (status);
732 *buffer_ret = buffer + buffer_pos;
733 *buffer_size_ret = buffer_size - buffer_pos;
734 *field_ret = field;
736 return (0);
737 } /* }}} int buffer_get_field */
739 static int flush_file (const char *filename) /* {{{ */
740 {
741 cache_item_t *ci;
743 pthread_mutex_lock (&cache_lock);
745 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
746 if (ci == NULL)
747 {
748 pthread_mutex_unlock (&cache_lock);
749 return (ENOENT);
750 }
752 /* Enqueue at head */
753 enqueue_cache_item (ci, HEAD);
755 pthread_cond_wait(&ci->flushed, &cache_lock);
756 pthread_mutex_unlock(&cache_lock);
758 return (0);
759 } /* }}} int flush_file */
761 static int handle_request_help (int fd, /* {{{ */
762 char *buffer, size_t buffer_size)
763 {
764 int status;
765 char **help_text;
766 size_t help_text_len;
767 char *command;
768 size_t i;
770 char *help_help[] =
771 {
772 "4 Command overview\n",
773 "FLUSH <filename>\n",
774 "HELP [<command>]\n",
775 "UPDATE <filename> <values> [<values> ...]\n",
776 "STATS\n"
777 };
778 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
780 char *help_flush[] =
781 {
782 "4 Help for FLUSH\n",
783 "Usage: FLUSH <filename>\n",
784 "\n",
785 "Adds the given filename to the head of the update queue and returns\n",
786 "after is has been dequeued.\n"
787 };
788 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
790 char *help_update[] =
791 {
792 "9 Help for UPDATE\n",
793 "Usage: UPDATE <filename> <values> [<values> ...]\n"
794 "\n",
795 "Adds the given file to the internal cache if it is not yet known and\n",
796 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
797 "for details.\n",
798 "\n",
799 "Each <values> has the following form:\n",
800 " <values> = <time>:<value>[:<value>[...]]\n",
801 "See the rrdupdate(1) manpage for details.\n"
802 };
803 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
805 char *help_stats[] =
806 {
807 "4 Help for STATS\n",
808 "Usage: STATS\n",
809 "\n",
810 "Returns some performance counters, see the rrdcached(1) manpage for\n",
811 "a description of the values.\n"
812 };
813 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
815 status = buffer_get_field (&buffer, &buffer_size, &command);
816 if (status != 0)
817 {
818 help_text = help_help;
819 help_text_len = help_help_len;
820 }
821 else
822 {
823 if (strcasecmp (command, "update") == 0)
824 {
825 help_text = help_update;
826 help_text_len = help_update_len;
827 }
828 else if (strcasecmp (command, "flush") == 0)
829 {
830 help_text = help_flush;
831 help_text_len = help_flush_len;
832 }
833 else if (strcasecmp (command, "stats") == 0)
834 {
835 help_text = help_stats;
836 help_text_len = help_stats_len;
837 }
838 else
839 {
840 help_text = help_help;
841 help_text_len = help_help_len;
842 }
843 }
845 for (i = 0; i < help_text_len; i++)
846 {
847 status = swrite (fd, help_text[i], strlen (help_text[i]));
848 if (status < 0)
849 {
850 status = errno;
851 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
852 return (status);
853 }
854 }
856 return (0);
857 } /* }}} int handle_request_help */
859 static int handle_request_stats (int fd, /* {{{ */
860 char *buffer __attribute__((unused)),
861 size_t buffer_size __attribute__((unused)))
862 {
863 int status;
864 char outbuf[CMD_MAX];
866 uint64_t copy_queue_length;
867 uint64_t copy_updates_received;
868 uint64_t copy_flush_received;
869 uint64_t copy_updates_written;
870 uint64_t copy_data_sets_written;
871 uint64_t copy_journal_bytes;
872 uint64_t copy_journal_rotate;
874 uint64_t tree_nodes_number;
875 uint64_t tree_depth;
877 pthread_mutex_lock (&stats_lock);
878 copy_queue_length = stats_queue_length;
879 copy_updates_received = stats_updates_received;
880 copy_flush_received = stats_flush_received;
881 copy_updates_written = stats_updates_written;
882 copy_data_sets_written = stats_data_sets_written;
883 copy_journal_bytes = stats_journal_bytes;
884 copy_journal_rotate = stats_journal_rotate;
885 pthread_mutex_unlock (&stats_lock);
887 pthread_mutex_lock (&cache_lock);
888 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
889 tree_depth = (uint64_t) g_tree_height (cache_tree);
890 pthread_mutex_unlock (&cache_lock);
892 #define RRDD_STATS_SEND \
893 outbuf[sizeof (outbuf) - 1] = 0; \
894 status = swrite (fd, outbuf, strlen (outbuf)); \
895 if (status < 0) \
896 { \
897 status = errno; \
898 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
899 return (status); \
900 }
902 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
903 RRDD_STATS_SEND;
905 snprintf (outbuf, sizeof (outbuf),
906 "QueueLength: %"PRIu64"\n", copy_queue_length);
907 RRDD_STATS_SEND;
909 snprintf (outbuf, sizeof (outbuf),
910 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
911 RRDD_STATS_SEND;
913 snprintf (outbuf, sizeof (outbuf),
914 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
915 RRDD_STATS_SEND;
917 snprintf (outbuf, sizeof (outbuf),
918 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
919 RRDD_STATS_SEND;
921 snprintf (outbuf, sizeof (outbuf),
922 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
923 RRDD_STATS_SEND;
925 snprintf (outbuf, sizeof (outbuf),
926 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
927 RRDD_STATS_SEND;
929 snprintf (outbuf, sizeof (outbuf),
930 "TreeDepth: %"PRIu64"\n", tree_depth);
931 RRDD_STATS_SEND;
933 snprintf (outbuf, sizeof(outbuf),
934 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
935 RRDD_STATS_SEND;
937 snprintf (outbuf, sizeof(outbuf),
938 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
939 RRDD_STATS_SEND;
941 return (0);
942 #undef RRDD_STATS_SEND
943 } /* }}} int handle_request_stats */
945 static int handle_request_flush (int fd, /* {{{ */
946 char *buffer, size_t buffer_size)
947 {
948 char *file;
949 int status;
950 char result[CMD_MAX];
952 status = buffer_get_field (&buffer, &buffer_size, &file);
953 if (status != 0)
954 {
955 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
956 }
957 else
958 {
959 pthread_mutex_lock(&stats_lock);
960 stats_flush_received++;
961 pthread_mutex_unlock(&stats_lock);
963 status = flush_file (file);
964 if (status == 0)
965 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
966 else if (status == ENOENT)
967 {
968 /* no file in our tree; see whether it exists at all */
969 struct stat statbuf;
971 memset(&statbuf, 0, sizeof(statbuf));
972 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
973 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
974 else
975 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
976 }
977 else if (status < 0)
978 strncpy (result, "-1 Internal error.\n", sizeof (result));
979 else
980 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
981 }
982 result[sizeof (result) - 1] = 0;
984 status = swrite (fd, result, strlen (result));
985 if (status < 0)
986 {
987 status = errno;
988 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
989 return (status);
990 }
992 return (0);
993 } /* }}} int handle_request_flush */
995 static int handle_request_update (int fd, /* {{{ */
996 char *buffer, size_t buffer_size)
997 {
998 char *file;
999 int values_num = 0;
1000 int status;
1002 time_t now;
1004 cache_item_t *ci;
1005 char answer[CMD_MAX];
1007 #define RRDD_UPDATE_SEND \
1008 answer[sizeof (answer) - 1] = 0; \
1009 status = swrite (fd, answer, strlen (answer)); \
1010 if (status < 0) \
1011 { \
1012 status = errno; \
1013 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1014 return (status); \
1015 }
1017 now = time (NULL);
1019 status = buffer_get_field (&buffer, &buffer_size, &file);
1020 if (status != 0)
1021 {
1022 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1023 sizeof (answer));
1024 RRDD_UPDATE_SEND;
1025 return (0);
1026 }
1028 pthread_mutex_lock(&stats_lock);
1029 stats_updates_received++;
1030 pthread_mutex_unlock(&stats_lock);
1032 pthread_mutex_lock (&cache_lock);
1033 ci = g_tree_lookup (cache_tree, file);
1035 if (ci == NULL) /* {{{ */
1036 {
1037 struct stat statbuf;
1039 /* don't hold the lock while we setup; stat(2) might block */
1040 pthread_mutex_unlock(&cache_lock);
1042 memset (&statbuf, 0, sizeof (statbuf));
1043 status = stat (file, &statbuf);
1044 if (status != 0)
1045 {
1046 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1048 status = errno;
1049 if (status == ENOENT)
1050 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1051 else
1052 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1053 status);
1054 RRDD_UPDATE_SEND;
1055 return (0);
1056 }
1057 if (!S_ISREG (statbuf.st_mode))
1058 {
1059 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1060 RRDD_UPDATE_SEND;
1061 return (0);
1062 }
1063 if (access(file, R_OK|W_OK) != 0)
1064 {
1065 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1066 file, rrd_strerror(errno));
1067 RRDD_UPDATE_SEND;
1068 return (0);
1069 }
1071 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1072 if (ci == NULL)
1073 {
1074 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1076 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1077 RRDD_UPDATE_SEND;
1078 return (0);
1079 }
1080 memset (ci, 0, sizeof (cache_item_t));
1082 ci->file = strdup (file);
1083 if (ci->file == NULL)
1084 {
1085 free (ci);
1086 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1088 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1089 RRDD_UPDATE_SEND;
1090 return (0);
1091 }
1093 _wipe_ci_values(ci, now);
1094 ci->flags = CI_FLAGS_IN_TREE;
1096 pthread_mutex_lock(&cache_lock);
1097 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1098 } /* }}} */
1099 assert (ci != NULL);
1101 while (buffer_size > 0)
1102 {
1103 char **temp;
1104 char *value;
1106 status = buffer_get_field (&buffer, &buffer_size, &value);
1107 if (status != 0)
1108 {
1109 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1110 break;
1111 }
1113 temp = (char **) realloc (ci->values,
1114 sizeof (char *) * (ci->values_num + 1));
1115 if (temp == NULL)
1116 {
1117 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1118 continue;
1119 }
1120 ci->values = temp;
1122 ci->values[ci->values_num] = strdup (value);
1123 if (ci->values[ci->values_num] == NULL)
1124 {
1125 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1126 continue;
1127 }
1128 ci->values_num++;
1130 values_num++;
1131 }
1133 if (((now - ci->last_flush_time) >= config_write_interval)
1134 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1135 && (ci->values_num > 0))
1136 {
1137 enqueue_cache_item (ci, TAIL);
1138 }
1140 pthread_mutex_unlock (&cache_lock);
1142 if (values_num < 1)
1143 {
1144 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1145 }
1146 else
1147 {
1148 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1149 (values_num == 1) ? "" : "s");
1150 }
1151 RRDD_UPDATE_SEND;
1152 return (0);
1153 #undef RRDD_UPDATE_SEND
1154 } /* }}} int handle_request_update */
1156 /* we came across a "WROTE" entry during journal replay.
1157 * throw away any values that we have accumulated for this file
1158 */
1159 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1160 const char *buffer,
1161 size_t buffer_size __attribute__((unused)))
1162 {
1163 int i;
1164 cache_item_t *ci;
1165 const char *file = buffer;
1167 pthread_mutex_lock(&cache_lock);
1169 ci = g_tree_lookup(cache_tree, file);
1170 if (ci == NULL)
1171 {
1172 pthread_mutex_unlock(&cache_lock);
1173 return (0);
1174 }
1176 if (ci->values)
1177 {
1178 for (i=0; i < ci->values_num; i++)
1179 free(ci->values[i]);
1181 free(ci->values);
1182 }
1184 _wipe_ci_values(ci, time(NULL));
1186 pthread_mutex_unlock(&cache_lock);
1187 return (0);
1188 } /* }}} int handle_request_wrote */
1190 /* if fd < 0, we are in journal replay mode */
1191 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1192 {
1193 char *buffer_ptr;
1194 char *command;
1195 int status;
1197 assert (buffer[buffer_size - 1] == '\0');
1199 buffer_ptr = buffer;
1200 command = NULL;
1201 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1202 if (status != 0)
1203 {
1204 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1205 return (-1);
1206 }
1208 if (strcasecmp (command, "update") == 0)
1209 {
1210 /* don't re-write updates in replay mode */
1211 if (fd >= 0)
1212 journal_write(command, buffer_ptr);
1214 return (handle_request_update (fd, buffer_ptr, buffer_size));
1215 }
1216 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1217 {
1218 /* this is only valid in replay mode */
1219 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1220 }
1221 else if (strcasecmp (command, "flush") == 0)
1222 {
1223 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1224 }
1225 else if (strcasecmp (command, "stats") == 0)
1226 {
1227 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1228 }
1229 else if (strcasecmp (command, "help") == 0)
1230 {
1231 return (handle_request_help (fd, buffer_ptr, buffer_size));
1232 }
1233 else
1234 {
1235 char result[CMD_MAX];
1237 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1238 result[sizeof (result) - 1] = 0;
1240 status = swrite (fd, result, strlen (result));
1241 if (status < 0)
1242 {
1243 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1244 return (-1);
1245 }
1246 }
1248 return (0);
1249 } /* }}} int handle_request */
1251 /* MUST NOT hold journal_lock before calling this */
1252 static void journal_rotate(void) /* {{{ */
1253 {
1254 FILE *old_fh = NULL;
1256 if (journal_cur == NULL || journal_old == NULL)
1257 return;
1259 pthread_mutex_lock(&journal_lock);
1261 /* we rotate this way (rename before close) so that the we can release
1262 * the journal lock as fast as possible. Journal writes to the new
1263 * journal can proceed immediately after the new file is opened. The
1264 * fclose can then block without affecting new updates.
1265 */
1266 if (journal_fh != NULL)
1267 {
1268 old_fh = journal_fh;
1269 rename(journal_cur, journal_old);
1270 ++stats_journal_rotate;
1271 }
1273 journal_fh = fopen(journal_cur, "a");
1274 pthread_mutex_unlock(&journal_lock);
1276 if (old_fh != NULL)
1277 fclose(old_fh);
1279 if (journal_fh == NULL)
1280 RRDD_LOG(LOG_CRIT,
1281 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1282 journal_cur, rrd_strerror(errno));
1284 } /* }}} static void journal_rotate */
1286 static void journal_done(void) /* {{{ */
1287 {
1288 if (journal_cur == NULL)
1289 return;
1291 pthread_mutex_lock(&journal_lock);
1292 if (journal_fh != NULL)
1293 {
1294 fclose(journal_fh);
1295 journal_fh = NULL;
1296 }
1298 RRDD_LOG(LOG_INFO, "removing journals");
1300 unlink(journal_old);
1301 unlink(journal_cur);
1302 pthread_mutex_unlock(&journal_lock);
1304 } /* }}} static void journal_done */
1306 static int journal_write(char *cmd, char *args) /* {{{ */
1307 {
1308 int chars;
1310 if (journal_fh == NULL)
1311 return 0;
1313 pthread_mutex_lock(&journal_lock);
1314 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1315 pthread_mutex_unlock(&journal_lock);
1317 if (chars > 0)
1318 {
1319 pthread_mutex_lock(&stats_lock);
1320 stats_journal_bytes += chars;
1321 pthread_mutex_unlock(&stats_lock);
1322 }
1324 return chars;
1325 } /* }}} static int journal_write */
1327 static int journal_replay (const char *file) /* {{{ */
1328 {
1329 FILE *fh;
1330 int entry_cnt = 0;
1331 int fail_cnt = 0;
1332 uint64_t line = 0;
1333 char entry[CMD_MAX];
1335 if (file == NULL) return 0;
1337 fh = fopen(file, "r");
1338 if (fh == NULL)
1339 {
1340 if (errno != ENOENT)
1341 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1342 file, rrd_strerror(errno));
1343 return 0;
1344 }
1345 else
1346 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1348 while(!feof(fh))
1349 {
1350 size_t entry_len;
1352 ++line;
1353 fgets(entry, sizeof(entry), fh);
1354 entry_len = strlen(entry);
1356 /* check \n termination in case journal writing crashed mid-line */
1357 if (entry_len == 0)
1358 continue;
1359 else if (entry[entry_len - 1] != '\n')
1360 {
1361 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1362 ++fail_cnt;
1363 continue;
1364 }
1366 entry[entry_len - 1] = '\0';
1368 if (handle_request(-1, entry, entry_len) == 0)
1369 ++entry_cnt;
1370 else
1371 ++fail_cnt;
1372 }
1374 fclose(fh);
1376 if (entry_cnt > 0)
1377 {
1378 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1379 entry_cnt, fail_cnt);
1380 return 1;
1381 }
1382 else
1383 return 0;
1385 } /* }}} static int journal_replay */
1387 static void *connection_thread_main (void *args) /* {{{ */
1388 {
1389 pthread_t self;
1390 int i;
1391 int fd;
1393 fd = *((int *) args);
1394 free (args);
1396 pthread_mutex_lock (&connection_threads_lock);
1397 {
1398 pthread_t *temp;
1400 temp = (pthread_t *) realloc (connection_threads,
1401 sizeof (pthread_t) * (connection_threads_num + 1));
1402 if (temp == NULL)
1403 {
1404 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1405 }
1406 else
1407 {
1408 connection_threads = temp;
1409 connection_threads[connection_threads_num] = pthread_self ();
1410 connection_threads_num++;
1411 }
1412 }
1413 pthread_mutex_unlock (&connection_threads_lock);
1415 while (do_shutdown == 0)
1416 {
1417 char buffer[CMD_MAX];
1419 struct pollfd pollfd;
1420 int status;
1422 pollfd.fd = fd;
1423 pollfd.events = POLLIN | POLLPRI;
1424 pollfd.revents = 0;
1426 status = poll (&pollfd, 1, /* timeout = */ 500);
1427 if (status == 0) /* timeout */
1428 continue;
1429 else if (status < 0) /* error */
1430 {
1431 status = errno;
1432 if (status == EINTR)
1433 continue;
1434 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1435 continue;
1436 }
1438 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1439 {
1440 close (fd);
1441 break;
1442 }
1443 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1444 {
1445 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1446 "poll(2) returned something unexpected: %#04hx",
1447 pollfd.revents);
1448 close (fd);
1449 break;
1450 }
1452 status = (int) sread (fd, buffer, sizeof (buffer));
1453 if (status <= 0)
1454 {
1455 close (fd);
1457 if (status < 0)
1458 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1460 break;
1461 }
1463 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1464 if (status != 0)
1465 break;
1466 }
1468 close(fd);
1470 self = pthread_self ();
1471 /* Remove this thread from the connection threads list */
1472 pthread_mutex_lock (&connection_threads_lock);
1473 /* Find out own index in the array */
1474 for (i = 0; i < connection_threads_num; i++)
1475 if (pthread_equal (connection_threads[i], self) != 0)
1476 break;
1477 assert (i < connection_threads_num);
1479 /* Move the trailing threads forward. */
1480 if (i < (connection_threads_num - 1))
1481 {
1482 memmove (connection_threads + i,
1483 connection_threads + i + 1,
1484 sizeof (pthread_t) * (connection_threads_num - i - 1));
1485 }
1487 connection_threads_num--;
1488 pthread_mutex_unlock (&connection_threads_lock);
1490 return (NULL);
1491 } /* }}} void *connection_thread_main */
1493 static int open_listen_socket_unix (const char *path) /* {{{ */
1494 {
1495 int fd;
1496 struct sockaddr_un sa;
1497 listen_socket_t *temp;
1498 int status;
1500 temp = (listen_socket_t *) realloc (listen_fds,
1501 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1502 if (temp == NULL)
1503 {
1504 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1505 return (-1);
1506 }
1507 listen_fds = temp;
1508 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1510 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1511 if (fd < 0)
1512 {
1513 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1514 return (-1);
1515 }
1517 memset (&sa, 0, sizeof (sa));
1518 sa.sun_family = AF_UNIX;
1519 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1521 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1522 if (status != 0)
1523 {
1524 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1525 close (fd);
1526 unlink (path);
1527 return (-1);
1528 }
1530 status = listen (fd, /* backlog = */ 10);
1531 if (status != 0)
1532 {
1533 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1534 close (fd);
1535 unlink (path);
1536 return (-1);
1537 }
1539 listen_fds[listen_fds_num].fd = fd;
1540 snprintf (listen_fds[listen_fds_num].path,
1541 sizeof (listen_fds[listen_fds_num].path) - 1,
1542 "unix:%s", path);
1543 listen_fds_num++;
1545 return (0);
1546 } /* }}} int open_listen_socket_unix */
1548 static int open_listen_socket (const char *addr_orig) /* {{{ */
1549 {
1550 struct addrinfo ai_hints;
1551 struct addrinfo *ai_res;
1552 struct addrinfo *ai_ptr;
1553 char addr_copy[NI_MAXHOST];
1554 char *addr;
1555 char *port;
1556 int status;
1558 assert (addr_orig != NULL);
1560 strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1561 addr_copy[sizeof (addr_copy) - 1] = 0;
1562 addr = addr_copy;
1564 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1565 return (open_listen_socket_unix (addr + strlen ("unix:")));
1566 else if (addr[0] == '/')
1567 return (open_listen_socket_unix (addr));
1569 memset (&ai_hints, 0, sizeof (ai_hints));
1570 ai_hints.ai_flags = 0;
1571 #ifdef AI_ADDRCONFIG
1572 ai_hints.ai_flags |= AI_ADDRCONFIG;
1573 #endif
1574 ai_hints.ai_family = AF_UNSPEC;
1575 ai_hints.ai_socktype = SOCK_STREAM;
1577 port = NULL;
1578 if (*addr == '[') /* IPv6+port format */
1579 {
1580 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1581 addr++;
1583 port = strchr (addr, ']');
1584 if (port == NULL)
1585 {
1586 RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1587 addr_orig);
1588 return (-1);
1589 }
1590 *port = 0;
1591 port++;
1593 if (*port == ':')
1594 port++;
1595 else if (*port == 0)
1596 port = NULL;
1597 else
1598 {
1599 RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1600 port);
1601 return (-1);
1602 }
1603 } /* if (*addr = ']') */
1604 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1605 {
1606 port = rindex(addr, ':');
1607 if (port != NULL)
1608 {
1609 *port = 0;
1610 port++;
1611 }
1612 }
1613 ai_res = NULL;
1614 status = getaddrinfo (addr,
1615 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1616 &ai_hints, &ai_res);
1617 if (status != 0)
1618 {
1619 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1620 "%s", addr, gai_strerror (status));
1621 return (-1);
1622 }
1624 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1625 {
1626 int fd;
1627 listen_socket_t *temp;
1628 int one = 1;
1630 temp = (listen_socket_t *) realloc (listen_fds,
1631 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1632 if (temp == NULL)
1633 {
1634 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1635 continue;
1636 }
1637 listen_fds = temp;
1638 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1640 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1641 if (fd < 0)
1642 {
1643 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1644 continue;
1645 }
1647 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1649 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1650 if (status != 0)
1651 {
1652 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1653 close (fd);
1654 continue;
1655 }
1657 status = listen (fd, /* backlog = */ 10);
1658 if (status != 0)
1659 {
1660 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1661 close (fd);
1662 return (-1);
1663 }
1665 listen_fds[listen_fds_num].fd = fd;
1666 strncpy (listen_fds[listen_fds_num].path, addr,
1667 sizeof (listen_fds[listen_fds_num].path) - 1);
1668 listen_fds_num++;
1669 } /* for (ai_ptr) */
1671 return (0);
1672 } /* }}} int open_listen_socket */
1674 static int close_listen_sockets (void) /* {{{ */
1675 {
1676 size_t i;
1678 for (i = 0; i < listen_fds_num; i++)
1679 {
1680 close (listen_fds[i].fd);
1681 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1682 unlink (listen_fds[i].path + strlen ("unix:"));
1683 }
1685 free (listen_fds);
1686 listen_fds = NULL;
1687 listen_fds_num = 0;
1689 return (0);
1690 } /* }}} int close_listen_sockets */
1692 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1693 {
1694 struct pollfd *pollfds;
1695 int pollfds_num;
1696 int status;
1697 int i;
1699 for (i = 0; i < config_listen_address_list_len; i++)
1700 open_listen_socket (config_listen_address_list[i]);
1702 if (config_listen_address_list_len < 1)
1703 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1705 if (listen_fds_num < 1)
1706 {
1707 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1708 "could be opened. Sorry.");
1709 return (NULL);
1710 }
1712 pollfds_num = listen_fds_num;
1713 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1714 if (pollfds == NULL)
1715 {
1716 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1717 return (NULL);
1718 }
1719 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1721 RRDD_LOG(LOG_INFO, "listening for connections");
1723 while (do_shutdown == 0)
1724 {
1725 assert (pollfds_num == ((int) listen_fds_num));
1726 for (i = 0; i < pollfds_num; i++)
1727 {
1728 pollfds[i].fd = listen_fds[i].fd;
1729 pollfds[i].events = POLLIN | POLLPRI;
1730 pollfds[i].revents = 0;
1731 }
1733 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1734 if (status == 0)
1735 {
1736 continue; /* timeout */
1737 }
1738 else if (status < 0)
1739 {
1740 status = errno;
1741 if (status != EINTR)
1742 {
1743 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1744 }
1745 continue;
1746 }
1748 for (i = 0; i < pollfds_num; i++)
1749 {
1750 int *client_sd;
1751 struct sockaddr_storage client_sa;
1752 socklen_t client_sa_size;
1753 pthread_t tid;
1754 pthread_attr_t attr;
1756 if (pollfds[i].revents == 0)
1757 continue;
1759 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1760 {
1761 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1762 "poll(2) returned something unexpected for listen FD #%i.",
1763 pollfds[i].fd);
1764 continue;
1765 }
1767 client_sd = (int *) malloc (sizeof (int));
1768 if (client_sd == NULL)
1769 {
1770 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1771 continue;
1772 }
1774 client_sa_size = sizeof (client_sa);
1775 *client_sd = accept (pollfds[i].fd,
1776 (struct sockaddr *) &client_sa, &client_sa_size);
1777 if (*client_sd < 0)
1778 {
1779 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1780 continue;
1781 }
1783 pthread_attr_init (&attr);
1784 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1786 status = pthread_create (&tid, &attr, connection_thread_main,
1787 /* args = */ (void *) client_sd);
1788 if (status != 0)
1789 {
1790 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1791 close (*client_sd);
1792 free (client_sd);
1793 continue;
1794 }
1795 } /* for (pollfds_num) */
1796 } /* while (do_shutdown == 0) */
1798 RRDD_LOG(LOG_INFO, "starting shutdown");
1800 close_listen_sockets ();
1802 pthread_mutex_lock (&connection_threads_lock);
1803 while (connection_threads_num > 0)
1804 {
1805 pthread_t wait_for;
1807 wait_for = connection_threads[0];
1809 pthread_mutex_unlock (&connection_threads_lock);
1810 pthread_join (wait_for, /* retval = */ NULL);
1811 pthread_mutex_lock (&connection_threads_lock);
1812 }
1813 pthread_mutex_unlock (&connection_threads_lock);
1815 return (NULL);
1816 } /* }}} void *listen_thread_main */
1818 static int daemonize (void) /* {{{ */
1819 {
1820 int status;
1821 int fd;
1823 /* These structures are static, because `sigaction' behaves weird if the are
1824 * overwritten.. */
1825 static struct sigaction sa_int;
1826 static struct sigaction sa_term;
1827 static struct sigaction sa_pipe;
1829 fd = open_pidfile();
1830 if (fd < 0) return fd;
1832 if (!stay_foreground)
1833 {
1834 pid_t child;
1835 char *base_dir;
1837 child = fork ();
1838 if (child < 0)
1839 {
1840 fprintf (stderr, "daemonize: fork(2) failed.\n");
1841 return (-1);
1842 }
1843 else if (child > 0)
1844 {
1845 return (1);
1846 }
1848 /* Change into the /tmp directory. */
1849 base_dir = (config_base_dir != NULL)
1850 ? config_base_dir
1851 : "/tmp";
1852 status = chdir (base_dir);
1853 if (status != 0)
1854 {
1855 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1856 return (-1);
1857 }
1859 /* Become session leader */
1860 setsid ();
1862 /* Open the first three file descriptors to /dev/null */
1863 close (2);
1864 close (1);
1865 close (0);
1867 open ("/dev/null", O_RDWR);
1868 dup (0);
1869 dup (0);
1870 } /* if (!stay_foreground) */
1872 /* Install signal handlers */
1873 memset (&sa_int, 0, sizeof (sa_int));
1874 sa_int.sa_handler = sig_int_handler;
1875 sigaction (SIGINT, &sa_int, NULL);
1877 memset (&sa_term, 0, sizeof (sa_term));
1878 sa_term.sa_handler = sig_term_handler;
1879 sigaction (SIGTERM, &sa_term, NULL);
1881 memset (&sa_pipe, 0, sizeof (sa_pipe));
1882 sa_pipe.sa_handler = SIG_IGN;
1883 sigaction (SIGPIPE, &sa_pipe, NULL);
1885 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1886 RRDD_LOG(LOG_INFO, "starting up");
1888 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1889 if (cache_tree == NULL)
1890 {
1891 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1892 return (-1);
1893 }
1895 status = write_pidfile (fd);
1896 return status;
1897 } /* }}} int daemonize */
1899 static int cleanup (void) /* {{{ */
1900 {
1901 do_shutdown++;
1903 pthread_cond_signal (&cache_cond);
1904 pthread_join (queue_thread, /* return = */ NULL);
1906 remove_pidfile ();
1908 RRDD_LOG(LOG_INFO, "goodbye");
1909 closelog ();
1911 return (0);
1912 } /* }}} int cleanup */
1914 static int read_options (int argc, char **argv) /* {{{ */
1915 {
1916 int option;
1917 int status = 0;
1919 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1920 {
1921 switch (option)
1922 {
1923 case 'g':
1924 stay_foreground=1;
1925 break;
1927 case 'l':
1928 {
1929 char **temp;
1931 temp = (char **) realloc (config_listen_address_list,
1932 sizeof (char *) * (config_listen_address_list_len + 1));
1933 if (temp == NULL)
1934 {
1935 fprintf (stderr, "read_options: realloc failed.\n");
1936 return (2);
1937 }
1938 config_listen_address_list = temp;
1940 temp[config_listen_address_list_len] = strdup (optarg);
1941 if (temp[config_listen_address_list_len] == NULL)
1942 {
1943 fprintf (stderr, "read_options: strdup failed.\n");
1944 return (2);
1945 }
1946 config_listen_address_list_len++;
1947 }
1948 break;
1950 case 'f':
1951 {
1952 int temp;
1954 temp = atoi (optarg);
1955 if (temp > 0)
1956 config_flush_interval = temp;
1957 else
1958 {
1959 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1960 status = 3;
1961 }
1962 }
1963 break;
1965 case 'w':
1966 {
1967 int temp;
1969 temp = atoi (optarg);
1970 if (temp > 0)
1971 config_write_interval = temp;
1972 else
1973 {
1974 fprintf (stderr, "Invalid write interval: %s\n", optarg);
1975 status = 2;
1976 }
1977 }
1978 break;
1980 case 'z':
1981 {
1982 int temp;
1984 temp = atoi(optarg);
1985 if (temp > 0)
1986 config_write_jitter = temp;
1987 else
1988 {
1989 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
1990 status = 2;
1991 }
1993 break;
1994 }
1996 case 'b':
1997 {
1998 size_t len;
2000 if (config_base_dir != NULL)
2001 free (config_base_dir);
2002 config_base_dir = strdup (optarg);
2003 if (config_base_dir == NULL)
2004 {
2005 fprintf (stderr, "read_options: strdup failed.\n");
2006 return (3);
2007 }
2009 len = strlen (config_base_dir);
2010 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2011 {
2012 config_base_dir[len - 1] = 0;
2013 len--;
2014 }
2016 if (len < 1)
2017 {
2018 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2019 return (4);
2020 }
2021 }
2022 break;
2024 case 'p':
2025 {
2026 if (config_pid_file != NULL)
2027 free (config_pid_file);
2028 config_pid_file = strdup (optarg);
2029 if (config_pid_file == NULL)
2030 {
2031 fprintf (stderr, "read_options: strdup failed.\n");
2032 return (3);
2033 }
2034 }
2035 break;
2037 case 'j':
2038 {
2039 struct stat statbuf;
2040 const char *dir = optarg;
2042 status = stat(dir, &statbuf);
2043 if (status != 0)
2044 {
2045 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2046 return 6;
2047 }
2049 if (!S_ISDIR(statbuf.st_mode)
2050 || access(dir, R_OK|W_OK|X_OK) != 0)
2051 {
2052 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2053 errno ? rrd_strerror(errno) : "");
2054 return 6;
2055 }
2057 journal_cur = malloc(PATH_MAX + 1);
2058 journal_old = malloc(PATH_MAX + 1);
2059 if (journal_cur == NULL || journal_old == NULL)
2060 {
2061 fprintf(stderr, "malloc failure for journal files\n");
2062 return 6;
2063 }
2064 else
2065 {
2066 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2067 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2068 }
2069 }
2070 break;
2072 case 'h':
2073 case '?':
2074 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2075 "\n"
2076 "Usage: rrdcached [options]\n"
2077 "\n"
2078 "Valid options are:\n"
2079 " -l <address> Socket address to listen to.\n"
2080 " -w <seconds> Interval in which to write data.\n"
2081 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2082 " -f <seconds> Interval in which to flush dead data.\n"
2083 " -p <file> Location of the PID-file.\n"
2084 " -b <dir> Base directory to change to.\n"
2085 " -g Do not fork and run in the foreground.\n"
2086 " -j <dir> Directory in which to create the journal files.\n"
2087 "\n"
2088 "For more information and a detailed description of all options "
2089 "please refer\n"
2090 "to the rrdcached(1) manual page.\n",
2091 VERSION);
2092 status = -1;
2093 break;
2094 } /* switch (option) */
2095 } /* while (getopt) */
2097 /* advise the user when values are not sane */
2098 if (config_flush_interval < 2 * config_write_interval)
2099 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2100 " 2x write interval (-w) !\n");
2101 if (config_write_jitter > config_write_interval)
2102 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2103 " write interval (-w) !\n");
2105 return (status);
2106 } /* }}} int read_options */
2108 int main (int argc, char **argv)
2109 {
2110 int status;
2112 status = read_options (argc, argv);
2113 if (status != 0)
2114 {
2115 if (status < 0)
2116 status = 0;
2117 return (status);
2118 }
2120 status = daemonize ();
2121 if (status == 1)
2122 {
2123 struct sigaction sigchld;
2125 memset (&sigchld, 0, sizeof (sigchld));
2126 sigchld.sa_handler = SIG_IGN;
2127 sigaction (SIGCHLD, &sigchld, NULL);
2129 return (0);
2130 }
2131 else if (status != 0)
2132 {
2133 fprintf (stderr, "daemonize failed, exiting.\n");
2134 return (1);
2135 }
2137 if (journal_cur != NULL)
2138 {
2139 int had_journal = 0;
2141 pthread_mutex_lock(&journal_lock);
2143 RRDD_LOG(LOG_INFO, "checking for journal files");
2145 had_journal += journal_replay(journal_old);
2146 had_journal += journal_replay(journal_cur);
2148 if (had_journal)
2149 flush_old_values(-1);
2151 pthread_mutex_unlock(&journal_lock);
2152 journal_rotate();
2154 RRDD_LOG(LOG_INFO, "journal processing complete");
2155 }
2157 /* start the queue thread */
2158 memset (&queue_thread, 0, sizeof (queue_thread));
2159 status = pthread_create (&queue_thread,
2160 NULL, /* attr */
2161 queue_thread_main,
2162 NULL); /* args */
2163 if (status != 0)
2164 {
2165 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2166 cleanup();
2167 return (1);
2168 }
2170 listen_thread_main (NULL);
2171 cleanup ();
2173 return (0);
2174 } /* int main */
2176 /*
2177 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2178 */