52e9f12ba06301157db4a80ea4dce0164d19d490
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111 * Types
112 */
113 typedef enum
114 {
115 PRIV_LOW,
116 PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
122 {
123 int fd;
124 char addr[PATH_MAX + 1];
125 int family;
126 socket_privilege privilege;
128 /* state for BATCH processing */
129 time_t batch_start;
130 int batch_cmd;
132 /* buffered IO */
133 char *rbuf;
134 off_t next_cmd;
135 off_t next_read;
137 char *wbuf;
138 ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146 char *file;
147 char **values;
148 int values_num;
149 time_t last_flush_time;
150 time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153 int flags;
154 pthread_cond_t flushed;
155 cache_item_t *prev;
156 cache_item_t *next;
157 };
159 struct callback_flush_data_s
160 {
161 time_t now;
162 time_t abs_timeout;
163 char **keys;
164 size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
169 {
170 HEAD,
171 TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180 * Variables
181 */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree *cache_tree = NULL;
198 static cache_item_t *cache_queue_head = NULL;
199 static cache_item_t *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /*
234 * Functions
235 */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239 do_shutdown++;
240 pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250 sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255 config_flush_at_shutdown = 1;
256 sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261 config_flush_at_shutdown = 0;
262 sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
266 {
267 /* These structures are static, because `sigaction' behaves weird if the are
268 * overwritten.. */
269 static struct sigaction sa_int;
270 static struct sigaction sa_term;
271 static struct sigaction sa_pipe;
272 static struct sigaction sa_usr1;
273 static struct sigaction sa_usr2;
275 /* Install signal handlers */
276 memset (&sa_int, 0, sizeof (sa_int));
277 sa_int.sa_handler = sig_int_handler;
278 sigaction (SIGINT, &sa_int, NULL);
280 memset (&sa_term, 0, sizeof (sa_term));
281 sa_term.sa_handler = sig_term_handler;
282 sigaction (SIGTERM, &sa_term, NULL);
284 memset (&sa_pipe, 0, sizeof (sa_pipe));
285 sa_pipe.sa_handler = SIG_IGN;
286 sigaction (SIGPIPE, &sa_pipe, NULL);
288 memset (&sa_pipe, 0, sizeof (sa_usr1));
289 sa_usr1.sa_handler = sig_usr1_handler;
290 sigaction (SIGUSR1, &sa_usr1, NULL);
292 memset (&sa_usr2, 0, sizeof (sa_usr2));
293 sa_usr2.sa_handler = sig_usr2_handler;
294 sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300 int fd;
301 char *file;
303 file = (config_pid_file != NULL)
304 ? config_pid_file
305 : LOCALSTATEDIR "/run/rrdcached.pid";
307 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308 if (fd < 0)
309 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310 action, file, rrd_strerror(errno));
312 return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318 int pid_fd;
319 pid_t pid;
320 char pid_str[16];
322 pid_fd = open_pidfile("open", O_RDWR);
323 if (pid_fd < 0)
324 return pid_fd;
326 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327 return -1;
329 pid = atoi(pid_str);
330 if (pid <= 0)
331 return -1;
333 /* another running process that we can signal COULD be
334 * a competing rrdcached */
335 if (pid != getpid() && kill(pid, 0) == 0)
336 {
337 fprintf(stderr,
338 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339 close(pid_fd);
340 return -1;
341 }
343 lseek(pid_fd, 0, SEEK_SET);
344 ftruncate(pid_fd, 0);
346 fprintf(stderr,
347 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348 "rrdcached: starting normally.\n", pid);
350 return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
354 {
355 pid_t pid;
356 FILE *fh;
358 pid = getpid ();
360 fh = fdopen (fd, "w");
361 if (fh == NULL)
362 {
363 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364 close(fd);
365 return (-1);
366 }
368 fprintf (fh, "%i\n", (int) pid);
369 fclose (fh);
371 return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
375 {
376 char *file;
377 int status;
379 file = (config_pid_file != NULL)
380 ? config_pid_file
381 : LOCALSTATEDIR "/run/rrdcached.pid";
383 status = unlink (file);
384 if (status == 0)
385 return (0);
386 return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391 char *eol;
393 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394 sock->next_read - sock->next_cmd);
396 if (eol == NULL)
397 {
398 /* no commands left, move remainder back to front of rbuf */
399 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400 sock->next_read - sock->next_cmd);
401 sock->next_read -= sock->next_cmd;
402 sock->next_cmd = 0;
403 *len = 0;
404 return NULL;
405 }
406 else
407 {
408 char *cmd = sock->rbuf + sock->next_cmd;
409 *eol = '\0';
411 sock->next_cmd = eol - sock->rbuf + 1;
413 if (eol > sock->rbuf && *(eol-1) == '\r')
414 *(--eol) = '\0'; /* handle "\r\n" EOL */
416 *len = eol - cmd;
418 return cmd;
419 }
421 /* NOTREACHED */
422 assert(1==0);
423 }
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428 char *new_buf;
430 assert(sock != NULL);
432 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433 if (new_buf == NULL)
434 {
435 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436 return -1;
437 }
439 strncpy(new_buf + sock->wbuf_len, str, len + 1);
441 sock->wbuf = new_buf;
442 sock->wbuf_len += len;
444 return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450 va_list argp;
451 char buffer[CMD_MAX];
452 int len;
454 if (sock == NULL) return 0; /* journal replay mode */
455 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457 va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461 len = vsprintf(buffer, fmt, argp);
462 #endif
463 va_end(argp);
464 if (len < 0)
465 {
466 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467 return -1;
468 }
470 return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
474 {
475 int lines = 0;
477 if (str != NULL)
478 {
479 while ((str = strchr(str, '\n')) != NULL)
480 {
481 ++lines;
482 ++str;
483 }
484 }
486 return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490 * returns 0 on success, -1 on error
491 * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493 char *fmt, ...) /* {{{ */
494 {
495 va_list argp;
496 char buffer[CMD_MAX];
497 int lines;
498 ssize_t wrote;
499 int rclen, len;
501 if (sock == NULL) return rc; /* journal replay mode */
503 if (sock->batch_start)
504 {
505 if (rc == RESP_OK)
506 return rc; /* no response on success during BATCH */
507 lines = sock->batch_cmd;
508 }
509 else if (rc == RESP_OK)
510 lines = count_lines(sock->wbuf);
511 else
512 lines = -1;
514 rclen = sprintf(buffer, "%d ", lines);
515 va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519 len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521 va_end(argp);
522 if (len < 0)
523 return -1;
525 len += rclen;
527 /* append the result to the wbuf, don't write to the user */
528 if (sock->batch_start)
529 return add_to_wbuf(sock, buffer, len);
531 /* first write must be complete */
532 if (len != write(sock->fd, buffer, len))
533 {
534 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535 return -1;
536 }
538 if (sock->wbuf != NULL && rc == RESP_OK)
539 {
540 wrote = 0;
541 while (wrote < sock->wbuf_len)
542 {
543 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544 if (wb <= 0)
545 {
546 RRDD_LOG(LOG_INFO, "send_response: could not write results");
547 return -1;
548 }
549 wrote += wb;
550 }
551 }
553 free(sock->wbuf); sock->wbuf = NULL;
554 sock->wbuf_len = 0;
556 return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561 ci->values = NULL;
562 ci->values_num = 0;
564 ci->last_flush_time = when;
565 if (config_write_jitter > 0)
566 ci->last_flush_time += (random() % config_write_jitter);
567 }
569 /* remove_from_queue
570 * remove a "cache_item_t" item from the queue.
571 * must hold 'cache_lock' when calling this
572 */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575 if (ci == NULL) return;
576 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578 if (ci->prev == NULL)
579 cache_queue_head = ci->next; /* reset head */
580 else
581 ci->prev->next = ci->next;
583 if (ci->next == NULL)
584 cache_queue_tail = ci->prev; /* reset the tail */
585 else
586 ci->next->prev = ci->prev;
588 ci->next = ci->prev = NULL;
589 ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
592 /* remove an entry from the tree and free all its resources.
593 * must hold 'cache lock' while calling this.
594 * returns 0 on success, otherwise errno */
595 static int forget_file(const char *file)
596 {
597 cache_item_t *ci;
599 ci = g_tree_lookup(cache_tree, file);
600 if (ci == NULL)
601 return ENOENT;
603 g_tree_remove (cache_tree, file);
604 remove_from_queue(ci);
606 for (int i=0; i < ci->values_num; i++)
607 free(ci->values[i]);
609 free (ci->values);
610 free (ci->file);
612 /* in case anyone is waiting */
613 pthread_cond_broadcast(&ci->flushed);
615 free (ci);
617 return 0;
618 } /* }}} static int forget_file */
620 /*
621 * enqueue_cache_item:
622 * `cache_lock' must be acquired before calling this function!
623 */
624 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
625 queue_side_t side)
626 {
627 if (ci == NULL)
628 return (-1);
630 if (ci->values_num == 0)
631 return (0);
633 if (side == HEAD)
634 {
635 if (cache_queue_head == ci)
636 return 0;
638 /* remove if further down in queue */
639 remove_from_queue(ci);
641 ci->prev = NULL;
642 ci->next = cache_queue_head;
643 if (ci->next != NULL)
644 ci->next->prev = ci;
645 cache_queue_head = ci;
647 if (cache_queue_tail == NULL)
648 cache_queue_tail = cache_queue_head;
649 }
650 else /* (side == TAIL) */
651 {
652 /* We don't move values back in the list.. */
653 if (ci->flags & CI_FLAGS_IN_QUEUE)
654 return (0);
656 assert (ci->next == NULL);
657 assert (ci->prev == NULL);
659 ci->prev = cache_queue_tail;
661 if (cache_queue_tail == NULL)
662 cache_queue_head = ci;
663 else
664 cache_queue_tail->next = ci;
666 cache_queue_tail = ci;
667 }
669 ci->flags |= CI_FLAGS_IN_QUEUE;
671 pthread_cond_broadcast(&cache_cond);
672 pthread_mutex_lock (&stats_lock);
673 stats_queue_length++;
674 pthread_mutex_unlock (&stats_lock);
676 return (0);
677 } /* }}} int enqueue_cache_item */
679 /*
680 * tree_callback_flush:
681 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
682 * while this is in progress.
683 */
684 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
685 gpointer data)
686 {
687 cache_item_t *ci;
688 callback_flush_data_t *cfd;
690 ci = (cache_item_t *) value;
691 cfd = (callback_flush_data_t *) data;
693 if (ci->flags & CI_FLAGS_IN_QUEUE)
694 return FALSE;
696 if ((ci->last_flush_time <= cfd->abs_timeout)
697 && (ci->values_num > 0))
698 {
699 enqueue_cache_item (ci, TAIL);
700 }
701 else if ((do_shutdown != 0)
702 && (ci->values_num > 0))
703 {
704 enqueue_cache_item (ci, TAIL);
705 }
706 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
707 && (ci->values_num <= 0))
708 {
709 char **temp;
711 temp = (char **) realloc (cfd->keys,
712 sizeof (char *) * (cfd->keys_num + 1));
713 if (temp == NULL)
714 {
715 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
716 return (FALSE);
717 }
718 cfd->keys = temp;
719 /* Make really sure this points to the _same_ place */
720 assert ((char *) key == ci->file);
721 cfd->keys[cfd->keys_num] = (char *) key;
722 cfd->keys_num++;
723 }
725 return (FALSE);
726 } /* }}} gboolean tree_callback_flush */
728 static int flush_old_values (int max_age)
729 {
730 callback_flush_data_t cfd;
731 size_t k;
733 memset (&cfd, 0, sizeof (cfd));
734 /* Pass the current time as user data so that we don't need to call
735 * `time' for each node. */
736 cfd.now = time (NULL);
737 cfd.keys = NULL;
738 cfd.keys_num = 0;
740 if (max_age > 0)
741 cfd.abs_timeout = cfd.now - max_age;
742 else
743 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
745 /* `tree_callback_flush' will return the keys of all values that haven't
746 * been touched in the last `config_flush_interval' seconds in `cfd'.
747 * The char*'s in this array point to the same memory as ci->file, so we
748 * don't need to free them separately. */
749 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
751 for (k = 0; k < cfd.keys_num; k++)
752 {
753 /* should never fail, since we have held the cache_lock
754 * the entire time */
755 assert( forget_file(cfd.keys[k]) == 0 );
756 }
758 if (cfd.keys != NULL)
759 {
760 free (cfd.keys);
761 cfd.keys = NULL;
762 }
764 return (0);
765 } /* int flush_old_values */
767 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
768 {
769 struct timeval now;
770 struct timespec next_flush;
771 int final_flush = 0; /* make sure we only flush once on shutdown */
773 gettimeofday (&now, NULL);
774 next_flush.tv_sec = now.tv_sec + config_flush_interval;
775 next_flush.tv_nsec = 1000 * now.tv_usec;
777 pthread_mutex_lock (&cache_lock);
778 while ((do_shutdown == 0) || (cache_queue_head != NULL))
779 {
780 cache_item_t *ci;
781 char *file;
782 char **values;
783 int values_num;
784 int status;
785 int i;
787 /* First, check if it's time to do the cache flush. */
788 gettimeofday (&now, NULL);
789 if ((now.tv_sec > next_flush.tv_sec)
790 || ((now.tv_sec == next_flush.tv_sec)
791 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
792 {
793 /* Flush all values that haven't been written in the last
794 * `config_write_interval' seconds. */
795 flush_old_values (config_write_interval);
797 /* Determine the time of the next cache flush. */
798 next_flush.tv_sec =
799 now.tv_sec + next_flush.tv_sec % config_flush_interval;
801 /* unlock the cache while we rotate so we don't block incoming
802 * updates if the fsync() blocks on disk I/O */
803 pthread_mutex_unlock(&cache_lock);
804 journal_rotate();
805 pthread_mutex_lock(&cache_lock);
806 }
808 /* Now, check if there's something to store away. If not, wait until
809 * something comes in or it's time to do the cache flush. if we are
810 * shutting down, do not wait around. */
811 if (cache_queue_head == NULL && !do_shutdown)
812 {
813 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
814 if ((status != 0) && (status != ETIMEDOUT))
815 {
816 RRDD_LOG (LOG_ERR, "queue_thread_main: "
817 "pthread_cond_timedwait returned %i.", status);
818 }
819 }
821 /* We're about to shut down */
822 if (do_shutdown != 0 && !final_flush++)
823 {
824 if (config_flush_at_shutdown)
825 flush_old_values (-1); /* flush everything */
826 else
827 break;
828 }
830 /* Check if a value has arrived. This may be NULL if we timed out or there
831 * was an interrupt such as a signal. */
832 if (cache_queue_head == NULL)
833 continue;
835 ci = cache_queue_head;
837 /* copy the relevant parts */
838 file = strdup (ci->file);
839 if (file == NULL)
840 {
841 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
842 continue;
843 }
845 assert(ci->values != NULL);
846 assert(ci->values_num > 0);
848 values = ci->values;
849 values_num = ci->values_num;
851 wipe_ci_values(ci, time(NULL));
852 remove_from_queue(ci);
854 pthread_mutex_lock (&stats_lock);
855 assert (stats_queue_length > 0);
856 stats_queue_length--;
857 pthread_mutex_unlock (&stats_lock);
859 pthread_mutex_unlock (&cache_lock);
861 rrd_clear_error ();
862 status = rrd_update_r (file, NULL, values_num, (void *) values);
863 if (status != 0)
864 {
865 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
866 "rrd_update_r (%s) failed with status %i. (%s)",
867 file, status, rrd_get_error());
868 }
870 journal_write("wrote", file);
871 pthread_cond_broadcast(&ci->flushed);
873 for (i = 0; i < values_num; i++)
874 free (values[i]);
876 free(values);
877 free(file);
879 if (status == 0)
880 {
881 pthread_mutex_lock (&stats_lock);
882 stats_updates_written++;
883 stats_data_sets_written += values_num;
884 pthread_mutex_unlock (&stats_lock);
885 }
887 pthread_mutex_lock (&cache_lock);
889 /* We're about to shut down */
890 if (do_shutdown != 0 && !final_flush++)
891 {
892 if (config_flush_at_shutdown)
893 flush_old_values (-1); /* flush everything */
894 else
895 break;
896 }
897 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
898 pthread_mutex_unlock (&cache_lock);
900 if (config_flush_at_shutdown)
901 {
902 assert(cache_queue_head == NULL);
903 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
904 }
906 journal_done();
908 return (NULL);
909 } /* }}} void *queue_thread_main */
911 static int buffer_get_field (char **buffer_ret, /* {{{ */
912 size_t *buffer_size_ret, char **field_ret)
913 {
914 char *buffer;
915 size_t buffer_pos;
916 size_t buffer_size;
917 char *field;
918 size_t field_size;
919 int status;
921 buffer = *buffer_ret;
922 buffer_pos = 0;
923 buffer_size = *buffer_size_ret;
924 field = *buffer_ret;
925 field_size = 0;
927 if (buffer_size <= 0)
928 return (-1);
930 /* This is ensured by `handle_request'. */
931 assert (buffer[buffer_size - 1] == '\0');
933 status = -1;
934 while (buffer_pos < buffer_size)
935 {
936 /* Check for end-of-field or end-of-buffer */
937 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
938 {
939 field[field_size] = 0;
940 field_size++;
941 buffer_pos++;
942 status = 0;
943 break;
944 }
945 /* Handle escaped characters. */
946 else if (buffer[buffer_pos] == '\\')
947 {
948 if (buffer_pos >= (buffer_size - 1))
949 break;
950 buffer_pos++;
951 field[field_size] = buffer[buffer_pos];
952 field_size++;
953 buffer_pos++;
954 }
955 /* Normal operation */
956 else
957 {
958 field[field_size] = buffer[buffer_pos];
959 field_size++;
960 buffer_pos++;
961 }
962 } /* while (buffer_pos < buffer_size) */
964 if (status != 0)
965 return (status);
967 *buffer_ret = buffer + buffer_pos;
968 *buffer_size_ret = buffer_size - buffer_pos;
969 *field_ret = field;
971 return (0);
972 } /* }}} int buffer_get_field */
974 /* if we're restricting writes to the base directory,
975 * check whether the file falls within the dir
976 * returns 1 if OK, otherwise 0
977 */
978 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
979 {
980 assert(file != NULL);
982 if (!config_write_base_only
983 || sock == NULL /* journal replay */
984 || config_base_dir == NULL)
985 return 1;
987 if (strstr(file, "../") != NULL) goto err;
989 /* relative paths without "../" are ok */
990 if (*file != '/') return 1;
992 /* file must be of the format base + "/" + <1+ char filename> */
993 if (strlen(file) < _config_base_dir_len + 2) goto err;
994 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
995 if (*(file + _config_base_dir_len) != '/') goto err;
997 return 1;
999 err:
1000 if (sock != NULL && sock->fd >= 0)
1001 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1003 return 0;
1004 } /* }}} static int check_file_access */
1006 /* when using a base dir, convert relative paths to absolute paths.
1007 * if necessary, modifies the "filename" pointer to point
1008 * to the new path created in "tmp". "tmp" is provided
1009 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1010 *
1011 * this allows us to optimize for the expected case (absolute path)
1012 * with a no-op.
1013 */
1014 static void get_abs_path(char **filename, char *tmp)
1015 {
1016 assert(tmp != NULL);
1017 assert(filename != NULL && *filename != NULL);
1019 if (config_base_dir == NULL || **filename == '/')
1020 return;
1022 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1023 *filename = tmp;
1024 } /* }}} static int get_abs_path */
1026 /* returns 1 if we have the required privilege level,
1027 * otherwise issue an error to the user on sock */
1028 static int has_privilege (listen_socket_t *sock, /* {{{ */
1029 socket_privilege priv)
1030 {
1031 if (sock == NULL) /* journal replay */
1032 return 1;
1034 if (sock->privilege >= priv)
1035 return 1;
1037 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1038 } /* }}} static int has_privilege */
1040 static int flush_file (const char *filename) /* {{{ */
1041 {
1042 cache_item_t *ci;
1044 pthread_mutex_lock (&cache_lock);
1046 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1047 if (ci == NULL)
1048 {
1049 pthread_mutex_unlock (&cache_lock);
1050 return (ENOENT);
1051 }
1053 if (ci->values_num > 0)
1054 {
1055 /* Enqueue at head */
1056 enqueue_cache_item (ci, HEAD);
1057 pthread_cond_wait(&ci->flushed, &cache_lock);
1058 }
1060 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1061 * may have been purged during our cond_wait() */
1063 pthread_mutex_unlock(&cache_lock);
1065 return (0);
1066 } /* }}} int flush_file */
1068 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1069 char *buffer, size_t buffer_size)
1070 {
1071 int status;
1072 char **help_text;
1073 char *command;
1075 char *help_help[2] =
1076 {
1077 "Command overview\n"
1078 ,
1079 "HELP [<command>]\n"
1080 "FLUSH <filename>\n"
1081 "FLUSHALL\n"
1082 "PENDING <filename>\n"
1083 "FORGET <filename>\n"
1084 "UPDATE <filename> <values> [<values> ...]\n"
1085 "BATCH\n"
1086 "STATS\n"
1087 };
1089 char *help_flush[2] =
1090 {
1091 "Help for FLUSH\n"
1092 ,
1093 "Usage: FLUSH <filename>\n"
1094 "\n"
1095 "Adds the given filename to the head of the update queue and returns\n"
1096 "after is has been dequeued.\n"
1097 };
1099 char *help_flushall[2] =
1100 {
1101 "Help for FLUSHALL\n"
1102 ,
1103 "Usage: FLUSHALL\n"
1104 "\n"
1105 "Triggers writing of all pending updates. Returns immediately.\n"
1106 };
1108 char *help_pending[2] =
1109 {
1110 "Help for PENDING\n"
1111 ,
1112 "Usage: PENDING <filename>\n"
1113 "\n"
1114 "Shows any 'pending' updates for a file, in order.\n"
1115 "The updates shown have not yet been written to the underlying RRD file.\n"
1116 };
1118 char *help_forget[2] =
1119 {
1120 "Help for FORGET\n"
1121 ,
1122 "Usage: FORGET <filename>\n"
1123 "\n"
1124 "Removes the file completely from the cache.\n"
1125 "Any pending updates for the file will be lost.\n"
1126 };
1128 char *help_update[2] =
1129 {
1130 "Help for UPDATE\n"
1131 ,
1132 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1133 "\n"
1134 "Adds the given file to the internal cache if it is not yet known and\n"
1135 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1136 "for details.\n"
1137 "\n"
1138 "Each <values> has the following form:\n"
1139 " <values> = <time>:<value>[:<value>[...]]\n"
1140 "See the rrdupdate(1) manpage for details.\n"
1141 };
1143 char *help_stats[2] =
1144 {
1145 "Help for STATS\n"
1146 ,
1147 "Usage: STATS\n"
1148 "\n"
1149 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1150 "a description of the values.\n"
1151 };
1153 char *help_batch[2] =
1154 {
1155 "Help for BATCH\n"
1156 ,
1157 "The 'BATCH' command permits the client to initiate a bulk load\n"
1158 " of commands to rrdcached.\n"
1159 "\n"
1160 "Usage:\n"
1161 "\n"
1162 " client: BATCH\n"
1163 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1164 " client: command #1\n"
1165 " client: command #2\n"
1166 " client: ... and so on\n"
1167 " client: .\n"
1168 " server: 2 errors\n"
1169 " server: 7 message for command #7\n"
1170 " server: 9 message for command #9\n"
1171 "\n"
1172 "For more information, consult the rrdcached(1) documentation.\n"
1173 };
1175 status = buffer_get_field (&buffer, &buffer_size, &command);
1176 if (status != 0)
1177 help_text = help_help;
1178 else
1179 {
1180 if (strcasecmp (command, "update") == 0)
1181 help_text = help_update;
1182 else if (strcasecmp (command, "flush") == 0)
1183 help_text = help_flush;
1184 else if (strcasecmp (command, "flushall") == 0)
1185 help_text = help_flushall;
1186 else if (strcasecmp (command, "pending") == 0)
1187 help_text = help_pending;
1188 else if (strcasecmp (command, "forget") == 0)
1189 help_text = help_forget;
1190 else if (strcasecmp (command, "stats") == 0)
1191 help_text = help_stats;
1192 else if (strcasecmp (command, "batch") == 0)
1193 help_text = help_batch;
1194 else
1195 help_text = help_help;
1196 }
1198 add_response_info(sock, help_text[1]);
1199 return send_response(sock, RESP_OK, help_text[0]);
1200 } /* }}} int handle_request_help */
1202 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1203 {
1204 uint64_t copy_queue_length;
1205 uint64_t copy_updates_received;
1206 uint64_t copy_flush_received;
1207 uint64_t copy_updates_written;
1208 uint64_t copy_data_sets_written;
1209 uint64_t copy_journal_bytes;
1210 uint64_t copy_journal_rotate;
1212 uint64_t tree_nodes_number;
1213 uint64_t tree_depth;
1215 pthread_mutex_lock (&stats_lock);
1216 copy_queue_length = stats_queue_length;
1217 copy_updates_received = stats_updates_received;
1218 copy_flush_received = stats_flush_received;
1219 copy_updates_written = stats_updates_written;
1220 copy_data_sets_written = stats_data_sets_written;
1221 copy_journal_bytes = stats_journal_bytes;
1222 copy_journal_rotate = stats_journal_rotate;
1223 pthread_mutex_unlock (&stats_lock);
1225 pthread_mutex_lock (&cache_lock);
1226 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1227 tree_depth = (uint64_t) g_tree_height (cache_tree);
1228 pthread_mutex_unlock (&cache_lock);
1230 add_response_info(sock,
1231 "QueueLength: %"PRIu64"\n", copy_queue_length);
1232 add_response_info(sock,
1233 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1234 add_response_info(sock,
1235 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1236 add_response_info(sock,
1237 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1238 add_response_info(sock,
1239 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1240 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1241 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1242 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1243 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1245 send_response(sock, RESP_OK, "Statistics follow\n");
1247 return (0);
1248 } /* }}} int handle_request_stats */
1250 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1251 char *buffer, size_t buffer_size)
1252 {
1253 char *file, file_tmp[PATH_MAX];
1254 int status;
1256 status = buffer_get_field (&buffer, &buffer_size, &file);
1257 if (status != 0)
1258 {
1259 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1260 }
1261 else
1262 {
1263 pthread_mutex_lock(&stats_lock);
1264 stats_flush_received++;
1265 pthread_mutex_unlock(&stats_lock);
1267 get_abs_path(&file, file_tmp);
1268 if (!check_file_access(file, sock)) return 0;
1270 status = flush_file (file);
1271 if (status == 0)
1272 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1273 else if (status == ENOENT)
1274 {
1275 /* no file in our tree; see whether it exists at all */
1276 struct stat statbuf;
1278 memset(&statbuf, 0, sizeof(statbuf));
1279 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1280 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1281 else
1282 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1283 }
1284 else if (status < 0)
1285 return send_response(sock, RESP_ERR, "Internal error.\n");
1286 else
1287 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1288 }
1290 /* NOTREACHED */
1291 assert(1==0);
1292 } /* }}} int handle_request_flush */
1294 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1295 {
1296 int status;
1298 status = has_privilege(sock, PRIV_HIGH);
1299 if (status <= 0)
1300 return status;
1302 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1304 pthread_mutex_lock(&cache_lock);
1305 flush_old_values(-1);
1306 pthread_mutex_unlock(&cache_lock);
1308 return send_response(sock, RESP_OK, "Started flush.\n");
1309 } /* }}} static int handle_request_flushall */
1311 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1312 char *buffer, size_t buffer_size)
1313 {
1314 int status;
1315 char *file, file_tmp[PATH_MAX];
1316 cache_item_t *ci;
1318 status = buffer_get_field(&buffer, &buffer_size, &file);
1319 if (status != 0)
1320 return send_response(sock, RESP_ERR,
1321 "Usage: PENDING <filename>\n");
1323 status = has_privilege(sock, PRIV_HIGH);
1324 if (status <= 0)
1325 return status;
1327 get_abs_path(&file, file_tmp);
1329 pthread_mutex_lock(&cache_lock);
1330 ci = g_tree_lookup(cache_tree, file);
1331 if (ci == NULL)
1332 {
1333 pthread_mutex_unlock(&cache_lock);
1334 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1335 }
1337 for (int i=0; i < ci->values_num; i++)
1338 add_response_info(sock, "%s\n", ci->values[i]);
1340 pthread_mutex_unlock(&cache_lock);
1341 return send_response(sock, RESP_OK, "updates pending\n");
1342 } /* }}} static int handle_request_pending */
1344 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1345 char *buffer, size_t buffer_size)
1346 {
1347 int status;
1348 char *file, file_tmp[PATH_MAX];
1350 status = buffer_get_field(&buffer, &buffer_size, &file);
1351 if (status != 0)
1352 return send_response(sock, RESP_ERR,
1353 "Usage: FORGET <filename>\n");
1355 status = has_privilege(sock, PRIV_HIGH);
1356 if (status <= 0)
1357 return status;
1359 get_abs_path(&file, file_tmp);
1360 if (!check_file_access(file, sock)) return 0;
1362 pthread_mutex_lock(&cache_lock);
1363 status = forget_file(file);
1364 pthread_mutex_unlock(&cache_lock);
1366 if (status == 0)
1367 {
1368 if (sock != NULL)
1369 journal_write("forget", file);
1371 return send_response(sock, RESP_OK, "Gone!\n");
1372 }
1373 else
1374 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1375 status < 0 ? "Internal error" : rrd_strerror(status));
1377 /* NOTREACHED */
1378 assert(1==0);
1379 } /* }}} static int handle_request_forget */
1381 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1382 time_t now,
1383 char *buffer, size_t buffer_size)
1384 {
1385 char *file, file_tmp[PATH_MAX];
1386 int values_num = 0;
1387 int status;
1388 char orig_buf[CMD_MAX];
1390 cache_item_t *ci;
1392 status = has_privilege(sock, PRIV_HIGH);
1393 if (status <= 0)
1394 return status;
1396 /* save it for the journal later */
1397 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1399 status = buffer_get_field (&buffer, &buffer_size, &file);
1400 if (status != 0)
1401 return send_response(sock, RESP_ERR,
1402 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1404 pthread_mutex_lock(&stats_lock);
1405 stats_updates_received++;
1406 pthread_mutex_unlock(&stats_lock);
1408 get_abs_path(&file, file_tmp);
1409 if (!check_file_access(file, sock)) return 0;
1411 pthread_mutex_lock (&cache_lock);
1412 ci = g_tree_lookup (cache_tree, file);
1414 if (ci == NULL) /* {{{ */
1415 {
1416 struct stat statbuf;
1418 /* don't hold the lock while we setup; stat(2) might block */
1419 pthread_mutex_unlock(&cache_lock);
1421 memset (&statbuf, 0, sizeof (statbuf));
1422 status = stat (file, &statbuf);
1423 if (status != 0)
1424 {
1425 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1427 status = errno;
1428 if (status == ENOENT)
1429 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1430 else
1431 return send_response(sock, RESP_ERR,
1432 "stat failed with error %i.\n", status);
1433 }
1434 if (!S_ISREG (statbuf.st_mode))
1435 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1437 if (access(file, R_OK|W_OK) != 0)
1438 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1439 file, rrd_strerror(errno));
1441 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1442 if (ci == NULL)
1443 {
1444 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1446 return send_response(sock, RESP_ERR, "malloc failed.\n");
1447 }
1448 memset (ci, 0, sizeof (cache_item_t));
1450 ci->file = strdup (file);
1451 if (ci->file == NULL)
1452 {
1453 free (ci);
1454 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1456 return send_response(sock, RESP_ERR, "strdup failed.\n");
1457 }
1459 wipe_ci_values(ci, now);
1460 ci->flags = CI_FLAGS_IN_TREE;
1461 pthread_cond_init(&ci->flushed, NULL);
1463 pthread_mutex_lock(&cache_lock);
1464 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1465 } /* }}} */
1466 assert (ci != NULL);
1468 /* don't re-write updates in replay mode */
1469 if (sock != NULL)
1470 journal_write("update", orig_buf);
1472 while (buffer_size > 0)
1473 {
1474 char **temp;
1475 char *value;
1476 time_t stamp;
1477 char *eostamp;
1479 status = buffer_get_field (&buffer, &buffer_size, &value);
1480 if (status != 0)
1481 {
1482 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1483 break;
1484 }
1486 /* make sure update time is always moving forward */
1487 stamp = strtol(value, &eostamp, 10);
1488 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1489 {
1490 pthread_mutex_unlock(&cache_lock);
1491 return send_response(sock, RESP_ERR,
1492 "Cannot find timestamp in '%s'!\n", value);
1493 }
1494 else if (stamp <= ci->last_update_stamp)
1495 {
1496 pthread_mutex_unlock(&cache_lock);
1497 return send_response(sock, RESP_ERR,
1498 "illegal attempt to update using time %ld when last"
1499 " update time is %ld (minimum one second step)\n",
1500 stamp, ci->last_update_stamp);
1501 }
1502 else
1503 ci->last_update_stamp = stamp;
1505 temp = (char **) realloc (ci->values,
1506 sizeof (char *) * (ci->values_num + 1));
1507 if (temp == NULL)
1508 {
1509 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1510 continue;
1511 }
1512 ci->values = temp;
1514 ci->values[ci->values_num] = strdup (value);
1515 if (ci->values[ci->values_num] == NULL)
1516 {
1517 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1518 continue;
1519 }
1520 ci->values_num++;
1522 values_num++;
1523 }
1525 if (((now - ci->last_flush_time) >= config_write_interval)
1526 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1527 && (ci->values_num > 0))
1528 {
1529 enqueue_cache_item (ci, TAIL);
1530 }
1532 pthread_mutex_unlock (&cache_lock);
1534 if (values_num < 1)
1535 return send_response(sock, RESP_ERR, "No values updated.\n");
1536 else
1537 return send_response(sock, RESP_OK,
1538 "errors, enqueued %i value(s).\n", values_num);
1540 /* NOTREACHED */
1541 assert(1==0);
1543 } /* }}} int handle_request_update */
1545 /* we came across a "WROTE" entry during journal replay.
1546 * throw away any values that we have accumulated for this file
1547 */
1548 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1549 {
1550 int i;
1551 cache_item_t *ci;
1552 const char *file = buffer;
1554 pthread_mutex_lock(&cache_lock);
1556 ci = g_tree_lookup(cache_tree, file);
1557 if (ci == NULL)
1558 {
1559 pthread_mutex_unlock(&cache_lock);
1560 return (0);
1561 }
1563 if (ci->values)
1564 {
1565 for (i=0; i < ci->values_num; i++)
1566 free(ci->values[i]);
1568 free(ci->values);
1569 }
1571 wipe_ci_values(ci, now);
1572 remove_from_queue(ci);
1574 pthread_mutex_unlock(&cache_lock);
1575 return (0);
1576 } /* }}} int handle_request_wrote */
1578 /* start "BATCH" processing */
1579 static int batch_start (listen_socket_t *sock) /* {{{ */
1580 {
1581 int status;
1582 if (sock->batch_start)
1583 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1585 status = send_response(sock, RESP_OK,
1586 "Go ahead. End with dot '.' on its own line.\n");
1587 sock->batch_start = time(NULL);
1588 sock->batch_cmd = 0;
1590 return status;
1591 } /* }}} static int batch_start */
1593 /* finish "BATCH" processing and return results to the client */
1594 static int batch_done (listen_socket_t *sock) /* {{{ */
1595 {
1596 assert(sock->batch_start);
1597 sock->batch_start = 0;
1598 sock->batch_cmd = 0;
1599 return send_response(sock, RESP_OK, "errors\n");
1600 } /* }}} static int batch_done */
1602 /* if sock==NULL, we are in journal replay mode */
1603 static int handle_request (listen_socket_t *sock, /* {{{ */
1604 time_t now,
1605 char *buffer, size_t buffer_size)
1606 {
1607 char *buffer_ptr;
1608 char *command;
1609 int status;
1611 assert (buffer[buffer_size - 1] == '\0');
1613 buffer_ptr = buffer;
1614 command = NULL;
1615 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1616 if (status != 0)
1617 {
1618 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1619 return (-1);
1620 }
1622 if (sock != NULL && sock->batch_start)
1623 sock->batch_cmd++;
1625 if (strcasecmp (command, "update") == 0)
1626 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1627 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1628 {
1629 /* this is only valid in replay mode */
1630 return (handle_request_wrote (buffer_ptr, now));
1631 }
1632 else if (strcasecmp (command, "flush") == 0)
1633 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1634 else if (strcasecmp (command, "flushall") == 0)
1635 return (handle_request_flushall(sock));
1636 else if (strcasecmp (command, "pending") == 0)
1637 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1638 else if (strcasecmp (command, "forget") == 0)
1639 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1640 else if (strcasecmp (command, "stats") == 0)
1641 return (handle_request_stats (sock));
1642 else if (strcasecmp (command, "help") == 0)
1643 return (handle_request_help (sock, buffer_ptr, buffer_size));
1644 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1645 return batch_start(sock);
1646 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1647 return batch_done(sock);
1648 else
1649 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1651 /* NOTREACHED */
1652 assert(1==0);
1653 } /* }}} int handle_request */
1655 /* MUST NOT hold journal_lock before calling this */
1656 static void journal_rotate(void) /* {{{ */
1657 {
1658 FILE *old_fh = NULL;
1659 int new_fd;
1661 if (journal_cur == NULL || journal_old == NULL)
1662 return;
1664 pthread_mutex_lock(&journal_lock);
1666 /* we rotate this way (rename before close) so that the we can release
1667 * the journal lock as fast as possible. Journal writes to the new
1668 * journal can proceed immediately after the new file is opened. The
1669 * fclose can then block without affecting new updates.
1670 */
1671 if (journal_fh != NULL)
1672 {
1673 old_fh = journal_fh;
1674 journal_fh = NULL;
1675 rename(journal_cur, journal_old);
1676 ++stats_journal_rotate;
1677 }
1679 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1680 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1681 if (new_fd >= 0)
1682 {
1683 journal_fh = fdopen(new_fd, "a");
1684 if (journal_fh == NULL)
1685 close(new_fd);
1686 }
1688 pthread_mutex_unlock(&journal_lock);
1690 if (old_fh != NULL)
1691 fclose(old_fh);
1693 if (journal_fh == NULL)
1694 {
1695 RRDD_LOG(LOG_CRIT,
1696 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1697 journal_cur, rrd_strerror(errno));
1699 RRDD_LOG(LOG_ERR,
1700 "JOURNALING DISABLED: All values will be flushed at shutdown");
1701 config_flush_at_shutdown = 1;
1702 }
1704 } /* }}} static void journal_rotate */
1706 static void journal_done(void) /* {{{ */
1707 {
1708 if (journal_cur == NULL)
1709 return;
1711 pthread_mutex_lock(&journal_lock);
1712 if (journal_fh != NULL)
1713 {
1714 fclose(journal_fh);
1715 journal_fh = NULL;
1716 }
1718 if (config_flush_at_shutdown)
1719 {
1720 RRDD_LOG(LOG_INFO, "removing journals");
1721 unlink(journal_old);
1722 unlink(journal_cur);
1723 }
1724 else
1725 {
1726 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1727 "journals will be used at next startup");
1728 }
1730 pthread_mutex_unlock(&journal_lock);
1732 } /* }}} static void journal_done */
1734 static int journal_write(char *cmd, char *args) /* {{{ */
1735 {
1736 int chars;
1738 if (journal_fh == NULL)
1739 return 0;
1741 pthread_mutex_lock(&journal_lock);
1742 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1743 pthread_mutex_unlock(&journal_lock);
1745 if (chars > 0)
1746 {
1747 pthread_mutex_lock(&stats_lock);
1748 stats_journal_bytes += chars;
1749 pthread_mutex_unlock(&stats_lock);
1750 }
1752 return chars;
1753 } /* }}} static int journal_write */
1755 static int journal_replay (const char *file) /* {{{ */
1756 {
1757 FILE *fh;
1758 int entry_cnt = 0;
1759 int fail_cnt = 0;
1760 uint64_t line = 0;
1761 char entry[CMD_MAX];
1762 time_t now;
1764 if (file == NULL) return 0;
1766 {
1767 char *reason;
1768 int status = 0;
1769 struct stat statbuf;
1771 memset(&statbuf, 0, sizeof(statbuf));
1772 if (stat(file, &statbuf) != 0)
1773 {
1774 if (errno == ENOENT)
1775 return 0;
1777 reason = "stat error";
1778 status = errno;
1779 }
1780 else if (!S_ISREG(statbuf.st_mode))
1781 {
1782 reason = "not a regular file";
1783 status = EPERM;
1784 }
1785 if (statbuf.st_uid != daemon_uid)
1786 {
1787 reason = "not owned by daemon user";
1788 status = EACCES;
1789 }
1790 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1791 {
1792 reason = "must not be user/group writable";
1793 status = EACCES;
1794 }
1796 if (status != 0)
1797 {
1798 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1799 file, rrd_strerror(status), reason);
1800 return 0;
1801 }
1802 }
1804 fh = fopen(file, "r");
1805 if (fh == NULL)
1806 {
1807 if (errno != ENOENT)
1808 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1809 file, rrd_strerror(errno));
1810 return 0;
1811 }
1812 else
1813 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1815 now = time(NULL);
1817 while(!feof(fh))
1818 {
1819 size_t entry_len;
1821 ++line;
1822 if (fgets(entry, sizeof(entry), fh) == NULL)
1823 break;
1824 entry_len = strlen(entry);
1826 /* check \n termination in case journal writing crashed mid-line */
1827 if (entry_len == 0)
1828 continue;
1829 else if (entry[entry_len - 1] != '\n')
1830 {
1831 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1832 ++fail_cnt;
1833 continue;
1834 }
1836 entry[entry_len - 1] = '\0';
1838 if (handle_request(NULL, now, entry, entry_len) == 0)
1839 ++entry_cnt;
1840 else
1841 ++fail_cnt;
1842 }
1844 fclose(fh);
1846 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1847 entry_cnt, fail_cnt);
1849 return entry_cnt > 0 ? 1 : 0;
1850 } /* }}} static int journal_replay */
1852 static void journal_init(void) /* {{{ */
1853 {
1854 int had_journal = 0;
1856 if (journal_cur == NULL) return;
1858 pthread_mutex_lock(&journal_lock);
1860 RRDD_LOG(LOG_INFO, "checking for journal files");
1862 had_journal += journal_replay(journal_old);
1863 had_journal += journal_replay(journal_cur);
1865 /* it must have been a crash. start a flush */
1866 if (had_journal && config_flush_at_shutdown)
1867 flush_old_values(-1);
1869 pthread_mutex_unlock(&journal_lock);
1870 journal_rotate();
1872 RRDD_LOG(LOG_INFO, "journal processing complete");
1874 } /* }}} static void journal_init */
1876 static void close_connection(listen_socket_t *sock)
1877 {
1878 close(sock->fd) ; sock->fd = -1;
1879 free(sock->rbuf); sock->rbuf = NULL;
1880 free(sock->wbuf); sock->wbuf = NULL;
1882 free(sock);
1883 }
1885 static void *connection_thread_main (void *args) /* {{{ */
1886 {
1887 pthread_t self;
1888 listen_socket_t *sock;
1889 int i;
1890 int fd;
1892 sock = (listen_socket_t *) args;
1893 fd = sock->fd;
1895 /* init read buffers */
1896 sock->next_read = sock->next_cmd = 0;
1897 sock->rbuf = malloc(RBUF_SIZE);
1898 if (sock->rbuf == NULL)
1899 {
1900 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1901 close_connection(sock);
1902 return NULL;
1903 }
1905 pthread_mutex_lock (&connection_threads_lock);
1906 {
1907 pthread_t *temp;
1909 temp = (pthread_t *) realloc (connection_threads,
1910 sizeof (pthread_t) * (connection_threads_num + 1));
1911 if (temp == NULL)
1912 {
1913 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1914 }
1915 else
1916 {
1917 connection_threads = temp;
1918 connection_threads[connection_threads_num] = pthread_self ();
1919 connection_threads_num++;
1920 }
1921 }
1922 pthread_mutex_unlock (&connection_threads_lock);
1924 while (do_shutdown == 0)
1925 {
1926 char *cmd;
1927 ssize_t cmd_len;
1928 ssize_t rbytes;
1929 time_t now;
1931 struct pollfd pollfd;
1932 int status;
1934 pollfd.fd = fd;
1935 pollfd.events = POLLIN | POLLPRI;
1936 pollfd.revents = 0;
1938 status = poll (&pollfd, 1, /* timeout = */ 500);
1939 if (do_shutdown)
1940 break;
1941 else if (status == 0) /* timeout */
1942 continue;
1943 else if (status < 0) /* error */
1944 {
1945 status = errno;
1946 if (status != EINTR)
1947 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1948 continue;
1949 }
1951 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1952 break;
1953 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1954 {
1955 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1956 "poll(2) returned something unexpected: %#04hx",
1957 pollfd.revents);
1958 break;
1959 }
1961 rbytes = read(fd, sock->rbuf + sock->next_read,
1962 RBUF_SIZE - sock->next_read);
1963 if (rbytes < 0)
1964 {
1965 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1966 break;
1967 }
1968 else if (rbytes == 0)
1969 break; /* eof */
1971 sock->next_read += rbytes;
1973 if (sock->batch_start)
1974 now = sock->batch_start;
1975 else
1976 now = time(NULL);
1978 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1979 {
1980 status = handle_request (sock, now, cmd, cmd_len+1);
1981 if (status != 0)
1982 goto out_close;
1983 }
1984 }
1986 out_close:
1987 close_connection(sock);
1989 self = pthread_self ();
1990 /* Remove this thread from the connection threads list */
1991 pthread_mutex_lock (&connection_threads_lock);
1992 /* Find out own index in the array */
1993 for (i = 0; i < connection_threads_num; i++)
1994 if (pthread_equal (connection_threads[i], self) != 0)
1995 break;
1996 assert (i < connection_threads_num);
1998 /* Move the trailing threads forward. */
1999 if (i < (connection_threads_num - 1))
2000 {
2001 memmove (connection_threads + i,
2002 connection_threads + i + 1,
2003 sizeof (pthread_t) * (connection_threads_num - i - 1));
2004 }
2006 connection_threads_num--;
2007 pthread_mutex_unlock (&connection_threads_lock);
2009 return (NULL);
2010 } /* }}} void *connection_thread_main */
2012 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2013 {
2014 int fd;
2015 struct sockaddr_un sa;
2016 listen_socket_t *temp;
2017 int status;
2018 const char *path;
2020 path = sock->addr;
2021 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2022 path += strlen("unix:");
2024 temp = (listen_socket_t *) realloc (listen_fds,
2025 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2026 if (temp == NULL)
2027 {
2028 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2029 return (-1);
2030 }
2031 listen_fds = temp;
2032 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2034 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2035 if (fd < 0)
2036 {
2037 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2038 rrd_strerror(errno));
2039 return (-1);
2040 }
2042 memset (&sa, 0, sizeof (sa));
2043 sa.sun_family = AF_UNIX;
2044 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2046 /* if we've gotten this far, we own the pid file. any daemon started
2047 * with the same args must not be alive. therefore, ensure that we can
2048 * create the socket...
2049 */
2050 unlink(path);
2052 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2053 if (status != 0)
2054 {
2055 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2056 path, rrd_strerror(errno));
2057 close (fd);
2058 return (-1);
2059 }
2061 status = listen (fd, /* backlog = */ 10);
2062 if (status != 0)
2063 {
2064 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2065 path, rrd_strerror(errno));
2066 close (fd);
2067 unlink (path);
2068 return (-1);
2069 }
2071 listen_fds[listen_fds_num].fd = fd;
2072 listen_fds[listen_fds_num].family = PF_UNIX;
2073 strncpy(listen_fds[listen_fds_num].addr, path,
2074 sizeof (listen_fds[listen_fds_num].addr) - 1);
2075 listen_fds_num++;
2077 return (0);
2078 } /* }}} int open_listen_socket_unix */
2080 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2081 {
2082 struct addrinfo ai_hints;
2083 struct addrinfo *ai_res;
2084 struct addrinfo *ai_ptr;
2085 char addr_copy[NI_MAXHOST];
2086 char *addr;
2087 char *port;
2088 int status;
2090 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2091 addr_copy[sizeof (addr_copy) - 1] = 0;
2092 addr = addr_copy;
2094 memset (&ai_hints, 0, sizeof (ai_hints));
2095 ai_hints.ai_flags = 0;
2096 #ifdef AI_ADDRCONFIG
2097 ai_hints.ai_flags |= AI_ADDRCONFIG;
2098 #endif
2099 ai_hints.ai_family = AF_UNSPEC;
2100 ai_hints.ai_socktype = SOCK_STREAM;
2102 port = NULL;
2103 if (*addr == '[') /* IPv6+port format */
2104 {
2105 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2106 addr++;
2108 port = strchr (addr, ']');
2109 if (port == NULL)
2110 {
2111 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2112 return (-1);
2113 }
2114 *port = 0;
2115 port++;
2117 if (*port == ':')
2118 port++;
2119 else if (*port == 0)
2120 port = NULL;
2121 else
2122 {
2123 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2124 return (-1);
2125 }
2126 } /* if (*addr = ']') */
2127 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2128 {
2129 port = rindex(addr, ':');
2130 if (port != NULL)
2131 {
2132 *port = 0;
2133 port++;
2134 }
2135 }
2136 ai_res = NULL;
2137 status = getaddrinfo (addr,
2138 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2139 &ai_hints, &ai_res);
2140 if (status != 0)
2141 {
2142 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2143 addr, gai_strerror (status));
2144 return (-1);
2145 }
2147 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2148 {
2149 int fd;
2150 listen_socket_t *temp;
2151 int one = 1;
2153 temp = (listen_socket_t *) realloc (listen_fds,
2154 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2155 if (temp == NULL)
2156 {
2157 fprintf (stderr,
2158 "rrdcached: open_listen_socket_network: realloc failed.\n");
2159 continue;
2160 }
2161 listen_fds = temp;
2162 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2164 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2165 if (fd < 0)
2166 {
2167 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2168 rrd_strerror(errno));
2169 continue;
2170 }
2172 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2174 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2175 if (status != 0)
2176 {
2177 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2178 sock->addr, rrd_strerror(errno));
2179 close (fd);
2180 continue;
2181 }
2183 status = listen (fd, /* backlog = */ 10);
2184 if (status != 0)
2185 {
2186 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2187 sock->addr, rrd_strerror(errno));
2188 close (fd);
2189 return (-1);
2190 }
2192 listen_fds[listen_fds_num].fd = fd;
2193 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2194 listen_fds_num++;
2195 } /* for (ai_ptr) */
2197 return (0);
2198 } /* }}} static int open_listen_socket_network */
2200 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2201 {
2202 assert(sock != NULL);
2203 assert(sock->addr != NULL);
2205 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2206 || sock->addr[0] == '/')
2207 return (open_listen_socket_unix(sock));
2208 else
2209 return (open_listen_socket_network(sock));
2210 } /* }}} int open_listen_socket */
2212 static int close_listen_sockets (void) /* {{{ */
2213 {
2214 size_t i;
2216 for (i = 0; i < listen_fds_num; i++)
2217 {
2218 close (listen_fds[i].fd);
2220 if (listen_fds[i].family == PF_UNIX)
2221 unlink(listen_fds[i].addr);
2222 }
2224 free (listen_fds);
2225 listen_fds = NULL;
2226 listen_fds_num = 0;
2228 return (0);
2229 } /* }}} int close_listen_sockets */
2231 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2232 {
2233 struct pollfd *pollfds;
2234 int pollfds_num;
2235 int status;
2236 int i;
2238 if (listen_fds_num < 1)
2239 {
2240 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2241 return (NULL);
2242 }
2244 pollfds_num = listen_fds_num;
2245 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2246 if (pollfds == NULL)
2247 {
2248 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2249 return (NULL);
2250 }
2251 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2253 RRDD_LOG(LOG_INFO, "listening for connections");
2255 while (do_shutdown == 0)
2256 {
2257 assert (pollfds_num == ((int) listen_fds_num));
2258 for (i = 0; i < pollfds_num; i++)
2259 {
2260 pollfds[i].fd = listen_fds[i].fd;
2261 pollfds[i].events = POLLIN | POLLPRI;
2262 pollfds[i].revents = 0;
2263 }
2265 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2266 if (do_shutdown)
2267 break;
2268 else if (status == 0) /* timeout */
2269 continue;
2270 else if (status < 0) /* error */
2271 {
2272 status = errno;
2273 if (status != EINTR)
2274 {
2275 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2276 }
2277 continue;
2278 }
2280 for (i = 0; i < pollfds_num; i++)
2281 {
2282 listen_socket_t *client_sock;
2283 struct sockaddr_storage client_sa;
2284 socklen_t client_sa_size;
2285 pthread_t tid;
2286 pthread_attr_t attr;
2288 if (pollfds[i].revents == 0)
2289 continue;
2291 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2292 {
2293 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2294 "poll(2) returned something unexpected for listen FD #%i.",
2295 pollfds[i].fd);
2296 continue;
2297 }
2299 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2300 if (client_sock == NULL)
2301 {
2302 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2303 continue;
2304 }
2305 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2307 client_sa_size = sizeof (client_sa);
2308 client_sock->fd = accept (pollfds[i].fd,
2309 (struct sockaddr *) &client_sa, &client_sa_size);
2310 if (client_sock->fd < 0)
2311 {
2312 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2313 free(client_sock);
2314 continue;
2315 }
2317 pthread_attr_init (&attr);
2318 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2320 status = pthread_create (&tid, &attr, connection_thread_main,
2321 client_sock);
2322 if (status != 0)
2323 {
2324 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2325 close_connection(client_sock);
2326 continue;
2327 }
2328 } /* for (pollfds_num) */
2329 } /* while (do_shutdown == 0) */
2331 RRDD_LOG(LOG_INFO, "starting shutdown");
2333 close_listen_sockets ();
2335 pthread_mutex_lock (&connection_threads_lock);
2336 while (connection_threads_num > 0)
2337 {
2338 pthread_t wait_for;
2340 wait_for = connection_threads[0];
2342 pthread_mutex_unlock (&connection_threads_lock);
2343 pthread_join (wait_for, /* retval = */ NULL);
2344 pthread_mutex_lock (&connection_threads_lock);
2345 }
2346 pthread_mutex_unlock (&connection_threads_lock);
2348 return (NULL);
2349 } /* }}} void *listen_thread_main */
2351 static int daemonize (void) /* {{{ */
2352 {
2353 int pid_fd;
2354 char *base_dir;
2356 daemon_uid = geteuid();
2358 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2359 if (pid_fd < 0)
2360 pid_fd = check_pidfile();
2361 if (pid_fd < 0)
2362 return pid_fd;
2364 /* open all the listen sockets */
2365 if (config_listen_address_list_len > 0)
2366 {
2367 for (int i = 0; i < config_listen_address_list_len; i++)
2368 open_listen_socket (config_listen_address_list[i]);
2369 }
2370 else
2371 {
2372 listen_socket_t sock;
2373 memset(&sock, 0, sizeof(sock));
2374 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2375 open_listen_socket (&sock);
2376 }
2378 if (listen_fds_num < 1)
2379 {
2380 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2381 goto error;
2382 }
2384 if (!stay_foreground)
2385 {
2386 pid_t child;
2388 child = fork ();
2389 if (child < 0)
2390 {
2391 fprintf (stderr, "daemonize: fork(2) failed.\n");
2392 goto error;
2393 }
2394 else if (child > 0)
2395 exit(0);
2397 /* Become session leader */
2398 setsid ();
2400 /* Open the first three file descriptors to /dev/null */
2401 close (2);
2402 close (1);
2403 close (0);
2405 open ("/dev/null", O_RDWR);
2406 dup (0);
2407 dup (0);
2408 } /* if (!stay_foreground) */
2410 /* Change into the /tmp directory. */
2411 base_dir = (config_base_dir != NULL)
2412 ? config_base_dir
2413 : "/tmp";
2415 if (chdir (base_dir) != 0)
2416 {
2417 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2418 goto error;
2419 }
2421 install_signal_handlers();
2423 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2424 RRDD_LOG(LOG_INFO, "starting up");
2426 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2427 if (cache_tree == NULL)
2428 {
2429 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2430 goto error;
2431 }
2433 return write_pidfile (pid_fd);
2435 error:
2436 remove_pidfile();
2437 return -1;
2438 } /* }}} int daemonize */
2440 static int cleanup (void) /* {{{ */
2441 {
2442 do_shutdown++;
2444 pthread_cond_signal (&cache_cond);
2445 pthread_join (queue_thread, /* return = */ NULL);
2447 remove_pidfile ();
2449 RRDD_LOG(LOG_INFO, "goodbye");
2450 closelog ();
2452 return (0);
2453 } /* }}} int cleanup */
2455 static int read_options (int argc, char **argv) /* {{{ */
2456 {
2457 int option;
2458 int status = 0;
2460 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2461 {
2462 switch (option)
2463 {
2464 case 'g':
2465 stay_foreground=1;
2466 break;
2468 case 'L':
2469 case 'l':
2470 {
2471 listen_socket_t **temp;
2472 listen_socket_t *new;
2474 new = malloc(sizeof(listen_socket_t));
2475 if (new == NULL)
2476 {
2477 fprintf(stderr, "read_options: malloc failed.\n");
2478 return(2);
2479 }
2480 memset(new, 0, sizeof(listen_socket_t));
2482 temp = (listen_socket_t **) realloc (config_listen_address_list,
2483 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2484 if (temp == NULL)
2485 {
2486 fprintf (stderr, "read_options: realloc failed.\n");
2487 return (2);
2488 }
2489 config_listen_address_list = temp;
2491 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2492 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2494 temp[config_listen_address_list_len] = new;
2495 config_listen_address_list_len++;
2496 }
2497 break;
2499 case 'f':
2500 {
2501 int temp;
2503 temp = atoi (optarg);
2504 if (temp > 0)
2505 config_flush_interval = temp;
2506 else
2507 {
2508 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2509 status = 3;
2510 }
2511 }
2512 break;
2514 case 'w':
2515 {
2516 int temp;
2518 temp = atoi (optarg);
2519 if (temp > 0)
2520 config_write_interval = temp;
2521 else
2522 {
2523 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2524 status = 2;
2525 }
2526 }
2527 break;
2529 case 'z':
2530 {
2531 int temp;
2533 temp = atoi(optarg);
2534 if (temp > 0)
2535 config_write_jitter = temp;
2536 else
2537 {
2538 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2539 status = 2;
2540 }
2542 break;
2543 }
2545 case 'B':
2546 config_write_base_only = 1;
2547 break;
2549 case 'b':
2550 {
2551 size_t len;
2552 char base_realpath[PATH_MAX];
2554 if (config_base_dir != NULL)
2555 free (config_base_dir);
2556 config_base_dir = strdup (optarg);
2557 if (config_base_dir == NULL)
2558 {
2559 fprintf (stderr, "read_options: strdup failed.\n");
2560 return (3);
2561 }
2563 /* make sure that the base directory is not resolved via
2564 * symbolic links. this makes some performance-enhancing
2565 * assumptions possible (we don't have to resolve paths
2566 * that start with a "/")
2567 */
2568 if (realpath(config_base_dir, base_realpath) == NULL)
2569 {
2570 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2571 return 5;
2572 }
2573 else if (strncmp(config_base_dir,
2574 base_realpath, sizeof(base_realpath)) != 0)
2575 {
2576 fprintf(stderr,
2577 "Base directory (-b) resolved via file system links!\n"
2578 "Please consult rrdcached '-b' documentation!\n"
2579 "Consider specifying the real directory (%s)\n",
2580 base_realpath);
2581 return 5;
2582 }
2584 len = strlen (config_base_dir);
2585 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2586 {
2587 config_base_dir[len - 1] = 0;
2588 len--;
2589 }
2591 if (len < 1)
2592 {
2593 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2594 return (4);
2595 }
2597 _config_base_dir_len = len;
2598 }
2599 break;
2601 case 'p':
2602 {
2603 if (config_pid_file != NULL)
2604 free (config_pid_file);
2605 config_pid_file = strdup (optarg);
2606 if (config_pid_file == NULL)
2607 {
2608 fprintf (stderr, "read_options: strdup failed.\n");
2609 return (3);
2610 }
2611 }
2612 break;
2614 case 'F':
2615 config_flush_at_shutdown = 1;
2616 break;
2618 case 'j':
2619 {
2620 struct stat statbuf;
2621 const char *dir = optarg;
2623 status = stat(dir, &statbuf);
2624 if (status != 0)
2625 {
2626 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2627 return 6;
2628 }
2630 if (!S_ISDIR(statbuf.st_mode)
2631 || access(dir, R_OK|W_OK|X_OK) != 0)
2632 {
2633 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2634 errno ? rrd_strerror(errno) : "");
2635 return 6;
2636 }
2638 journal_cur = malloc(PATH_MAX + 1);
2639 journal_old = malloc(PATH_MAX + 1);
2640 if (journal_cur == NULL || journal_old == NULL)
2641 {
2642 fprintf(stderr, "malloc failure for journal files\n");
2643 return 6;
2644 }
2645 else
2646 {
2647 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2648 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2649 }
2650 }
2651 break;
2653 case 'h':
2654 case '?':
2655 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2656 "\n"
2657 "Usage: rrdcached [options]\n"
2658 "\n"
2659 "Valid options are:\n"
2660 " -l <address> Socket address to listen to.\n"
2661 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2662 " -w <seconds> Interval in which to write data.\n"
2663 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2664 " -f <seconds> Interval in which to flush dead data.\n"
2665 " -p <file> Location of the PID-file.\n"
2666 " -b <dir> Base directory to change to.\n"
2667 " -B Restrict file access to paths within -b <dir>\n"
2668 " -g Do not fork and run in the foreground.\n"
2669 " -j <dir> Directory in which to create the journal files.\n"
2670 " -F Always flush all updates at shutdown\n"
2671 "\n"
2672 "For more information and a detailed description of all options "
2673 "please refer\n"
2674 "to the rrdcached(1) manual page.\n",
2675 VERSION);
2676 status = -1;
2677 break;
2678 } /* switch (option) */
2679 } /* while (getopt) */
2681 /* advise the user when values are not sane */
2682 if (config_flush_interval < 2 * config_write_interval)
2683 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2684 " 2x write interval (-w) !\n");
2685 if (config_write_jitter > config_write_interval)
2686 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2687 " write interval (-w) !\n");
2689 if (config_write_base_only && config_base_dir == NULL)
2690 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2691 " Consult the rrdcached documentation\n");
2693 if (journal_cur == NULL)
2694 config_flush_at_shutdown = 1;
2696 return (status);
2697 } /* }}} int read_options */
2699 int main (int argc, char **argv)
2700 {
2701 int status;
2703 status = read_options (argc, argv);
2704 if (status != 0)
2705 {
2706 if (status < 0)
2707 status = 0;
2708 return (status);
2709 }
2711 status = daemonize ();
2712 if (status != 0)
2713 {
2714 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2715 return (1);
2716 }
2718 journal_init();
2720 /* start the queue thread */
2721 memset (&queue_thread, 0, sizeof (queue_thread));
2722 status = pthread_create (&queue_thread,
2723 NULL, /* attr */
2724 queue_thread_main,
2725 NULL); /* args */
2726 if (status != 0)
2727 {
2728 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2729 cleanup();
2730 return (1);
2731 }
2733 listen_thread_main (NULL);
2734 cleanup ();
2736 return (0);
2737 } /* int main */
2739 /*
2740 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2741 */