a0e6bdbf9f629d62e8408548cffd39edc9843154
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 struct listen_socket_s
111 {
112 int fd;
113 char addr[PATH_MAX + 1];
114 int family;
115 socket_privilege privilege;
116 };
117 typedef struct listen_socket_s listen_socket_t;
119 struct cache_item_s;
120 typedef struct cache_item_s cache_item_t;
121 struct cache_item_s
122 {
123 char *file;
124 char **values;
125 int values_num;
126 time_t last_flush_time;
127 #define CI_FLAGS_IN_TREE (1<<0)
128 #define CI_FLAGS_IN_QUEUE (1<<1)
129 int flags;
130 pthread_cond_t flushed;
131 cache_item_t *next;
132 };
134 struct callback_flush_data_s
135 {
136 time_t now;
137 time_t abs_timeout;
138 char **keys;
139 size_t keys_num;
140 };
141 typedef struct callback_flush_data_s callback_flush_data_t;
143 enum queue_side_e
144 {
145 HEAD,
146 TAIL
147 };
148 typedef enum queue_side_e queue_side_t;
150 /* max length of socket command or response */
151 #define CMD_MAX 4096
153 /*
154 * Variables
155 */
156 static int stay_foreground = 0;
158 static listen_socket_t *listen_fds = NULL;
159 static size_t listen_fds_num = 0;
161 static int do_shutdown = 0;
163 static pthread_t queue_thread;
165 static pthread_t *connection_threads = NULL;
166 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
167 static int connection_threads_num = 0;
169 /* Cache stuff */
170 static GTree *cache_tree = NULL;
171 static cache_item_t *cache_queue_head = NULL;
172 static cache_item_t *cache_queue_tail = NULL;
173 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
174 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
176 static int config_write_interval = 300;
177 static int config_write_jitter = 0;
178 static int config_flush_interval = 3600;
179 static int config_flush_at_shutdown = 0;
180 static char *config_pid_file = NULL;
181 static char *config_base_dir = NULL;
183 static listen_socket_t **config_listen_address_list = NULL;
184 static int config_listen_address_list_len = 0;
186 static uint64_t stats_queue_length = 0;
187 static uint64_t stats_updates_received = 0;
188 static uint64_t stats_flush_received = 0;
189 static uint64_t stats_updates_written = 0;
190 static uint64_t stats_data_sets_written = 0;
191 static uint64_t stats_journal_bytes = 0;
192 static uint64_t stats_journal_rotate = 0;
193 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
195 /* Journaled updates */
196 static char *journal_cur = NULL;
197 static char *journal_old = NULL;
198 static FILE *journal_fh = NULL;
199 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
200 static int journal_write(char *cmd, char *args);
201 static void journal_done(void);
202 static void journal_rotate(void);
204 /*
205 * Functions
206 */
207 static void sig_common (const char *sig) /* {{{ */
208 {
209 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
210 do_shutdown++;
211 pthread_cond_broadcast(&cache_cond);
212 } /* }}} void sig_common */
214 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
215 {
216 sig_common("INT");
217 } /* }}} void sig_int_handler */
219 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
220 {
221 sig_common("TERM");
222 } /* }}} void sig_term_handler */
224 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
225 {
226 config_flush_at_shutdown = 1;
227 sig_common("USR1");
228 } /* }}} void sig_usr1_handler */
230 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
231 {
232 config_flush_at_shutdown = 0;
233 sig_common("USR2");
234 } /* }}} void sig_usr2_handler */
236 static void install_signal_handlers(void) /* {{{ */
237 {
238 /* These structures are static, because `sigaction' behaves weird if the are
239 * overwritten.. */
240 static struct sigaction sa_int;
241 static struct sigaction sa_term;
242 static struct sigaction sa_pipe;
243 static struct sigaction sa_usr1;
244 static struct sigaction sa_usr2;
246 /* Install signal handlers */
247 memset (&sa_int, 0, sizeof (sa_int));
248 sa_int.sa_handler = sig_int_handler;
249 sigaction (SIGINT, &sa_int, NULL);
251 memset (&sa_term, 0, sizeof (sa_term));
252 sa_term.sa_handler = sig_term_handler;
253 sigaction (SIGTERM, &sa_term, NULL);
255 memset (&sa_pipe, 0, sizeof (sa_pipe));
256 sa_pipe.sa_handler = SIG_IGN;
257 sigaction (SIGPIPE, &sa_pipe, NULL);
259 memset (&sa_pipe, 0, sizeof (sa_usr1));
260 sa_usr1.sa_handler = sig_usr1_handler;
261 sigaction (SIGUSR1, &sa_usr1, NULL);
263 memset (&sa_usr2, 0, sizeof (sa_usr2));
264 sa_usr2.sa_handler = sig_usr2_handler;
265 sigaction (SIGUSR2, &sa_usr2, NULL);
267 } /* }}} void install_signal_handlers */
269 static int open_pidfile(void) /* {{{ */
270 {
271 int fd;
272 char *file;
274 file = (config_pid_file != NULL)
275 ? config_pid_file
276 : LOCALSTATEDIR "/run/rrdcached.pid";
278 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
279 if (fd < 0)
280 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
281 file, rrd_strerror(errno));
283 return(fd);
284 } /* }}} static int open_pidfile */
286 static int write_pidfile (int fd) /* {{{ */
287 {
288 pid_t pid;
289 FILE *fh;
291 pid = getpid ();
293 fh = fdopen (fd, "w");
294 if (fh == NULL)
295 {
296 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
297 close(fd);
298 return (-1);
299 }
301 fprintf (fh, "%i\n", (int) pid);
302 fclose (fh);
304 return (0);
305 } /* }}} int write_pidfile */
307 static int remove_pidfile (void) /* {{{ */
308 {
309 char *file;
310 int status;
312 file = (config_pid_file != NULL)
313 ? config_pid_file
314 : LOCALSTATEDIR "/run/rrdcached.pid";
316 status = unlink (file);
317 if (status == 0)
318 return (0);
319 return (errno);
320 } /* }}} int remove_pidfile */
322 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
323 {
324 char *buffer;
325 size_t buffer_used;
326 size_t buffer_free;
327 ssize_t status;
329 buffer = (char *) buffer_void;
330 buffer_used = 0;
331 buffer_free = buffer_size;
333 while (buffer_free > 0)
334 {
335 status = read (fd, buffer + buffer_used, buffer_free);
336 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
337 continue;
339 if (status < 0)
340 return (-1);
342 if (status == 0)
343 return (0);
345 assert ((0 > status) || (buffer_free >= (size_t) status));
347 buffer_free = buffer_free - status;
348 buffer_used = buffer_used + status;
350 if (buffer[buffer_used - 1] == '\n')
351 break;
352 }
354 assert (buffer_used > 0);
356 if (buffer[buffer_used - 1] != '\n')
357 {
358 errno = ENOBUFS;
359 return (-1);
360 }
362 buffer[buffer_used - 1] = 0;
364 /* Fix network line endings. */
365 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
366 {
367 buffer_used--;
368 buffer[buffer_used - 1] = 0;
369 }
371 return (buffer_used);
372 } /* }}} ssize_t sread */
374 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
375 {
376 const char *ptr;
377 size_t nleft;
378 ssize_t status;
380 /* special case for journal replay */
381 if (fd < 0) return 0;
383 ptr = (const char *) buf;
384 nleft = count;
386 while (nleft > 0)
387 {
388 status = write (fd, (const void *) ptr, nleft);
390 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
391 continue;
393 if (status < 0)
394 return (status);
396 nleft -= status;
397 ptr += status;
398 }
400 return (0);
401 } /* }}} ssize_t swrite */
403 static void _wipe_ci_values(cache_item_t *ci, time_t when)
404 {
405 ci->values = NULL;
406 ci->values_num = 0;
408 ci->last_flush_time = when;
409 if (config_write_jitter > 0)
410 ci->last_flush_time += (random() % config_write_jitter);
412 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
413 }
415 /*
416 * enqueue_cache_item:
417 * `cache_lock' must be acquired before calling this function!
418 */
419 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
420 queue_side_t side)
421 {
422 int did_insert = 0;
424 if (ci == NULL)
425 return (-1);
427 if (ci->values_num == 0)
428 return (0);
430 if (side == HEAD)
431 {
432 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
433 {
434 assert (ci->next == NULL);
435 ci->next = cache_queue_head;
436 cache_queue_head = ci;
438 if (cache_queue_tail == NULL)
439 cache_queue_tail = cache_queue_head;
441 did_insert = 1;
442 }
443 else if (cache_queue_head == ci)
444 {
445 /* do nothing */
446 }
447 else /* enqueued, but not first entry */
448 {
449 cache_item_t *prev;
451 /* find previous entry */
452 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
453 if (prev->next == ci)
454 break;
455 assert (prev != NULL);
457 /* move to the front */
458 prev->next = ci->next;
459 ci->next = cache_queue_head;
460 cache_queue_head = ci;
462 /* check if we need to adapt the tail */
463 if (cache_queue_tail == ci)
464 cache_queue_tail = prev;
465 }
466 }
467 else /* (side == TAIL) */
468 {
469 /* We don't move values back in the list.. */
470 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
471 return (0);
473 assert (ci->next == NULL);
475 if (cache_queue_tail == NULL)
476 cache_queue_head = ci;
477 else
478 cache_queue_tail->next = ci;
479 cache_queue_tail = ci;
481 did_insert = 1;
482 }
484 ci->flags |= CI_FLAGS_IN_QUEUE;
486 if (did_insert)
487 {
488 pthread_cond_broadcast(&cache_cond);
489 pthread_mutex_lock (&stats_lock);
490 stats_queue_length++;
491 pthread_mutex_unlock (&stats_lock);
492 }
494 return (0);
495 } /* }}} int enqueue_cache_item */
497 /*
498 * tree_callback_flush:
499 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
500 * while this is in progress.
501 */
502 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
503 gpointer data)
504 {
505 cache_item_t *ci;
506 callback_flush_data_t *cfd;
508 ci = (cache_item_t *) value;
509 cfd = (callback_flush_data_t *) data;
511 if ((ci->last_flush_time <= cfd->abs_timeout)
512 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
513 && (ci->values_num > 0))
514 {
515 enqueue_cache_item (ci, TAIL);
516 }
517 else if ((do_shutdown != 0)
518 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
519 && (ci->values_num > 0))
520 {
521 enqueue_cache_item (ci, TAIL);
522 }
523 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
524 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
525 && (ci->values_num <= 0))
526 {
527 char **temp;
529 temp = (char **) realloc (cfd->keys,
530 sizeof (char *) * (cfd->keys_num + 1));
531 if (temp == NULL)
532 {
533 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
534 return (FALSE);
535 }
536 cfd->keys = temp;
537 /* Make really sure this points to the _same_ place */
538 assert ((char *) key == ci->file);
539 cfd->keys[cfd->keys_num] = (char *) key;
540 cfd->keys_num++;
541 }
543 return (FALSE);
544 } /* }}} gboolean tree_callback_flush */
546 static int flush_old_values (int max_age)
547 {
548 callback_flush_data_t cfd;
549 size_t k;
551 memset (&cfd, 0, sizeof (cfd));
552 /* Pass the current time as user data so that we don't need to call
553 * `time' for each node. */
554 cfd.now = time (NULL);
555 cfd.keys = NULL;
556 cfd.keys_num = 0;
558 if (max_age > 0)
559 cfd.abs_timeout = cfd.now - max_age;
560 else
561 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
563 /* `tree_callback_flush' will return the keys of all values that haven't
564 * been touched in the last `config_flush_interval' seconds in `cfd'.
565 * The char*'s in this array point to the same memory as ci->file, so we
566 * don't need to free them separately. */
567 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
569 for (k = 0; k < cfd.keys_num; k++)
570 {
571 cache_item_t *ci;
573 /* This must not fail. */
574 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
575 assert (ci != NULL);
577 /* If we end up here with values available, something's seriously
578 * messed up. */
579 assert (ci->values_num == 0);
581 /* Remove the node from the tree */
582 g_tree_remove (cache_tree, cfd.keys[k]);
583 cfd.keys[k] = NULL;
585 /* Now free and clean up `ci'. */
586 free (ci->file);
587 ci->file = NULL;
588 free (ci);
589 ci = NULL;
590 } /* for (k = 0; k < cfd.keys_num; k++) */
592 if (cfd.keys != NULL)
593 {
594 free (cfd.keys);
595 cfd.keys = NULL;
596 }
598 return (0);
599 } /* int flush_old_values */
601 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
602 {
603 struct timeval now;
604 struct timespec next_flush;
605 int final_flush = 0; /* make sure we only flush once on shutdown */
607 gettimeofday (&now, NULL);
608 next_flush.tv_sec = now.tv_sec + config_flush_interval;
609 next_flush.tv_nsec = 1000 * now.tv_usec;
611 pthread_mutex_lock (&cache_lock);
612 while ((do_shutdown == 0) || (cache_queue_head != NULL))
613 {
614 cache_item_t *ci;
615 char *file;
616 char **values;
617 int values_num;
618 int status;
619 int i;
621 /* First, check if it's time to do the cache flush. */
622 gettimeofday (&now, NULL);
623 if ((now.tv_sec > next_flush.tv_sec)
624 || ((now.tv_sec == next_flush.tv_sec)
625 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
626 {
627 /* Flush all values that haven't been written in the last
628 * `config_write_interval' seconds. */
629 flush_old_values (config_write_interval);
631 /* Determine the time of the next cache flush. */
632 while (next_flush.tv_sec <= now.tv_sec)
633 next_flush.tv_sec += config_flush_interval;
635 /* unlock the cache while we rotate so we don't block incoming
636 * updates if the fsync() blocks on disk I/O */
637 pthread_mutex_unlock(&cache_lock);
638 journal_rotate();
639 pthread_mutex_lock(&cache_lock);
640 }
642 /* Now, check if there's something to store away. If not, wait until
643 * something comes in or it's time to do the cache flush. if we are
644 * shutting down, do not wait around. */
645 if (cache_queue_head == NULL && !do_shutdown)
646 {
647 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
648 if ((status != 0) && (status != ETIMEDOUT))
649 {
650 RRDD_LOG (LOG_ERR, "queue_thread_main: "
651 "pthread_cond_timedwait returned %i.", status);
652 }
653 }
655 /* We're about to shut down */
656 if (do_shutdown != 0 && !final_flush++)
657 {
658 if (config_flush_at_shutdown)
659 flush_old_values (-1); /* flush everything */
660 else
661 break;
662 }
664 /* Check if a value has arrived. This may be NULL if we timed out or there
665 * was an interrupt such as a signal. */
666 if (cache_queue_head == NULL)
667 continue;
669 ci = cache_queue_head;
671 /* copy the relevant parts */
672 file = strdup (ci->file);
673 if (file == NULL)
674 {
675 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
676 continue;
677 }
679 assert(ci->values != NULL);
680 assert(ci->values_num > 0);
682 values = ci->values;
683 values_num = ci->values_num;
685 _wipe_ci_values(ci, time(NULL));
687 cache_queue_head = ci->next;
688 if (cache_queue_head == NULL)
689 cache_queue_tail = NULL;
690 ci->next = NULL;
692 pthread_mutex_lock (&stats_lock);
693 assert (stats_queue_length > 0);
694 stats_queue_length--;
695 pthread_mutex_unlock (&stats_lock);
697 pthread_mutex_unlock (&cache_lock);
699 rrd_clear_error ();
700 status = rrd_update_r (file, NULL, values_num, (void *) values);
701 if (status != 0)
702 {
703 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
704 "rrd_update_r (%s) failed with status %i. (%s)",
705 file, status, rrd_get_error());
706 }
708 journal_write("wrote", file);
709 pthread_cond_broadcast(&ci->flushed);
711 for (i = 0; i < values_num; i++)
712 free (values[i]);
714 free(values);
715 free(file);
717 if (status == 0)
718 {
719 pthread_mutex_lock (&stats_lock);
720 stats_updates_written++;
721 stats_data_sets_written += values_num;
722 pthread_mutex_unlock (&stats_lock);
723 }
725 pthread_mutex_lock (&cache_lock);
727 /* We're about to shut down */
728 if (do_shutdown != 0 && !final_flush++)
729 {
730 if (config_flush_at_shutdown)
731 flush_old_values (-1); /* flush everything */
732 else
733 break;
734 }
735 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
736 pthread_mutex_unlock (&cache_lock);
738 if (config_flush_at_shutdown)
739 {
740 assert(cache_queue_head == NULL);
741 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
742 }
744 journal_done();
746 return (NULL);
747 } /* }}} void *queue_thread_main */
749 static int buffer_get_field (char **buffer_ret, /* {{{ */
750 size_t *buffer_size_ret, char **field_ret)
751 {
752 char *buffer;
753 size_t buffer_pos;
754 size_t buffer_size;
755 char *field;
756 size_t field_size;
757 int status;
759 buffer = *buffer_ret;
760 buffer_pos = 0;
761 buffer_size = *buffer_size_ret;
762 field = *buffer_ret;
763 field_size = 0;
765 if (buffer_size <= 0)
766 return (-1);
768 /* This is ensured by `handle_request'. */
769 assert (buffer[buffer_size - 1] == '\0');
771 status = -1;
772 while (buffer_pos < buffer_size)
773 {
774 /* Check for end-of-field or end-of-buffer */
775 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
776 {
777 field[field_size] = 0;
778 field_size++;
779 buffer_pos++;
780 status = 0;
781 break;
782 }
783 /* Handle escaped characters. */
784 else if (buffer[buffer_pos] == '\\')
785 {
786 if (buffer_pos >= (buffer_size - 1))
787 break;
788 buffer_pos++;
789 field[field_size] = buffer[buffer_pos];
790 field_size++;
791 buffer_pos++;
792 }
793 /* Normal operation */
794 else
795 {
796 field[field_size] = buffer[buffer_pos];
797 field_size++;
798 buffer_pos++;
799 }
800 } /* while (buffer_pos < buffer_size) */
802 if (status != 0)
803 return (status);
805 *buffer_ret = buffer + buffer_pos;
806 *buffer_size_ret = buffer_size - buffer_pos;
807 *field_ret = field;
809 return (0);
810 } /* }}} int buffer_get_field */
812 static int flush_file (const char *filename) /* {{{ */
813 {
814 cache_item_t *ci;
816 pthread_mutex_lock (&cache_lock);
818 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
819 if (ci == NULL)
820 {
821 pthread_mutex_unlock (&cache_lock);
822 return (ENOENT);
823 }
825 if (ci->values_num > 0)
826 {
827 /* Enqueue at head */
828 enqueue_cache_item (ci, HEAD);
829 pthread_cond_wait(&ci->flushed, &cache_lock);
830 }
832 pthread_mutex_unlock(&cache_lock);
834 return (0);
835 } /* }}} int flush_file */
837 static int handle_request_help (int fd, /* {{{ */
838 char *buffer, size_t buffer_size)
839 {
840 int status;
841 char **help_text;
842 size_t help_text_len;
843 char *command;
844 size_t i;
846 char *help_help[] =
847 {
848 "5 Command overview\n",
849 "FLUSH <filename>\n",
850 "FLUSHALL\n",
851 "HELP [<command>]\n",
852 "UPDATE <filename> <values> [<values> ...]\n",
853 "STATS\n"
854 };
855 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
857 char *help_flush[] =
858 {
859 "4 Help for FLUSH\n",
860 "Usage: FLUSH <filename>\n",
861 "\n",
862 "Adds the given filename to the head of the update queue and returns\n",
863 "after is has been dequeued.\n"
864 };
865 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
867 char *help_flushall[] =
868 {
869 "3 Help for FLUSHALL\n",
870 "Usage: FLUSHALL\n",
871 "\n",
872 "Triggers writing of all pending updates. Returns immediately.\n"
873 };
874 size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
876 char *help_update[] =
877 {
878 "9 Help for UPDATE\n",
879 "Usage: UPDATE <filename> <values> [<values> ...]\n"
880 "\n",
881 "Adds the given file to the internal cache if it is not yet known and\n",
882 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
883 "for details.\n",
884 "\n",
885 "Each <values> has the following form:\n",
886 " <values> = <time>:<value>[:<value>[...]]\n",
887 "See the rrdupdate(1) manpage for details.\n"
888 };
889 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
891 char *help_stats[] =
892 {
893 "4 Help for STATS\n",
894 "Usage: STATS\n",
895 "\n",
896 "Returns some performance counters, see the rrdcached(1) manpage for\n",
897 "a description of the values.\n"
898 };
899 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
901 status = buffer_get_field (&buffer, &buffer_size, &command);
902 if (status != 0)
903 {
904 help_text = help_help;
905 help_text_len = help_help_len;
906 }
907 else
908 {
909 if (strcasecmp (command, "update") == 0)
910 {
911 help_text = help_update;
912 help_text_len = help_update_len;
913 }
914 else if (strcasecmp (command, "flush") == 0)
915 {
916 help_text = help_flush;
917 help_text_len = help_flush_len;
918 }
919 else if (strcasecmp (command, "flushall") == 0)
920 {
921 help_text = help_flushall;
922 help_text_len = help_flushall_len;
923 }
924 else if (strcasecmp (command, "stats") == 0)
925 {
926 help_text = help_stats;
927 help_text_len = help_stats_len;
928 }
929 else
930 {
931 help_text = help_help;
932 help_text_len = help_help_len;
933 }
934 }
936 for (i = 0; i < help_text_len; i++)
937 {
938 status = swrite (fd, help_text[i], strlen (help_text[i]));
939 if (status < 0)
940 {
941 status = errno;
942 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
943 return (status);
944 }
945 }
947 return (0);
948 } /* }}} int handle_request_help */
950 static int handle_request_stats (int fd, /* {{{ */
951 char *buffer __attribute__((unused)),
952 size_t buffer_size __attribute__((unused)))
953 {
954 int status;
955 char outbuf[CMD_MAX];
957 uint64_t copy_queue_length;
958 uint64_t copy_updates_received;
959 uint64_t copy_flush_received;
960 uint64_t copy_updates_written;
961 uint64_t copy_data_sets_written;
962 uint64_t copy_journal_bytes;
963 uint64_t copy_journal_rotate;
965 uint64_t tree_nodes_number;
966 uint64_t tree_depth;
968 pthread_mutex_lock (&stats_lock);
969 copy_queue_length = stats_queue_length;
970 copy_updates_received = stats_updates_received;
971 copy_flush_received = stats_flush_received;
972 copy_updates_written = stats_updates_written;
973 copy_data_sets_written = stats_data_sets_written;
974 copy_journal_bytes = stats_journal_bytes;
975 copy_journal_rotate = stats_journal_rotate;
976 pthread_mutex_unlock (&stats_lock);
978 pthread_mutex_lock (&cache_lock);
979 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
980 tree_depth = (uint64_t) g_tree_height (cache_tree);
981 pthread_mutex_unlock (&cache_lock);
983 #define RRDD_STATS_SEND \
984 outbuf[sizeof (outbuf) - 1] = 0; \
985 status = swrite (fd, outbuf, strlen (outbuf)); \
986 if (status < 0) \
987 { \
988 status = errno; \
989 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
990 return (status); \
991 }
993 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
994 RRDD_STATS_SEND;
996 snprintf (outbuf, sizeof (outbuf),
997 "QueueLength: %"PRIu64"\n", copy_queue_length);
998 RRDD_STATS_SEND;
1000 snprintf (outbuf, sizeof (outbuf),
1001 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1002 RRDD_STATS_SEND;
1004 snprintf (outbuf, sizeof (outbuf),
1005 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1006 RRDD_STATS_SEND;
1008 snprintf (outbuf, sizeof (outbuf),
1009 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1010 RRDD_STATS_SEND;
1012 snprintf (outbuf, sizeof (outbuf),
1013 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1014 RRDD_STATS_SEND;
1016 snprintf (outbuf, sizeof (outbuf),
1017 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1018 RRDD_STATS_SEND;
1020 snprintf (outbuf, sizeof (outbuf),
1021 "TreeDepth: %"PRIu64"\n", tree_depth);
1022 RRDD_STATS_SEND;
1024 snprintf (outbuf, sizeof(outbuf),
1025 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1026 RRDD_STATS_SEND;
1028 snprintf (outbuf, sizeof(outbuf),
1029 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1030 RRDD_STATS_SEND;
1032 return (0);
1033 #undef RRDD_STATS_SEND
1034 } /* }}} int handle_request_stats */
1036 static int handle_request_flush (int fd, /* {{{ */
1037 char *buffer, size_t buffer_size)
1038 {
1039 char *file;
1040 int status;
1041 char result[CMD_MAX];
1043 status = buffer_get_field (&buffer, &buffer_size, &file);
1044 if (status != 0)
1045 {
1046 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1047 }
1048 else
1049 {
1050 pthread_mutex_lock(&stats_lock);
1051 stats_flush_received++;
1052 pthread_mutex_unlock(&stats_lock);
1054 status = flush_file (file);
1055 if (status == 0)
1056 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1057 else if (status == ENOENT)
1058 {
1059 /* no file in our tree; see whether it exists at all */
1060 struct stat statbuf;
1062 memset(&statbuf, 0, sizeof(statbuf));
1063 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1064 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1065 else
1066 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1067 }
1068 else if (status < 0)
1069 strncpy (result, "-1 Internal error.\n", sizeof (result));
1070 else
1071 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1072 }
1073 result[sizeof (result) - 1] = 0;
1075 status = swrite (fd, result, strlen (result));
1076 if (status < 0)
1077 {
1078 status = errno;
1079 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1080 return (status);
1081 }
1083 return (0);
1084 } /* }}} int handle_request_flush */
1086 static int handle_request_flushall(int fd) /* {{{ */
1087 {
1088 int status;
1089 char answer[] ="0 Started flush.\n";
1091 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1093 pthread_mutex_lock(&cache_lock);
1094 flush_old_values(-1);
1095 pthread_mutex_unlock(&cache_lock);
1097 status = swrite(fd, answer, strlen(answer));
1098 if (status < 0)
1099 {
1100 status = errno;
1101 RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1102 }
1104 return (status);
1105 } /* }}} static int handle_request_flushall */
1107 static int handle_request_update (int fd, /* {{{ */
1108 char *buffer, size_t buffer_size)
1109 {
1110 char *file;
1111 int values_num = 0;
1112 int status;
1114 time_t now;
1116 cache_item_t *ci;
1117 char answer[CMD_MAX];
1119 #define RRDD_UPDATE_SEND \
1120 answer[sizeof (answer) - 1] = 0; \
1121 status = swrite (fd, answer, strlen (answer)); \
1122 if (status < 0) \
1123 { \
1124 status = errno; \
1125 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1126 return (status); \
1127 }
1129 now = time (NULL);
1131 status = buffer_get_field (&buffer, &buffer_size, &file);
1132 if (status != 0)
1133 {
1134 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1135 sizeof (answer));
1136 RRDD_UPDATE_SEND;
1137 return (0);
1138 }
1140 pthread_mutex_lock(&stats_lock);
1141 stats_updates_received++;
1142 pthread_mutex_unlock(&stats_lock);
1144 pthread_mutex_lock (&cache_lock);
1145 ci = g_tree_lookup (cache_tree, file);
1147 if (ci == NULL) /* {{{ */
1148 {
1149 struct stat statbuf;
1151 /* don't hold the lock while we setup; stat(2) might block */
1152 pthread_mutex_unlock(&cache_lock);
1154 memset (&statbuf, 0, sizeof (statbuf));
1155 status = stat (file, &statbuf);
1156 if (status != 0)
1157 {
1158 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1160 status = errno;
1161 if (status == ENOENT)
1162 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1163 else
1164 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1165 status);
1166 RRDD_UPDATE_SEND;
1167 return (0);
1168 }
1169 if (!S_ISREG (statbuf.st_mode))
1170 {
1171 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1172 RRDD_UPDATE_SEND;
1173 return (0);
1174 }
1175 if (access(file, R_OK|W_OK) != 0)
1176 {
1177 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1178 file, rrd_strerror(errno));
1179 RRDD_UPDATE_SEND;
1180 return (0);
1181 }
1183 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1184 if (ci == NULL)
1185 {
1186 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1188 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1189 RRDD_UPDATE_SEND;
1190 return (0);
1191 }
1192 memset (ci, 0, sizeof (cache_item_t));
1194 ci->file = strdup (file);
1195 if (ci->file == NULL)
1196 {
1197 free (ci);
1198 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1200 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1201 RRDD_UPDATE_SEND;
1202 return (0);
1203 }
1205 _wipe_ci_values(ci, now);
1206 ci->flags = CI_FLAGS_IN_TREE;
1208 pthread_mutex_lock(&cache_lock);
1209 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1210 } /* }}} */
1211 assert (ci != NULL);
1213 while (buffer_size > 0)
1214 {
1215 char **temp;
1216 char *value;
1218 status = buffer_get_field (&buffer, &buffer_size, &value);
1219 if (status != 0)
1220 {
1221 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1222 break;
1223 }
1225 temp = (char **) realloc (ci->values,
1226 sizeof (char *) * (ci->values_num + 1));
1227 if (temp == NULL)
1228 {
1229 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1230 continue;
1231 }
1232 ci->values = temp;
1234 ci->values[ci->values_num] = strdup (value);
1235 if (ci->values[ci->values_num] == NULL)
1236 {
1237 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1238 continue;
1239 }
1240 ci->values_num++;
1242 values_num++;
1243 }
1245 if (((now - ci->last_flush_time) >= config_write_interval)
1246 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1247 && (ci->values_num > 0))
1248 {
1249 enqueue_cache_item (ci, TAIL);
1250 }
1252 pthread_mutex_unlock (&cache_lock);
1254 if (values_num < 1)
1255 {
1256 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1257 }
1258 else
1259 {
1260 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1261 (values_num == 1) ? "" : "s");
1262 }
1263 RRDD_UPDATE_SEND;
1264 return (0);
1265 #undef RRDD_UPDATE_SEND
1266 } /* }}} int handle_request_update */
1268 /* we came across a "WROTE" entry during journal replay.
1269 * throw away any values that we have accumulated for this file
1270 */
1271 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1272 const char *buffer,
1273 size_t buffer_size __attribute__((unused)))
1274 {
1275 int i;
1276 cache_item_t *ci;
1277 const char *file = buffer;
1279 pthread_mutex_lock(&cache_lock);
1281 ci = g_tree_lookup(cache_tree, file);
1282 if (ci == NULL)
1283 {
1284 pthread_mutex_unlock(&cache_lock);
1285 return (0);
1286 }
1288 if (ci->values)
1289 {
1290 for (i=0; i < ci->values_num; i++)
1291 free(ci->values[i]);
1293 free(ci->values);
1294 }
1296 _wipe_ci_values(ci, time(NULL));
1298 pthread_mutex_unlock(&cache_lock);
1299 return (0);
1300 } /* }}} int handle_request_wrote */
1302 /* returns 1 if we have the required privilege level */
1303 static int has_privilege (socket_privilege priv, /* {{{ */
1304 socket_privilege required, int fd)
1305 {
1306 int status;
1307 char error[CMD_MAX];
1309 if (priv >= required)
1310 return 1;
1312 sprintf(error, "-1 %s\n", rrd_strerror(EACCES));
1313 status = swrite(fd, error, strlen(error));
1315 if (status < 0)
1316 return status;
1317 else
1318 return 0;
1319 } /* }}} static int has_privilege */
1321 /* if fd < 0, we are in journal replay mode */
1322 static int handle_request (int fd, socket_privilege privilege, /* {{{ */
1323 char *buffer, size_t buffer_size)
1324 {
1325 char *buffer_ptr;
1326 char *command;
1327 int status;
1329 assert (buffer[buffer_size - 1] == '\0');
1331 buffer_ptr = buffer;
1332 command = NULL;
1333 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1334 if (status != 0)
1335 {
1336 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1337 return (-1);
1338 }
1340 if (strcasecmp (command, "update") == 0)
1341 {
1342 /* don't re-write updates in replay mode */
1343 if (fd >= 0)
1344 journal_write(command, buffer_ptr);
1346 status = has_privilege(privilege, PRIV_HIGH, fd);
1347 if (status <= 0)
1348 return status;
1350 return (handle_request_update (fd, buffer_ptr, buffer_size));
1351 }
1352 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1353 {
1354 /* this is only valid in replay mode */
1355 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1356 }
1357 else if (strcasecmp (command, "flush") == 0)
1358 {
1359 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1360 }
1361 else if (strcasecmp (command, "flushall") == 0)
1362 {
1363 status = has_privilege(privilege, PRIV_HIGH, fd);
1364 if (status <= 0)
1365 return status;
1367 return (handle_request_flushall(fd));
1368 }
1369 else if (strcasecmp (command, "stats") == 0)
1370 {
1371 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1372 }
1373 else if (strcasecmp (command, "help") == 0)
1374 {
1375 return (handle_request_help (fd, buffer_ptr, buffer_size));
1376 }
1377 else
1378 {
1379 char result[CMD_MAX];
1381 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1382 result[sizeof (result) - 1] = 0;
1384 status = swrite (fd, result, strlen (result));
1385 if (status < 0)
1386 {
1387 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1388 return (-1);
1389 }
1390 }
1392 return (0);
1393 } /* }}} int handle_request */
1395 /* MUST NOT hold journal_lock before calling this */
1396 static void journal_rotate(void) /* {{{ */
1397 {
1398 FILE *old_fh = NULL;
1400 if (journal_cur == NULL || journal_old == NULL)
1401 return;
1403 pthread_mutex_lock(&journal_lock);
1405 /* we rotate this way (rename before close) so that the we can release
1406 * the journal lock as fast as possible. Journal writes to the new
1407 * journal can proceed immediately after the new file is opened. The
1408 * fclose can then block without affecting new updates.
1409 */
1410 if (journal_fh != NULL)
1411 {
1412 old_fh = journal_fh;
1413 rename(journal_cur, journal_old);
1414 ++stats_journal_rotate;
1415 }
1417 journal_fh = fopen(journal_cur, "a");
1418 pthread_mutex_unlock(&journal_lock);
1420 if (old_fh != NULL)
1421 fclose(old_fh);
1423 if (journal_fh == NULL)
1424 {
1425 RRDD_LOG(LOG_CRIT,
1426 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1427 journal_cur, rrd_strerror(errno));
1429 RRDD_LOG(LOG_ERR,
1430 "JOURNALING DISABLED: All values will be flushed at shutdown");
1431 config_flush_at_shutdown = 1;
1432 }
1434 } /* }}} static void journal_rotate */
1436 static void journal_done(void) /* {{{ */
1437 {
1438 if (journal_cur == NULL)
1439 return;
1441 pthread_mutex_lock(&journal_lock);
1442 if (journal_fh != NULL)
1443 {
1444 fclose(journal_fh);
1445 journal_fh = NULL;
1446 }
1448 if (config_flush_at_shutdown)
1449 {
1450 RRDD_LOG(LOG_INFO, "removing journals");
1451 unlink(journal_old);
1452 unlink(journal_cur);
1453 }
1454 else
1455 {
1456 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1457 "journals will be used at next startup");
1458 }
1460 pthread_mutex_unlock(&journal_lock);
1462 } /* }}} static void journal_done */
1464 static int journal_write(char *cmd, char *args) /* {{{ */
1465 {
1466 int chars;
1468 if (journal_fh == NULL)
1469 return 0;
1471 pthread_mutex_lock(&journal_lock);
1472 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1473 pthread_mutex_unlock(&journal_lock);
1475 if (chars > 0)
1476 {
1477 pthread_mutex_lock(&stats_lock);
1478 stats_journal_bytes += chars;
1479 pthread_mutex_unlock(&stats_lock);
1480 }
1482 return chars;
1483 } /* }}} static int journal_write */
1485 static int journal_replay (const char *file) /* {{{ */
1486 {
1487 FILE *fh;
1488 int entry_cnt = 0;
1489 int fail_cnt = 0;
1490 uint64_t line = 0;
1491 char entry[CMD_MAX];
1493 if (file == NULL) return 0;
1495 fh = fopen(file, "r");
1496 if (fh == NULL)
1497 {
1498 if (errno != ENOENT)
1499 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1500 file, rrd_strerror(errno));
1501 return 0;
1502 }
1503 else
1504 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1506 while(!feof(fh))
1507 {
1508 size_t entry_len;
1510 ++line;
1511 if (fgets(entry, sizeof(entry), fh) == NULL)
1512 break;
1513 entry_len = strlen(entry);
1515 /* check \n termination in case journal writing crashed mid-line */
1516 if (entry_len == 0)
1517 continue;
1518 else if (entry[entry_len - 1] != '\n')
1519 {
1520 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1521 ++fail_cnt;
1522 continue;
1523 }
1525 entry[entry_len - 1] = '\0';
1527 if (handle_request(-1, PRIV_HIGH, entry, entry_len) == 0)
1528 ++entry_cnt;
1529 else
1530 ++fail_cnt;
1531 }
1533 fclose(fh);
1535 if (entry_cnt > 0)
1536 {
1537 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1538 entry_cnt, fail_cnt);
1539 return 1;
1540 }
1541 else
1542 return 0;
1544 } /* }}} static int journal_replay */
1546 static void *connection_thread_main (void *args) /* {{{ */
1547 {
1548 pthread_t self;
1549 listen_socket_t *sock;
1550 int i;
1551 int fd;
1553 sock = (listen_socket_t *) args;
1554 fd = sock->fd;
1556 pthread_mutex_lock (&connection_threads_lock);
1557 {
1558 pthread_t *temp;
1560 temp = (pthread_t *) realloc (connection_threads,
1561 sizeof (pthread_t) * (connection_threads_num + 1));
1562 if (temp == NULL)
1563 {
1564 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1565 }
1566 else
1567 {
1568 connection_threads = temp;
1569 connection_threads[connection_threads_num] = pthread_self ();
1570 connection_threads_num++;
1571 }
1572 }
1573 pthread_mutex_unlock (&connection_threads_lock);
1575 while (do_shutdown == 0)
1576 {
1577 char buffer[CMD_MAX];
1579 struct pollfd pollfd;
1580 int status;
1582 pollfd.fd = fd;
1583 pollfd.events = POLLIN | POLLPRI;
1584 pollfd.revents = 0;
1586 status = poll (&pollfd, 1, /* timeout = */ 500);
1587 if (do_shutdown)
1588 break;
1589 else if (status == 0) /* timeout */
1590 continue;
1591 else if (status < 0) /* error */
1592 {
1593 status = errno;
1594 if (status == EINTR)
1595 continue;
1596 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1597 continue;
1598 }
1600 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1601 {
1602 close (fd);
1603 break;
1604 }
1605 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1606 {
1607 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1608 "poll(2) returned something unexpected: %#04hx",
1609 pollfd.revents);
1610 close (fd);
1611 break;
1612 }
1614 status = (int) sread (fd, buffer, sizeof (buffer));
1615 if (status <= 0)
1616 {
1617 close (fd);
1619 if (status < 0)
1620 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1622 break;
1623 }
1625 status = handle_request (fd, sock->privilege, buffer, status);
1626 if (status != 0)
1627 break;
1628 }
1630 close(fd);
1631 free(args);
1633 self = pthread_self ();
1634 /* Remove this thread from the connection threads list */
1635 pthread_mutex_lock (&connection_threads_lock);
1636 /* Find out own index in the array */
1637 for (i = 0; i < connection_threads_num; i++)
1638 if (pthread_equal (connection_threads[i], self) != 0)
1639 break;
1640 assert (i < connection_threads_num);
1642 /* Move the trailing threads forward. */
1643 if (i < (connection_threads_num - 1))
1644 {
1645 memmove (connection_threads + i,
1646 connection_threads + i + 1,
1647 sizeof (pthread_t) * (connection_threads_num - i - 1));
1648 }
1650 connection_threads_num--;
1651 pthread_mutex_unlock (&connection_threads_lock);
1653 return (NULL);
1654 } /* }}} void *connection_thread_main */
1656 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1657 {
1658 int fd;
1659 struct sockaddr_un sa;
1660 listen_socket_t *temp;
1661 int status;
1662 const char *path;
1664 path = sock->addr;
1665 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1666 path += strlen("unix:");
1668 temp = (listen_socket_t *) realloc (listen_fds,
1669 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1670 if (temp == NULL)
1671 {
1672 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1673 return (-1);
1674 }
1675 listen_fds = temp;
1676 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1678 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1679 if (fd < 0)
1680 {
1681 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1682 return (-1);
1683 }
1685 memset (&sa, 0, sizeof (sa));
1686 sa.sun_family = AF_UNIX;
1687 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1689 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1690 if (status != 0)
1691 {
1692 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1693 close (fd);
1694 unlink (path);
1695 return (-1);
1696 }
1698 status = listen (fd, /* backlog = */ 10);
1699 if (status != 0)
1700 {
1701 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1702 close (fd);
1703 unlink (path);
1704 return (-1);
1705 }
1707 listen_fds[listen_fds_num].fd = fd;
1708 listen_fds[listen_fds_num].family = PF_UNIX;
1709 strncpy(listen_fds[listen_fds_num].addr, path,
1710 sizeof (listen_fds[listen_fds_num].addr) - 1);
1711 listen_fds_num++;
1713 return (0);
1714 } /* }}} int open_listen_socket_unix */
1716 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1717 {
1718 struct addrinfo ai_hints;
1719 struct addrinfo *ai_res;
1720 struct addrinfo *ai_ptr;
1721 char addr_copy[NI_MAXHOST];
1722 char *addr;
1723 char *port;
1724 int status;
1726 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1727 addr_copy[sizeof (addr_copy) - 1] = 0;
1728 addr = addr_copy;
1730 memset (&ai_hints, 0, sizeof (ai_hints));
1731 ai_hints.ai_flags = 0;
1732 #ifdef AI_ADDRCONFIG
1733 ai_hints.ai_flags |= AI_ADDRCONFIG;
1734 #endif
1735 ai_hints.ai_family = AF_UNSPEC;
1736 ai_hints.ai_socktype = SOCK_STREAM;
1738 port = NULL;
1739 if (*addr == '[') /* IPv6+port format */
1740 {
1741 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1742 addr++;
1744 port = strchr (addr, ']');
1745 if (port == NULL)
1746 {
1747 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1748 sock->addr);
1749 return (-1);
1750 }
1751 *port = 0;
1752 port++;
1754 if (*port == ':')
1755 port++;
1756 else if (*port == 0)
1757 port = NULL;
1758 else
1759 {
1760 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1761 port);
1762 return (-1);
1763 }
1764 } /* if (*addr = ']') */
1765 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1766 {
1767 port = rindex(addr, ':');
1768 if (port != NULL)
1769 {
1770 *port = 0;
1771 port++;
1772 }
1773 }
1774 ai_res = NULL;
1775 status = getaddrinfo (addr,
1776 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1777 &ai_hints, &ai_res);
1778 if (status != 0)
1779 {
1780 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1781 "%s", addr, gai_strerror (status));
1782 return (-1);
1783 }
1785 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1786 {
1787 int fd;
1788 listen_socket_t *temp;
1789 int one = 1;
1791 temp = (listen_socket_t *) realloc (listen_fds,
1792 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1793 if (temp == NULL)
1794 {
1795 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1796 continue;
1797 }
1798 listen_fds = temp;
1799 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1801 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1802 if (fd < 0)
1803 {
1804 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1805 continue;
1806 }
1808 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1810 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1811 if (status != 0)
1812 {
1813 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1814 close (fd);
1815 continue;
1816 }
1818 status = listen (fd, /* backlog = */ 10);
1819 if (status != 0)
1820 {
1821 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1822 close (fd);
1823 return (-1);
1824 }
1826 listen_fds[listen_fds_num].fd = fd;
1827 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1828 listen_fds_num++;
1829 } /* for (ai_ptr) */
1831 return (0);
1832 } /* }}} static int open_listen_socket_network */
1834 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1835 {
1836 assert(sock != NULL);
1837 assert(sock->addr != NULL);
1839 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1840 || sock->addr[0] == '/')
1841 return (open_listen_socket_unix(sock));
1842 else
1843 return (open_listen_socket_network(sock));
1844 } /* }}} int open_listen_socket */
1846 static int close_listen_sockets (void) /* {{{ */
1847 {
1848 size_t i;
1850 for (i = 0; i < listen_fds_num; i++)
1851 {
1852 close (listen_fds[i].fd);
1854 if (listen_fds[i].family == PF_UNIX)
1855 unlink(listen_fds[i].addr);
1856 }
1858 free (listen_fds);
1859 listen_fds = NULL;
1860 listen_fds_num = 0;
1862 return (0);
1863 } /* }}} int close_listen_sockets */
1865 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1866 {
1867 struct pollfd *pollfds;
1868 int pollfds_num;
1869 int status;
1870 int i;
1872 for (i = 0; i < config_listen_address_list_len; i++)
1873 open_listen_socket (config_listen_address_list[i]);
1875 if (config_listen_address_list_len < 1)
1876 {
1877 listen_socket_t sock;
1878 memset(&sock, 0, sizeof(sock));
1879 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1880 open_listen_socket (&sock);
1881 }
1883 if (listen_fds_num < 1)
1884 {
1885 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1886 "could be opened. Sorry.");
1887 return (NULL);
1888 }
1890 pollfds_num = listen_fds_num;
1891 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1892 if (pollfds == NULL)
1893 {
1894 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1895 return (NULL);
1896 }
1897 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1899 RRDD_LOG(LOG_INFO, "listening for connections");
1901 while (do_shutdown == 0)
1902 {
1903 assert (pollfds_num == ((int) listen_fds_num));
1904 for (i = 0; i < pollfds_num; i++)
1905 {
1906 pollfds[i].fd = listen_fds[i].fd;
1907 pollfds[i].events = POLLIN | POLLPRI;
1908 pollfds[i].revents = 0;
1909 }
1911 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1912 if (do_shutdown)
1913 break;
1914 else if (status == 0) /* timeout */
1915 continue;
1916 else if (status < 0) /* error */
1917 {
1918 status = errno;
1919 if (status != EINTR)
1920 {
1921 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1922 }
1923 continue;
1924 }
1926 for (i = 0; i < pollfds_num; i++)
1927 {
1928 listen_socket_t *client_sock;
1929 struct sockaddr_storage client_sa;
1930 socklen_t client_sa_size;
1931 pthread_t tid;
1932 pthread_attr_t attr;
1934 if (pollfds[i].revents == 0)
1935 continue;
1937 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1938 {
1939 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1940 "poll(2) returned something unexpected for listen FD #%i.",
1941 pollfds[i].fd);
1942 continue;
1943 }
1945 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1946 if (client_sock == NULL)
1947 {
1948 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1949 continue;
1950 }
1951 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1953 client_sa_size = sizeof (client_sa);
1954 client_sock->fd = accept (pollfds[i].fd,
1955 (struct sockaddr *) &client_sa, &client_sa_size);
1956 if (client_sock->fd < 0)
1957 {
1958 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1959 free(client_sock);
1960 continue;
1961 }
1963 pthread_attr_init (&attr);
1964 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1966 status = pthread_create (&tid, &attr, connection_thread_main,
1967 client_sock);
1968 if (status != 0)
1969 {
1970 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1971 close (client_sock->fd);
1972 free (client_sock);
1973 continue;
1974 }
1975 } /* for (pollfds_num) */
1976 } /* while (do_shutdown == 0) */
1978 RRDD_LOG(LOG_INFO, "starting shutdown");
1980 close_listen_sockets ();
1982 pthread_mutex_lock (&connection_threads_lock);
1983 while (connection_threads_num > 0)
1984 {
1985 pthread_t wait_for;
1987 wait_for = connection_threads[0];
1989 pthread_mutex_unlock (&connection_threads_lock);
1990 pthread_join (wait_for, /* retval = */ NULL);
1991 pthread_mutex_lock (&connection_threads_lock);
1992 }
1993 pthread_mutex_unlock (&connection_threads_lock);
1995 return (NULL);
1996 } /* }}} void *listen_thread_main */
1998 static int daemonize (void) /* {{{ */
1999 {
2000 int status;
2001 int fd;
2002 char *base_dir;
2004 fd = open_pidfile();
2005 if (fd < 0) return fd;
2007 if (!stay_foreground)
2008 {
2009 pid_t child;
2011 child = fork ();
2012 if (child < 0)
2013 {
2014 fprintf (stderr, "daemonize: fork(2) failed.\n");
2015 return (-1);
2016 }
2017 else if (child > 0)
2018 {
2019 return (1);
2020 }
2022 /* Become session leader */
2023 setsid ();
2025 /* Open the first three file descriptors to /dev/null */
2026 close (2);
2027 close (1);
2028 close (0);
2030 open ("/dev/null", O_RDWR);
2031 dup (0);
2032 dup (0);
2033 } /* if (!stay_foreground) */
2035 /* Change into the /tmp directory. */
2036 base_dir = (config_base_dir != NULL)
2037 ? config_base_dir
2038 : "/tmp";
2039 status = chdir (base_dir);
2040 if (status != 0)
2041 {
2042 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2043 return (-1);
2044 }
2046 install_signal_handlers();
2048 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2049 RRDD_LOG(LOG_INFO, "starting up");
2051 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2052 if (cache_tree == NULL)
2053 {
2054 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2055 return (-1);
2056 }
2058 status = write_pidfile (fd);
2059 return status;
2060 } /* }}} int daemonize */
2062 static int cleanup (void) /* {{{ */
2063 {
2064 do_shutdown++;
2066 pthread_cond_signal (&cache_cond);
2067 pthread_join (queue_thread, /* return = */ NULL);
2069 remove_pidfile ();
2071 RRDD_LOG(LOG_INFO, "goodbye");
2072 closelog ();
2074 return (0);
2075 } /* }}} int cleanup */
2077 static int read_options (int argc, char **argv) /* {{{ */
2078 {
2079 int option;
2080 int status = 0;
2082 while ((option = getopt(argc, argv, "gl:L:f:w:b:z:p:j:h?F")) != -1)
2083 {
2084 switch (option)
2085 {
2086 case 'g':
2087 stay_foreground=1;
2088 break;
2090 case 'L':
2091 case 'l':
2092 {
2093 listen_socket_t **temp;
2094 listen_socket_t *new;
2096 new = malloc(sizeof(listen_socket_t));
2097 if (new == NULL)
2098 {
2099 fprintf(stderr, "read_options: malloc failed.\n");
2100 return(2);
2101 }
2102 memset(new, 0, sizeof(listen_socket_t));
2104 temp = (listen_socket_t **) realloc (config_listen_address_list,
2105 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2106 if (temp == NULL)
2107 {
2108 fprintf (stderr, "read_options: realloc failed.\n");
2109 return (2);
2110 }
2111 config_listen_address_list = temp;
2113 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2114 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2116 temp[config_listen_address_list_len] = new;
2117 config_listen_address_list_len++;
2118 }
2119 break;
2121 case 'f':
2122 {
2123 int temp;
2125 temp = atoi (optarg);
2126 if (temp > 0)
2127 config_flush_interval = temp;
2128 else
2129 {
2130 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2131 status = 3;
2132 }
2133 }
2134 break;
2136 case 'w':
2137 {
2138 int temp;
2140 temp = atoi (optarg);
2141 if (temp > 0)
2142 config_write_interval = temp;
2143 else
2144 {
2145 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2146 status = 2;
2147 }
2148 }
2149 break;
2151 case 'z':
2152 {
2153 int temp;
2155 temp = atoi(optarg);
2156 if (temp > 0)
2157 config_write_jitter = temp;
2158 else
2159 {
2160 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2161 status = 2;
2162 }
2164 break;
2165 }
2167 case 'b':
2168 {
2169 size_t len;
2171 if (config_base_dir != NULL)
2172 free (config_base_dir);
2173 config_base_dir = strdup (optarg);
2174 if (config_base_dir == NULL)
2175 {
2176 fprintf (stderr, "read_options: strdup failed.\n");
2177 return (3);
2178 }
2180 len = strlen (config_base_dir);
2181 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2182 {
2183 config_base_dir[len - 1] = 0;
2184 len--;
2185 }
2187 if (len < 1)
2188 {
2189 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2190 return (4);
2191 }
2192 }
2193 break;
2195 case 'p':
2196 {
2197 if (config_pid_file != NULL)
2198 free (config_pid_file);
2199 config_pid_file = strdup (optarg);
2200 if (config_pid_file == NULL)
2201 {
2202 fprintf (stderr, "read_options: strdup failed.\n");
2203 return (3);
2204 }
2205 }
2206 break;
2208 case 'F':
2209 config_flush_at_shutdown = 1;
2210 break;
2212 case 'j':
2213 {
2214 struct stat statbuf;
2215 const char *dir = optarg;
2217 status = stat(dir, &statbuf);
2218 if (status != 0)
2219 {
2220 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2221 return 6;
2222 }
2224 if (!S_ISDIR(statbuf.st_mode)
2225 || access(dir, R_OK|W_OK|X_OK) != 0)
2226 {
2227 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2228 errno ? rrd_strerror(errno) : "");
2229 return 6;
2230 }
2232 journal_cur = malloc(PATH_MAX + 1);
2233 journal_old = malloc(PATH_MAX + 1);
2234 if (journal_cur == NULL || journal_old == NULL)
2235 {
2236 fprintf(stderr, "malloc failure for journal files\n");
2237 return 6;
2238 }
2239 else
2240 {
2241 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2242 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2243 }
2244 }
2245 break;
2247 case 'h':
2248 case '?':
2249 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2250 "\n"
2251 "Usage: rrdcached [options]\n"
2252 "\n"
2253 "Valid options are:\n"
2254 " -l <address> Socket address to listen to.\n"
2255 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2256 " -w <seconds> Interval in which to write data.\n"
2257 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2258 " -f <seconds> Interval in which to flush dead data.\n"
2259 " -p <file> Location of the PID-file.\n"
2260 " -b <dir> Base directory to change to.\n"
2261 " -g Do not fork and run in the foreground.\n"
2262 " -j <dir> Directory in which to create the journal files.\n"
2263 " -F Always flush all updates at shutdown\n"
2264 "\n"
2265 "For more information and a detailed description of all options "
2266 "please refer\n"
2267 "to the rrdcached(1) manual page.\n",
2268 VERSION);
2269 status = -1;
2270 break;
2271 } /* switch (option) */
2272 } /* while (getopt) */
2274 /* advise the user when values are not sane */
2275 if (config_flush_interval < 2 * config_write_interval)
2276 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2277 " 2x write interval (-w) !\n");
2278 if (config_write_jitter > config_write_interval)
2279 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2280 " write interval (-w) !\n");
2282 if (journal_cur == NULL)
2283 config_flush_at_shutdown = 1;
2285 return (status);
2286 } /* }}} int read_options */
2288 int main (int argc, char **argv)
2289 {
2290 int status;
2292 status = read_options (argc, argv);
2293 if (status != 0)
2294 {
2295 if (status < 0)
2296 status = 0;
2297 return (status);
2298 }
2300 status = daemonize ();
2301 if (status == 1)
2302 {
2303 struct sigaction sigchld;
2305 memset (&sigchld, 0, sizeof (sigchld));
2306 sigchld.sa_handler = SIG_IGN;
2307 sigaction (SIGCHLD, &sigchld, NULL);
2309 return (0);
2310 }
2311 else if (status != 0)
2312 {
2313 fprintf (stderr, "daemonize failed, exiting.\n");
2314 return (1);
2315 }
2317 if (journal_cur != NULL)
2318 {
2319 int had_journal = 0;
2321 pthread_mutex_lock(&journal_lock);
2323 RRDD_LOG(LOG_INFO, "checking for journal files");
2325 had_journal += journal_replay(journal_old);
2326 had_journal += journal_replay(journal_cur);
2328 if (had_journal)
2329 flush_old_values(-1);
2331 pthread_mutex_unlock(&journal_lock);
2332 journal_rotate();
2334 RRDD_LOG(LOG_INFO, "journal processing complete");
2335 }
2337 /* start the queue thread */
2338 memset (&queue_thread, 0, sizeof (queue_thread));
2339 status = pthread_create (&queue_thread,
2340 NULL, /* attr */
2341 queue_thread_main,
2342 NULL); /* args */
2343 if (status != 0)
2344 {
2345 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2346 cleanup();
2347 return (1);
2348 }
2350 listen_thread_main (NULL);
2351 cleanup ();
2353 return (0);
2354 } /* int main */
2356 /*
2357 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2358 */