0e29f131ab1387fa657ae0160e2e6d956de4bab7
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
122 pthread_cond_t flushed;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static int config_write_interval = 300;
169 static int config_write_jitter = 0;
170 static int config_flush_interval = 3600;
171 static char *config_pid_file = NULL;
172 static char *config_base_dir = NULL;
174 static char **config_listen_address_list = NULL;
175 static int config_listen_address_list_len = 0;
177 static uint64_t stats_queue_length = 0;
178 static uint64_t stats_updates_received = 0;
179 static uint64_t stats_flush_received = 0;
180 static uint64_t stats_updates_written = 0;
181 static uint64_t stats_data_sets_written = 0;
182 static uint64_t stats_journal_bytes = 0;
183 static uint64_t stats_journal_rotate = 0;
184 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
186 /* Journaled updates */
187 static char *journal_cur = NULL;
188 static char *journal_old = NULL;
189 static FILE *journal_fh = NULL;
190 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
191 static int journal_write(char *cmd, char *args);
192 static void journal_done(void);
193 static void journal_rotate(void);
195 /*
196 * Functions
197 */
198 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
199 {
200 RRDD_LOG(LOG_NOTICE, "caught SIGINT");
201 do_shutdown++;
202 pthread_cond_broadcast(&cache_cond);
203 } /* }}} void sig_int_handler */
205 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
206 {
207 RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
208 do_shutdown++;
209 pthread_cond_broadcast(&cache_cond);
210 } /* }}} void sig_term_handler */
212 static int open_pidfile(void) /* {{{ */
213 {
214 int fd;
215 char *file;
217 file = (config_pid_file != NULL)
218 ? config_pid_file
219 : LOCALSTATEDIR "/run/rrdcached.pid";
221 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
222 if (fd < 0)
223 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
224 file, rrd_strerror(errno));
226 return(fd);
227 }
229 static int write_pidfile (int fd) /* {{{ */
230 {
231 pid_t pid;
232 FILE *fh;
234 pid = getpid ();
236 fh = fdopen (fd, "w");
237 if (fh == NULL)
238 {
239 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
240 close(fd);
241 return (-1);
242 }
244 fprintf (fh, "%i\n", (int) pid);
245 fclose (fh);
247 return (0);
248 } /* }}} int write_pidfile */
250 static int remove_pidfile (void) /* {{{ */
251 {
252 char *file;
253 int status;
255 file = (config_pid_file != NULL)
256 ? config_pid_file
257 : LOCALSTATEDIR "/run/rrdcached.pid";
259 status = unlink (file);
260 if (status == 0)
261 return (0);
262 return (errno);
263 } /* }}} int remove_pidfile */
265 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
266 {
267 char *buffer;
268 size_t buffer_used;
269 size_t buffer_free;
270 ssize_t status;
272 buffer = (char *) buffer_void;
273 buffer_used = 0;
274 buffer_free = buffer_size;
276 while (buffer_free > 0)
277 {
278 status = read (fd, buffer + buffer_used, buffer_free);
279 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
280 continue;
282 if (status < 0)
283 return (-1);
285 if (status == 0)
286 return (0);
288 assert ((0 > status) || (buffer_free >= (size_t) status));
290 buffer_free = buffer_free - status;
291 buffer_used = buffer_used + status;
293 if (buffer[buffer_used - 1] == '\n')
294 break;
295 }
297 assert (buffer_used > 0);
299 if (buffer[buffer_used - 1] != '\n')
300 {
301 errno = ENOBUFS;
302 return (-1);
303 }
305 buffer[buffer_used - 1] = 0;
307 /* Fix network line endings. */
308 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
309 {
310 buffer_used--;
311 buffer[buffer_used - 1] = 0;
312 }
314 return (buffer_used);
315 } /* }}} ssize_t sread */
317 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
318 {
319 const char *ptr;
320 size_t nleft;
321 ssize_t status;
323 /* special case for journal replay */
324 if (fd < 0) return 0;
326 ptr = (const char *) buf;
327 nleft = count;
329 while (nleft > 0)
330 {
331 status = write (fd, (const void *) ptr, nleft);
333 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
334 continue;
336 if (status < 0)
337 return (status);
339 nleft -= status;
340 ptr += status;
341 }
343 return (0);
344 } /* }}} ssize_t swrite */
346 static void _wipe_ci_values(cache_item_t *ci, time_t when)
347 {
348 ci->values = NULL;
349 ci->values_num = 0;
351 ci->last_flush_time = when;
352 if (config_write_jitter > 0)
353 ci->last_flush_time += (random() % config_write_jitter);
355 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
356 }
358 /*
359 * enqueue_cache_item:
360 * `cache_lock' must be acquired before calling this function!
361 */
362 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
363 queue_side_t side)
364 {
365 int did_insert = 0;
367 if (ci == NULL)
368 return (-1);
370 if (ci->values_num == 0)
371 return (0);
373 if (side == HEAD)
374 {
375 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
376 {
377 assert (ci->next == NULL);
378 ci->next = cache_queue_head;
379 cache_queue_head = ci;
381 if (cache_queue_tail == NULL)
382 cache_queue_tail = cache_queue_head;
384 did_insert = 1;
385 }
386 else if (cache_queue_head == ci)
387 {
388 /* do nothing */
389 }
390 else /* enqueued, but not first entry */
391 {
392 cache_item_t *prev;
394 /* find previous entry */
395 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
396 if (prev->next == ci)
397 break;
398 assert (prev != NULL);
400 /* move to the front */
401 prev->next = ci->next;
402 ci->next = cache_queue_head;
403 cache_queue_head = ci;
405 /* check if we need to adapt the tail */
406 if (cache_queue_tail == ci)
407 cache_queue_tail = prev;
408 }
409 }
410 else /* (side == TAIL) */
411 {
412 /* We don't move values back in the list.. */
413 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
414 return (0);
416 assert (ci->next == NULL);
418 if (cache_queue_tail == NULL)
419 cache_queue_head = ci;
420 else
421 cache_queue_tail->next = ci;
422 cache_queue_tail = ci;
424 did_insert = 1;
425 }
427 ci->flags |= CI_FLAGS_IN_QUEUE;
429 if (did_insert)
430 {
431 pthread_cond_broadcast(&cache_cond);
432 pthread_mutex_lock (&stats_lock);
433 stats_queue_length++;
434 pthread_mutex_unlock (&stats_lock);
435 }
437 return (0);
438 } /* }}} int enqueue_cache_item */
440 /*
441 * tree_callback_flush:
442 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
443 * while this is in progress.
444 */
445 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
446 gpointer data)
447 {
448 cache_item_t *ci;
449 callback_flush_data_t *cfd;
451 ci = (cache_item_t *) value;
452 cfd = (callback_flush_data_t *) data;
454 if ((ci->last_flush_time <= cfd->abs_timeout)
455 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
456 && (ci->values_num > 0))
457 {
458 enqueue_cache_item (ci, TAIL);
459 }
460 else if ((do_shutdown != 0)
461 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
462 && (ci->values_num > 0))
463 {
464 enqueue_cache_item (ci, TAIL);
465 }
466 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
467 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
468 && (ci->values_num <= 0))
469 {
470 char **temp;
472 temp = (char **) realloc (cfd->keys,
473 sizeof (char *) * (cfd->keys_num + 1));
474 if (temp == NULL)
475 {
476 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
477 return (FALSE);
478 }
479 cfd->keys = temp;
480 /* Make really sure this points to the _same_ place */
481 assert ((char *) key == ci->file);
482 cfd->keys[cfd->keys_num] = (char *) key;
483 cfd->keys_num++;
484 }
486 return (FALSE);
487 } /* }}} gboolean tree_callback_flush */
489 static int flush_old_values (int max_age)
490 {
491 callback_flush_data_t cfd;
492 size_t k;
494 memset (&cfd, 0, sizeof (cfd));
495 /* Pass the current time as user data so that we don't need to call
496 * `time' for each node. */
497 cfd.now = time (NULL);
498 cfd.keys = NULL;
499 cfd.keys_num = 0;
501 if (max_age > 0)
502 cfd.abs_timeout = cfd.now - max_age;
503 else
504 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
506 /* `tree_callback_flush' will return the keys of all values that haven't
507 * been touched in the last `config_flush_interval' seconds in `cfd'.
508 * The char*'s in this array point to the same memory as ci->file, so we
509 * don't need to free them separately. */
510 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
512 for (k = 0; k < cfd.keys_num; k++)
513 {
514 cache_item_t *ci;
516 /* This must not fail. */
517 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
518 assert (ci != NULL);
520 /* If we end up here with values available, something's seriously
521 * messed up. */
522 assert (ci->values_num == 0);
524 /* Remove the node from the tree */
525 g_tree_remove (cache_tree, cfd.keys[k]);
526 cfd.keys[k] = NULL;
528 /* Now free and clean up `ci'. */
529 free (ci->file);
530 ci->file = NULL;
531 free (ci);
532 ci = NULL;
533 } /* for (k = 0; k < cfd.keys_num; k++) */
535 if (cfd.keys != NULL)
536 {
537 free (cfd.keys);
538 cfd.keys = NULL;
539 }
541 return (0);
542 } /* int flush_old_values */
544 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
545 {
546 struct timeval now;
547 struct timespec next_flush;
549 gettimeofday (&now, NULL);
550 next_flush.tv_sec = now.tv_sec + config_flush_interval;
551 next_flush.tv_nsec = 1000 * now.tv_usec;
553 pthread_mutex_lock (&cache_lock);
554 while ((do_shutdown == 0) || (cache_queue_head != NULL))
555 {
556 cache_item_t *ci;
557 char *file;
558 char **values;
559 int values_num;
560 int status;
561 int i;
563 /* First, check if it's time to do the cache flush. */
564 gettimeofday (&now, NULL);
565 if ((now.tv_sec > next_flush.tv_sec)
566 || ((now.tv_sec == next_flush.tv_sec)
567 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
568 {
569 /* Flush all values that haven't been written in the last
570 * `config_write_interval' seconds. */
571 flush_old_values (config_write_interval);
573 /* Determine the time of the next cache flush. */
574 while (next_flush.tv_sec <= now.tv_sec)
575 next_flush.tv_sec += config_flush_interval;
577 /* unlock the cache while we rotate so we don't block incoming
578 * updates if the fsync() blocks on disk I/O */
579 pthread_mutex_unlock(&cache_lock);
580 journal_rotate();
581 pthread_mutex_lock(&cache_lock);
582 }
584 /* Now, check if there's something to store away. If not, wait until
585 * something comes in or it's time to do the cache flush. */
586 if (cache_queue_head == NULL)
587 {
588 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
589 if ((status != 0) && (status != ETIMEDOUT))
590 {
591 RRDD_LOG (LOG_ERR, "queue_thread_main: "
592 "pthread_cond_timedwait returned %i.", status);
593 }
594 }
596 /* We're about to shut down, so lets flush the entire tree. */
597 if ((do_shutdown != 0) && (cache_queue_head == NULL))
598 flush_old_values (/* max age = */ -1);
600 /* Check if a value has arrived. This may be NULL if we timed out or there
601 * was an interrupt such as a signal. */
602 if (cache_queue_head == NULL)
603 continue;
605 ci = cache_queue_head;
607 /* copy the relevant parts */
608 file = strdup (ci->file);
609 if (file == NULL)
610 {
611 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
612 continue;
613 }
615 assert(ci->values != NULL);
616 assert(ci->values_num > 0);
618 values = ci->values;
619 values_num = ci->values_num;
621 _wipe_ci_values(ci, time(NULL));
623 cache_queue_head = ci->next;
624 if (cache_queue_head == NULL)
625 cache_queue_tail = NULL;
626 ci->next = NULL;
628 pthread_mutex_lock (&stats_lock);
629 assert (stats_queue_length > 0);
630 stats_queue_length--;
631 pthread_mutex_unlock (&stats_lock);
633 pthread_mutex_unlock (&cache_lock);
635 rrd_clear_error ();
636 status = rrd_update_r (file, NULL, values_num, (void *) values);
637 if (status != 0)
638 {
639 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
640 "rrd_update_r (%s) failed with status %i. (%s)",
641 file, status, rrd_get_error());
642 }
644 journal_write("wrote", file);
645 pthread_cond_broadcast(&ci->flushed);
647 for (i = 0; i < values_num; i++)
648 free (values[i]);
650 free(values);
651 free(file);
653 if (status == 0)
654 {
655 pthread_mutex_lock (&stats_lock);
656 stats_updates_written++;
657 stats_data_sets_written += values_num;
658 pthread_mutex_unlock (&stats_lock);
659 }
661 pthread_mutex_lock (&cache_lock);
663 /* We're about to shut down, so lets flush the entire tree. */
664 if ((do_shutdown != 0) && (cache_queue_head == NULL))
665 flush_old_values (/* max age = */ -1);
666 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
667 pthread_mutex_unlock (&cache_lock);
669 assert(cache_queue_head == NULL);
670 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
671 journal_done();
673 return (NULL);
674 } /* }}} void *queue_thread_main */
676 static int buffer_get_field (char **buffer_ret, /* {{{ */
677 size_t *buffer_size_ret, char **field_ret)
678 {
679 char *buffer;
680 size_t buffer_pos;
681 size_t buffer_size;
682 char *field;
683 size_t field_size;
684 int status;
686 buffer = *buffer_ret;
687 buffer_pos = 0;
688 buffer_size = *buffer_size_ret;
689 field = *buffer_ret;
690 field_size = 0;
692 if (buffer_size <= 0)
693 return (-1);
695 /* This is ensured by `handle_request'. */
696 assert (buffer[buffer_size - 1] == '\0');
698 status = -1;
699 while (buffer_pos < buffer_size)
700 {
701 /* Check for end-of-field or end-of-buffer */
702 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
703 {
704 field[field_size] = 0;
705 field_size++;
706 buffer_pos++;
707 status = 0;
708 break;
709 }
710 /* Handle escaped characters. */
711 else if (buffer[buffer_pos] == '\\')
712 {
713 if (buffer_pos >= (buffer_size - 1))
714 break;
715 buffer_pos++;
716 field[field_size] = buffer[buffer_pos];
717 field_size++;
718 buffer_pos++;
719 }
720 /* Normal operation */
721 else
722 {
723 field[field_size] = buffer[buffer_pos];
724 field_size++;
725 buffer_pos++;
726 }
727 } /* while (buffer_pos < buffer_size) */
729 if (status != 0)
730 return (status);
732 *buffer_ret = buffer + buffer_pos;
733 *buffer_size_ret = buffer_size - buffer_pos;
734 *field_ret = field;
736 return (0);
737 } /* }}} int buffer_get_field */
739 static int flush_file (const char *filename) /* {{{ */
740 {
741 cache_item_t *ci;
743 pthread_mutex_lock (&cache_lock);
745 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
746 if (ci == NULL)
747 {
748 pthread_mutex_unlock (&cache_lock);
749 return (ENOENT);
750 }
752 /* Enqueue at head */
753 enqueue_cache_item (ci, HEAD);
755 pthread_cond_wait(&ci->flushed, &cache_lock);
756 pthread_mutex_unlock(&cache_lock);
758 return (0);
759 } /* }}} int flush_file */
761 static int handle_request_help (int fd, /* {{{ */
762 char *buffer, size_t buffer_size)
763 {
764 int status;
765 char **help_text;
766 size_t help_text_len;
767 char *command;
768 size_t i;
770 char *help_help[] =
771 {
772 "5 Command overview\n",
773 "FLUSH <filename>\n",
774 "FLUSHALL\n",
775 "HELP [<command>]\n",
776 "UPDATE <filename> <values> [<values> ...]\n",
777 "STATS\n"
778 };
779 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
781 char *help_flush[] =
782 {
783 "4 Help for FLUSH\n",
784 "Usage: FLUSH <filename>\n",
785 "\n",
786 "Adds the given filename to the head of the update queue and returns\n",
787 "after is has been dequeued.\n"
788 };
789 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
791 char *help_flushall[] =
792 {
793 "3 Help for FLUSHALL\n",
794 "Usage: FLUSHALL\n",
795 "\n",
796 "Triggers writing of all pending updates. Returns immediately.\n"
797 };
798 size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
800 char *help_update[] =
801 {
802 "9 Help for UPDATE\n",
803 "Usage: UPDATE <filename> <values> [<values> ...]\n"
804 "\n",
805 "Adds the given file to the internal cache if it is not yet known and\n",
806 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
807 "for details.\n",
808 "\n",
809 "Each <values> has the following form:\n",
810 " <values> = <time>:<value>[:<value>[...]]\n",
811 "See the rrdupdate(1) manpage for details.\n"
812 };
813 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
815 char *help_stats[] =
816 {
817 "4 Help for STATS\n",
818 "Usage: STATS\n",
819 "\n",
820 "Returns some performance counters, see the rrdcached(1) manpage for\n",
821 "a description of the values.\n"
822 };
823 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
825 status = buffer_get_field (&buffer, &buffer_size, &command);
826 if (status != 0)
827 {
828 help_text = help_help;
829 help_text_len = help_help_len;
830 }
831 else
832 {
833 if (strcasecmp (command, "update") == 0)
834 {
835 help_text = help_update;
836 help_text_len = help_update_len;
837 }
838 else if (strcasecmp (command, "flush") == 0)
839 {
840 help_text = help_flush;
841 help_text_len = help_flush_len;
842 }
843 else if (strcasecmp (command, "flushall") == 0)
844 {
845 help_text = help_flushall;
846 help_text_len = help_flushall_len;
847 }
848 else if (strcasecmp (command, "stats") == 0)
849 {
850 help_text = help_stats;
851 help_text_len = help_stats_len;
852 }
853 else
854 {
855 help_text = help_help;
856 help_text_len = help_help_len;
857 }
858 }
860 for (i = 0; i < help_text_len; i++)
861 {
862 status = swrite (fd, help_text[i], strlen (help_text[i]));
863 if (status < 0)
864 {
865 status = errno;
866 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
867 return (status);
868 }
869 }
871 return (0);
872 } /* }}} int handle_request_help */
874 static int handle_request_stats (int fd, /* {{{ */
875 char *buffer __attribute__((unused)),
876 size_t buffer_size __attribute__((unused)))
877 {
878 int status;
879 char outbuf[CMD_MAX];
881 uint64_t copy_queue_length;
882 uint64_t copy_updates_received;
883 uint64_t copy_flush_received;
884 uint64_t copy_updates_written;
885 uint64_t copy_data_sets_written;
886 uint64_t copy_journal_bytes;
887 uint64_t copy_journal_rotate;
889 uint64_t tree_nodes_number;
890 uint64_t tree_depth;
892 pthread_mutex_lock (&stats_lock);
893 copy_queue_length = stats_queue_length;
894 copy_updates_received = stats_updates_received;
895 copy_flush_received = stats_flush_received;
896 copy_updates_written = stats_updates_written;
897 copy_data_sets_written = stats_data_sets_written;
898 copy_journal_bytes = stats_journal_bytes;
899 copy_journal_rotate = stats_journal_rotate;
900 pthread_mutex_unlock (&stats_lock);
902 pthread_mutex_lock (&cache_lock);
903 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
904 tree_depth = (uint64_t) g_tree_height (cache_tree);
905 pthread_mutex_unlock (&cache_lock);
907 #define RRDD_STATS_SEND \
908 outbuf[sizeof (outbuf) - 1] = 0; \
909 status = swrite (fd, outbuf, strlen (outbuf)); \
910 if (status < 0) \
911 { \
912 status = errno; \
913 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
914 return (status); \
915 }
917 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
918 RRDD_STATS_SEND;
920 snprintf (outbuf, sizeof (outbuf),
921 "QueueLength: %"PRIu64"\n", copy_queue_length);
922 RRDD_STATS_SEND;
924 snprintf (outbuf, sizeof (outbuf),
925 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
926 RRDD_STATS_SEND;
928 snprintf (outbuf, sizeof (outbuf),
929 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
930 RRDD_STATS_SEND;
932 snprintf (outbuf, sizeof (outbuf),
933 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
934 RRDD_STATS_SEND;
936 snprintf (outbuf, sizeof (outbuf),
937 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
938 RRDD_STATS_SEND;
940 snprintf (outbuf, sizeof (outbuf),
941 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
942 RRDD_STATS_SEND;
944 snprintf (outbuf, sizeof (outbuf),
945 "TreeDepth: %"PRIu64"\n", tree_depth);
946 RRDD_STATS_SEND;
948 snprintf (outbuf, sizeof(outbuf),
949 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
950 RRDD_STATS_SEND;
952 snprintf (outbuf, sizeof(outbuf),
953 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
954 RRDD_STATS_SEND;
956 return (0);
957 #undef RRDD_STATS_SEND
958 } /* }}} int handle_request_stats */
960 static int handle_request_flush (int fd, /* {{{ */
961 char *buffer, size_t buffer_size)
962 {
963 char *file;
964 int status;
965 char result[CMD_MAX];
967 status = buffer_get_field (&buffer, &buffer_size, &file);
968 if (status != 0)
969 {
970 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
971 }
972 else
973 {
974 pthread_mutex_lock(&stats_lock);
975 stats_flush_received++;
976 pthread_mutex_unlock(&stats_lock);
978 status = flush_file (file);
979 if (status == 0)
980 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
981 else if (status == ENOENT)
982 {
983 /* no file in our tree; see whether it exists at all */
984 struct stat statbuf;
986 memset(&statbuf, 0, sizeof(statbuf));
987 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
988 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
989 else
990 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
991 }
992 else if (status < 0)
993 strncpy (result, "-1 Internal error.\n", sizeof (result));
994 else
995 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
996 }
997 result[sizeof (result) - 1] = 0;
999 status = swrite (fd, result, strlen (result));
1000 if (status < 0)
1001 {
1002 status = errno;
1003 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1004 return (status);
1005 }
1007 return (0);
1008 } /* }}} int handle_request_flush */
1010 static int handle_request_flushall(int fd) /* {{{ */
1011 {
1012 int status;
1013 char answer[] ="0 Started flush.\n";
1015 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1017 pthread_mutex_lock(&cache_lock);
1018 flush_old_values(-1);
1019 pthread_mutex_unlock(&cache_lock);
1021 status = swrite(fd, answer, strlen(answer));
1022 if (status < 0)
1023 {
1024 status = errno;
1025 RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1026 }
1028 return (status);
1029 }
1031 static int handle_request_update (int fd, /* {{{ */
1032 char *buffer, size_t buffer_size)
1033 {
1034 char *file;
1035 int values_num = 0;
1036 int status;
1038 time_t now;
1040 cache_item_t *ci;
1041 char answer[CMD_MAX];
1043 #define RRDD_UPDATE_SEND \
1044 answer[sizeof (answer) - 1] = 0; \
1045 status = swrite (fd, answer, strlen (answer)); \
1046 if (status < 0) \
1047 { \
1048 status = errno; \
1049 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1050 return (status); \
1051 }
1053 now = time (NULL);
1055 status = buffer_get_field (&buffer, &buffer_size, &file);
1056 if (status != 0)
1057 {
1058 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1059 sizeof (answer));
1060 RRDD_UPDATE_SEND;
1061 return (0);
1062 }
1064 pthread_mutex_lock(&stats_lock);
1065 stats_updates_received++;
1066 pthread_mutex_unlock(&stats_lock);
1068 pthread_mutex_lock (&cache_lock);
1069 ci = g_tree_lookup (cache_tree, file);
1071 if (ci == NULL) /* {{{ */
1072 {
1073 struct stat statbuf;
1075 /* don't hold the lock while we setup; stat(2) might block */
1076 pthread_mutex_unlock(&cache_lock);
1078 memset (&statbuf, 0, sizeof (statbuf));
1079 status = stat (file, &statbuf);
1080 if (status != 0)
1081 {
1082 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1084 status = errno;
1085 if (status == ENOENT)
1086 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1087 else
1088 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1089 status);
1090 RRDD_UPDATE_SEND;
1091 return (0);
1092 }
1093 if (!S_ISREG (statbuf.st_mode))
1094 {
1095 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1096 RRDD_UPDATE_SEND;
1097 return (0);
1098 }
1099 if (access(file, R_OK|W_OK) != 0)
1100 {
1101 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1102 file, rrd_strerror(errno));
1103 RRDD_UPDATE_SEND;
1104 return (0);
1105 }
1107 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1108 if (ci == NULL)
1109 {
1110 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1112 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1113 RRDD_UPDATE_SEND;
1114 return (0);
1115 }
1116 memset (ci, 0, sizeof (cache_item_t));
1118 ci->file = strdup (file);
1119 if (ci->file == NULL)
1120 {
1121 free (ci);
1122 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1124 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1125 RRDD_UPDATE_SEND;
1126 return (0);
1127 }
1129 _wipe_ci_values(ci, now);
1130 ci->flags = CI_FLAGS_IN_TREE;
1132 pthread_mutex_lock(&cache_lock);
1133 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1134 } /* }}} */
1135 assert (ci != NULL);
1137 while (buffer_size > 0)
1138 {
1139 char **temp;
1140 char *value;
1142 status = buffer_get_field (&buffer, &buffer_size, &value);
1143 if (status != 0)
1144 {
1145 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1146 break;
1147 }
1149 temp = (char **) realloc (ci->values,
1150 sizeof (char *) * (ci->values_num + 1));
1151 if (temp == NULL)
1152 {
1153 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1154 continue;
1155 }
1156 ci->values = temp;
1158 ci->values[ci->values_num] = strdup (value);
1159 if (ci->values[ci->values_num] == NULL)
1160 {
1161 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1162 continue;
1163 }
1164 ci->values_num++;
1166 values_num++;
1167 }
1169 if (((now - ci->last_flush_time) >= config_write_interval)
1170 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1171 && (ci->values_num > 0))
1172 {
1173 enqueue_cache_item (ci, TAIL);
1174 }
1176 pthread_mutex_unlock (&cache_lock);
1178 if (values_num < 1)
1179 {
1180 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1181 }
1182 else
1183 {
1184 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1185 (values_num == 1) ? "" : "s");
1186 }
1187 RRDD_UPDATE_SEND;
1188 return (0);
1189 #undef RRDD_UPDATE_SEND
1190 } /* }}} int handle_request_update */
1192 /* we came across a "WROTE" entry during journal replay.
1193 * throw away any values that we have accumulated for this file
1194 */
1195 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1196 const char *buffer,
1197 size_t buffer_size __attribute__((unused)))
1198 {
1199 int i;
1200 cache_item_t *ci;
1201 const char *file = buffer;
1203 pthread_mutex_lock(&cache_lock);
1205 ci = g_tree_lookup(cache_tree, file);
1206 if (ci == NULL)
1207 {
1208 pthread_mutex_unlock(&cache_lock);
1209 return (0);
1210 }
1212 if (ci->values)
1213 {
1214 for (i=0; i < ci->values_num; i++)
1215 free(ci->values[i]);
1217 free(ci->values);
1218 }
1220 _wipe_ci_values(ci, time(NULL));
1222 pthread_mutex_unlock(&cache_lock);
1223 return (0);
1224 } /* }}} int handle_request_wrote */
1226 /* if fd < 0, we are in journal replay mode */
1227 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1228 {
1229 char *buffer_ptr;
1230 char *command;
1231 int status;
1233 assert (buffer[buffer_size - 1] == '\0');
1235 buffer_ptr = buffer;
1236 command = NULL;
1237 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1238 if (status != 0)
1239 {
1240 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1241 return (-1);
1242 }
1244 if (strcasecmp (command, "update") == 0)
1245 {
1246 /* don't re-write updates in replay mode */
1247 if (fd >= 0)
1248 journal_write(command, buffer_ptr);
1250 return (handle_request_update (fd, buffer_ptr, buffer_size));
1251 }
1252 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1253 {
1254 /* this is only valid in replay mode */
1255 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1256 }
1257 else if (strcasecmp (command, "flush") == 0)
1258 {
1259 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1260 }
1261 else if (strcasecmp (command, "flushall") == 0)
1262 {
1263 return (handle_request_flushall(fd));
1264 }
1265 else if (strcasecmp (command, "stats") == 0)
1266 {
1267 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1268 }
1269 else if (strcasecmp (command, "help") == 0)
1270 {
1271 return (handle_request_help (fd, buffer_ptr, buffer_size));
1272 }
1273 else
1274 {
1275 char result[CMD_MAX];
1277 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1278 result[sizeof (result) - 1] = 0;
1280 status = swrite (fd, result, strlen (result));
1281 if (status < 0)
1282 {
1283 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1284 return (-1);
1285 }
1286 }
1288 return (0);
1289 } /* }}} int handle_request */
1291 /* MUST NOT hold journal_lock before calling this */
1292 static void journal_rotate(void) /* {{{ */
1293 {
1294 FILE *old_fh = NULL;
1296 if (journal_cur == NULL || journal_old == NULL)
1297 return;
1299 pthread_mutex_lock(&journal_lock);
1301 /* we rotate this way (rename before close) so that the we can release
1302 * the journal lock as fast as possible. Journal writes to the new
1303 * journal can proceed immediately after the new file is opened. The
1304 * fclose can then block without affecting new updates.
1305 */
1306 if (journal_fh != NULL)
1307 {
1308 old_fh = journal_fh;
1309 rename(journal_cur, journal_old);
1310 ++stats_journal_rotate;
1311 }
1313 journal_fh = fopen(journal_cur, "a");
1314 pthread_mutex_unlock(&journal_lock);
1316 if (old_fh != NULL)
1317 fclose(old_fh);
1319 if (journal_fh == NULL)
1320 RRDD_LOG(LOG_CRIT,
1321 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1322 journal_cur, rrd_strerror(errno));
1324 } /* }}} static void journal_rotate */
1326 static void journal_done(void) /* {{{ */
1327 {
1328 if (journal_cur == NULL)
1329 return;
1331 pthread_mutex_lock(&journal_lock);
1332 if (journal_fh != NULL)
1333 {
1334 fclose(journal_fh);
1335 journal_fh = NULL;
1336 }
1338 RRDD_LOG(LOG_INFO, "removing journals");
1340 unlink(journal_old);
1341 unlink(journal_cur);
1342 pthread_mutex_unlock(&journal_lock);
1344 } /* }}} static void journal_done */
1346 static int journal_write(char *cmd, char *args) /* {{{ */
1347 {
1348 int chars;
1350 if (journal_fh == NULL)
1351 return 0;
1353 pthread_mutex_lock(&journal_lock);
1354 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1355 pthread_mutex_unlock(&journal_lock);
1357 if (chars > 0)
1358 {
1359 pthread_mutex_lock(&stats_lock);
1360 stats_journal_bytes += chars;
1361 pthread_mutex_unlock(&stats_lock);
1362 }
1364 return chars;
1365 } /* }}} static int journal_write */
1367 static int journal_replay (const char *file) /* {{{ */
1368 {
1369 FILE *fh;
1370 int entry_cnt = 0;
1371 int fail_cnt = 0;
1372 uint64_t line = 0;
1373 char entry[CMD_MAX];
1375 if (file == NULL) return 0;
1377 fh = fopen(file, "r");
1378 if (fh == NULL)
1379 {
1380 if (errno != ENOENT)
1381 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1382 file, rrd_strerror(errno));
1383 return 0;
1384 }
1385 else
1386 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1388 while(!feof(fh))
1389 {
1390 size_t entry_len;
1392 ++line;
1393 fgets(entry, sizeof(entry), fh);
1394 entry_len = strlen(entry);
1396 /* check \n termination in case journal writing crashed mid-line */
1397 if (entry_len == 0)
1398 continue;
1399 else if (entry[entry_len - 1] != '\n')
1400 {
1401 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1402 ++fail_cnt;
1403 continue;
1404 }
1406 entry[entry_len - 1] = '\0';
1408 if (handle_request(-1, entry, entry_len) == 0)
1409 ++entry_cnt;
1410 else
1411 ++fail_cnt;
1412 }
1414 fclose(fh);
1416 if (entry_cnt > 0)
1417 {
1418 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1419 entry_cnt, fail_cnt);
1420 return 1;
1421 }
1422 else
1423 return 0;
1425 } /* }}} static int journal_replay */
1427 static void *connection_thread_main (void *args) /* {{{ */
1428 {
1429 pthread_t self;
1430 int i;
1431 int fd;
1433 fd = *((int *) args);
1434 free (args);
1436 pthread_mutex_lock (&connection_threads_lock);
1437 {
1438 pthread_t *temp;
1440 temp = (pthread_t *) realloc (connection_threads,
1441 sizeof (pthread_t) * (connection_threads_num + 1));
1442 if (temp == NULL)
1443 {
1444 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1445 }
1446 else
1447 {
1448 connection_threads = temp;
1449 connection_threads[connection_threads_num] = pthread_self ();
1450 connection_threads_num++;
1451 }
1452 }
1453 pthread_mutex_unlock (&connection_threads_lock);
1455 while (do_shutdown == 0)
1456 {
1457 char buffer[CMD_MAX];
1459 struct pollfd pollfd;
1460 int status;
1462 pollfd.fd = fd;
1463 pollfd.events = POLLIN | POLLPRI;
1464 pollfd.revents = 0;
1466 status = poll (&pollfd, 1, /* timeout = */ 500);
1467 if (status == 0) /* timeout */
1468 continue;
1469 else if (status < 0) /* error */
1470 {
1471 status = errno;
1472 if (status == EINTR)
1473 continue;
1474 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1475 continue;
1476 }
1478 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1479 {
1480 close (fd);
1481 break;
1482 }
1483 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1484 {
1485 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1486 "poll(2) returned something unexpected: %#04hx",
1487 pollfd.revents);
1488 close (fd);
1489 break;
1490 }
1492 status = (int) sread (fd, buffer, sizeof (buffer));
1493 if (status <= 0)
1494 {
1495 close (fd);
1497 if (status < 0)
1498 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1500 break;
1501 }
1503 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1504 if (status != 0)
1505 break;
1506 }
1508 close(fd);
1510 self = pthread_self ();
1511 /* Remove this thread from the connection threads list */
1512 pthread_mutex_lock (&connection_threads_lock);
1513 /* Find out own index in the array */
1514 for (i = 0; i < connection_threads_num; i++)
1515 if (pthread_equal (connection_threads[i], self) != 0)
1516 break;
1517 assert (i < connection_threads_num);
1519 /* Move the trailing threads forward. */
1520 if (i < (connection_threads_num - 1))
1521 {
1522 memmove (connection_threads + i,
1523 connection_threads + i + 1,
1524 sizeof (pthread_t) * (connection_threads_num - i - 1));
1525 }
1527 connection_threads_num--;
1528 pthread_mutex_unlock (&connection_threads_lock);
1530 return (NULL);
1531 } /* }}} void *connection_thread_main */
1533 static int open_listen_socket_unix (const char *path) /* {{{ */
1534 {
1535 int fd;
1536 struct sockaddr_un sa;
1537 listen_socket_t *temp;
1538 int status;
1540 temp = (listen_socket_t *) realloc (listen_fds,
1541 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1542 if (temp == NULL)
1543 {
1544 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1545 return (-1);
1546 }
1547 listen_fds = temp;
1548 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1550 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1551 if (fd < 0)
1552 {
1553 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1554 return (-1);
1555 }
1557 memset (&sa, 0, sizeof (sa));
1558 sa.sun_family = AF_UNIX;
1559 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1561 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1562 if (status != 0)
1563 {
1564 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1565 close (fd);
1566 unlink (path);
1567 return (-1);
1568 }
1570 status = listen (fd, /* backlog = */ 10);
1571 if (status != 0)
1572 {
1573 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1574 close (fd);
1575 unlink (path);
1576 return (-1);
1577 }
1579 listen_fds[listen_fds_num].fd = fd;
1580 snprintf (listen_fds[listen_fds_num].path,
1581 sizeof (listen_fds[listen_fds_num].path) - 1,
1582 "unix:%s", path);
1583 listen_fds_num++;
1585 return (0);
1586 } /* }}} int open_listen_socket_unix */
1588 static int open_listen_socket (const char *addr_orig) /* {{{ */
1589 {
1590 struct addrinfo ai_hints;
1591 struct addrinfo *ai_res;
1592 struct addrinfo *ai_ptr;
1593 char addr_copy[NI_MAXHOST];
1594 char *addr;
1595 char *port;
1596 int status;
1598 assert (addr_orig != NULL);
1600 strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1601 addr_copy[sizeof (addr_copy) - 1] = 0;
1602 addr = addr_copy;
1604 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1605 return (open_listen_socket_unix (addr + strlen ("unix:")));
1606 else if (addr[0] == '/')
1607 return (open_listen_socket_unix (addr));
1609 memset (&ai_hints, 0, sizeof (ai_hints));
1610 ai_hints.ai_flags = 0;
1611 #ifdef AI_ADDRCONFIG
1612 ai_hints.ai_flags |= AI_ADDRCONFIG;
1613 #endif
1614 ai_hints.ai_family = AF_UNSPEC;
1615 ai_hints.ai_socktype = SOCK_STREAM;
1617 port = NULL;
1618 if (*addr == '[') /* IPv6+port format */
1619 {
1620 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1621 addr++;
1623 port = strchr (addr, ']');
1624 if (port == NULL)
1625 {
1626 RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1627 addr_orig);
1628 return (-1);
1629 }
1630 *port = 0;
1631 port++;
1633 if (*port == ':')
1634 port++;
1635 else if (*port == 0)
1636 port = NULL;
1637 else
1638 {
1639 RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1640 port);
1641 return (-1);
1642 }
1643 } /* if (*addr = ']') */
1644 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1645 {
1646 port = rindex(addr, ':');
1647 if (port != NULL)
1648 {
1649 *port = 0;
1650 port++;
1651 }
1652 }
1653 ai_res = NULL;
1654 status = getaddrinfo (addr,
1655 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1656 &ai_hints, &ai_res);
1657 if (status != 0)
1658 {
1659 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1660 "%s", addr, gai_strerror (status));
1661 return (-1);
1662 }
1664 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1665 {
1666 int fd;
1667 listen_socket_t *temp;
1668 int one = 1;
1670 temp = (listen_socket_t *) realloc (listen_fds,
1671 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1672 if (temp == NULL)
1673 {
1674 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1675 continue;
1676 }
1677 listen_fds = temp;
1678 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1680 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1681 if (fd < 0)
1682 {
1683 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1684 continue;
1685 }
1687 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1689 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1690 if (status != 0)
1691 {
1692 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1693 close (fd);
1694 continue;
1695 }
1697 status = listen (fd, /* backlog = */ 10);
1698 if (status != 0)
1699 {
1700 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1701 close (fd);
1702 return (-1);
1703 }
1705 listen_fds[listen_fds_num].fd = fd;
1706 strncpy (listen_fds[listen_fds_num].path, addr,
1707 sizeof (listen_fds[listen_fds_num].path) - 1);
1708 listen_fds_num++;
1709 } /* for (ai_ptr) */
1711 return (0);
1712 } /* }}} int open_listen_socket */
1714 static int close_listen_sockets (void) /* {{{ */
1715 {
1716 size_t i;
1718 for (i = 0; i < listen_fds_num; i++)
1719 {
1720 close (listen_fds[i].fd);
1721 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1722 unlink (listen_fds[i].path + strlen ("unix:"));
1723 }
1725 free (listen_fds);
1726 listen_fds = NULL;
1727 listen_fds_num = 0;
1729 return (0);
1730 } /* }}} int close_listen_sockets */
1732 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1733 {
1734 struct pollfd *pollfds;
1735 int pollfds_num;
1736 int status;
1737 int i;
1739 for (i = 0; i < config_listen_address_list_len; i++)
1740 open_listen_socket (config_listen_address_list[i]);
1742 if (config_listen_address_list_len < 1)
1743 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1745 if (listen_fds_num < 1)
1746 {
1747 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1748 "could be opened. Sorry.");
1749 return (NULL);
1750 }
1752 pollfds_num = listen_fds_num;
1753 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1754 if (pollfds == NULL)
1755 {
1756 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1757 return (NULL);
1758 }
1759 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1761 RRDD_LOG(LOG_INFO, "listening for connections");
1763 while (do_shutdown == 0)
1764 {
1765 assert (pollfds_num == ((int) listen_fds_num));
1766 for (i = 0; i < pollfds_num; i++)
1767 {
1768 pollfds[i].fd = listen_fds[i].fd;
1769 pollfds[i].events = POLLIN | POLLPRI;
1770 pollfds[i].revents = 0;
1771 }
1773 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1774 if (status == 0)
1775 {
1776 continue; /* timeout */
1777 }
1778 else if (status < 0)
1779 {
1780 status = errno;
1781 if (status != EINTR)
1782 {
1783 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1784 }
1785 continue;
1786 }
1788 for (i = 0; i < pollfds_num; i++)
1789 {
1790 int *client_sd;
1791 struct sockaddr_storage client_sa;
1792 socklen_t client_sa_size;
1793 pthread_t tid;
1794 pthread_attr_t attr;
1796 if (pollfds[i].revents == 0)
1797 continue;
1799 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1800 {
1801 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1802 "poll(2) returned something unexpected for listen FD #%i.",
1803 pollfds[i].fd);
1804 continue;
1805 }
1807 client_sd = (int *) malloc (sizeof (int));
1808 if (client_sd == NULL)
1809 {
1810 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1811 continue;
1812 }
1814 client_sa_size = sizeof (client_sa);
1815 *client_sd = accept (pollfds[i].fd,
1816 (struct sockaddr *) &client_sa, &client_sa_size);
1817 if (*client_sd < 0)
1818 {
1819 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1820 continue;
1821 }
1823 pthread_attr_init (&attr);
1824 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1826 status = pthread_create (&tid, &attr, connection_thread_main,
1827 /* args = */ (void *) client_sd);
1828 if (status != 0)
1829 {
1830 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1831 close (*client_sd);
1832 free (client_sd);
1833 continue;
1834 }
1835 } /* for (pollfds_num) */
1836 } /* while (do_shutdown == 0) */
1838 RRDD_LOG(LOG_INFO, "starting shutdown");
1840 close_listen_sockets ();
1842 pthread_mutex_lock (&connection_threads_lock);
1843 while (connection_threads_num > 0)
1844 {
1845 pthread_t wait_for;
1847 wait_for = connection_threads[0];
1849 pthread_mutex_unlock (&connection_threads_lock);
1850 pthread_join (wait_for, /* retval = */ NULL);
1851 pthread_mutex_lock (&connection_threads_lock);
1852 }
1853 pthread_mutex_unlock (&connection_threads_lock);
1855 return (NULL);
1856 } /* }}} void *listen_thread_main */
1858 static int daemonize (void) /* {{{ */
1859 {
1860 int status;
1861 int fd;
1863 /* These structures are static, because `sigaction' behaves weird if the are
1864 * overwritten.. */
1865 static struct sigaction sa_int;
1866 static struct sigaction sa_term;
1867 static struct sigaction sa_pipe;
1869 fd = open_pidfile();
1870 if (fd < 0) return fd;
1872 if (!stay_foreground)
1873 {
1874 pid_t child;
1875 char *base_dir;
1877 child = fork ();
1878 if (child < 0)
1879 {
1880 fprintf (stderr, "daemonize: fork(2) failed.\n");
1881 return (-1);
1882 }
1883 else if (child > 0)
1884 {
1885 return (1);
1886 }
1888 /* Change into the /tmp directory. */
1889 base_dir = (config_base_dir != NULL)
1890 ? config_base_dir
1891 : "/tmp";
1892 status = chdir (base_dir);
1893 if (status != 0)
1894 {
1895 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1896 return (-1);
1897 }
1899 /* Become session leader */
1900 setsid ();
1902 /* Open the first three file descriptors to /dev/null */
1903 close (2);
1904 close (1);
1905 close (0);
1907 open ("/dev/null", O_RDWR);
1908 dup (0);
1909 dup (0);
1910 } /* if (!stay_foreground) */
1912 /* Install signal handlers */
1913 memset (&sa_int, 0, sizeof (sa_int));
1914 sa_int.sa_handler = sig_int_handler;
1915 sigaction (SIGINT, &sa_int, NULL);
1917 memset (&sa_term, 0, sizeof (sa_term));
1918 sa_term.sa_handler = sig_term_handler;
1919 sigaction (SIGTERM, &sa_term, NULL);
1921 memset (&sa_pipe, 0, sizeof (sa_pipe));
1922 sa_pipe.sa_handler = SIG_IGN;
1923 sigaction (SIGPIPE, &sa_pipe, NULL);
1925 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1926 RRDD_LOG(LOG_INFO, "starting up");
1928 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1929 if (cache_tree == NULL)
1930 {
1931 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1932 return (-1);
1933 }
1935 status = write_pidfile (fd);
1936 return status;
1937 } /* }}} int daemonize */
1939 static int cleanup (void) /* {{{ */
1940 {
1941 do_shutdown++;
1943 pthread_cond_signal (&cache_cond);
1944 pthread_join (queue_thread, /* return = */ NULL);
1946 remove_pidfile ();
1948 RRDD_LOG(LOG_INFO, "goodbye");
1949 closelog ();
1951 return (0);
1952 } /* }}} int cleanup */
1954 static int read_options (int argc, char **argv) /* {{{ */
1955 {
1956 int option;
1957 int status = 0;
1959 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
1960 {
1961 switch (option)
1962 {
1963 case 'g':
1964 stay_foreground=1;
1965 break;
1967 case 'l':
1968 {
1969 char **temp;
1971 temp = (char **) realloc (config_listen_address_list,
1972 sizeof (char *) * (config_listen_address_list_len + 1));
1973 if (temp == NULL)
1974 {
1975 fprintf (stderr, "read_options: realloc failed.\n");
1976 return (2);
1977 }
1978 config_listen_address_list = temp;
1980 temp[config_listen_address_list_len] = strdup (optarg);
1981 if (temp[config_listen_address_list_len] == NULL)
1982 {
1983 fprintf (stderr, "read_options: strdup failed.\n");
1984 return (2);
1985 }
1986 config_listen_address_list_len++;
1987 }
1988 break;
1990 case 'f':
1991 {
1992 int temp;
1994 temp = atoi (optarg);
1995 if (temp > 0)
1996 config_flush_interval = temp;
1997 else
1998 {
1999 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2000 status = 3;
2001 }
2002 }
2003 break;
2005 case 'w':
2006 {
2007 int temp;
2009 temp = atoi (optarg);
2010 if (temp > 0)
2011 config_write_interval = temp;
2012 else
2013 {
2014 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2015 status = 2;
2016 }
2017 }
2018 break;
2020 case 'z':
2021 {
2022 int temp;
2024 temp = atoi(optarg);
2025 if (temp > 0)
2026 config_write_jitter = temp;
2027 else
2028 {
2029 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2030 status = 2;
2031 }
2033 break;
2034 }
2036 case 'b':
2037 {
2038 size_t len;
2040 if (config_base_dir != NULL)
2041 free (config_base_dir);
2042 config_base_dir = strdup (optarg);
2043 if (config_base_dir == NULL)
2044 {
2045 fprintf (stderr, "read_options: strdup failed.\n");
2046 return (3);
2047 }
2049 len = strlen (config_base_dir);
2050 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2051 {
2052 config_base_dir[len - 1] = 0;
2053 len--;
2054 }
2056 if (len < 1)
2057 {
2058 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2059 return (4);
2060 }
2061 }
2062 break;
2064 case 'p':
2065 {
2066 if (config_pid_file != NULL)
2067 free (config_pid_file);
2068 config_pid_file = strdup (optarg);
2069 if (config_pid_file == NULL)
2070 {
2071 fprintf (stderr, "read_options: strdup failed.\n");
2072 return (3);
2073 }
2074 }
2075 break;
2077 case 'j':
2078 {
2079 struct stat statbuf;
2080 const char *dir = optarg;
2082 status = stat(dir, &statbuf);
2083 if (status != 0)
2084 {
2085 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2086 return 6;
2087 }
2089 if (!S_ISDIR(statbuf.st_mode)
2090 || access(dir, R_OK|W_OK|X_OK) != 0)
2091 {
2092 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2093 errno ? rrd_strerror(errno) : "");
2094 return 6;
2095 }
2097 journal_cur = malloc(PATH_MAX + 1);
2098 journal_old = malloc(PATH_MAX + 1);
2099 if (journal_cur == NULL || journal_old == NULL)
2100 {
2101 fprintf(stderr, "malloc failure for journal files\n");
2102 return 6;
2103 }
2104 else
2105 {
2106 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2107 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2108 }
2109 }
2110 break;
2112 case 'h':
2113 case '?':
2114 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2115 "\n"
2116 "Usage: rrdcached [options]\n"
2117 "\n"
2118 "Valid options are:\n"
2119 " -l <address> Socket address to listen to.\n"
2120 " -w <seconds> Interval in which to write data.\n"
2121 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2122 " -f <seconds> Interval in which to flush dead data.\n"
2123 " -p <file> Location of the PID-file.\n"
2124 " -b <dir> Base directory to change to.\n"
2125 " -g Do not fork and run in the foreground.\n"
2126 " -j <dir> Directory in which to create the journal files.\n"
2127 "\n"
2128 "For more information and a detailed description of all options "
2129 "please refer\n"
2130 "to the rrdcached(1) manual page.\n",
2131 VERSION);
2132 status = -1;
2133 break;
2134 } /* switch (option) */
2135 } /* while (getopt) */
2137 /* advise the user when values are not sane */
2138 if (config_flush_interval < 2 * config_write_interval)
2139 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2140 " 2x write interval (-w) !\n");
2141 if (config_write_jitter > config_write_interval)
2142 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2143 " write interval (-w) !\n");
2145 return (status);
2146 } /* }}} int read_options */
2148 int main (int argc, char **argv)
2149 {
2150 int status;
2152 status = read_options (argc, argv);
2153 if (status != 0)
2154 {
2155 if (status < 0)
2156 status = 0;
2157 return (status);
2158 }
2160 status = daemonize ();
2161 if (status == 1)
2162 {
2163 struct sigaction sigchld;
2165 memset (&sigchld, 0, sizeof (sigchld));
2166 sigchld.sa_handler = SIG_IGN;
2167 sigaction (SIGCHLD, &sigchld, NULL);
2169 return (0);
2170 }
2171 else if (status != 0)
2172 {
2173 fprintf (stderr, "daemonize failed, exiting.\n");
2174 return (1);
2175 }
2177 if (journal_cur != NULL)
2178 {
2179 int had_journal = 0;
2181 pthread_mutex_lock(&journal_lock);
2183 RRDD_LOG(LOG_INFO, "checking for journal files");
2185 had_journal += journal_replay(journal_old);
2186 had_journal += journal_replay(journal_cur);
2188 if (had_journal)
2189 flush_old_values(-1);
2191 pthread_mutex_unlock(&journal_lock);
2192 journal_rotate();
2194 RRDD_LOG(LOG_INFO, "journal processing complete");
2195 }
2197 /* start the queue thread */
2198 memset (&queue_thread, 0, sizeof (queue_thread));
2199 status = pthread_create (&queue_thread,
2200 NULL, /* attr */
2201 queue_thread_main,
2202 NULL); /* args */
2203 if (status != 0)
2204 {
2205 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2206 cleanup();
2207 return (1);
2208 }
2210 listen_thread_main (NULL);
2211 cleanup ();
2213 return (0);
2214 } /* int main */
2216 /*
2217 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2218 */