8c4f04226eebf648bda21f45c6051dbb0c940563
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 struct listen_socket_s
111 {
112 int fd;
113 char addr[PATH_MAX + 1];
114 int family;
115 socket_privilege privilege;
116 };
117 typedef struct listen_socket_s listen_socket_t;
119 struct cache_item_s;
120 typedef struct cache_item_s cache_item_t;
121 struct cache_item_s
122 {
123 char *file;
124 char **values;
125 int values_num;
126 time_t last_flush_time;
127 #define CI_FLAGS_IN_TREE (1<<0)
128 #define CI_FLAGS_IN_QUEUE (1<<1)
129 int flags;
130 pthread_cond_t flushed;
131 cache_item_t *prev;
132 cache_item_t *next;
133 };
135 struct callback_flush_data_s
136 {
137 time_t now;
138 time_t abs_timeout;
139 char **keys;
140 size_t keys_num;
141 };
142 typedef struct callback_flush_data_s callback_flush_data_t;
144 enum queue_side_e
145 {
146 HEAD,
147 TAIL
148 };
149 typedef enum queue_side_e queue_side_t;
151 /* max length of socket command or response */
152 #define CMD_MAX 4096
154 /*
155 * Variables
156 */
157 static int stay_foreground = 0;
159 static listen_socket_t *listen_fds = NULL;
160 static size_t listen_fds_num = 0;
162 static int do_shutdown = 0;
164 static pthread_t queue_thread;
166 static pthread_t *connection_threads = NULL;
167 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
168 static int connection_threads_num = 0;
170 /* Cache stuff */
171 static GTree *cache_tree = NULL;
172 static cache_item_t *cache_queue_head = NULL;
173 static cache_item_t *cache_queue_tail = NULL;
174 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
175 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
177 static int config_write_interval = 300;
178 static int config_write_jitter = 0;
179 static int config_flush_interval = 3600;
180 static int config_flush_at_shutdown = 0;
181 static char *config_pid_file = NULL;
182 static char *config_base_dir = NULL;
183 static size_t _config_base_dir_len = 0;
184 static int config_write_base_only = 0;
186 static listen_socket_t **config_listen_address_list = NULL;
187 static int config_listen_address_list_len = 0;
189 static uint64_t stats_queue_length = 0;
190 static uint64_t stats_updates_received = 0;
191 static uint64_t stats_flush_received = 0;
192 static uint64_t stats_updates_written = 0;
193 static uint64_t stats_data_sets_written = 0;
194 static uint64_t stats_journal_bytes = 0;
195 static uint64_t stats_journal_rotate = 0;
196 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
198 /* Journaled updates */
199 static char *journal_cur = NULL;
200 static char *journal_old = NULL;
201 static FILE *journal_fh = NULL;
202 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
203 static int journal_write(char *cmd, char *args);
204 static void journal_done(void);
205 static void journal_rotate(void);
207 /*
208 * Functions
209 */
210 static void sig_common (const char *sig) /* {{{ */
211 {
212 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
213 do_shutdown++;
214 pthread_cond_broadcast(&cache_cond);
215 } /* }}} void sig_common */
217 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
218 {
219 sig_common("INT");
220 } /* }}} void sig_int_handler */
222 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
223 {
224 sig_common("TERM");
225 } /* }}} void sig_term_handler */
227 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
228 {
229 config_flush_at_shutdown = 1;
230 sig_common("USR1");
231 } /* }}} void sig_usr1_handler */
233 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
234 {
235 config_flush_at_shutdown = 0;
236 sig_common("USR2");
237 } /* }}} void sig_usr2_handler */
239 static void install_signal_handlers(void) /* {{{ */
240 {
241 /* These structures are static, because `sigaction' behaves weird if the are
242 * overwritten.. */
243 static struct sigaction sa_int;
244 static struct sigaction sa_term;
245 static struct sigaction sa_pipe;
246 static struct sigaction sa_usr1;
247 static struct sigaction sa_usr2;
249 /* Install signal handlers */
250 memset (&sa_int, 0, sizeof (sa_int));
251 sa_int.sa_handler = sig_int_handler;
252 sigaction (SIGINT, &sa_int, NULL);
254 memset (&sa_term, 0, sizeof (sa_term));
255 sa_term.sa_handler = sig_term_handler;
256 sigaction (SIGTERM, &sa_term, NULL);
258 memset (&sa_pipe, 0, sizeof (sa_pipe));
259 sa_pipe.sa_handler = SIG_IGN;
260 sigaction (SIGPIPE, &sa_pipe, NULL);
262 memset (&sa_pipe, 0, sizeof (sa_usr1));
263 sa_usr1.sa_handler = sig_usr1_handler;
264 sigaction (SIGUSR1, &sa_usr1, NULL);
266 memset (&sa_usr2, 0, sizeof (sa_usr2));
267 sa_usr2.sa_handler = sig_usr2_handler;
268 sigaction (SIGUSR2, &sa_usr2, NULL);
270 } /* }}} void install_signal_handlers */
272 static int open_pidfile(void) /* {{{ */
273 {
274 int fd;
275 char *file;
277 file = (config_pid_file != NULL)
278 ? config_pid_file
279 : LOCALSTATEDIR "/run/rrdcached.pid";
281 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
282 if (fd < 0)
283 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
284 file, rrd_strerror(errno));
286 return(fd);
287 } /* }}} static int open_pidfile */
289 static int write_pidfile (int fd) /* {{{ */
290 {
291 pid_t pid;
292 FILE *fh;
294 pid = getpid ();
296 fh = fdopen (fd, "w");
297 if (fh == NULL)
298 {
299 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
300 close(fd);
301 return (-1);
302 }
304 fprintf (fh, "%i\n", (int) pid);
305 fclose (fh);
307 return (0);
308 } /* }}} int write_pidfile */
310 static int remove_pidfile (void) /* {{{ */
311 {
312 char *file;
313 int status;
315 file = (config_pid_file != NULL)
316 ? config_pid_file
317 : LOCALSTATEDIR "/run/rrdcached.pid";
319 status = unlink (file);
320 if (status == 0)
321 return (0);
322 return (errno);
323 } /* }}} int remove_pidfile */
325 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
326 {
327 char *buffer;
328 size_t buffer_used;
329 size_t buffer_free;
330 ssize_t status;
332 buffer = (char *) buffer_void;
333 buffer_used = 0;
334 buffer_free = buffer_size;
336 while (buffer_free > 0)
337 {
338 status = read (fd, buffer + buffer_used, buffer_free);
339 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
340 continue;
342 if (status < 0)
343 return (-1);
345 if (status == 0)
346 return (0);
348 assert ((0 > status) || (buffer_free >= (size_t) status));
350 buffer_free = buffer_free - status;
351 buffer_used = buffer_used + status;
353 if (buffer[buffer_used - 1] == '\n')
354 break;
355 }
357 assert (buffer_used > 0);
359 if (buffer[buffer_used - 1] != '\n')
360 {
361 errno = ENOBUFS;
362 return (-1);
363 }
365 buffer[buffer_used - 1] = 0;
367 /* Fix network line endings. */
368 if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
369 {
370 buffer_used--;
371 buffer[buffer_used - 1] = 0;
372 }
374 return (buffer_used);
375 } /* }}} ssize_t sread */
377 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
378 {
379 const char *ptr;
380 size_t nleft;
381 ssize_t status;
383 /* special case for journal replay */
384 if (fd < 0) return 0;
386 ptr = (const char *) buf;
387 nleft = count;
389 while (nleft > 0)
390 {
391 status = write (fd, (const void *) ptr, nleft);
393 if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
394 continue;
396 if (status < 0)
397 return (status);
399 nleft -= status;
400 ptr += status;
401 }
403 return (0);
404 } /* }}} ssize_t swrite */
406 static void wipe_ci_values(cache_item_t *ci, time_t when)
407 {
408 ci->values = NULL;
409 ci->values_num = 0;
411 ci->last_flush_time = when;
412 if (config_write_jitter > 0)
413 ci->last_flush_time += (random() % config_write_jitter);
414 }
416 /* remove_from_queue
417 * remove a "cache_item_t" item from the queue.
418 * must hold 'cache_lock' when calling this
419 */
420 static void remove_from_queue(cache_item_t *ci) /* {{{ */
421 {
422 if (ci == NULL) return;
424 if (ci->prev == NULL)
425 cache_queue_head = ci->next; /* reset head */
426 else
427 ci->prev->next = ci->next;
429 if (ci->next == NULL)
430 cache_queue_tail = ci->prev; /* reset the tail */
431 else
432 ci->next->prev = ci->prev;
434 ci->next = ci->prev = NULL;
435 ci->flags &= ~CI_FLAGS_IN_QUEUE;
436 } /* }}} static void remove_from_queue */
438 /*
439 * enqueue_cache_item:
440 * `cache_lock' must be acquired before calling this function!
441 */
442 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
443 queue_side_t side)
444 {
445 if (ci == NULL)
446 return (-1);
448 if (ci->values_num == 0)
449 return (0);
451 if (side == HEAD)
452 {
453 if (cache_queue_head == ci)
454 return 0;
456 /* remove from the double linked list */
457 if (ci->flags & CI_FLAGS_IN_QUEUE)
458 remove_from_queue(ci);
460 ci->prev = NULL;
461 ci->next = cache_queue_head;
462 if (ci->next != NULL)
463 ci->next->prev = ci;
464 cache_queue_head = ci;
466 if (cache_queue_tail == NULL)
467 cache_queue_tail = cache_queue_head;
468 }
469 else /* (side == TAIL) */
470 {
471 /* We don't move values back in the list.. */
472 if (ci->flags & CI_FLAGS_IN_QUEUE)
473 return (0);
475 assert (ci->next == NULL);
476 assert (ci->prev == NULL);
478 ci->prev = cache_queue_tail;
480 if (cache_queue_tail == NULL)
481 cache_queue_head = ci;
482 else
483 cache_queue_tail->next = ci;
485 cache_queue_tail = ci;
486 }
488 ci->flags |= CI_FLAGS_IN_QUEUE;
490 pthread_cond_broadcast(&cache_cond);
491 pthread_mutex_lock (&stats_lock);
492 stats_queue_length++;
493 pthread_mutex_unlock (&stats_lock);
495 return (0);
496 } /* }}} int enqueue_cache_item */
498 /*
499 * tree_callback_flush:
500 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
501 * while this is in progress.
502 */
503 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
504 gpointer data)
505 {
506 cache_item_t *ci;
507 callback_flush_data_t *cfd;
509 ci = (cache_item_t *) value;
510 cfd = (callback_flush_data_t *) data;
512 if ((ci->last_flush_time <= cfd->abs_timeout)
513 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
514 && (ci->values_num > 0))
515 {
516 enqueue_cache_item (ci, TAIL);
517 }
518 else if ((do_shutdown != 0)
519 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
520 && (ci->values_num > 0))
521 {
522 enqueue_cache_item (ci, TAIL);
523 }
524 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
525 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
526 && (ci->values_num <= 0))
527 {
528 char **temp;
530 temp = (char **) realloc (cfd->keys,
531 sizeof (char *) * (cfd->keys_num + 1));
532 if (temp == NULL)
533 {
534 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
535 return (FALSE);
536 }
537 cfd->keys = temp;
538 /* Make really sure this points to the _same_ place */
539 assert ((char *) key == ci->file);
540 cfd->keys[cfd->keys_num] = (char *) key;
541 cfd->keys_num++;
542 }
544 return (FALSE);
545 } /* }}} gboolean tree_callback_flush */
547 static int flush_old_values (int max_age)
548 {
549 callback_flush_data_t cfd;
550 size_t k;
552 memset (&cfd, 0, sizeof (cfd));
553 /* Pass the current time as user data so that we don't need to call
554 * `time' for each node. */
555 cfd.now = time (NULL);
556 cfd.keys = NULL;
557 cfd.keys_num = 0;
559 if (max_age > 0)
560 cfd.abs_timeout = cfd.now - max_age;
561 else
562 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
564 /* `tree_callback_flush' will return the keys of all values that haven't
565 * been touched in the last `config_flush_interval' seconds in `cfd'.
566 * The char*'s in this array point to the same memory as ci->file, so we
567 * don't need to free them separately. */
568 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
570 for (k = 0; k < cfd.keys_num; k++)
571 {
572 cache_item_t *ci;
574 /* This must not fail. */
575 ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
576 assert (ci != NULL);
578 /* If we end up here with values available, something's seriously
579 * messed up. */
580 assert (ci->values_num == 0);
582 /* Remove the node from the tree */
583 g_tree_remove (cache_tree, cfd.keys[k]);
584 cfd.keys[k] = NULL;
586 /* Now free and clean up `ci'. */
587 free (ci->file);
588 ci->file = NULL;
589 free (ci);
590 ci = NULL;
591 } /* for (k = 0; k < cfd.keys_num; k++) */
593 if (cfd.keys != NULL)
594 {
595 free (cfd.keys);
596 cfd.keys = NULL;
597 }
599 return (0);
600 } /* int flush_old_values */
602 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
603 {
604 struct timeval now;
605 struct timespec next_flush;
606 int final_flush = 0; /* make sure we only flush once on shutdown */
608 gettimeofday (&now, NULL);
609 next_flush.tv_sec = now.tv_sec + config_flush_interval;
610 next_flush.tv_nsec = 1000 * now.tv_usec;
612 pthread_mutex_lock (&cache_lock);
613 while ((do_shutdown == 0) || (cache_queue_head != NULL))
614 {
615 cache_item_t *ci;
616 char *file;
617 char **values;
618 int values_num;
619 int status;
620 int i;
622 /* First, check if it's time to do the cache flush. */
623 gettimeofday (&now, NULL);
624 if ((now.tv_sec > next_flush.tv_sec)
625 || ((now.tv_sec == next_flush.tv_sec)
626 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
627 {
628 /* Flush all values that haven't been written in the last
629 * `config_write_interval' seconds. */
630 flush_old_values (config_write_interval);
632 /* Determine the time of the next cache flush. */
633 while (next_flush.tv_sec <= now.tv_sec)
634 next_flush.tv_sec += config_flush_interval;
636 /* unlock the cache while we rotate so we don't block incoming
637 * updates if the fsync() blocks on disk I/O */
638 pthread_mutex_unlock(&cache_lock);
639 journal_rotate();
640 pthread_mutex_lock(&cache_lock);
641 }
643 /* Now, check if there's something to store away. If not, wait until
644 * something comes in or it's time to do the cache flush. if we are
645 * shutting down, do not wait around. */
646 if (cache_queue_head == NULL && !do_shutdown)
647 {
648 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
649 if ((status != 0) && (status != ETIMEDOUT))
650 {
651 RRDD_LOG (LOG_ERR, "queue_thread_main: "
652 "pthread_cond_timedwait returned %i.", status);
653 }
654 }
656 /* We're about to shut down */
657 if (do_shutdown != 0 && !final_flush++)
658 {
659 if (config_flush_at_shutdown)
660 flush_old_values (-1); /* flush everything */
661 else
662 break;
663 }
665 /* Check if a value has arrived. This may be NULL if we timed out or there
666 * was an interrupt such as a signal. */
667 if (cache_queue_head == NULL)
668 continue;
670 ci = cache_queue_head;
672 /* copy the relevant parts */
673 file = strdup (ci->file);
674 if (file == NULL)
675 {
676 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
677 continue;
678 }
680 assert(ci->values != NULL);
681 assert(ci->values_num > 0);
683 values = ci->values;
684 values_num = ci->values_num;
686 wipe_ci_values(ci, time(NULL));
687 remove_from_queue(ci);
689 pthread_mutex_lock (&stats_lock);
690 assert (stats_queue_length > 0);
691 stats_queue_length--;
692 pthread_mutex_unlock (&stats_lock);
694 pthread_mutex_unlock (&cache_lock);
696 rrd_clear_error ();
697 status = rrd_update_r (file, NULL, values_num, (void *) values);
698 if (status != 0)
699 {
700 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
701 "rrd_update_r (%s) failed with status %i. (%s)",
702 file, status, rrd_get_error());
703 }
705 journal_write("wrote", file);
706 pthread_cond_broadcast(&ci->flushed);
708 for (i = 0; i < values_num; i++)
709 free (values[i]);
711 free(values);
712 free(file);
714 if (status == 0)
715 {
716 pthread_mutex_lock (&stats_lock);
717 stats_updates_written++;
718 stats_data_sets_written += values_num;
719 pthread_mutex_unlock (&stats_lock);
720 }
722 pthread_mutex_lock (&cache_lock);
724 /* We're about to shut down */
725 if (do_shutdown != 0 && !final_flush++)
726 {
727 if (config_flush_at_shutdown)
728 flush_old_values (-1); /* flush everything */
729 else
730 break;
731 }
732 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
733 pthread_mutex_unlock (&cache_lock);
735 if (config_flush_at_shutdown)
736 {
737 assert(cache_queue_head == NULL);
738 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
739 }
741 journal_done();
743 return (NULL);
744 } /* }}} void *queue_thread_main */
746 static int buffer_get_field (char **buffer_ret, /* {{{ */
747 size_t *buffer_size_ret, char **field_ret)
748 {
749 char *buffer;
750 size_t buffer_pos;
751 size_t buffer_size;
752 char *field;
753 size_t field_size;
754 int status;
756 buffer = *buffer_ret;
757 buffer_pos = 0;
758 buffer_size = *buffer_size_ret;
759 field = *buffer_ret;
760 field_size = 0;
762 if (buffer_size <= 0)
763 return (-1);
765 /* This is ensured by `handle_request'. */
766 assert (buffer[buffer_size - 1] == '\0');
768 status = -1;
769 while (buffer_pos < buffer_size)
770 {
771 /* Check for end-of-field or end-of-buffer */
772 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
773 {
774 field[field_size] = 0;
775 field_size++;
776 buffer_pos++;
777 status = 0;
778 break;
779 }
780 /* Handle escaped characters. */
781 else if (buffer[buffer_pos] == '\\')
782 {
783 if (buffer_pos >= (buffer_size - 1))
784 break;
785 buffer_pos++;
786 field[field_size] = buffer[buffer_pos];
787 field_size++;
788 buffer_pos++;
789 }
790 /* Normal operation */
791 else
792 {
793 field[field_size] = buffer[buffer_pos];
794 field_size++;
795 buffer_pos++;
796 }
797 } /* while (buffer_pos < buffer_size) */
799 if (status != 0)
800 return (status);
802 *buffer_ret = buffer + buffer_pos;
803 *buffer_size_ret = buffer_size - buffer_pos;
804 *field_ret = field;
806 return (0);
807 } /* }}} int buffer_get_field */
809 /* if we're restricting writes to the base directory,
810 * check whether the file falls within the dir
811 * returns 1 if OK, otherwise 0
812 */
813 static int check_file_access (const char *file, int fd) /* {{{ */
814 {
815 char error[CMD_MAX];
816 assert(file != NULL);
818 if (!config_write_base_only
819 || fd < 0 /* journal replay */
820 || config_base_dir == NULL)
821 return 1;
823 if (strstr(file, "../") != NULL) goto err;
825 /* relative paths without "../" are ok */
826 if (*file != '/') return 1;
828 /* file must be of the format base + "/" + <1+ char filename> */
829 if (strlen(file) < _config_base_dir_len + 2) goto err;
830 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
831 if (*(file + _config_base_dir_len) != '/') goto err;
833 return 1;
835 err:
836 snprintf(error, sizeof(error)-1, "-1 %s\n", rrd_strerror(EACCES));
837 swrite(fd, error, strlen(error));
838 return 0;
839 } /* }}} static int check_file_access */
841 static int flush_file (const char *filename) /* {{{ */
842 {
843 cache_item_t *ci;
845 pthread_mutex_lock (&cache_lock);
847 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
848 if (ci == NULL)
849 {
850 pthread_mutex_unlock (&cache_lock);
851 return (ENOENT);
852 }
854 if (ci->values_num > 0)
855 {
856 /* Enqueue at head */
857 enqueue_cache_item (ci, HEAD);
858 pthread_cond_wait(&ci->flushed, &cache_lock);
859 }
861 pthread_mutex_unlock(&cache_lock);
863 return (0);
864 } /* }}} int flush_file */
866 static int handle_request_help (int fd, /* {{{ */
867 char *buffer, size_t buffer_size)
868 {
869 int status;
870 char **help_text;
871 size_t help_text_len;
872 char *command;
873 size_t i;
875 char *help_help[] =
876 {
877 "5 Command overview\n",
878 "FLUSH <filename>\n",
879 "FLUSHALL\n",
880 "HELP [<command>]\n",
881 "UPDATE <filename> <values> [<values> ...]\n",
882 "STATS\n"
883 };
884 size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
886 char *help_flush[] =
887 {
888 "4 Help for FLUSH\n",
889 "Usage: FLUSH <filename>\n",
890 "\n",
891 "Adds the given filename to the head of the update queue and returns\n",
892 "after is has been dequeued.\n"
893 };
894 size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
896 char *help_flushall[] =
897 {
898 "3 Help for FLUSHALL\n",
899 "Usage: FLUSHALL\n",
900 "\n",
901 "Triggers writing of all pending updates. Returns immediately.\n"
902 };
903 size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
905 char *help_update[] =
906 {
907 "9 Help for UPDATE\n",
908 "Usage: UPDATE <filename> <values> [<values> ...]\n"
909 "\n",
910 "Adds the given file to the internal cache if it is not yet known and\n",
911 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
912 "for details.\n",
913 "\n",
914 "Each <values> has the following form:\n",
915 " <values> = <time>:<value>[:<value>[...]]\n",
916 "See the rrdupdate(1) manpage for details.\n"
917 };
918 size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
920 char *help_stats[] =
921 {
922 "4 Help for STATS\n",
923 "Usage: STATS\n",
924 "\n",
925 "Returns some performance counters, see the rrdcached(1) manpage for\n",
926 "a description of the values.\n"
927 };
928 size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
930 status = buffer_get_field (&buffer, &buffer_size, &command);
931 if (status != 0)
932 {
933 help_text = help_help;
934 help_text_len = help_help_len;
935 }
936 else
937 {
938 if (strcasecmp (command, "update") == 0)
939 {
940 help_text = help_update;
941 help_text_len = help_update_len;
942 }
943 else if (strcasecmp (command, "flush") == 0)
944 {
945 help_text = help_flush;
946 help_text_len = help_flush_len;
947 }
948 else if (strcasecmp (command, "flushall") == 0)
949 {
950 help_text = help_flushall;
951 help_text_len = help_flushall_len;
952 }
953 else if (strcasecmp (command, "stats") == 0)
954 {
955 help_text = help_stats;
956 help_text_len = help_stats_len;
957 }
958 else
959 {
960 help_text = help_help;
961 help_text_len = help_help_len;
962 }
963 }
965 for (i = 0; i < help_text_len; i++)
966 {
967 status = swrite (fd, help_text[i], strlen (help_text[i]));
968 if (status < 0)
969 {
970 status = errno;
971 RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
972 return (status);
973 }
974 }
976 return (0);
977 } /* }}} int handle_request_help */
979 static int handle_request_stats (int fd, /* {{{ */
980 char *buffer __attribute__((unused)),
981 size_t buffer_size __attribute__((unused)))
982 {
983 int status;
984 char outbuf[CMD_MAX];
986 uint64_t copy_queue_length;
987 uint64_t copy_updates_received;
988 uint64_t copy_flush_received;
989 uint64_t copy_updates_written;
990 uint64_t copy_data_sets_written;
991 uint64_t copy_journal_bytes;
992 uint64_t copy_journal_rotate;
994 uint64_t tree_nodes_number;
995 uint64_t tree_depth;
997 pthread_mutex_lock (&stats_lock);
998 copy_queue_length = stats_queue_length;
999 copy_updates_received = stats_updates_received;
1000 copy_flush_received = stats_flush_received;
1001 copy_updates_written = stats_updates_written;
1002 copy_data_sets_written = stats_data_sets_written;
1003 copy_journal_bytes = stats_journal_bytes;
1004 copy_journal_rotate = stats_journal_rotate;
1005 pthread_mutex_unlock (&stats_lock);
1007 pthread_mutex_lock (&cache_lock);
1008 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1009 tree_depth = (uint64_t) g_tree_height (cache_tree);
1010 pthread_mutex_unlock (&cache_lock);
1012 #define RRDD_STATS_SEND \
1013 outbuf[sizeof (outbuf) - 1] = 0; \
1014 status = swrite (fd, outbuf, strlen (outbuf)); \
1015 if (status < 0) \
1016 { \
1017 status = errno; \
1018 RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
1019 return (status); \
1020 }
1022 strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
1023 RRDD_STATS_SEND;
1025 snprintf (outbuf, sizeof (outbuf),
1026 "QueueLength: %"PRIu64"\n", copy_queue_length);
1027 RRDD_STATS_SEND;
1029 snprintf (outbuf, sizeof (outbuf),
1030 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1031 RRDD_STATS_SEND;
1033 snprintf (outbuf, sizeof (outbuf),
1034 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1035 RRDD_STATS_SEND;
1037 snprintf (outbuf, sizeof (outbuf),
1038 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1039 RRDD_STATS_SEND;
1041 snprintf (outbuf, sizeof (outbuf),
1042 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1043 RRDD_STATS_SEND;
1045 snprintf (outbuf, sizeof (outbuf),
1046 "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1047 RRDD_STATS_SEND;
1049 snprintf (outbuf, sizeof (outbuf),
1050 "TreeDepth: %"PRIu64"\n", tree_depth);
1051 RRDD_STATS_SEND;
1053 snprintf (outbuf, sizeof(outbuf),
1054 "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1055 RRDD_STATS_SEND;
1057 snprintf (outbuf, sizeof(outbuf),
1058 "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1059 RRDD_STATS_SEND;
1061 return (0);
1062 #undef RRDD_STATS_SEND
1063 } /* }}} int handle_request_stats */
1065 static int handle_request_flush (int fd, /* {{{ */
1066 char *buffer, size_t buffer_size)
1067 {
1068 char *file;
1069 int status;
1070 char result[CMD_MAX];
1072 status = buffer_get_field (&buffer, &buffer_size, &file);
1073 if (status != 0)
1074 {
1075 strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1076 }
1077 else
1078 {
1079 pthread_mutex_lock(&stats_lock);
1080 stats_flush_received++;
1081 pthread_mutex_unlock(&stats_lock);
1083 if (!check_file_access(file, fd)) return 0;
1085 status = flush_file (file);
1086 if (status == 0)
1087 snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1088 else if (status == ENOENT)
1089 {
1090 /* no file in our tree; see whether it exists at all */
1091 struct stat statbuf;
1093 memset(&statbuf, 0, sizeof(statbuf));
1094 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1095 snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1096 else
1097 snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1098 }
1099 else if (status < 0)
1100 strncpy (result, "-1 Internal error.\n", sizeof (result));
1101 else
1102 snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1103 }
1104 result[sizeof (result) - 1] = 0;
1106 status = swrite (fd, result, strlen (result));
1107 if (status < 0)
1108 {
1109 status = errno;
1110 RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1111 return (status);
1112 }
1114 return (0);
1115 } /* }}} int handle_request_slurp */
1117 static int handle_request_flushall(int fd) /* {{{ */
1118 {
1119 int status;
1120 char answer[] ="0 Started flush.\n";
1122 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1124 pthread_mutex_lock(&cache_lock);
1125 flush_old_values(-1);
1126 pthread_mutex_unlock(&cache_lock);
1128 status = swrite(fd, answer, strlen(answer));
1129 if (status < 0)
1130 {
1131 status = errno;
1132 RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1133 }
1135 return (status);
1136 } /* }}} static int handle_request_flushall */
1138 static int handle_request_update (int fd, /* {{{ */
1139 char *buffer, size_t buffer_size)
1140 {
1141 char *file;
1142 int values_num = 0;
1143 int status;
1145 time_t now;
1147 cache_item_t *ci;
1148 char answer[CMD_MAX];
1150 #define RRDD_UPDATE_SEND \
1151 answer[sizeof (answer) - 1] = 0; \
1152 status = swrite (fd, answer, strlen (answer)); \
1153 if (status < 0) \
1154 { \
1155 status = errno; \
1156 RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1157 return (status); \
1158 }
1160 now = time (NULL);
1162 status = buffer_get_field (&buffer, &buffer_size, &file);
1163 if (status != 0)
1164 {
1165 strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1166 sizeof (answer));
1167 RRDD_UPDATE_SEND;
1168 return (0);
1169 }
1171 pthread_mutex_lock(&stats_lock);
1172 stats_updates_received++;
1173 pthread_mutex_unlock(&stats_lock);
1175 if (!check_file_access(file, fd)) return 0;
1177 pthread_mutex_lock (&cache_lock);
1178 ci = g_tree_lookup (cache_tree, file);
1180 if (ci == NULL) /* {{{ */
1181 {
1182 struct stat statbuf;
1184 /* don't hold the lock while we setup; stat(2) might block */
1185 pthread_mutex_unlock(&cache_lock);
1187 memset (&statbuf, 0, sizeof (statbuf));
1188 status = stat (file, &statbuf);
1189 if (status != 0)
1190 {
1191 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1193 status = errno;
1194 if (status == ENOENT)
1195 snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1196 else
1197 snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1198 status);
1199 RRDD_UPDATE_SEND;
1200 return (0);
1201 }
1202 if (!S_ISREG (statbuf.st_mode))
1203 {
1204 snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1205 RRDD_UPDATE_SEND;
1206 return (0);
1207 }
1208 if (access(file, R_OK|W_OK) != 0)
1209 {
1210 snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1211 file, rrd_strerror(errno));
1212 RRDD_UPDATE_SEND;
1213 return (0);
1214 }
1216 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1217 if (ci == NULL)
1218 {
1219 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1221 strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1222 RRDD_UPDATE_SEND;
1223 return (0);
1224 }
1225 memset (ci, 0, sizeof (cache_item_t));
1227 ci->file = strdup (file);
1228 if (ci->file == NULL)
1229 {
1230 free (ci);
1231 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1233 strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1234 RRDD_UPDATE_SEND;
1235 return (0);
1236 }
1238 wipe_ci_values(ci, now);
1239 ci->flags = CI_FLAGS_IN_TREE;
1241 pthread_mutex_lock(&cache_lock);
1242 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1243 } /* }}} */
1244 assert (ci != NULL);
1246 while (buffer_size > 0)
1247 {
1248 char **temp;
1249 char *value;
1251 status = buffer_get_field (&buffer, &buffer_size, &value);
1252 if (status != 0)
1253 {
1254 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1255 break;
1256 }
1258 temp = (char **) realloc (ci->values,
1259 sizeof (char *) * (ci->values_num + 1));
1260 if (temp == NULL)
1261 {
1262 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1263 continue;
1264 }
1265 ci->values = temp;
1267 ci->values[ci->values_num] = strdup (value);
1268 if (ci->values[ci->values_num] == NULL)
1269 {
1270 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1271 continue;
1272 }
1273 ci->values_num++;
1275 values_num++;
1276 }
1278 if (((now - ci->last_flush_time) >= config_write_interval)
1279 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1280 && (ci->values_num > 0))
1281 {
1282 enqueue_cache_item (ci, TAIL);
1283 }
1285 pthread_mutex_unlock (&cache_lock);
1287 if (values_num < 1)
1288 {
1289 strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1290 }
1291 else
1292 {
1293 snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1294 (values_num == 1) ? "" : "s");
1295 }
1296 RRDD_UPDATE_SEND;
1297 return (0);
1298 #undef RRDD_UPDATE_SEND
1299 } /* }}} int handle_request_update */
1301 /* we came across a "WROTE" entry during journal replay.
1302 * throw away any values that we have accumulated for this file
1303 */
1304 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1305 const char *buffer,
1306 size_t buffer_size __attribute__((unused)))
1307 {
1308 int i;
1309 cache_item_t *ci;
1310 const char *file = buffer;
1312 pthread_mutex_lock(&cache_lock);
1314 ci = g_tree_lookup(cache_tree, file);
1315 if (ci == NULL)
1316 {
1317 pthread_mutex_unlock(&cache_lock);
1318 return (0);
1319 }
1321 if (ci->values)
1322 {
1323 for (i=0; i < ci->values_num; i++)
1324 free(ci->values[i]);
1326 free(ci->values);
1327 }
1329 wipe_ci_values(ci, time(NULL));
1330 remove_from_queue(ci);
1332 pthread_mutex_unlock(&cache_lock);
1333 return (0);
1334 } /* }}} int handle_request_wrote */
1336 /* returns 1 if we have the required privilege level */
1337 static int has_privilege (socket_privilege priv, /* {{{ */
1338 socket_privilege required, int fd)
1339 {
1340 int status;
1341 char error[CMD_MAX];
1343 if (priv >= required)
1344 return 1;
1346 sprintf(error, "-1 %s\n", rrd_strerror(EACCES));
1347 status = swrite(fd, error, strlen(error));
1349 if (status < 0)
1350 return status;
1351 else
1352 return 0;
1353 } /* }}} static int has_privilege */
1355 /* if fd < 0, we are in journal replay mode */
1356 static int handle_request (int fd, socket_privilege privilege, /* {{{ */
1357 char *buffer, size_t buffer_size)
1358 {
1359 char *buffer_ptr;
1360 char *command;
1361 int status;
1363 assert (buffer[buffer_size - 1] == '\0');
1365 buffer_ptr = buffer;
1366 command = NULL;
1367 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1368 if (status != 0)
1369 {
1370 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1371 return (-1);
1372 }
1374 if (strcasecmp (command, "update") == 0)
1375 {
1376 status = has_privilege(privilege, PRIV_HIGH, fd);
1377 if (status <= 0)
1378 return status;
1380 /* don't re-write updates in replay mode */
1381 if (fd >= 0)
1382 journal_write(command, buffer_ptr);
1384 return (handle_request_update (fd, buffer_ptr, buffer_size));
1385 }
1386 else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1387 {
1388 /* this is only valid in replay mode */
1389 return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1390 }
1391 else if (strcasecmp (command, "flush") == 0)
1392 {
1393 return (handle_request_flush (fd, buffer_ptr, buffer_size));
1394 }
1395 else if (strcasecmp (command, "flushall") == 0)
1396 {
1397 status = has_privilege(privilege, PRIV_HIGH, fd);
1398 if (status <= 0)
1399 return status;
1401 return (handle_request_flushall(fd));
1402 }
1403 else if (strcasecmp (command, "stats") == 0)
1404 {
1405 return (handle_request_stats (fd, buffer_ptr, buffer_size));
1406 }
1407 else if (strcasecmp (command, "help") == 0)
1408 {
1409 return (handle_request_help (fd, buffer_ptr, buffer_size));
1410 }
1411 else
1412 {
1413 char result[CMD_MAX];
1415 snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1416 result[sizeof (result) - 1] = 0;
1418 status = swrite (fd, result, strlen (result));
1419 if (status < 0)
1420 {
1421 RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1422 return (-1);
1423 }
1424 }
1426 return (0);
1427 } /* }}} int handle_request */
1429 /* MUST NOT hold journal_lock before calling this */
1430 static void journal_rotate(void) /* {{{ */
1431 {
1432 FILE *old_fh = NULL;
1434 if (journal_cur == NULL || journal_old == NULL)
1435 return;
1437 pthread_mutex_lock(&journal_lock);
1439 /* we rotate this way (rename before close) so that the we can release
1440 * the journal lock as fast as possible. Journal writes to the new
1441 * journal can proceed immediately after the new file is opened. The
1442 * fclose can then block without affecting new updates.
1443 */
1444 if (journal_fh != NULL)
1445 {
1446 old_fh = journal_fh;
1447 rename(journal_cur, journal_old);
1448 ++stats_journal_rotate;
1449 }
1451 journal_fh = fopen(journal_cur, "a");
1452 pthread_mutex_unlock(&journal_lock);
1454 if (old_fh != NULL)
1455 fclose(old_fh);
1457 if (journal_fh == NULL)
1458 {
1459 RRDD_LOG(LOG_CRIT,
1460 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1461 journal_cur, rrd_strerror(errno));
1463 RRDD_LOG(LOG_ERR,
1464 "JOURNALING DISABLED: All values will be flushed at shutdown");
1465 config_flush_at_shutdown = 1;
1466 }
1468 } /* }}} static void journal_rotate */
1470 static void journal_done(void) /* {{{ */
1471 {
1472 if (journal_cur == NULL)
1473 return;
1475 pthread_mutex_lock(&journal_lock);
1476 if (journal_fh != NULL)
1477 {
1478 fclose(journal_fh);
1479 journal_fh = NULL;
1480 }
1482 if (config_flush_at_shutdown)
1483 {
1484 RRDD_LOG(LOG_INFO, "removing journals");
1485 unlink(journal_old);
1486 unlink(journal_cur);
1487 }
1488 else
1489 {
1490 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1491 "journals will be used at next startup");
1492 }
1494 pthread_mutex_unlock(&journal_lock);
1496 } /* }}} static void journal_done */
1498 static int journal_write(char *cmd, char *args) /* {{{ */
1499 {
1500 int chars;
1502 if (journal_fh == NULL)
1503 return 0;
1505 pthread_mutex_lock(&journal_lock);
1506 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1507 pthread_mutex_unlock(&journal_lock);
1509 if (chars > 0)
1510 {
1511 pthread_mutex_lock(&stats_lock);
1512 stats_journal_bytes += chars;
1513 pthread_mutex_unlock(&stats_lock);
1514 }
1516 return chars;
1517 } /* }}} static int journal_write */
1519 static int journal_replay (const char *file) /* {{{ */
1520 {
1521 FILE *fh;
1522 int entry_cnt = 0;
1523 int fail_cnt = 0;
1524 uint64_t line = 0;
1525 char entry[CMD_MAX];
1527 if (file == NULL) return 0;
1529 fh = fopen(file, "r");
1530 if (fh == NULL)
1531 {
1532 if (errno != ENOENT)
1533 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1534 file, rrd_strerror(errno));
1535 return 0;
1536 }
1537 else
1538 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1540 while(!feof(fh))
1541 {
1542 size_t entry_len;
1544 ++line;
1545 if (fgets(entry, sizeof(entry), fh) == NULL)
1546 break;
1547 entry_len = strlen(entry);
1549 /* check \n termination in case journal writing crashed mid-line */
1550 if (entry_len == 0)
1551 continue;
1552 else if (entry[entry_len - 1] != '\n')
1553 {
1554 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1555 ++fail_cnt;
1556 continue;
1557 }
1559 entry[entry_len - 1] = '\0';
1561 if (handle_request(-1, PRIV_HIGH, entry, entry_len) == 0)
1562 ++entry_cnt;
1563 else
1564 ++fail_cnt;
1565 }
1567 fclose(fh);
1569 if (entry_cnt > 0)
1570 {
1571 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1572 entry_cnt, fail_cnt);
1573 return 1;
1574 }
1575 else
1576 return 0;
1578 } /* }}} static int journal_replay */
1580 static void *connection_thread_main (void *args) /* {{{ */
1581 {
1582 pthread_t self;
1583 listen_socket_t *sock;
1584 int i;
1585 int fd;
1587 sock = (listen_socket_t *) args;
1588 fd = sock->fd;
1590 pthread_mutex_lock (&connection_threads_lock);
1591 {
1592 pthread_t *temp;
1594 temp = (pthread_t *) realloc (connection_threads,
1595 sizeof (pthread_t) * (connection_threads_num + 1));
1596 if (temp == NULL)
1597 {
1598 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1599 }
1600 else
1601 {
1602 connection_threads = temp;
1603 connection_threads[connection_threads_num] = pthread_self ();
1604 connection_threads_num++;
1605 }
1606 }
1607 pthread_mutex_unlock (&connection_threads_lock);
1609 while (do_shutdown == 0)
1610 {
1611 char buffer[CMD_MAX];
1613 struct pollfd pollfd;
1614 int status;
1616 pollfd.fd = fd;
1617 pollfd.events = POLLIN | POLLPRI;
1618 pollfd.revents = 0;
1620 status = poll (&pollfd, 1, /* timeout = */ 500);
1621 if (do_shutdown)
1622 break;
1623 else if (status == 0) /* timeout */
1624 continue;
1625 else if (status < 0) /* error */
1626 {
1627 status = errno;
1628 if (status == EINTR)
1629 continue;
1630 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1631 continue;
1632 }
1634 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1635 {
1636 close (fd);
1637 break;
1638 }
1639 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1640 {
1641 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1642 "poll(2) returned something unexpected: %#04hx",
1643 pollfd.revents);
1644 close (fd);
1645 break;
1646 }
1648 status = (int) sread (fd, buffer, sizeof (buffer));
1649 if (status <= 0)
1650 {
1651 close (fd);
1653 if (status < 0)
1654 RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1656 break;
1657 }
1659 status = handle_request (fd, sock->privilege, buffer, status);
1660 if (status != 0)
1661 break;
1662 }
1664 close(fd);
1665 free(args);
1667 self = pthread_self ();
1668 /* Remove this thread from the connection threads list */
1669 pthread_mutex_lock (&connection_threads_lock);
1670 /* Find out own index in the array */
1671 for (i = 0; i < connection_threads_num; i++)
1672 if (pthread_equal (connection_threads[i], self) != 0)
1673 break;
1674 assert (i < connection_threads_num);
1676 /* Move the trailing threads forward. */
1677 if (i < (connection_threads_num - 1))
1678 {
1679 memmove (connection_threads + i,
1680 connection_threads + i + 1,
1681 sizeof (pthread_t) * (connection_threads_num - i - 1));
1682 }
1684 connection_threads_num--;
1685 pthread_mutex_unlock (&connection_threads_lock);
1687 return (NULL);
1688 } /* }}} void *connection_thread_main */
1690 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1691 {
1692 int fd;
1693 struct sockaddr_un sa;
1694 listen_socket_t *temp;
1695 int status;
1696 const char *path;
1698 path = sock->addr;
1699 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1700 path += strlen("unix:");
1702 temp = (listen_socket_t *) realloc (listen_fds,
1703 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1704 if (temp == NULL)
1705 {
1706 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1707 return (-1);
1708 }
1709 listen_fds = temp;
1710 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1712 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1713 if (fd < 0)
1714 {
1715 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1716 return (-1);
1717 }
1719 memset (&sa, 0, sizeof (sa));
1720 sa.sun_family = AF_UNIX;
1721 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1723 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1724 if (status != 0)
1725 {
1726 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1727 close (fd);
1728 unlink (path);
1729 return (-1);
1730 }
1732 status = listen (fd, /* backlog = */ 10);
1733 if (status != 0)
1734 {
1735 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1736 close (fd);
1737 unlink (path);
1738 return (-1);
1739 }
1741 listen_fds[listen_fds_num].fd = fd;
1742 listen_fds[listen_fds_num].family = PF_UNIX;
1743 strncpy(listen_fds[listen_fds_num].addr, path,
1744 sizeof (listen_fds[listen_fds_num].addr) - 1);
1745 listen_fds_num++;
1747 return (0);
1748 } /* }}} int open_listen_socket_unix */
1750 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1751 {
1752 struct addrinfo ai_hints;
1753 struct addrinfo *ai_res;
1754 struct addrinfo *ai_ptr;
1755 char addr_copy[NI_MAXHOST];
1756 char *addr;
1757 char *port;
1758 int status;
1760 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1761 addr_copy[sizeof (addr_copy) - 1] = 0;
1762 addr = addr_copy;
1764 memset (&ai_hints, 0, sizeof (ai_hints));
1765 ai_hints.ai_flags = 0;
1766 #ifdef AI_ADDRCONFIG
1767 ai_hints.ai_flags |= AI_ADDRCONFIG;
1768 #endif
1769 ai_hints.ai_family = AF_UNSPEC;
1770 ai_hints.ai_socktype = SOCK_STREAM;
1772 port = NULL;
1773 if (*addr == '[') /* IPv6+port format */
1774 {
1775 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1776 addr++;
1778 port = strchr (addr, ']');
1779 if (port == NULL)
1780 {
1781 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1782 sock->addr);
1783 return (-1);
1784 }
1785 *port = 0;
1786 port++;
1788 if (*port == ':')
1789 port++;
1790 else if (*port == 0)
1791 port = NULL;
1792 else
1793 {
1794 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1795 port);
1796 return (-1);
1797 }
1798 } /* if (*addr = ']') */
1799 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1800 {
1801 port = rindex(addr, ':');
1802 if (port != NULL)
1803 {
1804 *port = 0;
1805 port++;
1806 }
1807 }
1808 ai_res = NULL;
1809 status = getaddrinfo (addr,
1810 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1811 &ai_hints, &ai_res);
1812 if (status != 0)
1813 {
1814 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1815 "%s", addr, gai_strerror (status));
1816 return (-1);
1817 }
1819 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1820 {
1821 int fd;
1822 listen_socket_t *temp;
1823 int one = 1;
1825 temp = (listen_socket_t *) realloc (listen_fds,
1826 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1827 if (temp == NULL)
1828 {
1829 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1830 continue;
1831 }
1832 listen_fds = temp;
1833 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1835 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1836 if (fd < 0)
1837 {
1838 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1839 continue;
1840 }
1842 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1844 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1845 if (status != 0)
1846 {
1847 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1848 close (fd);
1849 continue;
1850 }
1852 status = listen (fd, /* backlog = */ 10);
1853 if (status != 0)
1854 {
1855 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1856 close (fd);
1857 return (-1);
1858 }
1860 listen_fds[listen_fds_num].fd = fd;
1861 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1862 listen_fds_num++;
1863 } /* for (ai_ptr) */
1865 return (0);
1866 } /* }}} static int open_listen_socket_network */
1868 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1869 {
1870 assert(sock != NULL);
1871 assert(sock->addr != NULL);
1873 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1874 || sock->addr[0] == '/')
1875 return (open_listen_socket_unix(sock));
1876 else
1877 return (open_listen_socket_network(sock));
1878 } /* }}} int open_listen_socket */
1880 static int close_listen_sockets (void) /* {{{ */
1881 {
1882 size_t i;
1884 for (i = 0; i < listen_fds_num; i++)
1885 {
1886 close (listen_fds[i].fd);
1888 if (listen_fds[i].family == PF_UNIX)
1889 unlink(listen_fds[i].addr);
1890 }
1892 free (listen_fds);
1893 listen_fds = NULL;
1894 listen_fds_num = 0;
1896 return (0);
1897 } /* }}} int close_listen_sockets */
1899 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1900 {
1901 struct pollfd *pollfds;
1902 int pollfds_num;
1903 int status;
1904 int i;
1906 for (i = 0; i < config_listen_address_list_len; i++)
1907 open_listen_socket (config_listen_address_list[i]);
1909 if (config_listen_address_list_len < 1)
1910 {
1911 listen_socket_t sock;
1912 memset(&sock, 0, sizeof(sock));
1913 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1914 open_listen_socket (&sock);
1915 }
1917 if (listen_fds_num < 1)
1918 {
1919 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1920 "could be opened. Sorry.");
1921 return (NULL);
1922 }
1924 pollfds_num = listen_fds_num;
1925 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1926 if (pollfds == NULL)
1927 {
1928 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1929 return (NULL);
1930 }
1931 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1933 RRDD_LOG(LOG_INFO, "listening for connections");
1935 while (do_shutdown == 0)
1936 {
1937 assert (pollfds_num == ((int) listen_fds_num));
1938 for (i = 0; i < pollfds_num; i++)
1939 {
1940 pollfds[i].fd = listen_fds[i].fd;
1941 pollfds[i].events = POLLIN | POLLPRI;
1942 pollfds[i].revents = 0;
1943 }
1945 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1946 if (do_shutdown)
1947 break;
1948 else if (status == 0) /* timeout */
1949 continue;
1950 else if (status < 0) /* error */
1951 {
1952 status = errno;
1953 if (status != EINTR)
1954 {
1955 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1956 }
1957 continue;
1958 }
1960 for (i = 0; i < pollfds_num; i++)
1961 {
1962 listen_socket_t *client_sock;
1963 struct sockaddr_storage client_sa;
1964 socklen_t client_sa_size;
1965 pthread_t tid;
1966 pthread_attr_t attr;
1968 if (pollfds[i].revents == 0)
1969 continue;
1971 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1972 {
1973 RRDD_LOG (LOG_ERR, "listen_thread_main: "
1974 "poll(2) returned something unexpected for listen FD #%i.",
1975 pollfds[i].fd);
1976 continue;
1977 }
1979 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1980 if (client_sock == NULL)
1981 {
1982 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1983 continue;
1984 }
1985 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1987 client_sa_size = sizeof (client_sa);
1988 client_sock->fd = accept (pollfds[i].fd,
1989 (struct sockaddr *) &client_sa, &client_sa_size);
1990 if (client_sock->fd < 0)
1991 {
1992 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1993 free(client_sock);
1994 continue;
1995 }
1997 pthread_attr_init (&attr);
1998 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2000 status = pthread_create (&tid, &attr, connection_thread_main,
2001 client_sock);
2002 if (status != 0)
2003 {
2004 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2005 close (client_sock->fd);
2006 free (client_sock);
2007 continue;
2008 }
2009 } /* for (pollfds_num) */
2010 } /* while (do_shutdown == 0) */
2012 RRDD_LOG(LOG_INFO, "starting shutdown");
2014 close_listen_sockets ();
2016 pthread_mutex_lock (&connection_threads_lock);
2017 while (connection_threads_num > 0)
2018 {
2019 pthread_t wait_for;
2021 wait_for = connection_threads[0];
2023 pthread_mutex_unlock (&connection_threads_lock);
2024 pthread_join (wait_for, /* retval = */ NULL);
2025 pthread_mutex_lock (&connection_threads_lock);
2026 }
2027 pthread_mutex_unlock (&connection_threads_lock);
2029 return (NULL);
2030 } /* }}} void *listen_thread_main */
2032 static int daemonize (void) /* {{{ */
2033 {
2034 int status;
2035 int fd;
2036 char *base_dir;
2038 fd = open_pidfile();
2039 if (fd < 0) return fd;
2041 if (!stay_foreground)
2042 {
2043 pid_t child;
2045 child = fork ();
2046 if (child < 0)
2047 {
2048 fprintf (stderr, "daemonize: fork(2) failed.\n");
2049 return (-1);
2050 }
2051 else if (child > 0)
2052 {
2053 return (1);
2054 }
2056 /* Become session leader */
2057 setsid ();
2059 /* Open the first three file descriptors to /dev/null */
2060 close (2);
2061 close (1);
2062 close (0);
2064 open ("/dev/null", O_RDWR);
2065 dup (0);
2066 dup (0);
2067 } /* if (!stay_foreground) */
2069 /* Change into the /tmp directory. */
2070 base_dir = (config_base_dir != NULL)
2071 ? config_base_dir
2072 : "/tmp";
2073 status = chdir (base_dir);
2074 if (status != 0)
2075 {
2076 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2077 return (-1);
2078 }
2080 install_signal_handlers();
2082 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2083 RRDD_LOG(LOG_INFO, "starting up");
2085 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2086 if (cache_tree == NULL)
2087 {
2088 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2089 return (-1);
2090 }
2092 status = write_pidfile (fd);
2093 return status;
2094 } /* }}} int daemonize */
2096 static int cleanup (void) /* {{{ */
2097 {
2098 do_shutdown++;
2100 pthread_cond_signal (&cache_cond);
2101 pthread_join (queue_thread, /* return = */ NULL);
2103 remove_pidfile ();
2105 RRDD_LOG(LOG_INFO, "goodbye");
2106 closelog ();
2108 return (0);
2109 } /* }}} int cleanup */
2111 static int read_options (int argc, char **argv) /* {{{ */
2112 {
2113 int option;
2114 int status = 0;
2116 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2117 {
2118 switch (option)
2119 {
2120 case 'g':
2121 stay_foreground=1;
2122 break;
2124 case 'L':
2125 case 'l':
2126 {
2127 listen_socket_t **temp;
2128 listen_socket_t *new;
2130 new = malloc(sizeof(listen_socket_t));
2131 if (new == NULL)
2132 {
2133 fprintf(stderr, "read_options: malloc failed.\n");
2134 return(2);
2135 }
2136 memset(new, 0, sizeof(listen_socket_t));
2138 temp = (listen_socket_t **) realloc (config_listen_address_list,
2139 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2140 if (temp == NULL)
2141 {
2142 fprintf (stderr, "read_options: realloc failed.\n");
2143 return (2);
2144 }
2145 config_listen_address_list = temp;
2147 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2148 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2150 temp[config_listen_address_list_len] = new;
2151 config_listen_address_list_len++;
2152 }
2153 break;
2155 case 'f':
2156 {
2157 int temp;
2159 temp = atoi (optarg);
2160 if (temp > 0)
2161 config_flush_interval = temp;
2162 else
2163 {
2164 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2165 status = 3;
2166 }
2167 }
2168 break;
2170 case 'w':
2171 {
2172 int temp;
2174 temp = atoi (optarg);
2175 if (temp > 0)
2176 config_write_interval = temp;
2177 else
2178 {
2179 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2180 status = 2;
2181 }
2182 }
2183 break;
2185 case 'z':
2186 {
2187 int temp;
2189 temp = atoi(optarg);
2190 if (temp > 0)
2191 config_write_jitter = temp;
2192 else
2193 {
2194 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2195 status = 2;
2196 }
2198 break;
2199 }
2201 case 'B':
2202 config_write_base_only = 1;
2203 break;
2205 case 'b':
2206 {
2207 size_t len;
2209 if (config_base_dir != NULL)
2210 free (config_base_dir);
2211 config_base_dir = strdup (optarg);
2212 if (config_base_dir == NULL)
2213 {
2214 fprintf (stderr, "read_options: strdup failed.\n");
2215 return (3);
2216 }
2218 len = strlen (config_base_dir);
2219 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2220 {
2221 config_base_dir[len - 1] = 0;
2222 len--;
2223 }
2225 if (len < 1)
2226 {
2227 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2228 return (4);
2229 }
2231 _config_base_dir_len = len;
2232 }
2233 break;
2235 case 'p':
2236 {
2237 if (config_pid_file != NULL)
2238 free (config_pid_file);
2239 config_pid_file = strdup (optarg);
2240 if (config_pid_file == NULL)
2241 {
2242 fprintf (stderr, "read_options: strdup failed.\n");
2243 return (3);
2244 }
2245 }
2246 break;
2248 case 'F':
2249 config_flush_at_shutdown = 1;
2250 break;
2252 case 'j':
2253 {
2254 struct stat statbuf;
2255 const char *dir = optarg;
2257 status = stat(dir, &statbuf);
2258 if (status != 0)
2259 {
2260 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2261 return 6;
2262 }
2264 if (!S_ISDIR(statbuf.st_mode)
2265 || access(dir, R_OK|W_OK|X_OK) != 0)
2266 {
2267 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2268 errno ? rrd_strerror(errno) : "");
2269 return 6;
2270 }
2272 journal_cur = malloc(PATH_MAX + 1);
2273 journal_old = malloc(PATH_MAX + 1);
2274 if (journal_cur == NULL || journal_old == NULL)
2275 {
2276 fprintf(stderr, "malloc failure for journal files\n");
2277 return 6;
2278 }
2279 else
2280 {
2281 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2282 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2283 }
2284 }
2285 break;
2287 case 'h':
2288 case '?':
2289 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2290 "\n"
2291 "Usage: rrdcached [options]\n"
2292 "\n"
2293 "Valid options are:\n"
2294 " -l <address> Socket address to listen to.\n"
2295 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2296 " -w <seconds> Interval in which to write data.\n"
2297 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2298 " -f <seconds> Interval in which to flush dead data.\n"
2299 " -p <file> Location of the PID-file.\n"
2300 " -b <dir> Base directory to change to.\n"
2301 " -B Restrict file access to paths within -b <dir>\n"
2302 " -g Do not fork and run in the foreground.\n"
2303 " -j <dir> Directory in which to create the journal files.\n"
2304 " -F Always flush all updates at shutdown\n"
2305 "\n"
2306 "For more information and a detailed description of all options "
2307 "please refer\n"
2308 "to the rrdcached(1) manual page.\n",
2309 VERSION);
2310 status = -1;
2311 break;
2312 } /* switch (option) */
2313 } /* while (getopt) */
2315 /* advise the user when values are not sane */
2316 if (config_flush_interval < 2 * config_write_interval)
2317 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2318 " 2x write interval (-w) !\n");
2319 if (config_write_jitter > config_write_interval)
2320 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2321 " write interval (-w) !\n");
2323 if (config_write_base_only && config_base_dir == NULL)
2324 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2325 " Consult the rrdcached documentation\n");
2327 if (journal_cur == NULL)
2328 config_flush_at_shutdown = 1;
2330 return (status);
2331 } /* }}} int read_options */
2333 int main (int argc, char **argv)
2334 {
2335 int status;
2337 status = read_options (argc, argv);
2338 if (status != 0)
2339 {
2340 if (status < 0)
2341 status = 0;
2342 return (status);
2343 }
2345 status = daemonize ();
2346 if (status == 1)
2347 {
2348 struct sigaction sigchld;
2350 memset (&sigchld, 0, sizeof (sigchld));
2351 sigchld.sa_handler = SIG_IGN;
2352 sigaction (SIGCHLD, &sigchld, NULL);
2354 return (0);
2355 }
2356 else if (status != 0)
2357 {
2358 fprintf (stderr, "daemonize failed, exiting.\n");
2359 return (1);
2360 }
2362 if (journal_cur != NULL)
2363 {
2364 int had_journal = 0;
2366 pthread_mutex_lock(&journal_lock);
2368 RRDD_LOG(LOG_INFO, "checking for journal files");
2370 had_journal += journal_replay(journal_old);
2371 had_journal += journal_replay(journal_cur);
2373 if (had_journal)
2374 flush_old_values(-1);
2376 pthread_mutex_unlock(&journal_lock);
2377 journal_rotate();
2379 RRDD_LOG(LOG_INFO, "journal processing complete");
2380 }
2382 /* start the queue thread */
2383 memset (&queue_thread, 0, sizeof (queue_thread));
2384 status = pthread_create (&queue_thread,
2385 NULL, /* attr */
2386 queue_thread_main,
2387 NULL); /* args */
2388 if (status != 0)
2389 {
2390 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2391 cleanup();
2392 return (1);
2393 }
2395 listen_thread_main (NULL);
2396 cleanup ();
2398 return (0);
2399 } /* int main */
2401 /*
2402 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2403 */