e869a86d613508dfaf928db539086642e99b28d8
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 struct listen_socket_s
111 {
112 int fd;
113 char addr[PATH_MAX + 1];
114 int family;
115 socket_privilege privilege;
116 };
117 typedef struct listen_socket_s listen_socket_t;
119 struct cache_item_s;
120 typedef struct cache_item_s cache_item_t;
121 struct cache_item_s
122 {
123 char *file;
124 char **values;
125 int values_num;
126 time_t last_flush_time;
127 #define CI_FLAGS_IN_TREE (1<<0)
128 #define CI_FLAGS_IN_QUEUE (1<<1)
129 int flags;
130 pthread_cond_t flushed;
131 cache_item_t *next;
132 };
134 struct callback_flush_data_s
135 {
136 time_t now;
137 time_t abs_timeout;
138 char **keys;
139 size_t keys_num;
140 };
141 typedef struct callback_flush_data_s callback_flush_data_t;
143 enum queue_side_e
144 {
145 HEAD,
146 TAIL
147 };
148 typedef enum queue_side_e queue_side_t;
150 /* max length of socket command or response */
151 #define CMD_MAX 4096
153 /*
154 * Variables
155 */
156 static int stay_foreground = 0;
158 static listen_socket_t *listen_fds = NULL;
159 static size_t listen_fds_num = 0;
161 static int do_shutdown = 0;
163 static pthread_t queue_thread;
165 static pthread_t *connection_threads = NULL;
166 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
167 static int connection_threads_num = 0;
169 /* Cache stuff */
170 static GTree *cache_tree = NULL;
171 static cache_item_t *cache_queue_head = NULL;
172 static cache_item_t *cache_queue_tail = NULL;
173 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
174 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
176 static int config_write_interval = 300;
177 static int config_write_jitter = 0;
178 static int config_flush_interval = 3600;
179 static int config_flush_at_shutdown = 0;
180 static char *config_pid_file = NULL;
181 static char *config_base_dir = NULL;
182 static size_t _config_base_dir_len = 0;
183 static int config_write_base_only = 0;
185 static listen_socket_t **config_listen_address_list = NULL;
186 static int config_listen_address_list_len = 0;
188 static uint64_t stats_queue_length = 0;
189 static uint64_t stats_updates_received = 0;
190 static uint64_t stats_flush_received = 0;
191 static uint64_t stats_updates_written = 0;
192 static uint64_t stats_data_sets_written = 0;
193 static uint64_t stats_journal_bytes = 0;
194 static uint64_t stats_journal_rotate = 0;
195 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
197 /* Journaled updates */
198 static char *journal_cur = NULL;
199 static char *journal_old = NULL;
200 static FILE *journal_fh = NULL;
201 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
202 static int journal_write(char *cmd, char *args);
203 static void journal_done(void);
204 static void journal_rotate(void);
206 /*
207 * Functions
208 */
209 static void sig_common (const char *sig) /* {{{ */
210 {
211 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
212 do_shutdown++;
213 pthread_cond_broadcast(&cache_cond);
214 } /* }}} void sig_common */
216 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
217 {
218 sig_common("INT");
219 } /* }}} void sig_int_handler */
221 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
222 {
223 sig_common("TERM");
224 } /* }}} void sig_term_handler */
226 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
227 {
228 config_flush_at_shutdown = 1;
229 sig_common("USR1");
230 } /* }}} void sig_usr1_handler */
232 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
233 {
234 config_flush_at_shutdown = 0;
235 sig_common("USR2");
236 } /* }}} void sig_usr2_handler */
238 static void install_signal_handlers(void) /* {{{ */
239 {
240 /* These structures are static, because `sigaction' behaves weird if the are
241 * overwritten.. */
242 static struct sigaction sa_int;
243 static struct sigaction sa_term;
244 static struct sigaction sa_pipe;
245 static struct sigaction sa_usr1;
246 static struct sigaction sa_usr2;
248 /* Install signal handlers */
249 memset (&sa_int, 0, sizeof (sa_int));
250 sa_int.sa_handler = sig_int_handler;
251 sigaction (SIGINT, &sa_int, NULL);
253 memset (&sa_term, 0, sizeof (sa_term));
254 sa_term.sa_handler = sig_term_handler;
255 sigaction (SIGTERM, &sa_term, NULL);
257 memset (&sa_pipe, 0, sizeof (sa_pipe));
258 sa_pipe.sa_handler = SIG_IGN;
259 sigaction (SIGPIPE, &sa_pipe, NULL);
261 memset (&sa_pipe, 0, sizeof (sa_usr1));
262 sa_usr1.sa_handler = sig_usr1_handler;
263 sigaction (SIGUSR1, &sa_usr1, NULL);
265 memset (&sa_usr2, 0, sizeof (sa_usr2));
266 sa_usr2.sa_handler = sig_usr2_handler;
267 sigaction (SIGUSR2, &sa_usr2, NULL);
269 } /* }}} void install_signal_handlers */
271 static int open_pidfile(void) /* {{{ */
272 {
273 int fd;
274 char *file;
276 file = (config_pid_file != NULL)
277 ? config_pid_file
278 : LOCALSTATEDIR "/run/rrdcached.pid";
280 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
281 if (fd < 0)
282 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
283 file, rrd_strerror(errno));
285 return(fd);
286 } /* }}} static int open_pidfile */
288 static int write_pidfile (int fd) /* {{{ */
289 {
290 pid_t pid;
291 FILE *fh;
293 pid = getpid ();
295 fh = fdopen (fd, "w");
296 if (fh == NULL)
297 {
298 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
299 close(fd);
300 return (-1);
301 }
303 fprintf (fh, "%i\n", (int) pid);
304 fclose (fh);
306 return (0);
307 } /* }}} int write_pidfile */
309 static int remove_pidfile (void) /* {{{ */
310 {
311 char *file;
312 int status;
314 file = (config_pid_file != NULL)
315 ? config_pid_file
316 : LOCALSTATEDIR "/run/rrdcached.pid";
318 status = unlink (file);
319 if (status == 0)
320 return (0);
321 return (errno);
322 } /* }}} int remove_pidfile */
324 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
325 {
326 char *buffer;
327 size_t buffer_used;
328 size_t buffer_free;
329 ssize_t status;
331 buffer = (char *) buffer_void;
332 buffer_used = 0;
333 buffer_free = buffer_size;
335 while (buffer_free > 0)
336 {
337 status = read (fd, buffer + buffer_used, buffer_free);
338 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
339 continue;
341 if (status < 0)
342 return (-1);
344 if (status == 0)
345 return (0);
347 assert ((0 > status) || (buffer_free >= (size_t) status));
349 buffer_free = buffer_free - status;
350 buffer_used = buffer_used + status;
352 if (buffer[buffer_used - 1] == '\n')
353 break;
354 }
356 assert (buffer_used > 0);
358 if (buffer[buffer_used - 1] != '\n')
359 {
360 errno = ENOBUFS;
361 return (-1);
362 }
364 buffer[buffer_used - 1] = 0;
366 /* Fix network line endings. */
367 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
368 {
369 buffer_used--;
370 buffer[buffer_used - 1] = 0;
371 }
373 return (buffer_used);
374 } /* }}} ssize_t sread */
376 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
377 {
378 const char *ptr;
379 size_t nleft;
380 ssize_t status;
382 /* special case for journal replay */
383 if (fd < 0) return 0;
385 ptr = (const char *) buf;
386 nleft = count;
388 while (nleft > 0)
389 {
390 status = write (fd, (const void *) ptr, nleft);
392 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
393 continue;
395 if (status < 0)
396 return (status);
398 nleft -= status;
399 ptr += status;
400 }
402 return (0);
403 } /* }}} ssize_t swrite */
405 static void _wipe_ci_values(cache_item_t *ci, time_t when)
406 {
407 ci->values = NULL;
408 ci->values_num = 0;
410 ci->last_flush_time = when;
411 if (config_write_jitter > 0)
412 ci->last_flush_time += (random() % config_write_jitter);
414 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
415 }
417 /*
418 * enqueue_cache_item:
419 * `cache_lock' must be acquired before calling this function!
420 */
421 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
422 queue_side_t side)
423 {
424 int did_insert = 0;
426 if (ci == NULL)
427 return (-1);
429 if (ci->values_num == 0)
430 return (0);
432 if (side == HEAD)
433 {
434 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
435 {
436 assert (ci->next == NULL);
437 ci->next = cache_queue_head;
438 cache_queue_head = ci;
440 if (cache_queue_tail == NULL)
441 cache_queue_tail = cache_queue_head;
443 did_insert = 1;
444 }
445 else if (cache_queue_head == ci)
446 {
447 /* do nothing */
448 }
449 else /* enqueued, but not first entry */
450 {
451 cache_item_t *prev;
453 /* find previous entry */
454 for (prev = cache_queue_head; prev != NULL; prev = prev->next)
455 if (prev->next == ci)
456 break;
457 assert (prev != NULL);
459 /* move to the front */
460 prev->next = ci->next;
461 ci->next = cache_queue_head;
462 cache_queue_head = ci;
464 /* check if we need to adapt the tail */
465 if (cache_queue_tail == ci)
466 cache_queue_tail = prev;
467 }
468 }
469 else /* (side == TAIL) */
470 {
471 /* We don't move values back in the list.. */
472 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
473 return (0);
475 assert (ci->next == NULL);
477 if (cache_queue_tail == NULL)
478 cache_queue_head = ci;
479 else
480 cache_queue_tail->next = ci;
481 cache_queue_tail = ci;
483 did_insert = 1;
484 }
486 ci->flags |= CI_FLAGS_IN_QUEUE;
488 if (did_insert)
489 {
490 pthread_cond_broadcast(&cache_cond);
491 pthread_mutex_lock (&stats_lock);
492 stats_queue_length++;
493 pthread_mutex_unlock (&stats_lock);
494 }
496 return (0);
497 } /* }}} int enqueue_cache_item */
499 /*
500 * tree_callback_flush:
501 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
502 * while this is in progress.
503 */
504 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
505 gpointer data)
506 {
507 cache_item_t *ci;
508 callback_flush_data_t *cfd;
510 ci = (cache_item_t *) value;
511 cfd = (callback_flush_data_t *) data;
513 if ((ci->last_flush_time <= cfd->abs_timeout)
514 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
515 && (ci->values_num > 0))
516 {
517 enqueue_cache_item (ci, TAIL);
518 }
519 else if ((do_shutdown != 0)
520 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
521 && (ci->values_num > 0))
522 {
523 enqueue_cache_item (ci, TAIL);
524 }
525 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
526 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
527 && (ci->values_num <= 0))
528 {
529 char **temp;
531 temp = (char **) realloc (cfd->keys,
532 sizeof (char *) * (cfd->keys_num + 1));
533 if (temp == NULL)
534 {
535 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
536 return (FALSE);
537 }
538 cfd->keys = temp;
539 /* Make really sure this points to the _same_ place */
540 assert ((char *) key == ci->file);
541 cfd->keys[cfd->keys_num] = (char *) key;
542 cfd->keys_num++;
543 }
545 return (FALSE);
546 } /* }}} gboolean tree_callback_flush */
548 static int flush_old_values (int max_age)
549 {
550 callback_flush_data_t cfd;
551 size_t k;
553 memset (&cfd, 0, sizeof (cfd));
554 /* Pass the current time as user data so that we don't need to call
555 * `time' for each node. */
556 cfd.now = time (NULL);
557 cfd.keys = NULL;
558 cfd.keys_num = 0;
560 if (max_age > 0)
561 cfd.abs_timeout = cfd.now - max_age;
562 else
563 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
565 /* `tree_callback_flush' will return the keys of all values that haven't
566 * been touched in the last `config_flush_interval' seconds in `cfd'.
567 * The char*'s in this array point to the same memory as ci->file, so we
568 * don't need to free them separately. */
569 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
571 for (k = 0; k < cfd.keys_num; k++)
572 {
573 cache_item_t *ci;
575 /* This must not fail. */
576 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
577 assert (ci != NULL);
579 /* If we end up here with values available, something's seriously
580 * messed up. */
581 assert (ci->values_num == 0);
583 /* Remove the node from the tree */
584 g_tree_remove (cache_tree, cfd.keys[k]);
585 cfd.keys[k] = NULL;
587 /* Now free and clean up `ci'. */
588 free (ci->file);
589 ci->file = NULL;
590 free (ci);
591 ci = NULL;
592 } /* for (k = 0; k < cfd.keys_num; k++) */
594 if (cfd.keys != NULL)
595 {
596 free (cfd.keys);
597 cfd.keys = NULL;
598 }
600 return (0);
601 } /* int flush_old_values */
603 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
604 {
605 struct timeval now;
606 struct timespec next_flush;
607 int final_flush = 0; /* make sure we only flush once on shutdown */
609 gettimeofday (&now, NULL);
610 next_flush.tv_sec = now.tv_sec + config_flush_interval;
611 next_flush.tv_nsec = 1000 * now.tv_usec;
613 pthread_mutex_lock (&cache_lock);
614 while ((do_shutdown == 0) || (cache_queue_head != NULL))
615 {
616 cache_item_t *ci;
617 char *file;
618 char **values;
619 int values_num;
620 int status;
621 int i;
623 /* First, check if it's time to do the cache flush. */
624 gettimeofday (&now, NULL);
625 if ((now.tv_sec > next_flush.tv_sec)
626 || ((now.tv_sec == next_flush.tv_sec)
627 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
628 {
629 /* Flush all values that haven't been written in the last
630 * `config_write_interval' seconds. */
631 flush_old_values (config_write_interval);
633 /* Determine the time of the next cache flush. */
634 while (next_flush.tv_sec <= now.tv_sec)
635 next_flush.tv_sec += config_flush_interval;
637 /* unlock the cache while we rotate so we don't block incoming
638 * updates if the fsync() blocks on disk I/O */
639 pthread_mutex_unlock(&cache_lock);
640 journal_rotate();
641 pthread_mutex_lock(&cache_lock);
642 }
644 /* Now, check if there's something to store away. If not, wait until
645 * something comes in or it's time to do the cache flush. if we are
646 * shutting down, do not wait around. */
647 if (cache_queue_head == NULL && !do_shutdown)
648 {
649 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
650 if ((status != 0) && (status != ETIMEDOUT))
651 {
652 RRDD_LOG (LOG_ERR, "queue_thread_main: "
653 "pthread_cond_timedwait returned %i.", status);
654 }
655 }
657 /* We're about to shut down */
658 if (do_shutdown != 0 && !final_flush++)
659 {
660 if (config_flush_at_shutdown)
661 flush_old_values (-1); /* flush everything */
662 else
663 break;
664 }
666 /* Check if a value has arrived. This may be NULL if we timed out or there
667 * was an interrupt such as a signal. */
668 if (cache_queue_head == NULL)
669 continue;
671 ci = cache_queue_head;
673 /* copy the relevant parts */
674 file = strdup (ci->file);
675 if (file == NULL)
676 {
677 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
678 continue;
679 }
681 assert(ci->values != NULL);
682 assert(ci->values_num > 0);
684 values = ci->values;
685 values_num = ci->values_num;
687 _wipe_ci_values(ci, time(NULL));
689 cache_queue_head = ci->next;
690 if (cache_queue_head == NULL)
691 cache_queue_tail = NULL;
692 ci->next = NULL;
694 pthread_mutex_lock (&stats_lock);
695 assert (stats_queue_length > 0);
696 stats_queue_length--;
697 pthread_mutex_unlock (&stats_lock);
699 pthread_mutex_unlock (&cache_lock);
701 rrd_clear_error ();
702 status = rrd_update_r (file, NULL, values_num, (void *) values);
703 if (status != 0)
704 {
705 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
706 "rrd_update_r (%s) failed with status %i. (%s)",
707 file, status, rrd_get_error());
708 }
710 journal_write("wrote", file);
711 pthread_cond_broadcast(&ci->flushed);
713 for (i = 0; i < values_num; i++)
714 free (values[i]);
716 free(values);
717 free(file);
719 if (status == 0)
720 {
721 pthread_mutex_lock (&stats_lock);
722 stats_updates_written++;
723 stats_data_sets_written += values_num;
724 pthread_mutex_unlock (&stats_lock);
725 }
727 pthread_mutex_lock (&cache_lock);
729 /* We're about to shut down */
730 if (do_shutdown != 0 && !final_flush++)
731 {
732 if (config_flush_at_shutdown)
733 flush_old_values (-1); /* flush everything */
734 else
735 break;
736 }
737 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
738 pthread_mutex_unlock (&cache_lock);
740 if (config_flush_at_shutdown)
741 {
742 assert(cache_queue_head == NULL);
743 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
744 }
746 journal_done();
748 return (NULL);
749 } /* }}} void *queue_thread_main */
751 static int buffer_get_field (char **buffer_ret, /* {{{ */
752 size_t *buffer_size_ret, char **field_ret)
753 {
754 char *buffer;
755 size_t buffer_pos;
756 size_t buffer_size;
757 char *field;
758 size_t field_size;
759 int status;
761 buffer = *buffer_ret;
762 buffer_pos = 0;
763 buffer_size = *buffer_size_ret;
764 field = *buffer_ret;
765 field_size = 0;
767 if (buffer_size <= 0)
768 return (-1);
770 /* This is ensured by `handle_request'. */
771 assert (buffer[buffer_size - 1] == '\0');
773 status = -1;
774 while (buffer_pos < buffer_size)
775 {
776 /* Check for end-of-field or end-of-buffer */
777 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
778 {
779 field[field_size] = 0;
780 field_size++;
781 buffer_pos++;
782 status = 0;
783 break;
784 }
785 /* Handle escaped characters. */
786 else if (buffer[buffer_pos] == '\\')
787 {
788 if (buffer_pos >= (buffer_size - 1))
789 break;
790 buffer_pos++;
791 field[field_size] = buffer[buffer_pos];
792 field_size++;
793 buffer_pos++;
794 }
795 /* Normal operation */
796 else
797 {
798 field[field_size] = buffer[buffer_pos];
799 field_size++;
800 buffer_pos++;
801 }
802 } /* while (buffer_pos < buffer_size) */
804 if (status != 0)
805 return (status);
807 *buffer_ret = buffer + buffer_pos;
808 *buffer_size_ret = buffer_size - buffer_pos;
809 *field_ret = field;
811 return (0);
812 } /* }}} int buffer_get_field */
814 /* if we're restricting writes to the base directory,
815 * check whether the file falls within the dir
816 * returns 1 if OK, otherwise 0
817 */
818 static int check_file_access (const char *file, int fd) /* {{{ */
819 {
820 char error[CMD_MAX];
821 assert(file != NULL);
823 if (!config_write_base_only
824 || fd < 0 /* journal replay */
825 || config_base_dir == NULL)
826 return 1;
828 if (strstr(file, "../") != NULL) goto err;
830 /* relative paths without "../" are ok */
831 if (*file != '/') return 1;
833 /* file must be of the format base + "/" + <1+ char filename> */
834 if (strlen(file) < _config_base_dir_len + 2) goto err;
835 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
836 if (*(file + _config_base_dir_len) != '/') goto err;
838 return 1;
840 err:
841 snprintf(error, sizeof(error)-1, "-1 %s\n", rrd_strerror(EACCES));
842 swrite(fd, error, strlen(error));
843 return 0;
844 } /* }}} static int check_file_access */
846 static int flush_file (const char *filename) /* {{{ */
847 {
848 cache_item_t *ci;
850 pthread_mutex_lock (&cache_lock);
852 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
853 if (ci == NULL)
854 {
855 pthread_mutex_unlock (&cache_lock);
856 return (ENOENT);
857 }
859 if (ci->values_num > 0)
860 {
861 /* Enqueue at head */
862 enqueue_cache_item (ci, HEAD);
863 pthread_cond_wait(&ci->flushed, &cache_lock);
864 }
866 pthread_mutex_unlock(&cache_lock);
868 return (0);
869 } /* }}} int flush_file */
871 static int handle_request_help (int fd, /* {{{ */
872 char *buffer, size_t buffer_size)
873 {
874 int status;
875 char **help_text;
876 size_t help_text_len;
877 char *command;
878 size_t i;
880 char *help_help[] =
881 {
882 "5 Command overview\n",
883 "FLUSH <filename>\n",
884 "FLUSHALL\n",
885 "HELP [<command>]\n",
886 "UPDATE <filename> <values> [<values> ...]\n",
887 "STATS\n"
888 };
889 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
891 char *help_flush[] =
892 {
893 "4 Help for FLUSH\n",
894 "Usage: FLUSH <filename>\n",
895 "\n",
896 "Adds the given filename to the head of the update queue and returns\n",
897 "after is has been dequeued.\n"
898 };
899 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
901 char *help_flushall[] =
902 {
903 "3 Help for FLUSHALL\n",
904 "Usage: FLUSHALL\n",
905 "\n",
906 "Triggers writing of all pending updates. Returns immediately.\n"
907 };
908 size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
910 char *help_update[] =
911 {
912 "9 Help for UPDATE\n",
913 "Usage: UPDATE <filename> <values> [<values> ...]\n"
914 "\n",
915 "Adds the given file to the internal cache if it is not yet known and\n",
916 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
917 "for details.\n",
918 "\n",
919 "Each <values> has the following form:\n",
920 " <values> = <time>:<value>[:<value>[...]]\n",
921 "See the rrdupdate(1) manpage for details.\n"
922 };
923 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
925 char *help_stats[] =
926 {
927 "4 Help for STATS\n",
928 "Usage: STATS\n",
929 "\n",
930 "Returns some performance counters, see the rrdcached(1) manpage for\n",
931 "a description of the values.\n"
932 };
933 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
935 status = buffer_get_field (&buffer, &buffer_size, &command);
936 if (status != 0)
937 {
938 help_text = help_help;
939 help_text_len = help_help_len;
940 }
941 else
942 {
943 if (strcasecmp (command, "update") == 0)
944 {
945 help_text = help_update;
946 help_text_len = help_update_len;
947 }
948 else if (strcasecmp (command, "flush") == 0)
949 {
950 help_text = help_flush;
951 help_text_len = help_flush_len;
952 }
953 else if (strcasecmp (command, "flushall") == 0)
954 {
955 help_text = help_flushall;
956 help_text_len = help_flushall_len;
957 }
958 else if (strcasecmp (command, "stats") == 0)
959 {
960 help_text = help_stats;
961 help_text_len = help_stats_len;
962 }
963 else
964 {
965 help_text = help_help;
966 help_text_len = help_help_len;
967 }
968 }
970 for (i = 0; i < help_text_len; i++)
971 {
972 status = swrite (fd, help_text[i], strlen (help_text[i]));
973 if (status < 0)
974 {
975 status = errno;
976 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
977 return (status);
978 }
979 }
981 return (0);
982 } /* }}} int handle_request_help */
984 static int handle_request_stats (int fd, /* {{{ */
985 char *buffer __attribute__((unused)),
986 size_t buffer_size __attribute__((unused)))
987 {
988 int status;
989 char outbuf[CMD_MAX];
991 uint64_t copy_queue_length;
992 uint64_t copy_updates_received;
993 uint64_t copy_flush_received;
994 uint64_t copy_updates_written;
995 uint64_t copy_data_sets_written;
996 uint64_t copy_journal_bytes;
997 uint64_t copy_journal_rotate;
999 uint64_t tree_nodes_number;
1000 uint64_t tree_depth;
1002 pthread_mutex_lock (&stats_lock);
1003 copy_queue_length = stats_queue_length;
1004 copy_updates_received = stats_updates_received;
1005 copy_flush_received = stats_flush_received;
1006 copy_updates_written = stats_updates_written;
1007 copy_data_sets_written = stats_data_sets_written;
1008 copy_journal_bytes = stats_journal_bytes;
1009 copy_journal_rotate = stats_journal_rotate;
1010 pthread_mutex_unlock (&stats_lock);
1012 pthread_mutex_lock (&cache_lock);
1013 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1014 tree_depth = (uint64_t) g_tree_height (cache_tree);
1015 pthread_mutex_unlock (&cache_lock);
1017 #define RRDD_STATS_SEND \
1018 outbuf[sizeof (outbuf) - 1] = 0; \
1019 status = swrite (fd, outbuf, strlen (outbuf)); \
1020 if (status < 0) \
1021 { \
1022 status = errno; \
1023 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
1024 return (status); \
1025 }
1027 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
1028 RRDD_STATS_SEND;
1030 snprintf (outbuf, sizeof (outbuf),
1031 "QueueLength: %"PRIu64"\n", copy_queue_length);
1032 RRDD_STATS_SEND;
1034 snprintf (outbuf, sizeof (outbuf),
1035 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1036 RRDD_STATS_SEND;
1038 snprintf (outbuf, sizeof (outbuf),
1039 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1040 RRDD_STATS_SEND;
1042 snprintf (outbuf, sizeof (outbuf),
1043 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1044 RRDD_STATS_SEND;
1046 snprintf (outbuf, sizeof (outbuf),
1047 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1048 RRDD_STATS_SEND;
1050 snprintf (outbuf, sizeof (outbuf),
1051 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1052 RRDD_STATS_SEND;
1054 snprintf (outbuf, sizeof (outbuf),
1055 "TreeDepth: %"PRIu64"\n", tree_depth);
1056 RRDD_STATS_SEND;
1058 snprintf (outbuf, sizeof(outbuf),
1059 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1060 RRDD_STATS_SEND;
1062 snprintf (outbuf, sizeof(outbuf),
1063 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1064 RRDD_STATS_SEND;
1066 return (0);
1067 #undef RRDD_STATS_SEND
1068 } /* }}} int handle_request_stats */
1070 static int handle_request_flush (int fd, /* {{{ */
1071 char *buffer, size_t buffer_size)
1072 {
1073 char *file;
1074 int status;
1075 char result[CMD_MAX];
1077 status = buffer_get_field (&buffer, &buffer_size, &file);
1078 if (status != 0)
1079 {
1080 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1081 }
1082 else
1083 {
1084 pthread_mutex_lock(&stats_lock);
1085 stats_flush_received++;
1086 pthread_mutex_unlock(&stats_lock);
1088 if (!check_file_access(file, fd)) return 0;
1090 status = flush_file (file);
1091 if (status == 0)
1092 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1093 else if (status == ENOENT)
1094 {
1095 /* no file in our tree; see whether it exists at all */
1096 struct stat statbuf;
1098 memset(&statbuf, 0, sizeof(statbuf));
1099 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1100 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1101 else
1102 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1103 }
1104 else if (status < 0)
1105 strncpy (result, "-1 Internal error.\n", sizeof (result));
1106 else
1107 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1108 }
1109 result[sizeof (result) - 1] = 0;
1111 status = swrite (fd, result, strlen (result));
1112 if (status < 0)
1113 {
1114 status = errno;
1115 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1116 return (status);
1117 }
1119 return (0);
1120 } /* }}} int handle_request_flush */
1122 static int handle_request_flushall(int fd) /* {{{ */
1123 {
1124 int status;
1125 char answer[] ="0 Started flush.\n";
1127 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1129 pthread_mutex_lock(&cache_lock);
1130 flush_old_values(-1);
1131 pthread_mutex_unlock(&cache_lock);
1133 status = swrite(fd, answer, strlen(answer));
1134 if (status < 0)
1135 {
1136 status = errno;
1137 RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1138 }
1140 return (status);
1141 } /* }}} static int handle_request_flushall */
1143 static int handle_request_update (int fd, /* {{{ */
1144 char *buffer, size_t buffer_size)
1145 {
1146 char *file;
1147 int values_num = 0;
1148 int status;
1150 time_t now;
1152 cache_item_t *ci;
1153 char answer[CMD_MAX];
1155 #define RRDD_UPDATE_SEND \
1156 answer[sizeof (answer) - 1] = 0; \
1157 status = swrite (fd, answer, strlen (answer)); \
1158 if (status < 0) \
1159 { \
1160 status = errno; \
1161 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1162 return (status); \
1163 }
1165 now = time (NULL);
1167 status = buffer_get_field (&buffer, &buffer_size, &file);
1168 if (status != 0)
1169 {
1170 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1171 sizeof (answer));
1172 RRDD_UPDATE_SEND;
1173 return (0);
1174 }
1176 pthread_mutex_lock(&stats_lock);
1177 stats_updates_received++;
1178 pthread_mutex_unlock(&stats_lock);
1180 if (!check_file_access(file, fd)) return 0;
1182 pthread_mutex_lock (&cache_lock);
1183 ci = g_tree_lookup (cache_tree, file);
1185 if (ci == NULL) /* {{{ */
1186 {
1187 struct stat statbuf;
1189 /* don't hold the lock while we setup; stat(2) might block */
1190 pthread_mutex_unlock(&cache_lock);
1192 memset (&statbuf, 0, sizeof (statbuf));
1193 status = stat (file, &statbuf);
1194 if (status != 0)
1195 {
1196 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1198 status = errno;
1199 if (status == ENOENT)
1200 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1201 else
1202 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1203 status);
1204 RRDD_UPDATE_SEND;
1205 return (0);
1206 }
1207 if (!S_ISREG (statbuf.st_mode))
1208 {
1209 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1210 RRDD_UPDATE_SEND;
1211 return (0);
1212 }
1213 if (access(file, R_OK|W_OK) != 0)
1214 {
1215 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1216 file, rrd_strerror(errno));
1217 RRDD_UPDATE_SEND;
1218 return (0);
1219 }
1221 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1222 if (ci == NULL)
1223 {
1224 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1226 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1227 RRDD_UPDATE_SEND;
1228 return (0);
1229 }
1230 memset (ci, 0, sizeof (cache_item_t));
1232 ci->file = strdup (file);
1233 if (ci->file == NULL)
1234 {
1235 free (ci);
1236 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1238 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1239 RRDD_UPDATE_SEND;
1240 return (0);
1241 }
1243 _wipe_ci_values(ci, now);
1244 ci->flags = CI_FLAGS_IN_TREE;
1246 pthread_mutex_lock(&cache_lock);
1247 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1248 } /* }}} */
1249 assert (ci != NULL);
1251 while (buffer_size > 0)
1252 {
1253 char **temp;
1254 char *value;
1256 status = buffer_get_field (&buffer, &buffer_size, &value);
1257 if (status != 0)
1258 {
1259 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1260 break;
1261 }
1263 temp = (char **) realloc (ci->values,
1264 sizeof (char *) * (ci->values_num + 1));
1265 if (temp == NULL)
1266 {
1267 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1268 continue;
1269 }
1270 ci->values = temp;
1272 ci->values[ci->values_num] = strdup (value);
1273 if (ci->values[ci->values_num] == NULL)
1274 {
1275 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1276 continue;
1277 }
1278 ci->values_num++;
1280 values_num++;
1281 }
1283 if (((now - ci->last_flush_time) >= config_write_interval)
1284 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1285 && (ci->values_num > 0))
1286 {
1287 enqueue_cache_item (ci, TAIL);
1288 }
1290 pthread_mutex_unlock (&cache_lock);
1292 if (values_num < 1)
1293 {
1294 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1295 }
1296 else
1297 {
1298 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1299 (values_num == 1) ? "" : "s");
1300 }
1301 RRDD_UPDATE_SEND;
1302 return (0);
1303 #undef RRDD_UPDATE_SEND
1304 } /* }}} int handle_request_update */
1306 /* we came across a "WROTE" entry during journal replay.
1307 * throw away any values that we have accumulated for this file
1308 */
1309 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1310 const char *buffer,
1311 size_t buffer_size __attribute__((unused)))
1312 {
1313 int i;
1314 cache_item_t *ci;
1315 const char *file = buffer;
1317 pthread_mutex_lock(&cache_lock);
1319 ci = g_tree_lookup(cache_tree, file);
1320 if (ci == NULL)
1321 {
1322 pthread_mutex_unlock(&cache_lock);
1323 return (0);
1324 }
1326 if (ci->values)
1327 {
1328 for (i=0; i < ci->values_num; i++)
1329 free(ci->values[i]);
1331 free(ci->values);
1332 }
1334 _wipe_ci_values(ci, time(NULL));
1336 pthread_mutex_unlock(&cache_lock);
1337 return (0);
1338 } /* }}} int handle_request_wrote */
1340 /* returns 1 if we have the required privilege level */
1341 static int has_privilege (socket_privilege priv, /* {{{ */
1342 socket_privilege required, int fd)
1343 {
1344 int status;
1345 char error[CMD_MAX];
1347 if (priv >= required)
1348 return 1;
1350 sprintf(error, "-1 %s\n", rrd_strerror(EACCES));
1351 status = swrite(fd, error, strlen(error));
1353 if (status < 0)
1354 return status;
1355 else
1356 return 0;
1357 } /* }}} static int has_privilege */
1359 /* if fd < 0, we are in journal replay mode */
1360 static int handle_request (int fd, socket_privilege privilege, /* {{{ */
1361 char *buffer, size_t buffer_size)
1362 {
1363 char *buffer_ptr;
1364 char *command;
1365 int status;
1367 assert (buffer[buffer_size - 1] == '\0');
1369 buffer_ptr = buffer;
1370 command = NULL;
1371 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1372 if (status != 0)
1373 {
1374 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1375 return (-1);
1376 }
1378 if (strcasecmp (command, "update") == 0)
1379 {
1380 status = has_privilege(privilege, PRIV_HIGH, fd);
1381 if (status <= 0)
1382 return status;
1384 /* don't re-write updates in replay mode */
1385 if (fd >= 0)
1386 journal_write(command, buffer_ptr);
1388 return (handle_request_update (fd, buffer_ptr, buffer_size));
1389 }
1390 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1391 {
1392 /* this is only valid in replay mode */
1393 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1394 }
1395 else if (strcasecmp (command, "flush") == 0)
1396 {
1397 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1398 }
1399 else if (strcasecmp (command, "flushall") == 0)
1400 {
1401 status = has_privilege(privilege, PRIV_HIGH, fd);
1402 if (status <= 0)
1403 return status;
1405 return (handle_request_flushall(fd));
1406 }
1407 else if (strcasecmp (command, "stats") == 0)
1408 {
1409 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1410 }
1411 else if (strcasecmp (command, "help") == 0)
1412 {
1413 return (handle_request_help (fd, buffer_ptr, buffer_size));
1414 }
1415 else
1416 {
1417 char result[CMD_MAX];
1419 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1420 result[sizeof (result) - 1] = 0;
1422 status = swrite (fd, result, strlen (result));
1423 if (status < 0)
1424 {
1425 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1426 return (-1);
1427 }
1428 }
1430 return (0);
1431 } /* }}} int handle_request */
1433 /* MUST NOT hold journal_lock before calling this */
1434 static void journal_rotate(void) /* {{{ */
1435 {
1436 FILE *old_fh = NULL;
1438 if (journal_cur == NULL || journal_old == NULL)
1439 return;
1441 pthread_mutex_lock(&journal_lock);
1443 /* we rotate this way (rename before close) so that the we can release
1444 * the journal lock as fast as possible. Journal writes to the new
1445 * journal can proceed immediately after the new file is opened. The
1446 * fclose can then block without affecting new updates.
1447 */
1448 if (journal_fh != NULL)
1449 {
1450 old_fh = journal_fh;
1451 rename(journal_cur, journal_old);
1452 ++stats_journal_rotate;
1453 }
1455 journal_fh = fopen(journal_cur, "a");
1456 pthread_mutex_unlock(&journal_lock);
1458 if (old_fh != NULL)
1459 fclose(old_fh);
1461 if (journal_fh == NULL)
1462 {
1463 RRDD_LOG(LOG_CRIT,
1464 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1465 journal_cur, rrd_strerror(errno));
1467 RRDD_LOG(LOG_ERR,
1468 "JOURNALING DISABLED: All values will be flushed at shutdown");
1469 config_flush_at_shutdown = 1;
1470 }
1472 } /* }}} static void journal_rotate */
1474 static void journal_done(void) /* {{{ */
1475 {
1476 if (journal_cur == NULL)
1477 return;
1479 pthread_mutex_lock(&journal_lock);
1480 if (journal_fh != NULL)
1481 {
1482 fclose(journal_fh);
1483 journal_fh = NULL;
1484 }
1486 if (config_flush_at_shutdown)
1487 {
1488 RRDD_LOG(LOG_INFO, "removing journals");
1489 unlink(journal_old);
1490 unlink(journal_cur);
1491 }
1492 else
1493 {
1494 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1495 "journals will be used at next startup");
1496 }
1498 pthread_mutex_unlock(&journal_lock);
1500 } /* }}} static void journal_done */
1502 static int journal_write(char *cmd, char *args) /* {{{ */
1503 {
1504 int chars;
1506 if (journal_fh == NULL)
1507 return 0;
1509 pthread_mutex_lock(&journal_lock);
1510 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1511 pthread_mutex_unlock(&journal_lock);
1513 if (chars > 0)
1514 {
1515 pthread_mutex_lock(&stats_lock);
1516 stats_journal_bytes += chars;
1517 pthread_mutex_unlock(&stats_lock);
1518 }
1520 return chars;
1521 } /* }}} static int journal_write */
1523 static int journal_replay (const char *file) /* {{{ */
1524 {
1525 FILE *fh;
1526 int entry_cnt = 0;
1527 int fail_cnt = 0;
1528 uint64_t line = 0;
1529 char entry[CMD_MAX];
1531 if (file == NULL) return 0;
1533 fh = fopen(file, "r");
1534 if (fh == NULL)
1535 {
1536 if (errno != ENOENT)
1537 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1538 file, rrd_strerror(errno));
1539 return 0;
1540 }
1541 else
1542 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1544 while(!feof(fh))
1545 {
1546 size_t entry_len;
1548 ++line;
1549 if (fgets(entry, sizeof(entry), fh) == NULL)
1550 break;
1551 entry_len = strlen(entry);
1553 /* check \n termination in case journal writing crashed mid-line */
1554 if (entry_len == 0)
1555 continue;
1556 else if (entry[entry_len - 1] != '\n')
1557 {
1558 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1559 ++fail_cnt;
1560 continue;
1561 }
1563 entry[entry_len - 1] = '\0';
1565 if (handle_request(-1, PRIV_HIGH, entry, entry_len) == 0)
1566 ++entry_cnt;
1567 else
1568 ++fail_cnt;
1569 }
1571 fclose(fh);
1573 if (entry_cnt > 0)
1574 {
1575 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1576 entry_cnt, fail_cnt);
1577 return 1;
1578 }
1579 else
1580 return 0;
1582 } /* }}} static int journal_replay */
1584 static void *connection_thread_main (void *args) /* {{{ */
1585 {
1586 pthread_t self;
1587 listen_socket_t *sock;
1588 int i;
1589 int fd;
1591 sock = (listen_socket_t *) args;
1592 fd = sock->fd;
1594 pthread_mutex_lock (&connection_threads_lock);
1595 {
1596 pthread_t *temp;
1598 temp = (pthread_t *) realloc (connection_threads,
1599 sizeof (pthread_t) * (connection_threads_num + 1));
1600 if (temp == NULL)
1601 {
1602 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1603 }
1604 else
1605 {
1606 connection_threads = temp;
1607 connection_threads[connection_threads_num] = pthread_self ();
1608 connection_threads_num++;
1609 }
1610 }
1611 pthread_mutex_unlock (&connection_threads_lock);
1613 while (do_shutdown == 0)
1614 {
1615 char buffer[CMD_MAX];
1617 struct pollfd pollfd;
1618 int status;
1620 pollfd.fd = fd;
1621 pollfd.events = POLLIN | POLLPRI;
1622 pollfd.revents = 0;
1624 status = poll (&pollfd, 1, /* timeout = */ 500);
1625 if (do_shutdown)
1626 break;
1627 else if (status == 0) /* timeout */
1628 continue;
1629 else if (status < 0) /* error */
1630 {
1631 status = errno;
1632 if (status == EINTR)
1633 continue;
1634 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1635 continue;
1636 }
1638 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1639 {
1640 close (fd);
1641 break;
1642 }
1643 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1644 {
1645 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1646 "poll(2) returned something unexpected: %#04hx",
1647 pollfd.revents);
1648 close (fd);
1649 break;
1650 }
1652 status = (int) sread (fd, buffer, sizeof (buffer));
1653 if (status <= 0)
1654 {
1655 close (fd);
1657 if (status < 0)
1658 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1660 break;
1661 }
1663 status = handle_request (fd, sock->privilege, buffer, status);
1664 if (status != 0)
1665 break;
1666 }
1668 close(fd);
1669 free(args);
1671 self = pthread_self ();
1672 /* Remove this thread from the connection threads list */
1673 pthread_mutex_lock (&connection_threads_lock);
1674 /* Find out own index in the array */
1675 for (i = 0; i < connection_threads_num; i++)
1676 if (pthread_equal (connection_threads[i], self) != 0)
1677 break;
1678 assert (i < connection_threads_num);
1680 /* Move the trailing threads forward. */
1681 if (i < (connection_threads_num - 1))
1682 {
1683 memmove (connection_threads + i,
1684 connection_threads + i + 1,
1685 sizeof (pthread_t) * (connection_threads_num - i - 1));
1686 }
1688 connection_threads_num--;
1689 pthread_mutex_unlock (&connection_threads_lock);
1691 return (NULL);
1692 } /* }}} void *connection_thread_main */
1694 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1695 {
1696 int fd;
1697 struct sockaddr_un sa;
1698 listen_socket_t *temp;
1699 int status;
1700 const char *path;
1702 path = sock->addr;
1703 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1704 path += strlen("unix:");
1706 temp = (listen_socket_t *) realloc (listen_fds,
1707 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1708 if (temp == NULL)
1709 {
1710 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1711 return (-1);
1712 }
1713 listen_fds = temp;
1714 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1716 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1717 if (fd < 0)
1718 {
1719 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1720 return (-1);
1721 }
1723 memset (&sa, 0, sizeof (sa));
1724 sa.sun_family = AF_UNIX;
1725 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1727 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1728 if (status != 0)
1729 {
1730 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1731 close (fd);
1732 unlink (path);
1733 return (-1);
1734 }
1736 status = listen (fd, /* backlog = */ 10);
1737 if (status != 0)
1738 {
1739 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1740 close (fd);
1741 unlink (path);
1742 return (-1);
1743 }
1745 listen_fds[listen_fds_num].fd = fd;
1746 listen_fds[listen_fds_num].family = PF_UNIX;
1747 strncpy(listen_fds[listen_fds_num].addr, path,
1748 sizeof (listen_fds[listen_fds_num].addr) - 1);
1749 listen_fds_num++;
1751 return (0);
1752 } /* }}} int open_listen_socket_unix */
1754 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1755 {
1756 struct addrinfo ai_hints;
1757 struct addrinfo *ai_res;
1758 struct addrinfo *ai_ptr;
1759 char addr_copy[NI_MAXHOST];
1760 char *addr;
1761 char *port;
1762 int status;
1764 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1765 addr_copy[sizeof (addr_copy) - 1] = 0;
1766 addr = addr_copy;
1768 memset (&ai_hints, 0, sizeof (ai_hints));
1769 ai_hints.ai_flags = 0;
1770 #ifdef AI_ADDRCONFIG
1771 ai_hints.ai_flags |= AI_ADDRCONFIG;
1772 #endif
1773 ai_hints.ai_family = AF_UNSPEC;
1774 ai_hints.ai_socktype = SOCK_STREAM;
1776 port = NULL;
1777 if (*addr == '[') /* IPv6+port format */
1778 {
1779 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1780 addr++;
1782 port = strchr (addr, ']');
1783 if (port == NULL)
1784 {
1785 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1786 sock->addr);
1787 return (-1);
1788 }
1789 *port = 0;
1790 port++;
1792 if (*port == ':')
1793 port++;
1794 else if (*port == 0)
1795 port = NULL;
1796 else
1797 {
1798 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1799 port);
1800 return (-1);
1801 }
1802 } /* if (*addr = ']') */
1803 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1804 {
1805 port = rindex(addr, ':');
1806 if (port != NULL)
1807 {
1808 *port = 0;
1809 port++;
1810 }
1811 }
1812 ai_res = NULL;
1813 status = getaddrinfo (addr,
1814 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1815 &ai_hints, &ai_res);
1816 if (status != 0)
1817 {
1818 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1819 "%s", addr, gai_strerror (status));
1820 return (-1);
1821 }
1823 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1824 {
1825 int fd;
1826 listen_socket_t *temp;
1827 int one = 1;
1829 temp = (listen_socket_t *) realloc (listen_fds,
1830 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1831 if (temp == NULL)
1832 {
1833 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1834 continue;
1835 }
1836 listen_fds = temp;
1837 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1839 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1840 if (fd < 0)
1841 {
1842 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1843 continue;
1844 }
1846 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1848 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1849 if (status != 0)
1850 {
1851 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1852 close (fd);
1853 continue;
1854 }
1856 status = listen (fd, /* backlog = */ 10);
1857 if (status != 0)
1858 {
1859 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1860 close (fd);
1861 return (-1);
1862 }
1864 listen_fds[listen_fds_num].fd = fd;
1865 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1866 listen_fds_num++;
1867 } /* for (ai_ptr) */
1869 return (0);
1870 } /* }}} static int open_listen_socket_network */
1872 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1873 {
1874 assert(sock != NULL);
1875 assert(sock->addr != NULL);
1877 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1878 || sock->addr[0] == '/')
1879 return (open_listen_socket_unix(sock));
1880 else
1881 return (open_listen_socket_network(sock));
1882 } /* }}} int open_listen_socket */
1884 static int close_listen_sockets (void) /* {{{ */
1885 {
1886 size_t i;
1888 for (i = 0; i < listen_fds_num; i++)
1889 {
1890 close (listen_fds[i].fd);
1892 if (listen_fds[i].family == PF_UNIX)
1893 unlink(listen_fds[i].addr);
1894 }
1896 free (listen_fds);
1897 listen_fds = NULL;
1898 listen_fds_num = 0;
1900 return (0);
1901 } /* }}} int close_listen_sockets */
1903 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1904 {
1905 struct pollfd *pollfds;
1906 int pollfds_num;
1907 int status;
1908 int i;
1910 for (i = 0; i < config_listen_address_list_len; i++)
1911 open_listen_socket (config_listen_address_list[i]);
1913 if (config_listen_address_list_len < 1)
1914 {
1915 listen_socket_t sock;
1916 memset(&sock, 0, sizeof(sock));
1917 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1918 open_listen_socket (&sock);
1919 }
1921 if (listen_fds_num < 1)
1922 {
1923 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1924 "could be opened. Sorry.");
1925 return (NULL);
1926 }
1928 pollfds_num = listen_fds_num;
1929 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1930 if (pollfds == NULL)
1931 {
1932 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1933 return (NULL);
1934 }
1935 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1937 RRDD_LOG(LOG_INFO, "listening for connections");
1939 while (do_shutdown == 0)
1940 {
1941 assert (pollfds_num == ((int) listen_fds_num));
1942 for (i = 0; i < pollfds_num; i++)
1943 {
1944 pollfds[i].fd = listen_fds[i].fd;
1945 pollfds[i].events = POLLIN | POLLPRI;
1946 pollfds[i].revents = 0;
1947 }
1949 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1950 if (do_shutdown)
1951 break;
1952 else if (status == 0) /* timeout */
1953 continue;
1954 else if (status < 0) /* error */
1955 {
1956 status = errno;
1957 if (status != EINTR)
1958 {
1959 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1960 }
1961 continue;
1962 }
1964 for (i = 0; i < pollfds_num; i++)
1965 {
1966 listen_socket_t *client_sock;
1967 struct sockaddr_storage client_sa;
1968 socklen_t client_sa_size;
1969 pthread_t tid;
1970 pthread_attr_t attr;
1972 if (pollfds[i].revents == 0)
1973 continue;
1975 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1976 {
1977 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1978 "poll(2) returned something unexpected for listen FD #%i.",
1979 pollfds[i].fd);
1980 continue;
1981 }
1983 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1984 if (client_sock == NULL)
1985 {
1986 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1987 continue;
1988 }
1989 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1991 client_sa_size = sizeof (client_sa);
1992 client_sock->fd = accept (pollfds[i].fd,
1993 (struct sockaddr *) &client_sa, &client_sa_size);
1994 if (client_sock->fd < 0)
1995 {
1996 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1997 free(client_sock);
1998 continue;
1999 }
2001 pthread_attr_init (&attr);
2002 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2004 status = pthread_create (&tid, &attr, connection_thread_main,
2005 client_sock);
2006 if (status != 0)
2007 {
2008 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2009 close (client_sock->fd);
2010 free (client_sock);
2011 continue;
2012 }
2013 } /* for (pollfds_num) */
2014 } /* while (do_shutdown == 0) */
2016 RRDD_LOG(LOG_INFO, "starting shutdown");
2018 close_listen_sockets ();
2020 pthread_mutex_lock (&connection_threads_lock);
2021 while (connection_threads_num > 0)
2022 {
2023 pthread_t wait_for;
2025 wait_for = connection_threads[0];
2027 pthread_mutex_unlock (&connection_threads_lock);
2028 pthread_join (wait_for, /* retval = */ NULL);
2029 pthread_mutex_lock (&connection_threads_lock);
2030 }
2031 pthread_mutex_unlock (&connection_threads_lock);
2033 return (NULL);
2034 } /* }}} void *listen_thread_main */
2036 static int daemonize (void) /* {{{ */
2037 {
2038 int status;
2039 int fd;
2040 char *base_dir;
2042 fd = open_pidfile();
2043 if (fd < 0) return fd;
2045 if (!stay_foreground)
2046 {
2047 pid_t child;
2049 child = fork ();
2050 if (child < 0)
2051 {
2052 fprintf (stderr, "daemonize: fork(2) failed.\n");
2053 return (-1);
2054 }
2055 else if (child > 0)
2056 {
2057 return (1);
2058 }
2060 /* Become session leader */
2061 setsid ();
2063 /* Open the first three file descriptors to /dev/null */
2064 close (2);
2065 close (1);
2066 close (0);
2068 open ("/dev/null", O_RDWR);
2069 dup (0);
2070 dup (0);
2071 } /* if (!stay_foreground) */
2073 /* Change into the /tmp directory. */
2074 base_dir = (config_base_dir != NULL)
2075 ? config_base_dir
2076 : "/tmp";
2077 status = chdir (base_dir);
2078 if (status != 0)
2079 {
2080 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2081 return (-1);
2082 }
2084 install_signal_handlers();
2086 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2087 RRDD_LOG(LOG_INFO, "starting up");
2089 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2090 if (cache_tree == NULL)
2091 {
2092 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2093 return (-1);
2094 }
2096 status = write_pidfile (fd);
2097 return status;
2098 } /* }}} int daemonize */
2100 static int cleanup (void) /* {{{ */
2101 {
2102 do_shutdown++;
2104 pthread_cond_signal (&cache_cond);
2105 pthread_join (queue_thread, /* return = */ NULL);
2107 remove_pidfile ();
2109 RRDD_LOG(LOG_INFO, "goodbye");
2110 closelog ();
2112 return (0);
2113 } /* }}} int cleanup */
2115 static int read_options (int argc, char **argv) /* {{{ */
2116 {
2117 int option;
2118 int status = 0;
2120 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2121 {
2122 switch (option)
2123 {
2124 case 'g':
2125 stay_foreground=1;
2126 break;
2128 case 'L':
2129 case 'l':
2130 {
2131 listen_socket_t **temp;
2132 listen_socket_t *new;
2134 new = malloc(sizeof(listen_socket_t));
2135 if (new == NULL)
2136 {
2137 fprintf(stderr, "read_options: malloc failed.\n");
2138 return(2);
2139 }
2140 memset(new, 0, sizeof(listen_socket_t));
2142 temp = (listen_socket_t **) realloc (config_listen_address_list,
2143 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2144 if (temp == NULL)
2145 {
2146 fprintf (stderr, "read_options: realloc failed.\n");
2147 return (2);
2148 }
2149 config_listen_address_list = temp;
2151 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2152 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2154 temp[config_listen_address_list_len] = new;
2155 config_listen_address_list_len++;
2156 }
2157 break;
2159 case 'f':
2160 {
2161 int temp;
2163 temp = atoi (optarg);
2164 if (temp > 0)
2165 config_flush_interval = temp;
2166 else
2167 {
2168 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2169 status = 3;
2170 }
2171 }
2172 break;
2174 case 'w':
2175 {
2176 int temp;
2178 temp = atoi (optarg);
2179 if (temp > 0)
2180 config_write_interval = temp;
2181 else
2182 {
2183 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2184 status = 2;
2185 }
2186 }
2187 break;
2189 case 'z':
2190 {
2191 int temp;
2193 temp = atoi(optarg);
2194 if (temp > 0)
2195 config_write_jitter = temp;
2196 else
2197 {
2198 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2199 status = 2;
2200 }
2202 break;
2203 }
2205 case 'B':
2206 config_write_base_only = 1;
2207 break;
2209 case 'b':
2210 {
2211 size_t len;
2213 if (config_base_dir != NULL)
2214 free (config_base_dir);
2215 config_base_dir = strdup (optarg);
2216 if (config_base_dir == NULL)
2217 {
2218 fprintf (stderr, "read_options: strdup failed.\n");
2219 return (3);
2220 }
2222 len = strlen (config_base_dir);
2223 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2224 {
2225 config_base_dir[len - 1] = 0;
2226 len--;
2227 }
2229 if (len < 1)
2230 {
2231 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2232 return (4);
2233 }
2235 _config_base_dir_len = len;
2236 }
2237 break;
2239 case 'p':
2240 {
2241 if (config_pid_file != NULL)
2242 free (config_pid_file);
2243 config_pid_file = strdup (optarg);
2244 if (config_pid_file == NULL)
2245 {
2246 fprintf (stderr, "read_options: strdup failed.\n");
2247 return (3);
2248 }
2249 }
2250 break;
2252 case 'F':
2253 config_flush_at_shutdown = 1;
2254 break;
2256 case 'j':
2257 {
2258 struct stat statbuf;
2259 const char *dir = optarg;
2261 status = stat(dir, &statbuf);
2262 if (status != 0)
2263 {
2264 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2265 return 6;
2266 }
2268 if (!S_ISDIR(statbuf.st_mode)
2269 || access(dir, R_OK|W_OK|X_OK) != 0)
2270 {
2271 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2272 errno ? rrd_strerror(errno) : "");
2273 return 6;
2274 }
2276 journal_cur = malloc(PATH_MAX + 1);
2277 journal_old = malloc(PATH_MAX + 1);
2278 if (journal_cur == NULL || journal_old == NULL)
2279 {
2280 fprintf(stderr, "malloc failure for journal files\n");
2281 return 6;
2282 }
2283 else
2284 {
2285 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2286 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2287 }
2288 }
2289 break;
2291 case 'h':
2292 case '?':
2293 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2294 "\n"
2295 "Usage: rrdcached [options]\n"
2296 "\n"
2297 "Valid options are:\n"
2298 " -l <address> Socket address to listen to.\n"
2299 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2300 " -w <seconds> Interval in which to write data.\n"
2301 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2302 " -f <seconds> Interval in which to flush dead data.\n"
2303 " -p <file> Location of the PID-file.\n"
2304 " -b <dir> Base directory to change to.\n"
2305 " -B Restrict file access to paths within -b <dir>\n"
2306 " -g Do not fork and run in the foreground.\n"
2307 " -j <dir> Directory in which to create the journal files.\n"
2308 " -F Always flush all updates at shutdown\n"
2309 "\n"
2310 "For more information and a detailed description of all options "
2311 "please refer\n"
2312 "to the rrdcached(1) manual page.\n",
2313 VERSION);
2314 status = -1;
2315 break;
2316 } /* switch (option) */
2317 } /* while (getopt) */
2319 /* advise the user when values are not sane */
2320 if (config_flush_interval < 2 * config_write_interval)
2321 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2322 " 2x write interval (-w) !\n");
2323 if (config_write_jitter > config_write_interval)
2324 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2325 " write interval (-w) !\n");
2327 if (config_write_base_only && config_base_dir == NULL)
2328 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2329 " Consult the rrdcached documentation\n");
2331 if (journal_cur == NULL)
2332 config_flush_at_shutdown = 1;
2334 return (status);
2335 } /* }}} int read_options */
2337 int main (int argc, char **argv)
2338 {
2339 int status;
2341 status = read_options (argc, argv);
2342 if (status != 0)
2343 {
2344 if (status < 0)
2345 status = 0;
2346 return (status);
2347 }
2349 status = daemonize ();
2350 if (status == 1)
2351 {
2352 struct sigaction sigchld;
2354 memset (&sigchld, 0, sizeof (sigchld));
2355 sigchld.sa_handler = SIG_IGN;
2356 sigaction (SIGCHLD, &sigchld, NULL);
2358 return (0);
2359 }
2360 else if (status != 0)
2361 {
2362 fprintf (stderr, "daemonize failed, exiting.\n");
2363 return (1);
2364 }
2366 if (journal_cur != NULL)
2367 {
2368 int had_journal = 0;
2370 pthread_mutex_lock(&journal_lock);
2372 RRDD_LOG(LOG_INFO, "checking for journal files");
2374 had_journal += journal_replay(journal_old);
2375 had_journal += journal_replay(journal_cur);
2377 if (had_journal)
2378 flush_old_values(-1);
2380 pthread_mutex_unlock(&journal_lock);
2381 journal_rotate();
2383 RRDD_LOG(LOG_INFO, "journal processing complete");
2384 }
2386 /* start the queue thread */
2387 memset (&queue_thread, 0, sizeof (queue_thread));
2388 status = pthread_create (&queue_thread,
2389 NULL, /* attr */
2390 queue_thread_main,
2391 NULL); /* args */
2392 if (status != 0)
2393 {
2394 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2395 cleanup();
2396 return (1);
2397 }
2399 listen_thread_main (NULL);
2400 cleanup ();
2402 return (0);
2403 } /* int main */
2405 /*
2406 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2407 */