f92de21210af6c829243395e4fb776b9caf7d01a
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 struct listen_socket_s
105 {
106 int fd;
107 char path[PATH_MAX + 1];
108 };
109 typedef struct listen_socket_s listen_socket_t;
111 struct cache_item_s;
112 typedef struct cache_item_s cache_item_t;
113 struct cache_item_s
114 {
115 char *file;
116 char **values;
117 int values_num;
118 time_t last_flush_time;
119 #define CI_FLAGS_IN_TREE (1<<0)
120 #define CI_FLAGS_IN_QUEUE (1<<1)
121 int flags;
122 pthread_cond_t flushed;
123 cache_item_t *next;
124 };
126 struct callback_flush_data_s
127 {
128 time_t now;
129 time_t abs_timeout;
130 char **keys;
131 size_t keys_num;
132 };
133 typedef struct callback_flush_data_s callback_flush_data_t;
135 enum queue_side_e
136 {
137 HEAD,
138 TAIL
139 };
140 typedef enum queue_side_e queue_side_t;
142 /* max length of socket command or response */
143 #define CMD_MAX 4096
145 /*
146 * Variables
147 */
148 static int stay_foreground = 0;
150 static listen_socket_t *listen_fds = NULL;
151 static size_t listen_fds_num = 0;
153 static int do_shutdown = 0;
155 static pthread_t queue_thread;
157 static pthread_t *connection_threads = NULL;
158 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
159 static int connection_threads_num = 0;
161 /* Cache stuff */
162 static GTree *cache_tree = NULL;
163 static cache_item_t *cache_queue_head = NULL;
164 static cache_item_t *cache_queue_tail = NULL;
165 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
166 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
168 static int config_write_interval = 300;
169 static int config_write_jitter = 0;
170 static int config_flush_interval = 3600;
171 static int config_flush_at_shutdown = 0;
172 static char *config_pid_file = NULL;
173 static char *config_base_dir = NULL;
175 static char **config_listen_address_list = NULL;
176 static int config_listen_address_list_len = 0;
178 static uint64_t stats_queue_length = 0;
179 static uint64_t stats_updates_received = 0;
180 static uint64_t stats_flush_received = 0;
181 static uint64_t stats_updates_written = 0;
182 static uint64_t stats_data_sets_written = 0;
183 static uint64_t stats_journal_bytes = 0;
184 static uint64_t stats_journal_rotate = 0;
185 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
187 /* Journaled updates */
188 static char *journal_cur = NULL;
189 static char *journal_old = NULL;
190 static FILE *journal_fh = NULL;
191 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
192 static int journal_write(char *cmd, char *args);
193 static void journal_done(void);
194 static void journal_rotate(void);
196 /*
197 * Functions
198 */
199 static void sig_common (const char *sig) /* {{{ */
200 {
201 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
202 do_shutdown++;
203 pthread_cond_broadcast(&cache_cond);
204 } /* }}} void sig_common */
206 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
207 {
208 sig_common("INT");
209 } /* }}} void sig_int_handler */
211 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
212 {
213 sig_common("TERM");
214 } /* }}} void sig_term_handler */
216 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
217 {
218 config_flush_at_shutdown = 1;
219 sig_common("USR1");
220 } /* }}} void sig_usr1_handler */
222 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
223 {
224 config_flush_at_shutdown = 0;
225 sig_common("USR2");
226 } /* }}} void sig_usr2_handler */
228 static void install_signal_handlers(void) /* {{{ */
229 {
230 /* These structures are static, because `sigaction' behaves weird if the are
231 * overwritten.. */
232 static struct sigaction sa_int;
233 static struct sigaction sa_term;
234 static struct sigaction sa_pipe;
235 static struct sigaction sa_usr1;
236 static struct sigaction sa_usr2;
238 /* Install signal handlers */
239 memset (&sa_int, 0, sizeof (sa_int));
240 sa_int.sa_handler = sig_int_handler;
241 sigaction (SIGINT, &sa_int, NULL);
243 memset (&sa_term, 0, sizeof (sa_term));
244 sa_term.sa_handler = sig_term_handler;
245 sigaction (SIGTERM, &sa_term, NULL);
247 memset (&sa_pipe, 0, sizeof (sa_pipe));
248 sa_pipe.sa_handler = SIG_IGN;
249 sigaction (SIGPIPE, &sa_pipe, NULL);
251 memset (&sa_pipe, 0, sizeof (sa_usr1));
252 sa_usr1.sa_handler = sig_usr1_handler;
253 sigaction (SIGUSR1, &sa_usr1, NULL);
255 memset (&sa_usr2, 0, sizeof (sa_usr2));
256 sa_usr2.sa_handler = sig_usr2_handler;
257 sigaction (SIGUSR2, &sa_usr2, NULL);
259 } /* }}} void install_signal_handlers */
261 static int open_pidfile(void) /* {{{ */
262 {
263 int fd;
264 char *file;
266 file = (config_pid_file != NULL)
267 ? config_pid_file
268 : LOCALSTATEDIR "/run/rrdcached.pid";
270 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
271 if (fd < 0)
272 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
273 file, rrd_strerror(errno));
275 return(fd);
276 } /* }}} static int open_pidfile */
278 static int write_pidfile (int fd) /* {{{ */
279 {
280 pid_t pid;
281 FILE *fh;
283 pid = getpid ();
285 fh = fdopen (fd, "w");
286 if (fh == NULL)
287 {
288 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
289 close(fd);
290 return (-1);
291 }
293 fprintf (fh, "%i\n", (int) pid);
294 fclose (fh);
296 return (0);
297 } /* }}} int write_pidfile */
299 static int remove_pidfile (void) /* {{{ */
300 {
301 char *file;
302 int status;
304 file = (config_pid_file != NULL)
305 ? config_pid_file
306 : LOCALSTATEDIR "/run/rrdcached.pid";
308 status = unlink (file);
309 if (status == 0)
310 return (0);
311 return (errno);
312 } /* }}} int remove_pidfile */
314 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
315 {
316 char *buffer;
317 size_t buffer_used;
318 size_t buffer_free;
319 ssize_t status;
321 buffer = (char *) buffer_void;
322 buffer_used = 0;
323 buffer_free = buffer_size;
325 while (buffer_free > 0)
326 {
327 status = read (fd, buffer + buffer_used, buffer_free);
328 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
329 continue;
331 if (status < 0)
332 return (-1);
334 if (status == 0)
335 return (0);
337 assert ((0 > status) || (buffer_free >= (size_t) status));
339 buffer_free = buffer_free - status;
340 buffer_used = buffer_used + status;
342 if (buffer[buffer_used - 1] == '\n')
343 break;
344 }
346 assert (buffer_used > 0);
348 if (buffer[buffer_used - 1] != '\n')
349 {
350 errno = ENOBUFS;
351 return (-1);
352 }
354 buffer[buffer_used - 1] = 0;
356 /* Fix network line endings. */
357 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
358 {
359 buffer_used--;
360 buffer[buffer_used - 1] = 0;
361 }
363 return (buffer_used);
364 } /* }}} ssize_t sread */
366 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
367 {
368 const char *ptr;
369 size_t nleft;
370 ssize_t status;
372 /* special case for journal replay */
373 if (fd < 0) return 0;
375 ptr = (const char *) buf;
376 nleft = count;
378 while (nleft > 0)
379 {
380 status = write (fd, (const void *) ptr, nleft);
382 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
383 continue;
385 if (status < 0)
386 return (status);
388 nleft -= status;
389 ptr += status;
390 }
392 return (0);
393 } /* }}} ssize_t swrite */
395 static void _wipe_ci_values(cache_item_t *ci, time_t when)
396 {
397 ci->values = NULL;
398 ci->values_num = 0;
400 ci->last_flush_time = when;
401 if (config_write_jitter > 0)
402 ci->last_flush_time += (random() % config_write_jitter);
404 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
405 }
407 /*
408 * enqueue_cache_item:
409 * `cache_lock' must be acquired before calling this function!
410 */
411 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
412 queue_side_t side)
413 {
414 int did_insert = 0;
416 if (ci == NULL)
417 return (-1);
419 if (ci->values_num == 0)
420 return (0);
422 if (side == HEAD)
423 {
424 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
425 {
426 assert (ci->next == NULL);
427 ci->next = cache_queue_head;
428 cache_queue_head = ci;
430 if (cache_queue_tail == NULL)
431 cache_queue_tail = cache_queue_head;
433 did_insert = 1;
434 }
435 else if (cache_queue_head == ci)
436 {
437 /* do nothing */
438 }
439 else /* enqueued, but not first entry */
440 {
441 cache_item_t *prev;
443 /* find previous entry */
444 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
445 if (prev->next == ci)
446 break;
447 assert (prev != NULL);
449 /* move to the front */
450 prev->next = ci->next;
451 ci->next = cache_queue_head;
452 cache_queue_head = ci;
454 /* check if we need to adapt the tail */
455 if (cache_queue_tail == ci)
456 cache_queue_tail = prev;
457 }
458 }
459 else /* (side == TAIL) */
460 {
461 /* We don't move values back in the list.. */
462 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
463 return (0);
465 assert (ci->next == NULL);
467 if (cache_queue_tail == NULL)
468 cache_queue_head = ci;
469 else
470 cache_queue_tail->next = ci;
471 cache_queue_tail = ci;
473 did_insert = 1;
474 }
476 ci->flags |= CI_FLAGS_IN_QUEUE;
478 if (did_insert)
479 {
480 pthread_cond_broadcast(&cache_cond);
481 pthread_mutex_lock (&stats_lock);
482 stats_queue_length++;
483 pthread_mutex_unlock (&stats_lock);
484 }
486 return (0);
487 } /* }}} int enqueue_cache_item */
489 /*
490 * tree_callback_flush:
491 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
492 * while this is in progress.
493 */
494 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
495 gpointer data)
496 {
497 cache_item_t *ci;
498 callback_flush_data_t *cfd;
500 ci = (cache_item_t *) value;
501 cfd = (callback_flush_data_t *) data;
503 if ((ci->last_flush_time <= cfd->abs_timeout)
504 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
505 && (ci->values_num > 0))
506 {
507 enqueue_cache_item (ci, TAIL);
508 }
509 else if ((do_shutdown != 0)
510 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
511 && (ci->values_num > 0))
512 {
513 enqueue_cache_item (ci, TAIL);
514 }
515 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
516 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
517 && (ci->values_num <= 0))
518 {
519 char **temp;
521 temp = (char **) realloc (cfd->keys,
522 sizeof (char *) * (cfd->keys_num + 1));
523 if (temp == NULL)
524 {
525 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
526 return (FALSE);
527 }
528 cfd->keys = temp;
529 /* Make really sure this points to the _same_ place */
530 assert ((char *) key == ci->file);
531 cfd->keys[cfd->keys_num] = (char *) key;
532 cfd->keys_num++;
533 }
535 return (FALSE);
536 } /* }}} gboolean tree_callback_flush */
538 static int flush_old_values (int max_age)
539 {
540 callback_flush_data_t cfd;
541 size_t k;
543 memset (&cfd, 0, sizeof (cfd));
544 /* Pass the current time as user data so that we don't need to call
545 * `time' for each node. */
546 cfd.now = time (NULL);
547 cfd.keys = NULL;
548 cfd.keys_num = 0;
550 if (max_age > 0)
551 cfd.abs_timeout = cfd.now - max_age;
552 else
553 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
555 /* `tree_callback_flush' will return the keys of all values that haven't
556 * been touched in the last `config_flush_interval' seconds in `cfd'.
557 * The char*'s in this array point to the same memory as ci->file, so we
558 * don't need to free them separately. */
559 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
561 for (k = 0; k < cfd.keys_num; k++)
562 {
563 cache_item_t *ci;
565 /* This must not fail. */
566 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
567 assert (ci != NULL);
569 /* If we end up here with values available, something's seriously
570 * messed up. */
571 assert (ci->values_num == 0);
573 /* Remove the node from the tree */
574 g_tree_remove (cache_tree, cfd.keys[k]);
575 cfd.keys[k] = NULL;
577 /* Now free and clean up `ci'. */
578 free (ci->file);
579 ci->file = NULL;
580 free (ci);
581 ci = NULL;
582 } /* for (k = 0; k < cfd.keys_num; k++) */
584 if (cfd.keys != NULL)
585 {
586 free (cfd.keys);
587 cfd.keys = NULL;
588 }
590 return (0);
591 } /* int flush_old_values */
593 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
594 {
595 struct timeval now;
596 struct timespec next_flush;
597 int final_flush = 0; /* make sure we only flush once on shutdown */
599 gettimeofday (&now, NULL);
600 next_flush.tv_sec = now.tv_sec + config_flush_interval;
601 next_flush.tv_nsec = 1000 * now.tv_usec;
603 pthread_mutex_lock (&cache_lock);
604 while ((do_shutdown == 0) || (cache_queue_head != NULL))
605 {
606 cache_item_t *ci;
607 char *file;
608 char **values;
609 int values_num;
610 int status;
611 int i;
613 /* First, check if it's time to do the cache flush. */
614 gettimeofday (&now, NULL);
615 if ((now.tv_sec > next_flush.tv_sec)
616 || ((now.tv_sec == next_flush.tv_sec)
617 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
618 {
619 /* Flush all values that haven't been written in the last
620 * `config_write_interval' seconds. */
621 flush_old_values (config_write_interval);
623 /* Determine the time of the next cache flush. */
624 while (next_flush.tv_sec <= now.tv_sec)
625 next_flush.tv_sec += config_flush_interval;
627 /* unlock the cache while we rotate so we don't block incoming
628 * updates if the fsync() blocks on disk I/O */
629 pthread_mutex_unlock(&cache_lock);
630 journal_rotate();
631 pthread_mutex_lock(&cache_lock);
632 }
634 /* Now, check if there's something to store away. If not, wait until
635 * something comes in or it's time to do the cache flush. if we are
636 * shutting down, do not wait around. */
637 if (cache_queue_head == NULL && !do_shutdown)
638 {
639 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
640 if ((status != 0) && (status != ETIMEDOUT))
641 {
642 RRDD_LOG (LOG_ERR, "queue_thread_main: "
643 "pthread_cond_timedwait returned %i.", status);
644 }
645 }
647 /* We're about to shut down */
648 if (do_shutdown != 0 && !final_flush++)
649 {
650 if (config_flush_at_shutdown)
651 flush_old_values (-1); /* flush everything */
652 else
653 break;
654 }
656 /* Check if a value has arrived. This may be NULL if we timed out or there
657 * was an interrupt such as a signal. */
658 if (cache_queue_head == NULL)
659 continue;
661 ci = cache_queue_head;
663 /* copy the relevant parts */
664 file = strdup (ci->file);
665 if (file == NULL)
666 {
667 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
668 continue;
669 }
671 assert(ci->values != NULL);
672 assert(ci->values_num > 0);
674 values = ci->values;
675 values_num = ci->values_num;
677 _wipe_ci_values(ci, time(NULL));
679 cache_queue_head = ci->next;
680 if (cache_queue_head == NULL)
681 cache_queue_tail = NULL;
682 ci->next = NULL;
684 pthread_mutex_lock (&stats_lock);
685 assert (stats_queue_length > 0);
686 stats_queue_length--;
687 pthread_mutex_unlock (&stats_lock);
689 pthread_mutex_unlock (&cache_lock);
691 rrd_clear_error ();
692 status = rrd_update_r (file, NULL, values_num, (void *) values);
693 if (status != 0)
694 {
695 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
696 "rrd_update_r (%s) failed with status %i. (%s)",
697 file, status, rrd_get_error());
698 }
700 journal_write("wrote", file);
701 pthread_cond_broadcast(&ci->flushed);
703 for (i = 0; i < values_num; i++)
704 free (values[i]);
706 free(values);
707 free(file);
709 if (status == 0)
710 {
711 pthread_mutex_lock (&stats_lock);
712 stats_updates_written++;
713 stats_data_sets_written += values_num;
714 pthread_mutex_unlock (&stats_lock);
715 }
717 pthread_mutex_lock (&cache_lock);
719 /* We're about to shut down */
720 if (do_shutdown != 0 && !final_flush++)
721 {
722 if (config_flush_at_shutdown)
723 flush_old_values (-1); /* flush everything */
724 else
725 break;
726 }
727 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
728 pthread_mutex_unlock (&cache_lock);
730 if (config_flush_at_shutdown)
731 {
732 assert(cache_queue_head == NULL);
733 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
734 }
736 journal_done();
738 return (NULL);
739 } /* }}} void *queue_thread_main */
741 static int buffer_get_field (char **buffer_ret, /* {{{ */
742 size_t *buffer_size_ret, char **field_ret)
743 {
744 char *buffer;
745 size_t buffer_pos;
746 size_t buffer_size;
747 char *field;
748 size_t field_size;
749 int status;
751 buffer = *buffer_ret;
752 buffer_pos = 0;
753 buffer_size = *buffer_size_ret;
754 field = *buffer_ret;
755 field_size = 0;
757 if (buffer_size <= 0)
758 return (-1);
760 /* This is ensured by `handle_request'. */
761 assert (buffer[buffer_size - 1] == '\0');
763 status = -1;
764 while (buffer_pos < buffer_size)
765 {
766 /* Check for end-of-field or end-of-buffer */
767 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
768 {
769 field[field_size] = 0;
770 field_size++;
771 buffer_pos++;
772 status = 0;
773 break;
774 }
775 /* Handle escaped characters. */
776 else if (buffer[buffer_pos] == '\\')
777 {
778 if (buffer_pos >= (buffer_size - 1))
779 break;
780 buffer_pos++;
781 field[field_size] = buffer[buffer_pos];
782 field_size++;
783 buffer_pos++;
784 }
785 /* Normal operation */
786 else
787 {
788 field[field_size] = buffer[buffer_pos];
789 field_size++;
790 buffer_pos++;
791 }
792 } /* while (buffer_pos < buffer_size) */
794 if (status != 0)
795 return (status);
797 *buffer_ret = buffer + buffer_pos;
798 *buffer_size_ret = buffer_size - buffer_pos;
799 *field_ret = field;
801 return (0);
802 } /* }}} int buffer_get_field */
804 static int flush_file (const char *filename) /* {{{ */
805 {
806 cache_item_t *ci;
808 pthread_mutex_lock (&cache_lock);
810 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
811 if (ci == NULL)
812 {
813 pthread_mutex_unlock (&cache_lock);
814 return (ENOENT);
815 }
817 if (ci->values_num > 0)
818 {
819 /* Enqueue at head */
820 enqueue_cache_item (ci, HEAD);
821 pthread_cond_wait(&ci->flushed, &cache_lock);
822 }
824 pthread_mutex_unlock(&cache_lock);
826 return (0);
827 } /* }}} int flush_file */
829 static int handle_request_help (int fd, /* {{{ */
830 char *buffer, size_t buffer_size)
831 {
832 int status;
833 char **help_text;
834 size_t help_text_len;
835 char *command;
836 size_t i;
838 char *help_help[] =
839 {
840 "5 Command overview\n",
841 "FLUSH <filename>\n",
842 "FLUSHALL\n",
843 "HELP [<command>]\n",
844 "UPDATE <filename> <values> [<values> ...]\n",
845 "STATS\n"
846 };
847 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
849 char *help_flush[] =
850 {
851 "4 Help for FLUSH\n",
852 "Usage: FLUSH <filename>\n",
853 "\n",
854 "Adds the given filename to the head of the update queue and returns\n",
855 "after is has been dequeued.\n"
856 };
857 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
859 char *help_flushall[] =
860 {
861 "3 Help for FLUSHALL\n",
862 "Usage: FLUSHALL\n",
863 "\n",
864 "Triggers writing of all pending updates. Returns immediately.\n"
865 };
866 size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
868 char *help_update[] =
869 {
870 "9 Help for UPDATE\n",
871 "Usage: UPDATE <filename> <values> [<values> ...]\n"
872 "\n",
873 "Adds the given file to the internal cache if it is not yet known and\n",
874 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
875 "for details.\n",
876 "\n",
877 "Each <values> has the following form:\n",
878 " <values> = <time>:<value>[:<value>[...]]\n",
879 "See the rrdupdate(1) manpage for details.\n"
880 };
881 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
883 char *help_stats[] =
884 {
885 "4 Help for STATS\n",
886 "Usage: STATS\n",
887 "\n",
888 "Returns some performance counters, see the rrdcached(1) manpage for\n",
889 "a description of the values.\n"
890 };
891 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
893 status = buffer_get_field (&buffer, &buffer_size, &command);
894 if (status != 0)
895 {
896 help_text = help_help;
897 help_text_len = help_help_len;
898 }
899 else
900 {
901 if (strcasecmp (command, "update") == 0)
902 {
903 help_text = help_update;
904 help_text_len = help_update_len;
905 }
906 else if (strcasecmp (command, "flush") == 0)
907 {
908 help_text = help_flush;
909 help_text_len = help_flush_len;
910 }
911 else if (strcasecmp (command, "flushall") == 0)
912 {
913 help_text = help_flushall;
914 help_text_len = help_flushall_len;
915 }
916 else if (strcasecmp (command, "stats") == 0)
917 {
918 help_text = help_stats;
919 help_text_len = help_stats_len;
920 }
921 else
922 {
923 help_text = help_help;
924 help_text_len = help_help_len;
925 }
926 }
928 for (i = 0; i < help_text_len; i++)
929 {
930 status = swrite (fd, help_text[i], strlen (help_text[i]));
931 if (status < 0)
932 {
933 status = errno;
934 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
935 return (status);
936 }
937 }
939 return (0);
940 } /* }}} int handle_request_help */
942 static int handle_request_stats (int fd, /* {{{ */
943 char *buffer __attribute__((unused)),
944 size_t buffer_size __attribute__((unused)))
945 {
946 int status;
947 char outbuf[CMD_MAX];
949 uint64_t copy_queue_length;
950 uint64_t copy_updates_received;
951 uint64_t copy_flush_received;
952 uint64_t copy_updates_written;
953 uint64_t copy_data_sets_written;
954 uint64_t copy_journal_bytes;
955 uint64_t copy_journal_rotate;
957 uint64_t tree_nodes_number;
958 uint64_t tree_depth;
960 pthread_mutex_lock (&stats_lock);
961 copy_queue_length = stats_queue_length;
962 copy_updates_received = stats_updates_received;
963 copy_flush_received = stats_flush_received;
964 copy_updates_written = stats_updates_written;
965 copy_data_sets_written = stats_data_sets_written;
966 copy_journal_bytes = stats_journal_bytes;
967 copy_journal_rotate = stats_journal_rotate;
968 pthread_mutex_unlock (&stats_lock);
970 pthread_mutex_lock (&cache_lock);
971 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
972 tree_depth = (uint64_t) g_tree_height (cache_tree);
973 pthread_mutex_unlock (&cache_lock);
975 #define RRDD_STATS_SEND \
976 outbuf[sizeof (outbuf) - 1] = 0; \
977 status = swrite (fd, outbuf, strlen (outbuf)); \
978 if (status < 0) \
979 { \
980 status = errno; \
981 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
982 return (status); \
983 }
985 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
986 RRDD_STATS_SEND;
988 snprintf (outbuf, sizeof (outbuf),
989 "QueueLength: %"PRIu64"\n", copy_queue_length);
990 RRDD_STATS_SEND;
992 snprintf (outbuf, sizeof (outbuf),
993 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
994 RRDD_STATS_SEND;
996 snprintf (outbuf, sizeof (outbuf),
997 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
998 RRDD_STATS_SEND;
1000 snprintf (outbuf, sizeof (outbuf),
1001 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1002 RRDD_STATS_SEND;
1004 snprintf (outbuf, sizeof (outbuf),
1005 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1006 RRDD_STATS_SEND;
1008 snprintf (outbuf, sizeof (outbuf),
1009 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1010 RRDD_STATS_SEND;
1012 snprintf (outbuf, sizeof (outbuf),
1013 "TreeDepth: %"PRIu64"\n", tree_depth);
1014 RRDD_STATS_SEND;
1016 snprintf (outbuf, sizeof(outbuf),
1017 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1018 RRDD_STATS_SEND;
1020 snprintf (outbuf, sizeof(outbuf),
1021 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1022 RRDD_STATS_SEND;
1024 return (0);
1025 #undef RRDD_STATS_SEND
1026 } /* }}} int handle_request_stats */
1028 static int handle_request_flush (int fd, /* {{{ */
1029 char *buffer, size_t buffer_size)
1030 {
1031 char *file;
1032 int status;
1033 char result[CMD_MAX];
1035 status = buffer_get_field (&buffer, &buffer_size, &file);
1036 if (status != 0)
1037 {
1038 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1039 }
1040 else
1041 {
1042 pthread_mutex_lock(&stats_lock);
1043 stats_flush_received++;
1044 pthread_mutex_unlock(&stats_lock);
1046 status = flush_file (file);
1047 if (status == 0)
1048 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1049 else if (status == ENOENT)
1050 {
1051 /* no file in our tree; see whether it exists at all */
1052 struct stat statbuf;
1054 memset(&statbuf, 0, sizeof(statbuf));
1055 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1056 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1057 else
1058 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1059 }
1060 else if (status < 0)
1061 strncpy (result, "-1 Internal error.\n", sizeof (result));
1062 else
1063 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1064 }
1065 result[sizeof (result) - 1] = 0;
1067 status = swrite (fd, result, strlen (result));
1068 if (status < 0)
1069 {
1070 status = errno;
1071 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1072 return (status);
1073 }
1075 return (0);
1076 } /* }}} int handle_request_flush */
1078 static int handle_request_flushall(int fd) /* {{{ */
1079 {
1080 int status;
1081 char answer[] ="0 Started flush.\n";
1083 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1085 pthread_mutex_lock(&cache_lock);
1086 flush_old_values(-1);
1087 pthread_mutex_unlock(&cache_lock);
1089 status = swrite(fd, answer, strlen(answer));
1090 if (status < 0)
1091 {
1092 status = errno;
1093 RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1094 }
1096 return (status);
1097 } /* }}} static int handle_request_flushall */
1099 static int handle_request_update (int fd, /* {{{ */
1100 char *buffer, size_t buffer_size)
1101 {
1102 char *file;
1103 int values_num = 0;
1104 int status;
1106 time_t now;
1108 cache_item_t *ci;
1109 char answer[CMD_MAX];
1111 #define RRDD_UPDATE_SEND \
1112 answer[sizeof (answer) - 1] = 0; \
1113 status = swrite (fd, answer, strlen (answer)); \
1114 if (status < 0) \
1115 { \
1116 status = errno; \
1117 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1118 return (status); \
1119 }
1121 now = time (NULL);
1123 status = buffer_get_field (&buffer, &buffer_size, &file);
1124 if (status != 0)
1125 {
1126 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1127 sizeof (answer));
1128 RRDD_UPDATE_SEND;
1129 return (0);
1130 }
1132 pthread_mutex_lock(&stats_lock);
1133 stats_updates_received++;
1134 pthread_mutex_unlock(&stats_lock);
1136 pthread_mutex_lock (&cache_lock);
1137 ci = g_tree_lookup (cache_tree, file);
1139 if (ci == NULL) /* {{{ */
1140 {
1141 struct stat statbuf;
1143 /* don't hold the lock while we setup; stat(2) might block */
1144 pthread_mutex_unlock(&cache_lock);
1146 memset (&statbuf, 0, sizeof (statbuf));
1147 status = stat (file, &statbuf);
1148 if (status != 0)
1149 {
1150 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1152 status = errno;
1153 if (status == ENOENT)
1154 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1155 else
1156 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1157 status);
1158 RRDD_UPDATE_SEND;
1159 return (0);
1160 }
1161 if (!S_ISREG (statbuf.st_mode))
1162 {
1163 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1164 RRDD_UPDATE_SEND;
1165 return (0);
1166 }
1167 if (access(file, R_OK|W_OK) != 0)
1168 {
1169 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1170 file, rrd_strerror(errno));
1171 RRDD_UPDATE_SEND;
1172 return (0);
1173 }
1175 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1176 if (ci == NULL)
1177 {
1178 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1180 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1181 RRDD_UPDATE_SEND;
1182 return (0);
1183 }
1184 memset (ci, 0, sizeof (cache_item_t));
1186 ci->file = strdup (file);
1187 if (ci->file == NULL)
1188 {
1189 free (ci);
1190 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1192 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1193 RRDD_UPDATE_SEND;
1194 return (0);
1195 }
1197 _wipe_ci_values(ci, now);
1198 ci->flags = CI_FLAGS_IN_TREE;
1200 pthread_mutex_lock(&cache_lock);
1201 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1202 } /* }}} */
1203 assert (ci != NULL);
1205 while (buffer_size > 0)
1206 {
1207 char **temp;
1208 char *value;
1210 status = buffer_get_field (&buffer, &buffer_size, &value);
1211 if (status != 0)
1212 {
1213 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1214 break;
1215 }
1217 temp = (char **) realloc (ci->values,
1218 sizeof (char *) * (ci->values_num + 1));
1219 if (temp == NULL)
1220 {
1221 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1222 continue;
1223 }
1224 ci->values = temp;
1226 ci->values[ci->values_num] = strdup (value);
1227 if (ci->values[ci->values_num] == NULL)
1228 {
1229 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1230 continue;
1231 }
1232 ci->values_num++;
1234 values_num++;
1235 }
1237 if (((now - ci->last_flush_time) >= config_write_interval)
1238 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1239 && (ci->values_num > 0))
1240 {
1241 enqueue_cache_item (ci, TAIL);
1242 }
1244 pthread_mutex_unlock (&cache_lock);
1246 if (values_num < 1)
1247 {
1248 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1249 }
1250 else
1251 {
1252 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1253 (values_num == 1) ? "" : "s");
1254 }
1255 RRDD_UPDATE_SEND;
1256 return (0);
1257 #undef RRDD_UPDATE_SEND
1258 } /* }}} int handle_request_update */
1260 /* we came across a "WROTE" entry during journal replay.
1261 * throw away any values that we have accumulated for this file
1262 */
1263 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1264 const char *buffer,
1265 size_t buffer_size __attribute__((unused)))
1266 {
1267 int i;
1268 cache_item_t *ci;
1269 const char *file = buffer;
1271 pthread_mutex_lock(&cache_lock);
1273 ci = g_tree_lookup(cache_tree, file);
1274 if (ci == NULL)
1275 {
1276 pthread_mutex_unlock(&cache_lock);
1277 return (0);
1278 }
1280 if (ci->values)
1281 {
1282 for (i=0; i < ci->values_num; i++)
1283 free(ci->values[i]);
1285 free(ci->values);
1286 }
1288 _wipe_ci_values(ci, time(NULL));
1290 pthread_mutex_unlock(&cache_lock);
1291 return (0);
1292 } /* }}} int handle_request_wrote */
1294 /* if fd < 0, we are in journal replay mode */
1295 static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
1296 {
1297 char *buffer_ptr;
1298 char *command;
1299 int status;
1301 assert (buffer[buffer_size - 1] == '\0');
1303 buffer_ptr = buffer;
1304 command = NULL;
1305 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1306 if (status != 0)
1307 {
1308 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1309 return (-1);
1310 }
1312 if (strcasecmp (command, "update") == 0)
1313 {
1314 /* don't re-write updates in replay mode */
1315 if (fd >= 0)
1316 journal_write(command, buffer_ptr);
1318 return (handle_request_update (fd, buffer_ptr, buffer_size));
1319 }
1320 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1321 {
1322 /* this is only valid in replay mode */
1323 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1324 }
1325 else if (strcasecmp (command, "flush") == 0)
1326 {
1327 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1328 }
1329 else if (strcasecmp (command, "flushall") == 0)
1330 {
1331 return (handle_request_flushall(fd));
1332 }
1333 else if (strcasecmp (command, "stats") == 0)
1334 {
1335 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1336 }
1337 else if (strcasecmp (command, "help") == 0)
1338 {
1339 return (handle_request_help (fd, buffer_ptr, buffer_size));
1340 }
1341 else
1342 {
1343 char result[CMD_MAX];
1345 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1346 result[sizeof (result) - 1] = 0;
1348 status = swrite (fd, result, strlen (result));
1349 if (status < 0)
1350 {
1351 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1352 return (-1);
1353 }
1354 }
1356 return (0);
1357 } /* }}} int handle_request */
1359 /* MUST NOT hold journal_lock before calling this */
1360 static void journal_rotate(void) /* {{{ */
1361 {
1362 FILE *old_fh = NULL;
1364 if (journal_cur == NULL || journal_old == NULL)
1365 return;
1367 pthread_mutex_lock(&journal_lock);
1369 /* we rotate this way (rename before close) so that the we can release
1370 * the journal lock as fast as possible. Journal writes to the new
1371 * journal can proceed immediately after the new file is opened. The
1372 * fclose can then block without affecting new updates.
1373 */
1374 if (journal_fh != NULL)
1375 {
1376 old_fh = journal_fh;
1377 rename(journal_cur, journal_old);
1378 ++stats_journal_rotate;
1379 }
1381 journal_fh = fopen(journal_cur, "a");
1382 pthread_mutex_unlock(&journal_lock);
1384 if (old_fh != NULL)
1385 fclose(old_fh);
1387 if (journal_fh == NULL)
1388 {
1389 RRDD_LOG(LOG_CRIT,
1390 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1391 journal_cur, rrd_strerror(errno));
1393 RRDD_LOG(LOG_ERR,
1394 "JOURNALING DISABLED: All values will be flushed at shutdown");
1395 config_flush_at_shutdown = 1;
1396 }
1398 } /* }}} static void journal_rotate */
1400 static void journal_done(void) /* {{{ */
1401 {
1402 if (journal_cur == NULL)
1403 return;
1405 pthread_mutex_lock(&journal_lock);
1406 if (journal_fh != NULL)
1407 {
1408 fclose(journal_fh);
1409 journal_fh = NULL;
1410 }
1412 if (config_flush_at_shutdown)
1413 {
1414 RRDD_LOG(LOG_INFO, "removing journals");
1415 unlink(journal_old);
1416 unlink(journal_cur);
1417 }
1418 else
1419 {
1420 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1421 "journals will be used at next startup");
1422 }
1424 pthread_mutex_unlock(&journal_lock);
1426 } /* }}} static void journal_done */
1428 static int journal_write(char *cmd, char *args) /* {{{ */
1429 {
1430 int chars;
1432 if (journal_fh == NULL)
1433 return 0;
1435 pthread_mutex_lock(&journal_lock);
1436 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1437 pthread_mutex_unlock(&journal_lock);
1439 if (chars > 0)
1440 {
1441 pthread_mutex_lock(&stats_lock);
1442 stats_journal_bytes += chars;
1443 pthread_mutex_unlock(&stats_lock);
1444 }
1446 return chars;
1447 } /* }}} static int journal_write */
1449 static int journal_replay (const char *file) /* {{{ */
1450 {
1451 FILE *fh;
1452 int entry_cnt = 0;
1453 int fail_cnt = 0;
1454 uint64_t line = 0;
1455 char entry[CMD_MAX];
1457 if (file == NULL) return 0;
1459 fh = fopen(file, "r");
1460 if (fh == NULL)
1461 {
1462 if (errno != ENOENT)
1463 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1464 file, rrd_strerror(errno));
1465 return 0;
1466 }
1467 else
1468 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1470 while(!feof(fh))
1471 {
1472 size_t entry_len;
1474 ++line;
1475 fgets(entry, sizeof(entry), fh);
1476 entry_len = strlen(entry);
1478 /* check \n termination in case journal writing crashed mid-line */
1479 if (entry_len == 0)
1480 continue;
1481 else if (entry[entry_len - 1] != '\n')
1482 {
1483 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1484 ++fail_cnt;
1485 continue;
1486 }
1488 entry[entry_len - 1] = '\0';
1490 if (handle_request(-1, entry, entry_len) == 0)
1491 ++entry_cnt;
1492 else
1493 ++fail_cnt;
1494 }
1496 fclose(fh);
1498 if (entry_cnt > 0)
1499 {
1500 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1501 entry_cnt, fail_cnt);
1502 return 1;
1503 }
1504 else
1505 return 0;
1507 } /* }}} static int journal_replay */
1509 static void *connection_thread_main (void *args) /* {{{ */
1510 {
1511 pthread_t self;
1512 int i;
1513 int fd;
1515 fd = *((int *) args);
1516 free (args);
1518 pthread_mutex_lock (&connection_threads_lock);
1519 {
1520 pthread_t *temp;
1522 temp = (pthread_t *) realloc (connection_threads,
1523 sizeof (pthread_t) * (connection_threads_num + 1));
1524 if (temp == NULL)
1525 {
1526 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1527 }
1528 else
1529 {
1530 connection_threads = temp;
1531 connection_threads[connection_threads_num] = pthread_self ();
1532 connection_threads_num++;
1533 }
1534 }
1535 pthread_mutex_unlock (&connection_threads_lock);
1537 while (do_shutdown == 0)
1538 {
1539 char buffer[CMD_MAX];
1541 struct pollfd pollfd;
1542 int status;
1544 pollfd.fd = fd;
1545 pollfd.events = POLLIN | POLLPRI;
1546 pollfd.revents = 0;
1548 status = poll (&pollfd, 1, /* timeout = */ 500);
1549 if (do_shutdown)
1550 break;
1551 else if (status == 0) /* timeout */
1552 continue;
1553 else if (status < 0) /* error */
1554 {
1555 status = errno;
1556 if (status == EINTR)
1557 continue;
1558 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1559 continue;
1560 }
1562 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1563 {
1564 close (fd);
1565 break;
1566 }
1567 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1568 {
1569 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1570 "poll(2) returned something unexpected: %#04hx",
1571 pollfd.revents);
1572 close (fd);
1573 break;
1574 }
1576 status = (int) sread (fd, buffer, sizeof (buffer));
1577 if (status <= 0)
1578 {
1579 close (fd);
1581 if (status < 0)
1582 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1584 break;
1585 }
1587 status = handle_request (fd, buffer, /*buffer_size=*/ status);
1588 if (status != 0)
1589 break;
1590 }
1592 close(fd);
1594 self = pthread_self ();
1595 /* Remove this thread from the connection threads list */
1596 pthread_mutex_lock (&connection_threads_lock);
1597 /* Find out own index in the array */
1598 for (i = 0; i < connection_threads_num; i++)
1599 if (pthread_equal (connection_threads[i], self) != 0)
1600 break;
1601 assert (i < connection_threads_num);
1603 /* Move the trailing threads forward. */
1604 if (i < (connection_threads_num - 1))
1605 {
1606 memmove (connection_threads + i,
1607 connection_threads + i + 1,
1608 sizeof (pthread_t) * (connection_threads_num - i - 1));
1609 }
1611 connection_threads_num--;
1612 pthread_mutex_unlock (&connection_threads_lock);
1614 return (NULL);
1615 } /* }}} void *connection_thread_main */
1617 static int open_listen_socket_unix (const char *path) /* {{{ */
1618 {
1619 int fd;
1620 struct sockaddr_un sa;
1621 listen_socket_t *temp;
1622 int status;
1624 temp = (listen_socket_t *) realloc (listen_fds,
1625 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1626 if (temp == NULL)
1627 {
1628 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1629 return (-1);
1630 }
1631 listen_fds = temp;
1632 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1634 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1635 if (fd < 0)
1636 {
1637 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1638 return (-1);
1639 }
1641 memset (&sa, 0, sizeof (sa));
1642 sa.sun_family = AF_UNIX;
1643 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1645 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1646 if (status != 0)
1647 {
1648 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1649 close (fd);
1650 unlink (path);
1651 return (-1);
1652 }
1654 status = listen (fd, /* backlog = */ 10);
1655 if (status != 0)
1656 {
1657 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1658 close (fd);
1659 unlink (path);
1660 return (-1);
1661 }
1663 listen_fds[listen_fds_num].fd = fd;
1664 snprintf (listen_fds[listen_fds_num].path,
1665 sizeof (listen_fds[listen_fds_num].path) - 1,
1666 "unix:%s", path);
1667 listen_fds_num++;
1669 return (0);
1670 } /* }}} int open_listen_socket_unix */
1672 static int open_listen_socket (const char *addr_orig) /* {{{ */
1673 {
1674 struct addrinfo ai_hints;
1675 struct addrinfo *ai_res;
1676 struct addrinfo *ai_ptr;
1677 char addr_copy[NI_MAXHOST];
1678 char *addr;
1679 char *port;
1680 int status;
1682 assert (addr_orig != NULL);
1684 strncpy (addr_copy, addr_orig, sizeof (addr_copy));
1685 addr_copy[sizeof (addr_copy) - 1] = 0;
1686 addr = addr_copy;
1688 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1689 return (open_listen_socket_unix (addr + strlen ("unix:")));
1690 else if (addr[0] == '/')
1691 return (open_listen_socket_unix (addr));
1693 memset (&ai_hints, 0, sizeof (ai_hints));
1694 ai_hints.ai_flags = 0;
1695 #ifdef AI_ADDRCONFIG
1696 ai_hints.ai_flags |= AI_ADDRCONFIG;
1697 #endif
1698 ai_hints.ai_family = AF_UNSPEC;
1699 ai_hints.ai_socktype = SOCK_STREAM;
1701 port = NULL;
1702 if (*addr == '[') /* IPv6+port format */
1703 {
1704 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1705 addr++;
1707 port = strchr (addr, ']');
1708 if (port == NULL)
1709 {
1710 RRDD_LOG (LOG_ERR, "open_listen_socket: Malformed address: %s",
1711 addr_orig);
1712 return (-1);
1713 }
1714 *port = 0;
1715 port++;
1717 if (*port == ':')
1718 port++;
1719 else if (*port == 0)
1720 port = NULL;
1721 else
1722 {
1723 RRDD_LOG (LOG_ERR, "open_listen_socket: Garbage after address: %s",
1724 port);
1725 return (-1);
1726 }
1727 } /* if (*addr = ']') */
1728 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1729 {
1730 port = rindex(addr, ':');
1731 if (port != NULL)
1732 {
1733 *port = 0;
1734 port++;
1735 }
1736 }
1737 ai_res = NULL;
1738 status = getaddrinfo (addr,
1739 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1740 &ai_hints, &ai_res);
1741 if (status != 0)
1742 {
1743 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1744 "%s", addr, gai_strerror (status));
1745 return (-1);
1746 }
1748 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1749 {
1750 int fd;
1751 listen_socket_t *temp;
1752 int one = 1;
1754 temp = (listen_socket_t *) realloc (listen_fds,
1755 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1756 if (temp == NULL)
1757 {
1758 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1759 continue;
1760 }
1761 listen_fds = temp;
1762 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1764 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1765 if (fd < 0)
1766 {
1767 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1768 continue;
1769 }
1771 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1773 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1774 if (status != 0)
1775 {
1776 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1777 close (fd);
1778 continue;
1779 }
1781 status = listen (fd, /* backlog = */ 10);
1782 if (status != 0)
1783 {
1784 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1785 close (fd);
1786 return (-1);
1787 }
1789 listen_fds[listen_fds_num].fd = fd;
1790 strncpy (listen_fds[listen_fds_num].path, addr,
1791 sizeof (listen_fds[listen_fds_num].path) - 1);
1792 listen_fds_num++;
1793 } /* for (ai_ptr) */
1795 return (0);
1796 } /* }}} int open_listen_socket */
1798 static int close_listen_sockets (void) /* {{{ */
1799 {
1800 size_t i;
1802 for (i = 0; i < listen_fds_num; i++)
1803 {
1804 close (listen_fds[i].fd);
1805 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1806 unlink (listen_fds[i].path + strlen ("unix:"));
1807 }
1809 free (listen_fds);
1810 listen_fds = NULL;
1811 listen_fds_num = 0;
1813 return (0);
1814 } /* }}} int close_listen_sockets */
1816 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1817 {
1818 struct pollfd *pollfds;
1819 int pollfds_num;
1820 int status;
1821 int i;
1823 for (i = 0; i < config_listen_address_list_len; i++)
1824 open_listen_socket (config_listen_address_list[i]);
1826 if (config_listen_address_list_len < 1)
1827 open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1829 if (listen_fds_num < 1)
1830 {
1831 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1832 "could be opened. Sorry.");
1833 return (NULL);
1834 }
1836 pollfds_num = listen_fds_num;
1837 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1838 if (pollfds == NULL)
1839 {
1840 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1841 return (NULL);
1842 }
1843 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1845 RRDD_LOG(LOG_INFO, "listening for connections");
1847 while (do_shutdown == 0)
1848 {
1849 assert (pollfds_num == ((int) listen_fds_num));
1850 for (i = 0; i < pollfds_num; i++)
1851 {
1852 pollfds[i].fd = listen_fds[i].fd;
1853 pollfds[i].events = POLLIN | POLLPRI;
1854 pollfds[i].revents = 0;
1855 }
1857 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1858 if (do_shutdown)
1859 break;
1860 else if (status == 0) /* timeout */
1861 continue;
1862 else if (status < 0) /* error */
1863 {
1864 status = errno;
1865 if (status != EINTR)
1866 {
1867 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1868 }
1869 continue;
1870 }
1872 for (i = 0; i < pollfds_num; i++)
1873 {
1874 int *client_sd;
1875 struct sockaddr_storage client_sa;
1876 socklen_t client_sa_size;
1877 pthread_t tid;
1878 pthread_attr_t attr;
1880 if (pollfds[i].revents == 0)
1881 continue;
1883 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1884 {
1885 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1886 "poll(2) returned something unexpected for listen FD #%i.",
1887 pollfds[i].fd);
1888 continue;
1889 }
1891 client_sd = (int *) malloc (sizeof (int));
1892 if (client_sd == NULL)
1893 {
1894 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1895 continue;
1896 }
1898 client_sa_size = sizeof (client_sa);
1899 *client_sd = accept (pollfds[i].fd,
1900 (struct sockaddr *) &client_sa, &client_sa_size);
1901 if (*client_sd < 0)
1902 {
1903 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1904 continue;
1905 }
1907 pthread_attr_init (&attr);
1908 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1910 status = pthread_create (&tid, &attr, connection_thread_main,
1911 /* args = */ (void *) client_sd);
1912 if (status != 0)
1913 {
1914 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1915 close (*client_sd);
1916 free (client_sd);
1917 continue;
1918 }
1919 } /* for (pollfds_num) */
1920 } /* while (do_shutdown == 0) */
1922 RRDD_LOG(LOG_INFO, "starting shutdown");
1924 close_listen_sockets ();
1926 pthread_mutex_lock (&connection_threads_lock);
1927 while (connection_threads_num > 0)
1928 {
1929 pthread_t wait_for;
1931 wait_for = connection_threads[0];
1933 pthread_mutex_unlock (&connection_threads_lock);
1934 pthread_join (wait_for, /* retval = */ NULL);
1935 pthread_mutex_lock (&connection_threads_lock);
1936 }
1937 pthread_mutex_unlock (&connection_threads_lock);
1939 return (NULL);
1940 } /* }}} void *listen_thread_main */
1942 static int daemonize (void) /* {{{ */
1943 {
1944 int status;
1945 int fd;
1947 fd = open_pidfile();
1948 if (fd < 0) return fd;
1950 if (!stay_foreground)
1951 {
1952 pid_t child;
1953 char *base_dir;
1955 child = fork ();
1956 if (child < 0)
1957 {
1958 fprintf (stderr, "daemonize: fork(2) failed.\n");
1959 return (-1);
1960 }
1961 else if (child > 0)
1962 {
1963 return (1);
1964 }
1966 /* Change into the /tmp directory. */
1967 base_dir = (config_base_dir != NULL)
1968 ? config_base_dir
1969 : "/tmp";
1970 status = chdir (base_dir);
1971 if (status != 0)
1972 {
1973 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1974 return (-1);
1975 }
1977 /* Become session leader */
1978 setsid ();
1980 /* Open the first three file descriptors to /dev/null */
1981 close (2);
1982 close (1);
1983 close (0);
1985 open ("/dev/null", O_RDWR);
1986 dup (0);
1987 dup (0);
1988 } /* if (!stay_foreground) */
1990 install_signal_handlers();
1992 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1993 RRDD_LOG(LOG_INFO, "starting up");
1995 cache_tree = g_tree_new ((GCompareFunc) strcmp);
1996 if (cache_tree == NULL)
1997 {
1998 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1999 return (-1);
2000 }
2002 status = write_pidfile (fd);
2003 return status;
2004 } /* }}} int daemonize */
2006 static int cleanup (void) /* {{{ */
2007 {
2008 do_shutdown++;
2010 pthread_cond_signal (&cache_cond);
2011 pthread_join (queue_thread, /* return = */ NULL);
2013 remove_pidfile ();
2015 RRDD_LOG(LOG_INFO, "goodbye");
2016 closelog ();
2018 return (0);
2019 } /* }}} int cleanup */
2021 static int read_options (int argc, char **argv) /* {{{ */
2022 {
2023 int option;
2024 int status = 0;
2026 while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?F")) != -1)
2027 {
2028 switch (option)
2029 {
2030 case 'g':
2031 stay_foreground=1;
2032 break;
2034 case 'l':
2035 {
2036 char **temp;
2038 temp = (char **) realloc (config_listen_address_list,
2039 sizeof (char *) * (config_listen_address_list_len + 1));
2040 if (temp == NULL)
2041 {
2042 fprintf (stderr, "read_options: realloc failed.\n");
2043 return (2);
2044 }
2045 config_listen_address_list = temp;
2047 temp[config_listen_address_list_len] = strdup (optarg);
2048 if (temp[config_listen_address_list_len] == NULL)
2049 {
2050 fprintf (stderr, "read_options: strdup failed.\n");
2051 return (2);
2052 }
2053 config_listen_address_list_len++;
2054 }
2055 break;
2057 case 'f':
2058 {
2059 int temp;
2061 temp = atoi (optarg);
2062 if (temp > 0)
2063 config_flush_interval = temp;
2064 else
2065 {
2066 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2067 status = 3;
2068 }
2069 }
2070 break;
2072 case 'w':
2073 {
2074 int temp;
2076 temp = atoi (optarg);
2077 if (temp > 0)
2078 config_write_interval = temp;
2079 else
2080 {
2081 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2082 status = 2;
2083 }
2084 }
2085 break;
2087 case 'z':
2088 {
2089 int temp;
2091 temp = atoi(optarg);
2092 if (temp > 0)
2093 config_write_jitter = temp;
2094 else
2095 {
2096 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2097 status = 2;
2098 }
2100 break;
2101 }
2103 case 'b':
2104 {
2105 size_t len;
2107 if (config_base_dir != NULL)
2108 free (config_base_dir);
2109 config_base_dir = strdup (optarg);
2110 if (config_base_dir == NULL)
2111 {
2112 fprintf (stderr, "read_options: strdup failed.\n");
2113 return (3);
2114 }
2116 len = strlen (config_base_dir);
2117 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2118 {
2119 config_base_dir[len - 1] = 0;
2120 len--;
2121 }
2123 if (len < 1)
2124 {
2125 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2126 return (4);
2127 }
2128 }
2129 break;
2131 case 'p':
2132 {
2133 if (config_pid_file != NULL)
2134 free (config_pid_file);
2135 config_pid_file = strdup (optarg);
2136 if (config_pid_file == NULL)
2137 {
2138 fprintf (stderr, "read_options: strdup failed.\n");
2139 return (3);
2140 }
2141 }
2142 break;
2144 case 'F':
2145 config_flush_at_shutdown = 1;
2146 break;
2148 case 'j':
2149 {
2150 struct stat statbuf;
2151 const char *dir = optarg;
2153 status = stat(dir, &statbuf);
2154 if (status != 0)
2155 {
2156 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2157 return 6;
2158 }
2160 if (!S_ISDIR(statbuf.st_mode)
2161 || access(dir, R_OK|W_OK|X_OK) != 0)
2162 {
2163 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2164 errno ? rrd_strerror(errno) : "");
2165 return 6;
2166 }
2168 journal_cur = malloc(PATH_MAX + 1);
2169 journal_old = malloc(PATH_MAX + 1);
2170 if (journal_cur == NULL || journal_old == NULL)
2171 {
2172 fprintf(stderr, "malloc failure for journal files\n");
2173 return 6;
2174 }
2175 else
2176 {
2177 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2178 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2179 }
2180 }
2181 break;
2183 case 'h':
2184 case '?':
2185 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2186 "\n"
2187 "Usage: rrdcached [options]\n"
2188 "\n"
2189 "Valid options are:\n"
2190 " -l <address> Socket address to listen to.\n"
2191 " -w <seconds> Interval in which to write data.\n"
2192 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2193 " -f <seconds> Interval in which to flush dead data.\n"
2194 " -p <file> Location of the PID-file.\n"
2195 " -b <dir> Base directory to change to.\n"
2196 " -g Do not fork and run in the foreground.\n"
2197 " -j <dir> Directory in which to create the journal files.\n"
2198 " -F Always flush all updates at shutdown\n"
2199 "\n"
2200 "For more information and a detailed description of all options "
2201 "please refer\n"
2202 "to the rrdcached(1) manual page.\n",
2203 VERSION);
2204 status = -1;
2205 break;
2206 } /* switch (option) */
2207 } /* while (getopt) */
2209 /* advise the user when values are not sane */
2210 if (config_flush_interval < 2 * config_write_interval)
2211 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2212 " 2x write interval (-w) !\n");
2213 if (config_write_jitter > config_write_interval)
2214 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2215 " write interval (-w) !\n");
2217 if (journal_cur == NULL)
2218 config_flush_at_shutdown = 1;
2220 return (status);
2221 } /* }}} int read_options */
2223 int main (int argc, char **argv)
2224 {
2225 int status;
2227 status = read_options (argc, argv);
2228 if (status != 0)
2229 {
2230 if (status < 0)
2231 status = 0;
2232 return (status);
2233 }
2235 status = daemonize ();
2236 if (status == 1)
2237 {
2238 struct sigaction sigchld;
2240 memset (&sigchld, 0, sizeof (sigchld));
2241 sigchld.sa_handler = SIG_IGN;
2242 sigaction (SIGCHLD, &sigchld, NULL);
2244 return (0);
2245 }
2246 else if (status != 0)
2247 {
2248 fprintf (stderr, "daemonize failed, exiting.\n");
2249 return (1);
2250 }
2252 if (journal_cur != NULL)
2253 {
2254 int had_journal = 0;
2256 pthread_mutex_lock(&journal_lock);
2258 RRDD_LOG(LOG_INFO, "checking for journal files");
2260 had_journal += journal_replay(journal_old);
2261 had_journal += journal_replay(journal_cur);
2263 if (had_journal)
2264 flush_old_values(-1);
2266 pthread_mutex_unlock(&journal_lock);
2267 journal_rotate();
2269 RRDD_LOG(LOG_INFO, "journal processing complete");
2270 }
2272 /* start the queue thread */
2273 memset (&queue_thread, 0, sizeof (queue_thread));
2274 status = pthread_create (&queue_thread,
2275 NULL, /* attr */
2276 queue_thread_main,
2277 NULL); /* args */
2278 if (status != 0)
2279 {
2280 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2281 cleanup();
2282 return (1);
2283 }
2285 listen_thread_main (NULL);
2286 cleanup ();
2288 return (0);
2289 } /* int main */
2291 /*
2292 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2293 */