abcb788348ca83314d515baecc9e91e909b6d8eb
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 struct listen_socket_s
111 {
112 int fd;
113 char addr[PATH_MAX + 1];
114 int family;
115 socket_privilege privilege;
116 };
117 typedef struct listen_socket_s listen_socket_t;
119 struct cache_item_s;
120 typedef struct cache_item_s cache_item_t;
121 struct cache_item_s
122 {
123 char *file;
124 char **values;
125 int values_num;
126 time_t last_flush_time;
127 #define CI_FLAGS_IN_TREE (1<<0)
128 #define CI_FLAGS_IN_QUEUE (1<<1)
129 int flags;
130 pthread_cond_t flushed;
131 cache_item_t *next;
132 };
134 struct callback_flush_data_s
135 {
136 time_t now;
137 time_t abs_timeout;
138 char **keys;
139 size_t keys_num;
140 };
141 typedef struct callback_flush_data_s callback_flush_data_t;
143 enum queue_side_e
144 {
145 HEAD,
146 TAIL
147 };
148 typedef enum queue_side_e queue_side_t;
150 /* max length of socket command or response */
151 #define CMD_MAX 4096
153 /*
154 * Variables
155 */
156 static int stay_foreground = 0;
158 static listen_socket_t *listen_fds = NULL;
159 static size_t listen_fds_num = 0;
161 static int do_shutdown = 0;
163 static pthread_t queue_thread;
165 static pthread_t *connection_threads = NULL;
166 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
167 static int connection_threads_num = 0;
169 /* Cache stuff */
170 static GTree *cache_tree = NULL;
171 static cache_item_t *cache_queue_head = NULL;
172 static cache_item_t *cache_queue_tail = NULL;
173 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
174 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
176 static int config_write_interval = 300;
177 static int config_write_jitter = 0;
178 static int config_flush_interval = 3600;
179 static int config_flush_at_shutdown = 0;
180 static char *config_pid_file = NULL;
181 static char *config_base_dir = NULL;
183 static listen_socket_t **config_listen_address_list = NULL;
184 static int config_listen_address_list_len = 0;
186 static uint64_t stats_queue_length = 0;
187 static uint64_t stats_updates_received = 0;
188 static uint64_t stats_flush_received = 0;
189 static uint64_t stats_updates_written = 0;
190 static uint64_t stats_data_sets_written = 0;
191 static uint64_t stats_journal_bytes = 0;
192 static uint64_t stats_journal_rotate = 0;
193 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
195 /* Journaled updates */
196 static char *journal_cur = NULL;
197 static char *journal_old = NULL;
198 static FILE *journal_fh = NULL;
199 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
200 static int journal_write(char *cmd, char *args);
201 static void journal_done(void);
202 static void journal_rotate(void);
204 /*
205 * Functions
206 */
207 static void sig_common (const char *sig) /* {{{ */
208 {
209 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
210 do_shutdown++;
211 pthread_cond_broadcast(&cache_cond);
212 } /* }}} void sig_common */
214 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
215 {
216 sig_common("INT");
217 } /* }}} void sig_int_handler */
219 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
220 {
221 sig_common("TERM");
222 } /* }}} void sig_term_handler */
224 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
225 {
226 config_flush_at_shutdown = 1;
227 sig_common("USR1");
228 } /* }}} void sig_usr1_handler */
230 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
231 {
232 config_flush_at_shutdown = 0;
233 sig_common("USR2");
234 } /* }}} void sig_usr2_handler */
236 static void install_signal_handlers(void) /* {{{ */
237 {
238 /* These structures are static, because `sigaction' behaves weird if the are
239 * overwritten.. */
240 static struct sigaction sa_int;
241 static struct sigaction sa_term;
242 static struct sigaction sa_pipe;
243 static struct sigaction sa_usr1;
244 static struct sigaction sa_usr2;
246 /* Install signal handlers */
247 memset (&sa_int, 0, sizeof (sa_int));
248 sa_int.sa_handler = sig_int_handler;
249 sigaction (SIGINT, &sa_int, NULL);
251 memset (&sa_term, 0, sizeof (sa_term));
252 sa_term.sa_handler = sig_term_handler;
253 sigaction (SIGTERM, &sa_term, NULL);
255 memset (&sa_pipe, 0, sizeof (sa_pipe));
256 sa_pipe.sa_handler = SIG_IGN;
257 sigaction (SIGPIPE, &sa_pipe, NULL);
259 memset (&sa_pipe, 0, sizeof (sa_usr1));
260 sa_usr1.sa_handler = sig_usr1_handler;
261 sigaction (SIGUSR1, &sa_usr1, NULL);
263 memset (&sa_usr2, 0, sizeof (sa_usr2));
264 sa_usr2.sa_handler = sig_usr2_handler;
265 sigaction (SIGUSR2, &sa_usr2, NULL);
267 } /* }}} void install_signal_handlers */
269 static int open_pidfile(void) /* {{{ */
270 {
271 int fd;
272 char *file;
274 file = (config_pid_file != NULL)
275 ? config_pid_file
276 : LOCALSTATEDIR "/run/rrdcached.pid";
278 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
279 if (fd < 0)
280 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
281 file, rrd_strerror(errno));
283 return(fd);
284 } /* }}} static int open_pidfile */
286 static int write_pidfile (int fd) /* {{{ */
287 {
288 pid_t pid;
289 FILE *fh;
291 pid = getpid ();
293 fh = fdopen (fd, "w");
294 if (fh == NULL)
295 {
296 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
297 close(fd);
298 return (-1);
299 }
301 fprintf (fh, "%i\n", (int) pid);
302 fclose (fh);
304 return (0);
305 } /* }}} int write_pidfile */
307 static int remove_pidfile (void) /* {{{ */
308 {
309 char *file;
310 int status;
312 file = (config_pid_file != NULL)
313 ? config_pid_file
314 : LOCALSTATEDIR "/run/rrdcached.pid";
316 status = unlink (file);
317 if (status == 0)
318 return (0);
319 return (errno);
320 } /* }}} int remove_pidfile */
322 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
323 {
324 char *buffer;
325 size_t buffer_used;
326 size_t buffer_free;
327 ssize_t status;
329 buffer = (char *) buffer_void;
330 buffer_used = 0;
331 buffer_free = buffer_size;
333 while (buffer_free > 0)
334 {
335 status = read (fd, buffer + buffer_used, buffer_free);
336 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
337 continue;
339 if (status < 0)
340 return (-1);
342 if (status == 0)
343 return (0);
345 assert ((0 > status) || (buffer_free >= (size_t) status));
347 buffer_free = buffer_free - status;
348 buffer_used = buffer_used + status;
350 if (buffer[buffer_used - 1] == '\n')
351 break;
352 }
354 assert (buffer_used > 0);
356 if (buffer[buffer_used - 1] != '\n')
357 {
358 errno = ENOBUFS;
359 return (-1);
360 }
362 buffer[buffer_used - 1] = 0;
364 /* Fix network line endings. */
365 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
366 {
367 buffer_used--;
368 buffer[buffer_used - 1] = 0;
369 }
371 return (buffer_used);
372 } /* }}} ssize_t sread */
374 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
375 {
376 const char *ptr;
377 size_t nleft;
378 ssize_t status;
380 /* special case for journal replay */
381 if (fd < 0) return 0;
383 ptr = (const char *) buf;
384 nleft = count;
386 while (nleft > 0)
387 {
388 status = write (fd, (const void *) ptr, nleft);
390 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
391 continue;
393 if (status < 0)
394 return (status);
396 nleft -= status;
397 ptr += status;
398 }
400 return (0);
401 } /* }}} ssize_t swrite */
403 static void _wipe_ci_values(cache_item_t *ci, time_t when)
404 {
405 ci->values = NULL;
406 ci->values_num = 0;
408 ci->last_flush_time = when;
409 if (config_write_jitter > 0)
410 ci->last_flush_time += (random() % config_write_jitter);
412 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
413 }
415 /*
416 * enqueue_cache_item:
417 * `cache_lock' must be acquired before calling this function!
418 */
419 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
420 queue_side_t side)
421 {
422 int did_insert = 0;
424 if (ci == NULL)
425 return (-1);
427 if (ci->values_num == 0)
428 return (0);
430 if (side == HEAD)
431 {
432 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
433 {
434 assert (ci->next == NULL);
435 ci->next = cache_queue_head;
436 cache_queue_head = ci;
438 if (cache_queue_tail == NULL)
439 cache_queue_tail = cache_queue_head;
441 did_insert = 1;
442 }
443 else if (cache_queue_head == ci)
444 {
445 /* do nothing */
446 }
447 else /* enqueued, but not first entry */
448 {
449 cache_item_t *prev;
451 /* find previous entry */
452 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
453 if (prev->next == ci)
454 break;
455 assert (prev != NULL);
457 /* move to the front */
458 prev->next = ci->next;
459 ci->next = cache_queue_head;
460 cache_queue_head = ci;
462 /* check if we need to adapt the tail */
463 if (cache_queue_tail == ci)
464 cache_queue_tail = prev;
465 }
466 }
467 else /* (side == TAIL) */
468 {
469 /* We don't move values back in the list.. */
470 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
471 return (0);
473 assert (ci->next == NULL);
475 if (cache_queue_tail == NULL)
476 cache_queue_head = ci;
477 else
478 cache_queue_tail->next = ci;
479 cache_queue_tail = ci;
481 did_insert = 1;
482 }
484 ci->flags |= CI_FLAGS_IN_QUEUE;
486 if (did_insert)
487 {
488 pthread_cond_broadcast(&cache_cond);
489 pthread_mutex_lock (&stats_lock);
490 stats_queue_length++;
491 pthread_mutex_unlock (&stats_lock);
492 }
494 return (0);
495 } /* }}} int enqueue_cache_item */
497 /*
498 * tree_callback_flush:
499 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
500 * while this is in progress.
501 */
502 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
503 gpointer data)
504 {
505 cache_item_t *ci;
506 callback_flush_data_t *cfd;
508 ci = (cache_item_t *) value;
509 cfd = (callback_flush_data_t *) data;
511 if ((ci->last_flush_time <= cfd->abs_timeout)
512 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
513 && (ci->values_num > 0))
514 {
515 enqueue_cache_item (ci, TAIL);
516 }
517 else if ((do_shutdown != 0)
518 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
519 && (ci->values_num > 0))
520 {
521 enqueue_cache_item (ci, TAIL);
522 }
523 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
524 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
525 && (ci->values_num <= 0))
526 {
527 char **temp;
529 temp = (char **) realloc (cfd->keys,
530 sizeof (char *) * (cfd->keys_num + 1));
531 if (temp == NULL)
532 {
533 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
534 return (FALSE);
535 }
536 cfd->keys = temp;
537 /* Make really sure this points to the _same_ place */
538 assert ((char *) key == ci->file);
539 cfd->keys[cfd->keys_num] = (char *) key;
540 cfd->keys_num++;
541 }
543 return (FALSE);
544 } /* }}} gboolean tree_callback_flush */
546 static int flush_old_values (int max_age)
547 {
548 callback_flush_data_t cfd;
549 size_t k;
551 memset (&cfd, 0, sizeof (cfd));
552 /* Pass the current time as user data so that we don't need to call
553 * `time' for each node. */
554 cfd.now = time (NULL);
555 cfd.keys = NULL;
556 cfd.keys_num = 0;
558 if (max_age > 0)
559 cfd.abs_timeout = cfd.now - max_age;
560 else
561 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
563 /* `tree_callback_flush' will return the keys of all values that haven't
564 * been touched in the last `config_flush_interval' seconds in `cfd'.
565 * The char*'s in this array point to the same memory as ci->file, so we
566 * don't need to free them separately. */
567 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
569 for (k = 0; k < cfd.keys_num; k++)
570 {
571 cache_item_t *ci;
573 /* This must not fail. */
574 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
575 assert (ci != NULL);
577 /* If we end up here with values available, something's seriously
578 * messed up. */
579 assert (ci->values_num == 0);
581 /* Remove the node from the tree */
582 g_tree_remove (cache_tree, cfd.keys[k]);
583 cfd.keys[k] = NULL;
585 /* Now free and clean up `ci'. */
586 free (ci->file);
587 ci->file = NULL;
588 free (ci);
589 ci = NULL;
590 } /* for (k = 0; k < cfd.keys_num; k++) */
592 if (cfd.keys != NULL)
593 {
594 free (cfd.keys);
595 cfd.keys = NULL;
596 }
598 return (0);
599 } /* int flush_old_values */
601 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
602 {
603 struct timeval now;
604 struct timespec next_flush;
605 int final_flush = 0; /* make sure we only flush once on shutdown */
607 gettimeofday (&now, NULL);
608 next_flush.tv_sec = now.tv_sec + config_flush_interval;
609 next_flush.tv_nsec = 1000 * now.tv_usec;
611 pthread_mutex_lock (&cache_lock);
612 while ((do_shutdown == 0) || (cache_queue_head != NULL))
613 {
614 cache_item_t *ci;
615 char *file;
616 char **values;
617 int values_num;
618 int status;
619 int i;
621 /* First, check if it's time to do the cache flush. */
622 gettimeofday (&now, NULL);
623 if ((now.tv_sec > next_flush.tv_sec)
624 || ((now.tv_sec == next_flush.tv_sec)
625 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
626 {
627 /* Flush all values that haven't been written in the last
628 * `config_write_interval' seconds. */
629 flush_old_values (config_write_interval);
631 /* Determine the time of the next cache flush. */
632 while (next_flush.tv_sec <= now.tv_sec)
633 next_flush.tv_sec += config_flush_interval;
635 /* unlock the cache while we rotate so we don't block incoming
636 * updates if the fsync() blocks on disk I/O */
637 pthread_mutex_unlock(&cache_lock);
638 journal_rotate();
639 pthread_mutex_lock(&cache_lock);
640 }
642 /* Now, check if there's something to store away. If not, wait until
643 * something comes in or it's time to do the cache flush. if we are
644 * shutting down, do not wait around. */
645 if (cache_queue_head == NULL && !do_shutdown)
646 {
647 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
648 if ((status != 0) && (status != ETIMEDOUT))
649 {
650 RRDD_LOG (LOG_ERR, "queue_thread_main: "
651 "pthread_cond_timedwait returned %i.", status);
652 }
653 }
655 /* We're about to shut down */
656 if (do_shutdown != 0 && !final_flush++)
657 {
658 if (config_flush_at_shutdown)
659 flush_old_values (-1); /* flush everything */
660 else
661 break;
662 }
664 /* Check if a value has arrived. This may be NULL if we timed out or there
665 * was an interrupt such as a signal. */
666 if (cache_queue_head == NULL)
667 continue;
669 ci = cache_queue_head;
671 /* copy the relevant parts */
672 file = strdup (ci->file);
673 if (file == NULL)
674 {
675 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
676 continue;
677 }
679 assert(ci->values != NULL);
680 assert(ci->values_num > 0);
682 values = ci->values;
683 values_num = ci->values_num;
685 _wipe_ci_values(ci, time(NULL));
687 cache_queue_head = ci->next;
688 if (cache_queue_head == NULL)
689 cache_queue_tail = NULL;
690 ci->next = NULL;
692 pthread_mutex_lock (&stats_lock);
693 assert (stats_queue_length > 0);
694 stats_queue_length--;
695 pthread_mutex_unlock (&stats_lock);
697 pthread_mutex_unlock (&cache_lock);
699 rrd_clear_error ();
700 status = rrd_update_r (file, NULL, values_num, (void *) values);
701 if (status != 0)
702 {
703 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
704 "rrd_update_r (%s) failed with status %i. (%s)",
705 file, status, rrd_get_error());
706 }
708 journal_write("wrote", file);
709 pthread_cond_broadcast(&ci->flushed);
711 for (i = 0; i < values_num; i++)
712 free (values[i]);
714 free(values);
715 free(file);
717 if (status == 0)
718 {
719 pthread_mutex_lock (&stats_lock);
720 stats_updates_written++;
721 stats_data_sets_written += values_num;
722 pthread_mutex_unlock (&stats_lock);
723 }
725 pthread_mutex_lock (&cache_lock);
727 /* We're about to shut down */
728 if (do_shutdown != 0 && !final_flush++)
729 {
730 if (config_flush_at_shutdown)
731 flush_old_values (-1); /* flush everything */
732 else
733 break;
734 }
735 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
736 pthread_mutex_unlock (&cache_lock);
738 if (config_flush_at_shutdown)
739 {
740 assert(cache_queue_head == NULL);
741 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
742 }
744 journal_done();
746 return (NULL);
747 } /* }}} void *queue_thread_main */
749 static int buffer_get_field (char **buffer_ret, /* {{{ */
750 size_t *buffer_size_ret, char **field_ret)
751 {
752 char *buffer;
753 size_t buffer_pos;
754 size_t buffer_size;
755 char *field;
756 size_t field_size;
757 int status;
759 buffer = *buffer_ret;
760 buffer_pos = 0;
761 buffer_size = *buffer_size_ret;
762 field = *buffer_ret;
763 field_size = 0;
765 if (buffer_size <= 0)
766 return (-1);
768 /* This is ensured by `handle_request'. */
769 assert (buffer[buffer_size - 1] == '\0');
771 status = -1;
772 while (buffer_pos < buffer_size)
773 {
774 /* Check for end-of-field or end-of-buffer */
775 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
776 {
777 field[field_size] = 0;
778 field_size++;
779 buffer_pos++;
780 status = 0;
781 break;
782 }
783 /* Handle escaped characters. */
784 else if (buffer[buffer_pos] == '\\')
785 {
786 if (buffer_pos >= (buffer_size - 1))
787 break;
788 buffer_pos++;
789 field[field_size] = buffer[buffer_pos];
790 field_size++;
791 buffer_pos++;
792 }
793 /* Normal operation */
794 else
795 {
796 field[field_size] = buffer[buffer_pos];
797 field_size++;
798 buffer_pos++;
799 }
800 } /* while (buffer_pos < buffer_size) */
802 if (status != 0)
803 return (status);
805 *buffer_ret = buffer + buffer_pos;
806 *buffer_size_ret = buffer_size - buffer_pos;
807 *field_ret = field;
809 return (0);
810 } /* }}} int buffer_get_field */
812 static int flush_file (const char *filename) /* {{{ */
813 {
814 cache_item_t *ci;
816 pthread_mutex_lock (&cache_lock);
818 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
819 if (ci == NULL)
820 {
821 pthread_mutex_unlock (&cache_lock);
822 return (ENOENT);
823 }
825 if (ci->values_num > 0)
826 {
827 /* Enqueue at head */
828 enqueue_cache_item (ci, HEAD);
829 pthread_cond_wait(&ci->flushed, &cache_lock);
830 }
832 pthread_mutex_unlock(&cache_lock);
834 return (0);
835 } /* }}} int flush_file */
837 static int handle_request_help (int fd, /* {{{ */
838 char *buffer, size_t buffer_size)
839 {
840 int status;
841 char **help_text;
842 size_t help_text_len;
843 char *command;
844 size_t i;
846 char *help_help[] =
847 {
848 "5 Command overview\n",
849 "FLUSH <filename>\n",
850 "FLUSHALL\n",
851 "HELP [<command>]\n",
852 "UPDATE <filename> <values> [<values> ...]\n",
853 "STATS\n"
854 };
855 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
857 char *help_flush[] =
858 {
859 "4 Help for FLUSH\n",
860 "Usage: FLUSH <filename>\n",
861 "\n",
862 "Adds the given filename to the head of the update queue and returns\n",
863 "after is has been dequeued.\n"
864 };
865 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
867 char *help_flushall[] =
868 {
869 "3 Help for FLUSHALL\n",
870 "Usage: FLUSHALL\n",
871 "\n",
872 "Triggers writing of all pending updates. Returns immediately.\n"
873 };
874 size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
876 char *help_update[] =
877 {
878 "9 Help for UPDATE\n",
879 "Usage: UPDATE <filename> <values> [<values> ...]\n"
880 "\n",
881 "Adds the given file to the internal cache if it is not yet known and\n",
882 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
883 "for details.\n",
884 "\n",
885 "Each <values> has the following form:\n",
886 " <values> = <time>:<value>[:<value>[...]]\n",
887 "See the rrdupdate(1) manpage for details.\n"
888 };
889 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
891 char *help_stats[] =
892 {
893 "4 Help for STATS\n",
894 "Usage: STATS\n",
895 "\n",
896 "Returns some performance counters, see the rrdcached(1) manpage for\n",
897 "a description of the values.\n"
898 };
899 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
901 status = buffer_get_field (&buffer, &buffer_size, &command);
902 if (status != 0)
903 {
904 help_text = help_help;
905 help_text_len = help_help_len;
906 }
907 else
908 {
909 if (strcasecmp (command, "update") == 0)
910 {
911 help_text = help_update;
912 help_text_len = help_update_len;
913 }
914 else if (strcasecmp (command, "flush") == 0)
915 {
916 help_text = help_flush;
917 help_text_len = help_flush_len;
918 }
919 else if (strcasecmp (command, "flushall") == 0)
920 {
921 help_text = help_flushall;
922 help_text_len = help_flushall_len;
923 }
924 else if (strcasecmp (command, "stats") == 0)
925 {
926 help_text = help_stats;
927 help_text_len = help_stats_len;
928 }
929 else
930 {
931 help_text = help_help;
932 help_text_len = help_help_len;
933 }
934 }
936 for (i = 0; i < help_text_len; i++)
937 {
938 status = swrite (fd, help_text[i], strlen (help_text[i]));
939 if (status < 0)
940 {
941 status = errno;
942 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
943 return (status);
944 }
945 }
947 return (0);
948 } /* }}} int handle_request_help */
950 static int handle_request_stats (int fd, /* {{{ */
951 char *buffer __attribute__((unused)),
952 size_t buffer_size __attribute__((unused)))
953 {
954 int status;
955 char outbuf[CMD_MAX];
957 uint64_t copy_queue_length;
958 uint64_t copy_updates_received;
959 uint64_t copy_flush_received;
960 uint64_t copy_updates_written;
961 uint64_t copy_data_sets_written;
962 uint64_t copy_journal_bytes;
963 uint64_t copy_journal_rotate;
965 uint64_t tree_nodes_number;
966 uint64_t tree_depth;
968 pthread_mutex_lock (&stats_lock);
969 copy_queue_length = stats_queue_length;
970 copy_updates_received = stats_updates_received;
971 copy_flush_received = stats_flush_received;
972 copy_updates_written = stats_updates_written;
973 copy_data_sets_written = stats_data_sets_written;
974 copy_journal_bytes = stats_journal_bytes;
975 copy_journal_rotate = stats_journal_rotate;
976 pthread_mutex_unlock (&stats_lock);
978 pthread_mutex_lock (&cache_lock);
979 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
980 tree_depth = (uint64_t) g_tree_height (cache_tree);
981 pthread_mutex_unlock (&cache_lock);
983 #define RRDD_STATS_SEND \
984 outbuf[sizeof (outbuf) - 1] = 0; \
985 status = swrite (fd, outbuf, strlen (outbuf)); \
986 if (status < 0) \
987 { \
988 status = errno; \
989 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
990 return (status); \
991 }
993 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
994 RRDD_STATS_SEND;
996 snprintf (outbuf, sizeof (outbuf),
997 "QueueLength: %"PRIu64"\n", copy_queue_length);
998 RRDD_STATS_SEND;
1000 snprintf (outbuf, sizeof (outbuf),
1001 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1002 RRDD_STATS_SEND;
1004 snprintf (outbuf, sizeof (outbuf),
1005 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1006 RRDD_STATS_SEND;
1008 snprintf (outbuf, sizeof (outbuf),
1009 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1010 RRDD_STATS_SEND;
1012 snprintf (outbuf, sizeof (outbuf),
1013 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1014 RRDD_STATS_SEND;
1016 snprintf (outbuf, sizeof (outbuf),
1017 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1018 RRDD_STATS_SEND;
1020 snprintf (outbuf, sizeof (outbuf),
1021 "TreeDepth: %"PRIu64"\n", tree_depth);
1022 RRDD_STATS_SEND;
1024 snprintf (outbuf, sizeof(outbuf),
1025 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1026 RRDD_STATS_SEND;
1028 snprintf (outbuf, sizeof(outbuf),
1029 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1030 RRDD_STATS_SEND;
1032 return (0);
1033 #undef RRDD_STATS_SEND
1034 } /* }}} int handle_request_stats */
1036 static int handle_request_flush (int fd, /* {{{ */
1037 char *buffer, size_t buffer_size)
1038 {
1039 char *file;
1040 int status;
1041 char result[CMD_MAX];
1043 status = buffer_get_field (&buffer, &buffer_size, &file);
1044 if (status != 0)
1045 {
1046 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1047 }
1048 else
1049 {
1050 pthread_mutex_lock(&stats_lock);
1051 stats_flush_received++;
1052 pthread_mutex_unlock(&stats_lock);
1054 status = flush_file (file);
1055 if (status == 0)
1056 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1057 else if (status == ENOENT)
1058 {
1059 /* no file in our tree; see whether it exists at all */
1060 struct stat statbuf;
1062 memset(&statbuf, 0, sizeof(statbuf));
1063 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1064 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1065 else
1066 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1067 }
1068 else if (status < 0)
1069 strncpy (result, "-1 Internal error.\n", sizeof (result));
1070 else
1071 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1072 }
1073 result[sizeof (result) - 1] = 0;
1075 status = swrite (fd, result, strlen (result));
1076 if (status < 0)
1077 {
1078 status = errno;
1079 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1080 return (status);
1081 }
1083 return (0);
1084 } /* }}} int handle_request_flush */
1086 static int handle_request_flushall(int fd) /* {{{ */
1087 {
1088 int status;
1089 char answer[] ="0 Started flush.\n";
1091 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1093 pthread_mutex_lock(&cache_lock);
1094 flush_old_values(-1);
1095 pthread_mutex_unlock(&cache_lock);
1097 status = swrite(fd, answer, strlen(answer));
1098 if (status < 0)
1099 {
1100 status = errno;
1101 RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1102 }
1104 return (status);
1105 } /* }}} static int handle_request_flushall */
1107 static int handle_request_update (int fd, /* {{{ */
1108 char *buffer, size_t buffer_size)
1109 {
1110 char *file;
1111 int values_num = 0;
1112 int status;
1114 time_t now;
1116 cache_item_t *ci;
1117 char answer[CMD_MAX];
1119 #define RRDD_UPDATE_SEND \
1120 answer[sizeof (answer) - 1] = 0; \
1121 status = swrite (fd, answer, strlen (answer)); \
1122 if (status < 0) \
1123 { \
1124 status = errno; \
1125 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1126 return (status); \
1127 }
1129 now = time (NULL);
1131 status = buffer_get_field (&buffer, &buffer_size, &file);
1132 if (status != 0)
1133 {
1134 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1135 sizeof (answer));
1136 RRDD_UPDATE_SEND;
1137 return (0);
1138 }
1140 pthread_mutex_lock(&stats_lock);
1141 stats_updates_received++;
1142 pthread_mutex_unlock(&stats_lock);
1144 pthread_mutex_lock (&cache_lock);
1145 ci = g_tree_lookup (cache_tree, file);
1147 if (ci == NULL) /* {{{ */
1148 {
1149 struct stat statbuf;
1151 /* don't hold the lock while we setup; stat(2) might block */
1152 pthread_mutex_unlock(&cache_lock);
1154 memset (&statbuf, 0, sizeof (statbuf));
1155 status = stat (file, &statbuf);
1156 if (status != 0)
1157 {
1158 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1160 status = errno;
1161 if (status == ENOENT)
1162 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1163 else
1164 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1165 status);
1166 RRDD_UPDATE_SEND;
1167 return (0);
1168 }
1169 if (!S_ISREG (statbuf.st_mode))
1170 {
1171 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1172 RRDD_UPDATE_SEND;
1173 return (0);
1174 }
1175 if (access(file, R_OK|W_OK) != 0)
1176 {
1177 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1178 file, rrd_strerror(errno));
1179 RRDD_UPDATE_SEND;
1180 return (0);
1181 }
1183 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1184 if (ci == NULL)
1185 {
1186 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1188 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1189 RRDD_UPDATE_SEND;
1190 return (0);
1191 }
1192 memset (ci, 0, sizeof (cache_item_t));
1194 ci->file = strdup (file);
1195 if (ci->file == NULL)
1196 {
1197 free (ci);
1198 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1200 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1201 RRDD_UPDATE_SEND;
1202 return (0);
1203 }
1205 _wipe_ci_values(ci, now);
1206 ci->flags = CI_FLAGS_IN_TREE;
1208 pthread_mutex_lock(&cache_lock);
1209 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1210 } /* }}} */
1211 assert (ci != NULL);
1213 while (buffer_size > 0)
1214 {
1215 char **temp;
1216 char *value;
1218 status = buffer_get_field (&buffer, &buffer_size, &value);
1219 if (status != 0)
1220 {
1221 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1222 break;
1223 }
1225 temp = (char **) realloc (ci->values,
1226 sizeof (char *) * (ci->values_num + 1));
1227 if (temp == NULL)
1228 {
1229 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1230 continue;
1231 }
1232 ci->values = temp;
1234 ci->values[ci->values_num] = strdup (value);
1235 if (ci->values[ci->values_num] == NULL)
1236 {
1237 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1238 continue;
1239 }
1240 ci->values_num++;
1242 values_num++;
1243 }
1245 if (((now - ci->last_flush_time) >= config_write_interval)
1246 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1247 && (ci->values_num > 0))
1248 {
1249 enqueue_cache_item (ci, TAIL);
1250 }
1252 pthread_mutex_unlock (&cache_lock);
1254 if (values_num < 1)
1255 {
1256 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1257 }
1258 else
1259 {
1260 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1261 (values_num == 1) ? "" : "s");
1262 }
1263 RRDD_UPDATE_SEND;
1264 return (0);
1265 #undef RRDD_UPDATE_SEND
1266 } /* }}} int handle_request_update */
1268 /* we came across a "WROTE" entry during journal replay.
1269 * throw away any values that we have accumulated for this file
1270 */
1271 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1272 const char *buffer,
1273 size_t buffer_size __attribute__((unused)))
1274 {
1275 int i;
1276 cache_item_t *ci;
1277 const char *file = buffer;
1279 pthread_mutex_lock(&cache_lock);
1281 ci = g_tree_lookup(cache_tree, file);
1282 if (ci == NULL)
1283 {
1284 pthread_mutex_unlock(&cache_lock);
1285 return (0);
1286 }
1288 if (ci->values)
1289 {
1290 for (i=0; i < ci->values_num; i++)
1291 free(ci->values[i]);
1293 free(ci->values);
1294 }
1296 _wipe_ci_values(ci, time(NULL));
1298 pthread_mutex_unlock(&cache_lock);
1299 return (0);
1300 } /* }}} int handle_request_wrote */
1302 /* returns 1 if we have the required privilege level */
1303 static int has_privilege (socket_privilege priv, /* {{{ */
1304 socket_privilege required, int fd)
1305 {
1306 int status;
1307 char error[CMD_MAX];
1309 if (priv >= required)
1310 return 1;
1312 sprintf(error, "-1 %s\n", rrd_strerror(EACCES));
1313 status = swrite(fd, error, strlen(error));
1315 if (status < 0)
1316 return status;
1317 else
1318 return 0;
1319 } /* }}} static int has_privilege */
1321 /* if fd < 0, we are in journal replay mode */
1322 static int handle_request (int fd, socket_privilege privilege, /* {{{ */
1323 char *buffer, size_t buffer_size)
1324 {
1325 char *buffer_ptr;
1326 char *command;
1327 int status;
1329 assert (buffer[buffer_size - 1] == '\0');
1331 buffer_ptr = buffer;
1332 command = NULL;
1333 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1334 if (status != 0)
1335 {
1336 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1337 return (-1);
1338 }
1340 if (strcasecmp (command, "update") == 0)
1341 {
1342 /* don't re-write updates in replay mode */
1343 if (fd >= 0)
1344 journal_write(command, buffer_ptr);
1346 status = has_privilege(privilege, PRIV_HIGH, fd);
1347 if (status <= 0)
1348 return status;
1350 return (handle_request_update (fd, buffer_ptr, buffer_size));
1351 }
1352 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1353 {
1354 /* this is only valid in replay mode */
1355 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1356 }
1357 else if (strcasecmp (command, "flush") == 0)
1358 {
1359 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1360 }
1361 else if (strcasecmp (command, "flushall") == 0)
1362 {
1363 status = has_privilege(privilege, PRIV_HIGH, fd);
1364 if (status <= 0)
1365 return status;
1367 return (handle_request_flushall(fd));
1368 }
1369 else if (strcasecmp (command, "stats") == 0)
1370 {
1371 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1372 }
1373 else if (strcasecmp (command, "help") == 0)
1374 {
1375 return (handle_request_help (fd, buffer_ptr, buffer_size));
1376 }
1377 else
1378 {
1379 char result[CMD_MAX];
1381 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1382 result[sizeof (result) - 1] = 0;
1384 status = swrite (fd, result, strlen (result));
1385 if (status < 0)
1386 {
1387 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1388 return (-1);
1389 }
1390 }
1392 return (0);
1393 } /* }}} int handle_request */
1395 /* MUST NOT hold journal_lock before calling this */
1396 static void journal_rotate(void) /* {{{ */
1397 {
1398 FILE *old_fh = NULL;
1400 if (journal_cur == NULL || journal_old == NULL)
1401 return;
1403 pthread_mutex_lock(&journal_lock);
1405 /* we rotate this way (rename before close) so that the we can release
1406 * the journal lock as fast as possible. Journal writes to the new
1407 * journal can proceed immediately after the new file is opened. The
1408 * fclose can then block without affecting new updates.
1409 */
1410 if (journal_fh != NULL)
1411 {
1412 old_fh = journal_fh;
1413 rename(journal_cur, journal_old);
1414 ++stats_journal_rotate;
1415 }
1417 journal_fh = fopen(journal_cur, "a");
1418 pthread_mutex_unlock(&journal_lock);
1420 if (old_fh != NULL)
1421 fclose(old_fh);
1423 if (journal_fh == NULL)
1424 {
1425 RRDD_LOG(LOG_CRIT,
1426 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1427 journal_cur, rrd_strerror(errno));
1429 RRDD_LOG(LOG_ERR,
1430 "JOURNALING DISABLED: All values will be flushed at shutdown");
1431 config_flush_at_shutdown = 1;
1432 }
1434 } /* }}} static void journal_rotate */
1436 static void journal_done(void) /* {{{ */
1437 {
1438 if (journal_cur == NULL)
1439 return;
1441 pthread_mutex_lock(&journal_lock);
1442 if (journal_fh != NULL)
1443 {
1444 fclose(journal_fh);
1445 journal_fh = NULL;
1446 }
1448 if (config_flush_at_shutdown)
1449 {
1450 RRDD_LOG(LOG_INFO, "removing journals");
1451 unlink(journal_old);
1452 unlink(journal_cur);
1453 }
1454 else
1455 {
1456 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1457 "journals will be used at next startup");
1458 }
1460 pthread_mutex_unlock(&journal_lock);
1462 } /* }}} static void journal_done */
1464 static int journal_write(char *cmd, char *args) /* {{{ */
1465 {
1466 int chars;
1468 if (journal_fh == NULL)
1469 return 0;
1471 pthread_mutex_lock(&journal_lock);
1472 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1473 pthread_mutex_unlock(&journal_lock);
1475 if (chars > 0)
1476 {
1477 pthread_mutex_lock(&stats_lock);
1478 stats_journal_bytes += chars;
1479 pthread_mutex_unlock(&stats_lock);
1480 }
1482 return chars;
1483 } /* }}} static int journal_write */
1485 static int journal_replay (const char *file) /* {{{ */
1486 {
1487 FILE *fh;
1488 int entry_cnt = 0;
1489 int fail_cnt = 0;
1490 uint64_t line = 0;
1491 char entry[CMD_MAX];
1493 if (file == NULL) return 0;
1495 fh = fopen(file, "r");
1496 if (fh == NULL)
1497 {
1498 if (errno != ENOENT)
1499 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1500 file, rrd_strerror(errno));
1501 return 0;
1502 }
1503 else
1504 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1506 while(!feof(fh))
1507 {
1508 size_t entry_len;
1510 ++line;
1511 fgets(entry, sizeof(entry), fh);
1512 entry_len = strlen(entry);
1514 /* check \n termination in case journal writing crashed mid-line */
1515 if (entry_len == 0)
1516 continue;
1517 else if (entry[entry_len - 1] != '\n')
1518 {
1519 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1520 ++fail_cnt;
1521 continue;
1522 }
1524 entry[entry_len - 1] = '\0';
1526 if (handle_request(-1, PRIV_HIGH, entry, entry_len) == 0)
1527 ++entry_cnt;
1528 else
1529 ++fail_cnt;
1530 }
1532 fclose(fh);
1534 if (entry_cnt > 0)
1535 {
1536 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1537 entry_cnt, fail_cnt);
1538 return 1;
1539 }
1540 else
1541 return 0;
1543 } /* }}} static int journal_replay */
1545 static void *connection_thread_main (void *args) /* {{{ */
1546 {
1547 pthread_t self;
1548 listen_socket_t *sock;
1549 int i;
1550 int fd;
1552 sock = (listen_socket_t *) args;
1553 fd = sock->fd;
1555 pthread_mutex_lock (&connection_threads_lock);
1556 {
1557 pthread_t *temp;
1559 temp = (pthread_t *) realloc (connection_threads,
1560 sizeof (pthread_t) * (connection_threads_num + 1));
1561 if (temp == NULL)
1562 {
1563 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1564 }
1565 else
1566 {
1567 connection_threads = temp;
1568 connection_threads[connection_threads_num] = pthread_self ();
1569 connection_threads_num++;
1570 }
1571 }
1572 pthread_mutex_unlock (&connection_threads_lock);
1574 while (do_shutdown == 0)
1575 {
1576 char buffer[CMD_MAX];
1578 struct pollfd pollfd;
1579 int status;
1581 pollfd.fd = fd;
1582 pollfd.events = POLLIN | POLLPRI;
1583 pollfd.revents = 0;
1585 status = poll (&pollfd, 1, /* timeout = */ 500);
1586 if (do_shutdown)
1587 break;
1588 else if (status == 0) /* timeout */
1589 continue;
1590 else if (status < 0) /* error */
1591 {
1592 status = errno;
1593 if (status == EINTR)
1594 continue;
1595 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1596 continue;
1597 }
1599 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1600 {
1601 close (fd);
1602 break;
1603 }
1604 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1605 {
1606 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1607 "poll(2) returned something unexpected: %#04hx",
1608 pollfd.revents);
1609 close (fd);
1610 break;
1611 }
1613 status = (int) sread (fd, buffer, sizeof (buffer));
1614 if (status <= 0)
1615 {
1616 close (fd);
1618 if (status < 0)
1619 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1621 break;
1622 }
1624 status = handle_request (fd, sock->privilege, buffer, status);
1625 if (status != 0)
1626 break;
1627 }
1629 close(fd);
1630 free(args);
1632 self = pthread_self ();
1633 /* Remove this thread from the connection threads list */
1634 pthread_mutex_lock (&connection_threads_lock);
1635 /* Find out own index in the array */
1636 for (i = 0; i < connection_threads_num; i++)
1637 if (pthread_equal (connection_threads[i], self) != 0)
1638 break;
1639 assert (i < connection_threads_num);
1641 /* Move the trailing threads forward. */
1642 if (i < (connection_threads_num - 1))
1643 {
1644 memmove (connection_threads + i,
1645 connection_threads + i + 1,
1646 sizeof (pthread_t) * (connection_threads_num - i - 1));
1647 }
1649 connection_threads_num--;
1650 pthread_mutex_unlock (&connection_threads_lock);
1652 return (NULL);
1653 } /* }}} void *connection_thread_main */
1655 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1656 {
1657 int fd;
1658 struct sockaddr_un sa;
1659 listen_socket_t *temp;
1660 int status;
1661 const char *path;
1663 path = sock->addr;
1664 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1665 path += strlen("unix:");
1667 temp = (listen_socket_t *) realloc (listen_fds,
1668 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1669 if (temp == NULL)
1670 {
1671 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1672 return (-1);
1673 }
1674 listen_fds = temp;
1675 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1677 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1678 if (fd < 0)
1679 {
1680 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1681 return (-1);
1682 }
1684 memset (&sa, 0, sizeof (sa));
1685 sa.sun_family = AF_UNIX;
1686 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1688 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1689 if (status != 0)
1690 {
1691 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1692 close (fd);
1693 unlink (path);
1694 return (-1);
1695 }
1697 status = listen (fd, /* backlog = */ 10);
1698 if (status != 0)
1699 {
1700 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1701 close (fd);
1702 unlink (path);
1703 return (-1);
1704 }
1706 listen_fds[listen_fds_num].fd = fd;
1707 listen_fds[listen_fds_num].family = PF_UNIX;
1708 strncpy(listen_fds[listen_fds_num].addr, path,
1709 sizeof (listen_fds[listen_fds_num].addr) - 1);
1710 listen_fds_num++;
1712 return (0);
1713 } /* }}} int open_listen_socket_unix */
1715 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1716 {
1717 struct addrinfo ai_hints;
1718 struct addrinfo *ai_res;
1719 struct addrinfo *ai_ptr;
1720 char addr_copy[NI_MAXHOST];
1721 char *addr;
1722 char *port;
1723 int status;
1725 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1726 addr_copy[sizeof (addr_copy) - 1] = 0;
1727 addr = addr_copy;
1729 memset (&ai_hints, 0, sizeof (ai_hints));
1730 ai_hints.ai_flags = 0;
1731 #ifdef AI_ADDRCONFIG
1732 ai_hints.ai_flags |= AI_ADDRCONFIG;
1733 #endif
1734 ai_hints.ai_family = AF_UNSPEC;
1735 ai_hints.ai_socktype = SOCK_STREAM;
1737 port = NULL;
1738 if (*addr == '[') /* IPv6+port format */
1739 {
1740 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1741 addr++;
1743 port = strchr (addr, ']');
1744 if (port == NULL)
1745 {
1746 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1747 sock->addr);
1748 return (-1);
1749 }
1750 *port = 0;
1751 port++;
1753 if (*port == ':')
1754 port++;
1755 else if (*port == 0)
1756 port = NULL;
1757 else
1758 {
1759 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1760 port);
1761 return (-1);
1762 }
1763 } /* if (*addr = ']') */
1764 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1765 {
1766 port = rindex(addr, ':');
1767 if (port != NULL)
1768 {
1769 *port = 0;
1770 port++;
1771 }
1772 }
1773 ai_res = NULL;
1774 status = getaddrinfo (addr,
1775 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1776 &ai_hints, &ai_res);
1777 if (status != 0)
1778 {
1779 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1780 "%s", addr, gai_strerror (status));
1781 return (-1);
1782 }
1784 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1785 {
1786 int fd;
1787 listen_socket_t *temp;
1788 int one = 1;
1790 temp = (listen_socket_t *) realloc (listen_fds,
1791 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1792 if (temp == NULL)
1793 {
1794 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1795 continue;
1796 }
1797 listen_fds = temp;
1798 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1800 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1801 if (fd < 0)
1802 {
1803 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1804 continue;
1805 }
1807 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1809 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1810 if (status != 0)
1811 {
1812 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1813 close (fd);
1814 continue;
1815 }
1817 status = listen (fd, /* backlog = */ 10);
1818 if (status != 0)
1819 {
1820 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1821 close (fd);
1822 return (-1);
1823 }
1825 listen_fds[listen_fds_num].fd = fd;
1826 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1827 listen_fds_num++;
1828 } /* for (ai_ptr) */
1830 return (0);
1831 } /* }}} static int open_listen_socket_network */
1833 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1834 {
1835 assert(sock != NULL);
1836 assert(sock->addr != NULL);
1838 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1839 || sock->addr[0] == '/')
1840 return (open_listen_socket_unix(sock));
1841 else
1842 return (open_listen_socket_network(sock));
1843 } /* }}} int open_listen_socket */
1845 static int close_listen_sockets (void) /* {{{ */
1846 {
1847 size_t i;
1849 for (i = 0; i < listen_fds_num; i++)
1850 {
1851 close (listen_fds[i].fd);
1853 if (listen_fds[i].family == PF_UNIX)
1854 unlink(listen_fds[i].addr);
1855 }
1857 free (listen_fds);
1858 listen_fds = NULL;
1859 listen_fds_num = 0;
1861 return (0);
1862 } /* }}} int close_listen_sockets */
1864 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1865 {
1866 struct pollfd *pollfds;
1867 int pollfds_num;
1868 int status;
1869 int i;
1871 for (i = 0; i < config_listen_address_list_len; i++)
1872 open_listen_socket (config_listen_address_list[i]);
1874 if (config_listen_address_list_len < 1)
1875 {
1876 listen_socket_t sock;
1877 memset(&sock, 0, sizeof(sock));
1878 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1879 open_listen_socket (&sock);
1880 }
1882 if (listen_fds_num < 1)
1883 {
1884 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1885 "could be opened. Sorry.");
1886 return (NULL);
1887 }
1889 pollfds_num = listen_fds_num;
1890 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1891 if (pollfds == NULL)
1892 {
1893 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1894 return (NULL);
1895 }
1896 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1898 RRDD_LOG(LOG_INFO, "listening for connections");
1900 while (do_shutdown == 0)
1901 {
1902 assert (pollfds_num == ((int) listen_fds_num));
1903 for (i = 0; i < pollfds_num; i++)
1904 {
1905 pollfds[i].fd = listen_fds[i].fd;
1906 pollfds[i].events = POLLIN | POLLPRI;
1907 pollfds[i].revents = 0;
1908 }
1910 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1911 if (do_shutdown)
1912 break;
1913 else if (status == 0) /* timeout */
1914 continue;
1915 else if (status < 0) /* error */
1916 {
1917 status = errno;
1918 if (status != EINTR)
1919 {
1920 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1921 }
1922 continue;
1923 }
1925 for (i = 0; i < pollfds_num; i++)
1926 {
1927 listen_socket_t *client_sock;
1928 struct sockaddr_storage client_sa;
1929 socklen_t client_sa_size;
1930 pthread_t tid;
1931 pthread_attr_t attr;
1933 if (pollfds[i].revents == 0)
1934 continue;
1936 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1937 {
1938 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1939 "poll(2) returned something unexpected for listen FD #%i.",
1940 pollfds[i].fd);
1941 continue;
1942 }
1944 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1945 if (client_sock == NULL)
1946 {
1947 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1948 continue;
1949 }
1950 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1952 client_sa_size = sizeof (client_sa);
1953 client_sock->fd = accept (pollfds[i].fd,
1954 (struct sockaddr *) &client_sa, &client_sa_size);
1955 if (client_sock->fd < 0)
1956 {
1957 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1958 free(client_sock);
1959 continue;
1960 }
1962 pthread_attr_init (&attr);
1963 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1965 status = pthread_create (&tid, &attr, connection_thread_main,
1966 client_sock);
1967 if (status != 0)
1968 {
1969 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1970 close (client_sock->fd);
1971 free (client_sock);
1972 continue;
1973 }
1974 } /* for (pollfds_num) */
1975 } /* while (do_shutdown == 0) */
1977 RRDD_LOG(LOG_INFO, "starting shutdown");
1979 close_listen_sockets ();
1981 pthread_mutex_lock (&connection_threads_lock);
1982 while (connection_threads_num > 0)
1983 {
1984 pthread_t wait_for;
1986 wait_for = connection_threads[0];
1988 pthread_mutex_unlock (&connection_threads_lock);
1989 pthread_join (wait_for, /* retval = */ NULL);
1990 pthread_mutex_lock (&connection_threads_lock);
1991 }
1992 pthread_mutex_unlock (&connection_threads_lock);
1994 return (NULL);
1995 } /* }}} void *listen_thread_main */
1997 static int daemonize (void) /* {{{ */
1998 {
1999 int status;
2000 int fd;
2002 fd = open_pidfile();
2003 if (fd < 0) return fd;
2005 if (!stay_foreground)
2006 {
2007 pid_t child;
2008 char *base_dir;
2010 child = fork ();
2011 if (child < 0)
2012 {
2013 fprintf (stderr, "daemonize: fork(2) failed.\n");
2014 return (-1);
2015 }
2016 else if (child > 0)
2017 {
2018 return (1);
2019 }
2021 /* Change into the /tmp directory. */
2022 base_dir = (config_base_dir != NULL)
2023 ? config_base_dir
2024 : "/tmp";
2025 status = chdir (base_dir);
2026 if (status != 0)
2027 {
2028 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2029 return (-1);
2030 }
2032 /* Become session leader */
2033 setsid ();
2035 /* Open the first three file descriptors to /dev/null */
2036 close (2);
2037 close (1);
2038 close (0);
2040 open ("/dev/null", O_RDWR);
2041 dup (0);
2042 dup (0);
2043 } /* if (!stay_foreground) */
2045 install_signal_handlers();
2047 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2048 RRDD_LOG(LOG_INFO, "starting up");
2050 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2051 if (cache_tree == NULL)
2052 {
2053 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2054 return (-1);
2055 }
2057 status = write_pidfile (fd);
2058 return status;
2059 } /* }}} int daemonize */
2061 static int cleanup (void) /* {{{ */
2062 {
2063 do_shutdown++;
2065 pthread_cond_signal (&cache_cond);
2066 pthread_join (queue_thread, /* return = */ NULL);
2068 remove_pidfile ();
2070 RRDD_LOG(LOG_INFO, "goodbye");
2071 closelog ();
2073 return (0);
2074 } /* }}} int cleanup */
2076 static int read_options (int argc, char **argv) /* {{{ */
2077 {
2078 int option;
2079 int status = 0;
2081 while ((option = getopt(argc, argv, "gl:L:f:w:b:z:p:j:h?F")) != -1)
2082 {
2083 switch (option)
2084 {
2085 case 'g':
2086 stay_foreground=1;
2087 break;
2089 case 'L':
2090 case 'l':
2091 {
2092 listen_socket_t **temp;
2093 listen_socket_t *new;
2095 new = malloc(sizeof(listen_socket_t));
2096 if (new == NULL)
2097 {
2098 fprintf(stderr, "read_options: malloc failed.\n");
2099 return(2);
2100 }
2101 memset(new, 0, sizeof(listen_socket_t));
2103 temp = (listen_socket_t **) realloc (config_listen_address_list,
2104 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2105 if (temp == NULL)
2106 {
2107 fprintf (stderr, "read_options: realloc failed.\n");
2108 return (2);
2109 }
2110 config_listen_address_list = temp;
2112 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2113 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2115 temp[config_listen_address_list_len] = new;
2116 config_listen_address_list_len++;
2117 }
2118 break;
2120 case 'f':
2121 {
2122 int temp;
2124 temp = atoi (optarg);
2125 if (temp > 0)
2126 config_flush_interval = temp;
2127 else
2128 {
2129 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2130 status = 3;
2131 }
2132 }
2133 break;
2135 case 'w':
2136 {
2137 int temp;
2139 temp = atoi (optarg);
2140 if (temp > 0)
2141 config_write_interval = temp;
2142 else
2143 {
2144 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2145 status = 2;
2146 }
2147 }
2148 break;
2150 case 'z':
2151 {
2152 int temp;
2154 temp = atoi(optarg);
2155 if (temp > 0)
2156 config_write_jitter = temp;
2157 else
2158 {
2159 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2160 status = 2;
2161 }
2163 break;
2164 }
2166 case 'b':
2167 {
2168 size_t len;
2170 if (config_base_dir != NULL)
2171 free (config_base_dir);
2172 config_base_dir = strdup (optarg);
2173 if (config_base_dir == NULL)
2174 {
2175 fprintf (stderr, "read_options: strdup failed.\n");
2176 return (3);
2177 }
2179 len = strlen (config_base_dir);
2180 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2181 {
2182 config_base_dir[len - 1] = 0;
2183 len--;
2184 }
2186 if (len < 1)
2187 {
2188 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2189 return (4);
2190 }
2191 }
2192 break;
2194 case 'p':
2195 {
2196 if (config_pid_file != NULL)
2197 free (config_pid_file);
2198 config_pid_file = strdup (optarg);
2199 if (config_pid_file == NULL)
2200 {
2201 fprintf (stderr, "read_options: strdup failed.\n");
2202 return (3);
2203 }
2204 }
2205 break;
2207 case 'F':
2208 config_flush_at_shutdown = 1;
2209 break;
2211 case 'j':
2212 {
2213 struct stat statbuf;
2214 const char *dir = optarg;
2216 status = stat(dir, &statbuf);
2217 if (status != 0)
2218 {
2219 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2220 return 6;
2221 }
2223 if (!S_ISDIR(statbuf.st_mode)
2224 || access(dir, R_OK|W_OK|X_OK) != 0)
2225 {
2226 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2227 errno ? rrd_strerror(errno) : "");
2228 return 6;
2229 }
2231 journal_cur = malloc(PATH_MAX + 1);
2232 journal_old = malloc(PATH_MAX + 1);
2233 if (journal_cur == NULL || journal_old == NULL)
2234 {
2235 fprintf(stderr, "malloc failure for journal files\n");
2236 return 6;
2237 }
2238 else
2239 {
2240 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2241 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2242 }
2243 }
2244 break;
2246 case 'h':
2247 case '?':
2248 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2249 "\n"
2250 "Usage: rrdcached [options]\n"
2251 "\n"
2252 "Valid options are:\n"
2253 " -l <address> Socket address to listen to.\n"
2254 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2255 " -w <seconds> Interval in which to write data.\n"
2256 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2257 " -f <seconds> Interval in which to flush dead data.\n"
2258 " -p <file> Location of the PID-file.\n"
2259 " -b <dir> Base directory to change to.\n"
2260 " -g Do not fork and run in the foreground.\n"
2261 " -j <dir> Directory in which to create the journal files.\n"
2262 " -F Always flush all updates at shutdown\n"
2263 "\n"
2264 "For more information and a detailed description of all options "
2265 "please refer\n"
2266 "to the rrdcached(1) manual page.\n",
2267 VERSION);
2268 status = -1;
2269 break;
2270 } /* switch (option) */
2271 } /* while (getopt) */
2273 /* advise the user when values are not sane */
2274 if (config_flush_interval < 2 * config_write_interval)
2275 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2276 " 2x write interval (-w) !\n");
2277 if (config_write_jitter > config_write_interval)
2278 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2279 " write interval (-w) !\n");
2281 if (journal_cur == NULL)
2282 config_flush_at_shutdown = 1;
2284 return (status);
2285 } /* }}} int read_options */
2287 int main (int argc, char **argv)
2288 {
2289 int status;
2291 status = read_options (argc, argv);
2292 if (status != 0)
2293 {
2294 if (status < 0)
2295 status = 0;
2296 return (status);
2297 }
2299 status = daemonize ();
2300 if (status == 1)
2301 {
2302 struct sigaction sigchld;
2304 memset (&sigchld, 0, sizeof (sigchld));
2305 sigchld.sa_handler = SIG_IGN;
2306 sigaction (SIGCHLD, &sigchld, NULL);
2308 return (0);
2309 }
2310 else if (status != 0)
2311 {
2312 fprintf (stderr, "daemonize failed, exiting.\n");
2313 return (1);
2314 }
2316 if (journal_cur != NULL)
2317 {
2318 int had_journal = 0;
2320 pthread_mutex_lock(&journal_lock);
2322 RRDD_LOG(LOG_INFO, "checking for journal files");
2324 had_journal += journal_replay(journal_old);
2325 had_journal += journal_replay(journal_cur);
2327 if (had_journal)
2328 flush_old_values(-1);
2330 pthread_mutex_unlock(&journal_lock);
2331 journal_rotate();
2333 RRDD_LOG(LOG_INFO, "journal processing complete");
2334 }
2336 /* start the queue thread */
2337 memset (&queue_thread, 0, sizeof (queue_thread));
2338 status = pthread_create (&queue_thread,
2339 NULL, /* attr */
2340 queue_thread_main,
2341 NULL); /* args */
2342 if (status != 0)
2343 {
2344 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2345 cleanup();
2346 return (1);
2347 }
2349 listen_thread_main (NULL);
2350 cleanup ();
2352 return (0);
2353 } /* int main */
2355 /*
2356 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2357 */