1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111 * Types
112 */
113 typedef enum
114 {
115 PRIV_LOW,
116 PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
122 {
123 int fd;
124 char addr[PATH_MAX + 1];
125 int family;
126 socket_privilege privilege;
128 /* state for BATCH processing */
129 time_t batch_start;
130 int batch_cmd;
132 /* buffered IO */
133 char *rbuf;
134 off_t next_cmd;
135 off_t next_read;
137 char *wbuf;
138 ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146 char *file;
147 char **values;
148 int values_num;
149 time_t last_flush_time;
150 time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153 int flags;
154 pthread_cond_t flushed;
155 cache_item_t *prev;
156 cache_item_t *next;
157 };
159 struct callback_flush_data_s
160 {
161 time_t now;
162 time_t abs_timeout;
163 char **keys;
164 size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
169 {
170 HEAD,
171 TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180 * Variables
181 */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree *cache_tree = NULL;
198 static cache_item_t *cache_queue_head = NULL;
199 static cache_item_t *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /*
234 * Functions
235 */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239 do_shutdown++;
240 pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250 sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255 config_flush_at_shutdown = 1;
256 sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261 config_flush_at_shutdown = 0;
262 sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
266 {
267 /* These structures are static, because `sigaction' behaves weird if the are
268 * overwritten.. */
269 static struct sigaction sa_int;
270 static struct sigaction sa_term;
271 static struct sigaction sa_pipe;
272 static struct sigaction sa_usr1;
273 static struct sigaction sa_usr2;
275 /* Install signal handlers */
276 memset (&sa_int, 0, sizeof (sa_int));
277 sa_int.sa_handler = sig_int_handler;
278 sigaction (SIGINT, &sa_int, NULL);
280 memset (&sa_term, 0, sizeof (sa_term));
281 sa_term.sa_handler = sig_term_handler;
282 sigaction (SIGTERM, &sa_term, NULL);
284 memset (&sa_pipe, 0, sizeof (sa_pipe));
285 sa_pipe.sa_handler = SIG_IGN;
286 sigaction (SIGPIPE, &sa_pipe, NULL);
288 memset (&sa_pipe, 0, sizeof (sa_usr1));
289 sa_usr1.sa_handler = sig_usr1_handler;
290 sigaction (SIGUSR1, &sa_usr1, NULL);
292 memset (&sa_usr2, 0, sizeof (sa_usr2));
293 sa_usr2.sa_handler = sig_usr2_handler;
294 sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300 int fd;
301 char *file;
303 file = (config_pid_file != NULL)
304 ? config_pid_file
305 : LOCALSTATEDIR "/run/rrdcached.pid";
307 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308 if (fd < 0)
309 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310 action, file, rrd_strerror(errno));
312 return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318 int pid_fd;
319 pid_t pid;
320 char pid_str[16];
322 pid_fd = open_pidfile("open", O_RDWR);
323 if (pid_fd < 0)
324 return pid_fd;
326 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327 return -1;
329 pid = atoi(pid_str);
330 if (pid <= 0)
331 return -1;
333 /* another running process that we can signal COULD be
334 * a competing rrdcached */
335 if (pid != getpid() && kill(pid, 0) == 0)
336 {
337 fprintf(stderr,
338 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339 close(pid_fd);
340 return -1;
341 }
343 lseek(pid_fd, 0, SEEK_SET);
344 ftruncate(pid_fd, 0);
346 fprintf(stderr,
347 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348 "rrdcached: starting normally.\n", pid);
350 return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
354 {
355 pid_t pid;
356 FILE *fh;
358 pid = getpid ();
360 fh = fdopen (fd, "w");
361 if (fh == NULL)
362 {
363 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364 close(fd);
365 return (-1);
366 }
368 fprintf (fh, "%i\n", (int) pid);
369 fclose (fh);
371 return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
375 {
376 char *file;
377 int status;
379 file = (config_pid_file != NULL)
380 ? config_pid_file
381 : LOCALSTATEDIR "/run/rrdcached.pid";
383 status = unlink (file);
384 if (status == 0)
385 return (0);
386 return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391 char *eol;
393 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394 sock->next_read - sock->next_cmd);
396 if (eol == NULL)
397 {
398 /* no commands left, move remainder back to front of rbuf */
399 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400 sock->next_read - sock->next_cmd);
401 sock->next_read -= sock->next_cmd;
402 sock->next_cmd = 0;
403 *len = 0;
404 return NULL;
405 }
406 else
407 {
408 char *cmd = sock->rbuf + sock->next_cmd;
409 *eol = '\0';
411 sock->next_cmd = eol - sock->rbuf + 1;
413 if (eol > sock->rbuf && *(eol-1) == '\r')
414 *(--eol) = '\0'; /* handle "\r\n" EOL */
416 *len = eol - cmd;
418 return cmd;
419 }
421 /* NOTREACHED */
422 assert(1==0);
423 }
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428 char *new_buf;
430 assert(sock != NULL);
432 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433 if (new_buf == NULL)
434 {
435 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436 return -1;
437 }
439 strncpy(new_buf + sock->wbuf_len, str, len + 1);
441 sock->wbuf = new_buf;
442 sock->wbuf_len += len;
444 return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450 va_list argp;
451 char buffer[CMD_MAX];
452 int len;
454 if (sock == NULL) return 0; /* journal replay mode */
455 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457 va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461 len = vsprintf(buffer, fmt, argp);
462 #endif
463 va_end(argp);
464 if (len < 0)
465 {
466 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467 return -1;
468 }
470 return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
474 {
475 int lines = 0;
477 if (str != NULL)
478 {
479 while ((str = strchr(str, '\n')) != NULL)
480 {
481 ++lines;
482 ++str;
483 }
484 }
486 return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490 * returns 0 on success, -1 on error
491 * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493 char *fmt, ...) /* {{{ */
494 {
495 va_list argp;
496 char buffer[CMD_MAX];
497 int lines;
498 ssize_t wrote;
499 int rclen, len;
501 if (sock == NULL) return rc; /* journal replay mode */
503 if (sock->batch_start)
504 {
505 if (rc == RESP_OK)
506 return rc; /* no response on success during BATCH */
507 lines = sock->batch_cmd;
508 }
509 else if (rc == RESP_OK)
510 lines = count_lines(sock->wbuf);
511 else
512 lines = -1;
514 rclen = sprintf(buffer, "%d ", lines);
515 va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519 len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521 va_end(argp);
522 if (len < 0)
523 return -1;
525 len += rclen;
527 /* append the result to the wbuf, don't write to the user */
528 if (sock->batch_start)
529 return add_to_wbuf(sock, buffer, len);
531 /* first write must be complete */
532 if (len != write(sock->fd, buffer, len))
533 {
534 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535 return -1;
536 }
538 if (sock->wbuf != NULL && rc == RESP_OK)
539 {
540 wrote = 0;
541 while (wrote < sock->wbuf_len)
542 {
543 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544 if (wb <= 0)
545 {
546 RRDD_LOG(LOG_INFO, "send_response: could not write results");
547 return -1;
548 }
549 wrote += wb;
550 }
551 }
553 free(sock->wbuf); sock->wbuf = NULL;
554 sock->wbuf_len = 0;
556 return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561 ci->values = NULL;
562 ci->values_num = 0;
564 ci->last_flush_time = when;
565 if (config_write_jitter > 0)
566 ci->last_flush_time += (random() % config_write_jitter);
567 }
569 /* remove_from_queue
570 * remove a "cache_item_t" item from the queue.
571 * must hold 'cache_lock' when calling this
572 */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575 if (ci == NULL) return;
576 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578 if (ci->prev == NULL)
579 cache_queue_head = ci->next; /* reset head */
580 else
581 ci->prev->next = ci->next;
583 if (ci->next == NULL)
584 cache_queue_tail = ci->prev; /* reset the tail */
585 else
586 ci->next->prev = ci->prev;
588 ci->next = ci->prev = NULL;
589 ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
592 /* remove an entry from the tree and free all its resources.
593 * must hold 'cache lock' while calling this.
594 * returns 0 on success, otherwise errno */
595 static int forget_file(const char *file)
596 {
597 cache_item_t *ci;
599 ci = g_tree_lookup(cache_tree, file);
600 if (ci == NULL)
601 return ENOENT;
603 g_tree_remove (cache_tree, file);
604 remove_from_queue(ci);
606 for (int i=0; i < ci->values_num; i++)
607 free(ci->values[i]);
609 free (ci->values);
610 free (ci->file);
612 /* in case anyone is waiting */
613 pthread_cond_broadcast(&ci->flushed);
615 free (ci);
617 return 0;
618 } /* }}} static int forget_file */
620 /*
621 * enqueue_cache_item:
622 * `cache_lock' must be acquired before calling this function!
623 */
624 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
625 queue_side_t side)
626 {
627 if (ci == NULL)
628 return (-1);
630 if (ci->values_num == 0)
631 return (0);
633 if (side == HEAD)
634 {
635 if (cache_queue_head == ci)
636 return 0;
638 /* remove if further down in queue */
639 remove_from_queue(ci);
641 ci->prev = NULL;
642 ci->next = cache_queue_head;
643 if (ci->next != NULL)
644 ci->next->prev = ci;
645 cache_queue_head = ci;
647 if (cache_queue_tail == NULL)
648 cache_queue_tail = cache_queue_head;
649 }
650 else /* (side == TAIL) */
651 {
652 /* We don't move values back in the list.. */
653 if (ci->flags & CI_FLAGS_IN_QUEUE)
654 return (0);
656 assert (ci->next == NULL);
657 assert (ci->prev == NULL);
659 ci->prev = cache_queue_tail;
661 if (cache_queue_tail == NULL)
662 cache_queue_head = ci;
663 else
664 cache_queue_tail->next = ci;
666 cache_queue_tail = ci;
667 }
669 ci->flags |= CI_FLAGS_IN_QUEUE;
671 pthread_cond_broadcast(&cache_cond);
672 pthread_mutex_lock (&stats_lock);
673 stats_queue_length++;
674 pthread_mutex_unlock (&stats_lock);
676 return (0);
677 } /* }}} int enqueue_cache_item */
679 /*
680 * tree_callback_flush:
681 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
682 * while this is in progress.
683 */
684 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
685 gpointer data)
686 {
687 cache_item_t *ci;
688 callback_flush_data_t *cfd;
690 ci = (cache_item_t *) value;
691 cfd = (callback_flush_data_t *) data;
693 if (ci->flags & CI_FLAGS_IN_QUEUE)
694 return FALSE;
696 if ((ci->last_flush_time <= cfd->abs_timeout)
697 && (ci->values_num > 0))
698 {
699 enqueue_cache_item (ci, TAIL);
700 }
701 else if ((do_shutdown != 0)
702 && (ci->values_num > 0))
703 {
704 enqueue_cache_item (ci, TAIL);
705 }
706 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
707 && (ci->values_num <= 0))
708 {
709 char **temp;
711 temp = (char **) realloc (cfd->keys,
712 sizeof (char *) * (cfd->keys_num + 1));
713 if (temp == NULL)
714 {
715 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
716 return (FALSE);
717 }
718 cfd->keys = temp;
719 /* Make really sure this points to the _same_ place */
720 assert ((char *) key == ci->file);
721 cfd->keys[cfd->keys_num] = (char *) key;
722 cfd->keys_num++;
723 }
725 return (FALSE);
726 } /* }}} gboolean tree_callback_flush */
728 static int flush_old_values (int max_age)
729 {
730 callback_flush_data_t cfd;
731 size_t k;
733 memset (&cfd, 0, sizeof (cfd));
734 /* Pass the current time as user data so that we don't need to call
735 * `time' for each node. */
736 cfd.now = time (NULL);
737 cfd.keys = NULL;
738 cfd.keys_num = 0;
740 if (max_age > 0)
741 cfd.abs_timeout = cfd.now - max_age;
742 else
743 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
745 /* `tree_callback_flush' will return the keys of all values that haven't
746 * been touched in the last `config_flush_interval' seconds in `cfd'.
747 * The char*'s in this array point to the same memory as ci->file, so we
748 * don't need to free them separately. */
749 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
751 for (k = 0; k < cfd.keys_num; k++)
752 {
753 /* should never fail, since we have held the cache_lock
754 * the entire time */
755 assert( forget_file(cfd.keys[k]) == 0 );
756 }
758 if (cfd.keys != NULL)
759 {
760 free (cfd.keys);
761 cfd.keys = NULL;
762 }
764 return (0);
765 } /* int flush_old_values */
767 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
768 {
769 struct timeval now;
770 struct timespec next_flush;
771 int final_flush = 0; /* make sure we only flush once on shutdown */
773 gettimeofday (&now, NULL);
774 next_flush.tv_sec = now.tv_sec + config_flush_interval;
775 next_flush.tv_nsec = 1000 * now.tv_usec;
777 pthread_mutex_lock (&cache_lock);
778 while ((do_shutdown == 0) || (cache_queue_head != NULL))
779 {
780 cache_item_t *ci;
781 char *file;
782 char **values;
783 int values_num;
784 int status;
785 int i;
787 /* First, check if it's time to do the cache flush. */
788 gettimeofday (&now, NULL);
789 if ((now.tv_sec > next_flush.tv_sec)
790 || ((now.tv_sec == next_flush.tv_sec)
791 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
792 {
793 /* Flush all values that haven't been written in the last
794 * `config_write_interval' seconds. */
795 flush_old_values (config_write_interval);
797 /* Determine the time of the next cache flush. */
798 next_flush.tv_sec =
799 now.tv_sec + next_flush.tv_sec % config_flush_interval;
801 /* unlock the cache while we rotate so we don't block incoming
802 * updates if the fsync() blocks on disk I/O */
803 pthread_mutex_unlock(&cache_lock);
804 journal_rotate();
805 pthread_mutex_lock(&cache_lock);
806 }
808 /* Now, check if there's something to store away. If not, wait until
809 * something comes in or it's time to do the cache flush. if we are
810 * shutting down, do not wait around. */
811 if (cache_queue_head == NULL && !do_shutdown)
812 {
813 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
814 if ((status != 0) && (status != ETIMEDOUT))
815 {
816 RRDD_LOG (LOG_ERR, "queue_thread_main: "
817 "pthread_cond_timedwait returned %i.", status);
818 }
819 }
821 /* We're about to shut down */
822 if (do_shutdown != 0 && !final_flush++)
823 {
824 if (config_flush_at_shutdown)
825 flush_old_values (-1); /* flush everything */
826 else
827 break;
828 }
830 /* Check if a value has arrived. This may be NULL if we timed out or there
831 * was an interrupt such as a signal. */
832 if (cache_queue_head == NULL)
833 continue;
835 ci = cache_queue_head;
837 /* copy the relevant parts */
838 file = strdup (ci->file);
839 if (file == NULL)
840 {
841 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
842 continue;
843 }
845 assert(ci->values != NULL);
846 assert(ci->values_num > 0);
848 values = ci->values;
849 values_num = ci->values_num;
851 wipe_ci_values(ci, time(NULL));
852 remove_from_queue(ci);
854 pthread_mutex_lock (&stats_lock);
855 assert (stats_queue_length > 0);
856 stats_queue_length--;
857 pthread_mutex_unlock (&stats_lock);
859 pthread_mutex_unlock (&cache_lock);
861 rrd_clear_error ();
862 status = rrd_update_r (file, NULL, values_num, (void *) values);
863 if (status != 0)
864 {
865 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
866 "rrd_update_r (%s) failed with status %i. (%s)",
867 file, status, rrd_get_error());
868 }
870 journal_write("wrote", file);
871 pthread_cond_broadcast(&ci->flushed);
873 for (i = 0; i < values_num; i++)
874 free (values[i]);
876 free(values);
877 free(file);
879 if (status == 0)
880 {
881 pthread_mutex_lock (&stats_lock);
882 stats_updates_written++;
883 stats_data_sets_written += values_num;
884 pthread_mutex_unlock (&stats_lock);
885 }
887 pthread_mutex_lock (&cache_lock);
889 /* We're about to shut down */
890 if (do_shutdown != 0 && !final_flush++)
891 {
892 if (config_flush_at_shutdown)
893 flush_old_values (-1); /* flush everything */
894 else
895 break;
896 }
897 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
898 pthread_mutex_unlock (&cache_lock);
900 if (config_flush_at_shutdown)
901 {
902 assert(cache_queue_head == NULL);
903 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
904 }
906 journal_done();
908 return (NULL);
909 } /* }}} void *queue_thread_main */
911 static int buffer_get_field (char **buffer_ret, /* {{{ */
912 size_t *buffer_size_ret, char **field_ret)
913 {
914 char *buffer;
915 size_t buffer_pos;
916 size_t buffer_size;
917 char *field;
918 size_t field_size;
919 int status;
921 buffer = *buffer_ret;
922 buffer_pos = 0;
923 buffer_size = *buffer_size_ret;
924 field = *buffer_ret;
925 field_size = 0;
927 if (buffer_size <= 0)
928 return (-1);
930 /* This is ensured by `handle_request'. */
931 assert (buffer[buffer_size - 1] == '\0');
933 status = -1;
934 while (buffer_pos < buffer_size)
935 {
936 /* Check for end-of-field or end-of-buffer */
937 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
938 {
939 field[field_size] = 0;
940 field_size++;
941 buffer_pos++;
942 status = 0;
943 break;
944 }
945 /* Handle escaped characters. */
946 else if (buffer[buffer_pos] == '\\')
947 {
948 if (buffer_pos >= (buffer_size - 1))
949 break;
950 buffer_pos++;
951 field[field_size] = buffer[buffer_pos];
952 field_size++;
953 buffer_pos++;
954 }
955 /* Normal operation */
956 else
957 {
958 field[field_size] = buffer[buffer_pos];
959 field_size++;
960 buffer_pos++;
961 }
962 } /* while (buffer_pos < buffer_size) */
964 if (status != 0)
965 return (status);
967 *buffer_ret = buffer + buffer_pos;
968 *buffer_size_ret = buffer_size - buffer_pos;
969 *field_ret = field;
971 return (0);
972 } /* }}} int buffer_get_field */
974 /* if we're restricting writes to the base directory,
975 * check whether the file falls within the dir
976 * returns 1 if OK, otherwise 0
977 */
978 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
979 {
980 assert(file != NULL);
982 if (!config_write_base_only
983 || sock == NULL /* journal replay */
984 || config_base_dir == NULL)
985 return 1;
987 if (strstr(file, "../") != NULL) goto err;
989 /* relative paths without "../" are ok */
990 if (*file != '/') return 1;
992 /* file must be of the format base + "/" + <1+ char filename> */
993 if (strlen(file) < _config_base_dir_len + 2) goto err;
994 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
995 if (*(file + _config_base_dir_len) != '/') goto err;
997 return 1;
999 err:
1000 if (sock != NULL && sock->fd >= 0)
1001 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1003 return 0;
1004 } /* }}} static int check_file_access */
1006 /* when using a base dir, convert relative paths to absolute paths.
1007 * if necessary, modifies the "filename" pointer to point
1008 * to the new path created in "tmp". "tmp" is provided
1009 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1010 *
1011 * this allows us to optimize for the expected case (absolute path)
1012 * with a no-op.
1013 */
1014 static void get_abs_path(char **filename, char *tmp)
1015 {
1016 assert(tmp != NULL);
1017 assert(filename != NULL && *filename != NULL);
1019 if (config_base_dir == NULL || **filename == '/')
1020 return;
1022 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1023 *filename = tmp;
1024 } /* }}} static int get_abs_path */
1026 /* returns 1 if we have the required privilege level,
1027 * otherwise issue an error to the user on sock */
1028 static int has_privilege (listen_socket_t *sock, /* {{{ */
1029 socket_privilege priv)
1030 {
1031 if (sock == NULL) /* journal replay */
1032 return 1;
1034 if (sock->privilege >= priv)
1035 return 1;
1037 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1038 } /* }}} static int has_privilege */
1040 static int flush_file (const char *filename) /* {{{ */
1041 {
1042 cache_item_t *ci;
1044 pthread_mutex_lock (&cache_lock);
1046 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1047 if (ci == NULL)
1048 {
1049 pthread_mutex_unlock (&cache_lock);
1050 return (ENOENT);
1051 }
1053 if (ci->values_num > 0)
1054 {
1055 /* Enqueue at head */
1056 enqueue_cache_item (ci, HEAD);
1057 pthread_cond_wait(&ci->flushed, &cache_lock);
1058 }
1060 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1061 * may have been purged during our cond_wait() */
1063 pthread_mutex_unlock(&cache_lock);
1065 return (0);
1066 } /* }}} int flush_file */
1068 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1069 char *buffer, size_t buffer_size)
1070 {
1071 int status;
1072 char **help_text;
1073 char *command;
1075 char *help_help[2] =
1076 {
1077 "Command overview\n"
1078 ,
1079 "HELP [<command>]\n"
1080 "FLUSH <filename>\n"
1081 "FLUSHALL\n"
1082 "PENDING <filename>\n"
1083 "FORGET <filename>\n"
1084 "UPDATE <filename> <values> [<values> ...]\n"
1085 "BATCH\n"
1086 "STATS\n"
1087 };
1089 char *help_flush[2] =
1090 {
1091 "Help for FLUSH\n"
1092 ,
1093 "Usage: FLUSH <filename>\n"
1094 "\n"
1095 "Adds the given filename to the head of the update queue and returns\n"
1096 "after is has been dequeued.\n"
1097 };
1099 char *help_flushall[2] =
1100 {
1101 "Help for FLUSHALL\n"
1102 ,
1103 "Usage: FLUSHALL\n"
1104 "\n"
1105 "Triggers writing of all pending updates. Returns immediately.\n"
1106 };
1108 char *help_pending[2] =
1109 {
1110 "Help for PENDING\n"
1111 ,
1112 "Usage: PENDING <filename>\n"
1113 "\n"
1114 "Shows any 'pending' updates for a file, in order.\n"
1115 "The updates shown have not yet been written to the underlying RRD file.\n"
1116 };
1118 char *help_forget[2] =
1119 {
1120 "Help for FORGET\n"
1121 ,
1122 "Usage: FORGET <filename>\n"
1123 "\n"
1124 "Removes the file completely from the cache.\n"
1125 "Any pending updates for the file will be lost.\n"
1126 };
1128 char *help_update[2] =
1129 {
1130 "Help for UPDATE\n"
1131 ,
1132 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1133 "\n"
1134 "Adds the given file to the internal cache if it is not yet known and\n"
1135 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1136 "for details.\n"
1137 "\n"
1138 "Each <values> has the following form:\n"
1139 " <values> = <time>:<value>[:<value>[...]]\n"
1140 "See the rrdupdate(1) manpage for details.\n"
1141 };
1143 char *help_stats[2] =
1144 {
1145 "Help for STATS\n"
1146 ,
1147 "Usage: STATS\n"
1148 "\n"
1149 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1150 "a description of the values.\n"
1151 };
1153 char *help_batch[2] =
1154 {
1155 "Help for BATCH\n"
1156 ,
1157 "The 'BATCH' command permits the client to initiate a bulk load\n"
1158 " of commands to rrdcached.\n"
1159 "\n"
1160 "Usage:\n"
1161 "\n"
1162 " client: BATCH\n"
1163 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1164 " client: command #1\n"
1165 " client: command #2\n"
1166 " client: ... and so on\n"
1167 " client: .\n"
1168 " server: 2 errors\n"
1169 " server: 7 message for command #7\n"
1170 " server: 9 message for command #9\n"
1171 "\n"
1172 "For more information, consult the rrdcached(1) documentation.\n"
1173 };
1175 status = buffer_get_field (&buffer, &buffer_size, &command);
1176 if (status != 0)
1177 help_text = help_help;
1178 else
1179 {
1180 if (strcasecmp (command, "update") == 0)
1181 help_text = help_update;
1182 else if (strcasecmp (command, "flush") == 0)
1183 help_text = help_flush;
1184 else if (strcasecmp (command, "flushall") == 0)
1185 help_text = help_flushall;
1186 else if (strcasecmp (command, "pending") == 0)
1187 help_text = help_pending;
1188 else if (strcasecmp (command, "forget") == 0)
1189 help_text = help_forget;
1190 else if (strcasecmp (command, "stats") == 0)
1191 help_text = help_stats;
1192 else if (strcasecmp (command, "batch") == 0)
1193 help_text = help_batch;
1194 else
1195 help_text = help_help;
1196 }
1198 add_response_info(sock, help_text[1]);
1199 return send_response(sock, RESP_OK, help_text[0]);
1200 } /* }}} int handle_request_help */
1202 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1203 {
1204 uint64_t copy_queue_length;
1205 uint64_t copy_updates_received;
1206 uint64_t copy_flush_received;
1207 uint64_t copy_updates_written;
1208 uint64_t copy_data_sets_written;
1209 uint64_t copy_journal_bytes;
1210 uint64_t copy_journal_rotate;
1212 uint64_t tree_nodes_number;
1213 uint64_t tree_depth;
1215 pthread_mutex_lock (&stats_lock);
1216 copy_queue_length = stats_queue_length;
1217 copy_updates_received = stats_updates_received;
1218 copy_flush_received = stats_flush_received;
1219 copy_updates_written = stats_updates_written;
1220 copy_data_sets_written = stats_data_sets_written;
1221 copy_journal_bytes = stats_journal_bytes;
1222 copy_journal_rotate = stats_journal_rotate;
1223 pthread_mutex_unlock (&stats_lock);
1225 pthread_mutex_lock (&cache_lock);
1226 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1227 tree_depth = (uint64_t) g_tree_height (cache_tree);
1228 pthread_mutex_unlock (&cache_lock);
1230 add_response_info(sock,
1231 "QueueLength: %"PRIu64"\n", copy_queue_length);
1232 add_response_info(sock,
1233 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1234 add_response_info(sock,
1235 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1236 add_response_info(sock,
1237 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1238 add_response_info(sock,
1239 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1240 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1241 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1242 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1243 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1245 send_response(sock, RESP_OK, "Statistics follow\n");
1247 return (0);
1248 } /* }}} int handle_request_stats */
1250 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1251 char *buffer, size_t buffer_size)
1252 {
1253 char *file, file_tmp[PATH_MAX];
1254 int status;
1256 status = buffer_get_field (&buffer, &buffer_size, &file);
1257 if (status != 0)
1258 {
1259 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1260 }
1261 else
1262 {
1263 pthread_mutex_lock(&stats_lock);
1264 stats_flush_received++;
1265 pthread_mutex_unlock(&stats_lock);
1267 get_abs_path(&file, file_tmp);
1268 if (!check_file_access(file, sock)) return 0;
1270 status = flush_file (file);
1271 if (status == 0)
1272 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1273 else if (status == ENOENT)
1274 {
1275 /* no file in our tree; see whether it exists at all */
1276 struct stat statbuf;
1278 memset(&statbuf, 0, sizeof(statbuf));
1279 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1280 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1281 else
1282 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1283 }
1284 else if (status < 0)
1285 return send_response(sock, RESP_ERR, "Internal error.\n");
1286 else
1287 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1288 }
1290 /* NOTREACHED */
1291 assert(1==0);
1292 } /* }}} int handle_request_flush */
1294 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1295 {
1296 int status;
1298 status = has_privilege(sock, PRIV_HIGH);
1299 if (status <= 0)
1300 return status;
1302 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1304 pthread_mutex_lock(&cache_lock);
1305 flush_old_values(-1);
1306 pthread_mutex_unlock(&cache_lock);
1308 return send_response(sock, RESP_OK, "Started flush.\n");
1309 } /* }}} static int handle_request_flushall */
1311 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1312 char *buffer, size_t buffer_size)
1313 {
1314 int status;
1315 char *file, file_tmp[PATH_MAX];
1316 cache_item_t *ci;
1318 status = buffer_get_field(&buffer, &buffer_size, &file);
1319 if (status != 0)
1320 return send_response(sock, RESP_ERR,
1321 "Usage: PENDING <filename>\n");
1323 status = has_privilege(sock, PRIV_HIGH);
1324 if (status <= 0)
1325 return status;
1327 get_abs_path(&file, file_tmp);
1329 pthread_mutex_lock(&cache_lock);
1330 ci = g_tree_lookup(cache_tree, file);
1331 if (ci == NULL)
1332 {
1333 pthread_mutex_unlock(&cache_lock);
1334 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1335 }
1337 for (int i=0; i < ci->values_num; i++)
1338 add_response_info(sock, "%s\n", ci->values[i]);
1340 pthread_mutex_unlock(&cache_lock);
1341 return send_response(sock, RESP_OK, "updates pending\n");
1342 } /* }}} static int handle_request_pending */
1344 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1345 char *buffer, size_t buffer_size)
1346 {
1347 int status;
1348 char *file, file_tmp[PATH_MAX];
1350 status = buffer_get_field(&buffer, &buffer_size, &file);
1351 if (status != 0)
1352 return send_response(sock, RESP_ERR,
1353 "Usage: FORGET <filename>\n");
1355 status = has_privilege(sock, PRIV_HIGH);
1356 if (status <= 0)
1357 return status;
1359 get_abs_path(&file, file_tmp);
1360 if (!check_file_access(file, sock)) return 0;
1362 pthread_mutex_lock(&cache_lock);
1363 status = forget_file(file);
1364 pthread_mutex_unlock(&cache_lock);
1366 if (status == 0)
1367 {
1368 if (sock != NULL)
1369 journal_write("forget", file);
1371 return send_response(sock, RESP_OK, "Gone!\n");
1372 }
1373 else
1374 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1375 status < 0 ? "Internal error" : rrd_strerror(status));
1377 /* NOTREACHED */
1378 assert(1==0);
1379 } /* }}} static int handle_request_forget */
1381 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1382 time_t now,
1383 char *buffer, size_t buffer_size)
1384 {
1385 char *file, file_tmp[PATH_MAX];
1386 int values_num = 0;
1387 int bad_timestamps = 0;
1388 int status;
1389 char orig_buf[CMD_MAX];
1391 cache_item_t *ci;
1393 status = has_privilege(sock, PRIV_HIGH);
1394 if (status <= 0)
1395 return status;
1397 /* save it for the journal later */
1398 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1400 status = buffer_get_field (&buffer, &buffer_size, &file);
1401 if (status != 0)
1402 return send_response(sock, RESP_ERR,
1403 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1405 pthread_mutex_lock(&stats_lock);
1406 stats_updates_received++;
1407 pthread_mutex_unlock(&stats_lock);
1409 get_abs_path(&file, file_tmp);
1410 if (!check_file_access(file, sock)) return 0;
1412 pthread_mutex_lock (&cache_lock);
1413 ci = g_tree_lookup (cache_tree, file);
1415 if (ci == NULL) /* {{{ */
1416 {
1417 struct stat statbuf;
1419 /* don't hold the lock while we setup; stat(2) might block */
1420 pthread_mutex_unlock(&cache_lock);
1422 memset (&statbuf, 0, sizeof (statbuf));
1423 status = stat (file, &statbuf);
1424 if (status != 0)
1425 {
1426 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1428 status = errno;
1429 if (status == ENOENT)
1430 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1431 else
1432 return send_response(sock, RESP_ERR,
1433 "stat failed with error %i.\n", status);
1434 }
1435 if (!S_ISREG (statbuf.st_mode))
1436 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1438 if (access(file, R_OK|W_OK) != 0)
1439 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1440 file, rrd_strerror(errno));
1442 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1443 if (ci == NULL)
1444 {
1445 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1447 return send_response(sock, RESP_ERR, "malloc failed.\n");
1448 }
1449 memset (ci, 0, sizeof (cache_item_t));
1451 ci->file = strdup (file);
1452 if (ci->file == NULL)
1453 {
1454 free (ci);
1455 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1457 return send_response(sock, RESP_ERR, "strdup failed.\n");
1458 }
1460 wipe_ci_values(ci, now);
1461 ci->flags = CI_FLAGS_IN_TREE;
1462 pthread_cond_init(&ci->flushed, NULL);
1464 pthread_mutex_lock(&cache_lock);
1465 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1466 } /* }}} */
1467 assert (ci != NULL);
1469 /* don't re-write updates in replay mode */
1470 if (sock != NULL)
1471 journal_write("update", orig_buf);
1473 while (buffer_size > 0)
1474 {
1475 char **temp;
1476 char *value;
1477 time_t stamp;
1478 char *eostamp;
1480 status = buffer_get_field (&buffer, &buffer_size, &value);
1481 if (status != 0)
1482 {
1483 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1484 break;
1485 }
1487 /* make sure update time is always moving forward */
1488 stamp = strtol(value, &eostamp, 10);
1489 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1490 {
1491 ++bad_timestamps;
1492 add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1493 continue;
1494 }
1495 else if (stamp <= ci->last_update_stamp)
1496 {
1497 ++bad_timestamps;
1498 add_response_info(sock,
1499 "illegal attempt to update using time %ld when"
1500 " last update time is %ld (minimum one second step)\n",
1501 stamp, ci->last_update_stamp);
1502 continue;
1503 }
1504 else
1505 ci->last_update_stamp = stamp;
1507 temp = (char **) realloc (ci->values,
1508 sizeof (char *) * (ci->values_num + 1));
1509 if (temp == NULL)
1510 {
1511 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1512 continue;
1513 }
1514 ci->values = temp;
1516 ci->values[ci->values_num] = strdup (value);
1517 if (ci->values[ci->values_num] == NULL)
1518 {
1519 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1520 continue;
1521 }
1522 ci->values_num++;
1524 values_num++;
1525 }
1527 if (((now - ci->last_flush_time) >= config_write_interval)
1528 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1529 && (ci->values_num > 0))
1530 {
1531 enqueue_cache_item (ci, TAIL);
1532 }
1534 pthread_mutex_unlock (&cache_lock);
1536 if (values_num < 1)
1537 {
1538 /* journal replay mode */
1539 if (sock == NULL) return RESP_ERR;
1541 /* if we had only one update attempt, then return the full
1542 error message... try to get the most information out
1543 of the limited error space allowed by the protocol
1544 */
1545 if (bad_timestamps == 1)
1546 return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1547 else
1548 return send_response(sock, RESP_ERR,
1549 "No values updated (%d bad timestamps).\n",
1550 bad_timestamps);
1551 }
1552 else
1553 return send_response(sock, RESP_OK,
1554 "errors, enqueued %i value(s).\n", values_num);
1556 /* NOTREACHED */
1557 assert(1==0);
1559 } /* }}} int handle_request_update */
1561 /* we came across a "WROTE" entry during journal replay.
1562 * throw away any values that we have accumulated for this file
1563 */
1564 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1565 {
1566 int i;
1567 cache_item_t *ci;
1568 const char *file = buffer;
1570 pthread_mutex_lock(&cache_lock);
1572 ci = g_tree_lookup(cache_tree, file);
1573 if (ci == NULL)
1574 {
1575 pthread_mutex_unlock(&cache_lock);
1576 return (0);
1577 }
1579 if (ci->values)
1580 {
1581 for (i=0; i < ci->values_num; i++)
1582 free(ci->values[i]);
1584 free(ci->values);
1585 }
1587 wipe_ci_values(ci, now);
1588 remove_from_queue(ci);
1590 pthread_mutex_unlock(&cache_lock);
1591 return (0);
1592 } /* }}} int handle_request_wrote */
1594 /* start "BATCH" processing */
1595 static int batch_start (listen_socket_t *sock) /* {{{ */
1596 {
1597 int status;
1598 if (sock->batch_start)
1599 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1601 status = send_response(sock, RESP_OK,
1602 "Go ahead. End with dot '.' on its own line.\n");
1603 sock->batch_start = time(NULL);
1604 sock->batch_cmd = 0;
1606 return status;
1607 } /* }}} static int batch_start */
1609 /* finish "BATCH" processing and return results to the client */
1610 static int batch_done (listen_socket_t *sock) /* {{{ */
1611 {
1612 assert(sock->batch_start);
1613 sock->batch_start = 0;
1614 sock->batch_cmd = 0;
1615 return send_response(sock, RESP_OK, "errors\n");
1616 } /* }}} static int batch_done */
1618 /* if sock==NULL, we are in journal replay mode */
1619 static int handle_request (listen_socket_t *sock, /* {{{ */
1620 time_t now,
1621 char *buffer, size_t buffer_size)
1622 {
1623 char *buffer_ptr;
1624 char *command;
1625 int status;
1627 assert (buffer[buffer_size - 1] == '\0');
1629 buffer_ptr = buffer;
1630 command = NULL;
1631 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1632 if (status != 0)
1633 {
1634 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1635 return (-1);
1636 }
1638 if (sock != NULL && sock->batch_start)
1639 sock->batch_cmd++;
1641 if (strcasecmp (command, "update") == 0)
1642 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1643 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1644 {
1645 /* this is only valid in replay mode */
1646 return (handle_request_wrote (buffer_ptr, now));
1647 }
1648 else if (strcasecmp (command, "flush") == 0)
1649 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1650 else if (strcasecmp (command, "flushall") == 0)
1651 return (handle_request_flushall(sock));
1652 else if (strcasecmp (command, "pending") == 0)
1653 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1654 else if (strcasecmp (command, "forget") == 0)
1655 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1656 else if (strcasecmp (command, "stats") == 0)
1657 return (handle_request_stats (sock));
1658 else if (strcasecmp (command, "help") == 0)
1659 return (handle_request_help (sock, buffer_ptr, buffer_size));
1660 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1661 return batch_start(sock);
1662 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1663 return batch_done(sock);
1664 else
1665 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1667 /* NOTREACHED */
1668 assert(1==0);
1669 } /* }}} int handle_request */
1671 /* MUST NOT hold journal_lock before calling this */
1672 static void journal_rotate(void) /* {{{ */
1673 {
1674 FILE *old_fh = NULL;
1675 int new_fd;
1677 if (journal_cur == NULL || journal_old == NULL)
1678 return;
1680 pthread_mutex_lock(&journal_lock);
1682 /* we rotate this way (rename before close) so that the we can release
1683 * the journal lock as fast as possible. Journal writes to the new
1684 * journal can proceed immediately after the new file is opened. The
1685 * fclose can then block without affecting new updates.
1686 */
1687 if (journal_fh != NULL)
1688 {
1689 old_fh = journal_fh;
1690 journal_fh = NULL;
1691 rename(journal_cur, journal_old);
1692 ++stats_journal_rotate;
1693 }
1695 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1696 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1697 if (new_fd >= 0)
1698 {
1699 journal_fh = fdopen(new_fd, "a");
1700 if (journal_fh == NULL)
1701 close(new_fd);
1702 }
1704 pthread_mutex_unlock(&journal_lock);
1706 if (old_fh != NULL)
1707 fclose(old_fh);
1709 if (journal_fh == NULL)
1710 {
1711 RRDD_LOG(LOG_CRIT,
1712 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1713 journal_cur, rrd_strerror(errno));
1715 RRDD_LOG(LOG_ERR,
1716 "JOURNALING DISABLED: All values will be flushed at shutdown");
1717 config_flush_at_shutdown = 1;
1718 }
1720 } /* }}} static void journal_rotate */
1722 static void journal_done(void) /* {{{ */
1723 {
1724 if (journal_cur == NULL)
1725 return;
1727 pthread_mutex_lock(&journal_lock);
1728 if (journal_fh != NULL)
1729 {
1730 fclose(journal_fh);
1731 journal_fh = NULL;
1732 }
1734 if (config_flush_at_shutdown)
1735 {
1736 RRDD_LOG(LOG_INFO, "removing journals");
1737 unlink(journal_old);
1738 unlink(journal_cur);
1739 }
1740 else
1741 {
1742 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1743 "journals will be used at next startup");
1744 }
1746 pthread_mutex_unlock(&journal_lock);
1748 } /* }}} static void journal_done */
1750 static int journal_write(char *cmd, char *args) /* {{{ */
1751 {
1752 int chars;
1754 if (journal_fh == NULL)
1755 return 0;
1757 pthread_mutex_lock(&journal_lock);
1758 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1759 pthread_mutex_unlock(&journal_lock);
1761 if (chars > 0)
1762 {
1763 pthread_mutex_lock(&stats_lock);
1764 stats_journal_bytes += chars;
1765 pthread_mutex_unlock(&stats_lock);
1766 }
1768 return chars;
1769 } /* }}} static int journal_write */
1771 static int journal_replay (const char *file) /* {{{ */
1772 {
1773 FILE *fh;
1774 int entry_cnt = 0;
1775 int fail_cnt = 0;
1776 uint64_t line = 0;
1777 char entry[CMD_MAX];
1778 time_t now;
1780 if (file == NULL) return 0;
1782 {
1783 char *reason;
1784 int status = 0;
1785 struct stat statbuf;
1787 memset(&statbuf, 0, sizeof(statbuf));
1788 if (stat(file, &statbuf) != 0)
1789 {
1790 if (errno == ENOENT)
1791 return 0;
1793 reason = "stat error";
1794 status = errno;
1795 }
1796 else if (!S_ISREG(statbuf.st_mode))
1797 {
1798 reason = "not a regular file";
1799 status = EPERM;
1800 }
1801 if (statbuf.st_uid != daemon_uid)
1802 {
1803 reason = "not owned by daemon user";
1804 status = EACCES;
1805 }
1806 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1807 {
1808 reason = "must not be user/group writable";
1809 status = EACCES;
1810 }
1812 if (status != 0)
1813 {
1814 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1815 file, rrd_strerror(status), reason);
1816 return 0;
1817 }
1818 }
1820 fh = fopen(file, "r");
1821 if (fh == NULL)
1822 {
1823 if (errno != ENOENT)
1824 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1825 file, rrd_strerror(errno));
1826 return 0;
1827 }
1828 else
1829 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1831 now = time(NULL);
1833 while(!feof(fh))
1834 {
1835 size_t entry_len;
1837 ++line;
1838 if (fgets(entry, sizeof(entry), fh) == NULL)
1839 break;
1840 entry_len = strlen(entry);
1842 /* check \n termination in case journal writing crashed mid-line */
1843 if (entry_len == 0)
1844 continue;
1845 else if (entry[entry_len - 1] != '\n')
1846 {
1847 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1848 ++fail_cnt;
1849 continue;
1850 }
1852 entry[entry_len - 1] = '\0';
1854 if (handle_request(NULL, now, entry, entry_len) == 0)
1855 ++entry_cnt;
1856 else
1857 ++fail_cnt;
1858 }
1860 fclose(fh);
1862 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1863 entry_cnt, fail_cnt);
1865 return entry_cnt > 0 ? 1 : 0;
1866 } /* }}} static int journal_replay */
1868 static void journal_init(void) /* {{{ */
1869 {
1870 int had_journal = 0;
1872 if (journal_cur == NULL) return;
1874 pthread_mutex_lock(&journal_lock);
1876 RRDD_LOG(LOG_INFO, "checking for journal files");
1878 had_journal += journal_replay(journal_old);
1879 had_journal += journal_replay(journal_cur);
1881 /* it must have been a crash. start a flush */
1882 if (had_journal && config_flush_at_shutdown)
1883 flush_old_values(-1);
1885 pthread_mutex_unlock(&journal_lock);
1886 journal_rotate();
1888 RRDD_LOG(LOG_INFO, "journal processing complete");
1890 } /* }}} static void journal_init */
1892 static void close_connection(listen_socket_t *sock)
1893 {
1894 close(sock->fd) ; sock->fd = -1;
1895 free(sock->rbuf); sock->rbuf = NULL;
1896 free(sock->wbuf); sock->wbuf = NULL;
1898 free(sock);
1899 }
1901 static void *connection_thread_main (void *args) /* {{{ */
1902 {
1903 pthread_t self;
1904 listen_socket_t *sock;
1905 int i;
1906 int fd;
1908 sock = (listen_socket_t *) args;
1909 fd = sock->fd;
1911 /* init read buffers */
1912 sock->next_read = sock->next_cmd = 0;
1913 sock->rbuf = malloc(RBUF_SIZE);
1914 if (sock->rbuf == NULL)
1915 {
1916 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1917 close_connection(sock);
1918 return NULL;
1919 }
1921 pthread_mutex_lock (&connection_threads_lock);
1922 {
1923 pthread_t *temp;
1925 temp = (pthread_t *) realloc (connection_threads,
1926 sizeof (pthread_t) * (connection_threads_num + 1));
1927 if (temp == NULL)
1928 {
1929 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1930 }
1931 else
1932 {
1933 connection_threads = temp;
1934 connection_threads[connection_threads_num] = pthread_self ();
1935 connection_threads_num++;
1936 }
1937 }
1938 pthread_mutex_unlock (&connection_threads_lock);
1940 while (do_shutdown == 0)
1941 {
1942 char *cmd;
1943 ssize_t cmd_len;
1944 ssize_t rbytes;
1945 time_t now;
1947 struct pollfd pollfd;
1948 int status;
1950 pollfd.fd = fd;
1951 pollfd.events = POLLIN | POLLPRI;
1952 pollfd.revents = 0;
1954 status = poll (&pollfd, 1, /* timeout = */ 500);
1955 if (do_shutdown)
1956 break;
1957 else if (status == 0) /* timeout */
1958 continue;
1959 else if (status < 0) /* error */
1960 {
1961 status = errno;
1962 if (status != EINTR)
1963 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1964 continue;
1965 }
1967 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1968 break;
1969 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1970 {
1971 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1972 "poll(2) returned something unexpected: %#04hx",
1973 pollfd.revents);
1974 break;
1975 }
1977 rbytes = read(fd, sock->rbuf + sock->next_read,
1978 RBUF_SIZE - sock->next_read);
1979 if (rbytes < 0)
1980 {
1981 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1982 break;
1983 }
1984 else if (rbytes == 0)
1985 break; /* eof */
1987 sock->next_read += rbytes;
1989 if (sock->batch_start)
1990 now = sock->batch_start;
1991 else
1992 now = time(NULL);
1994 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1995 {
1996 status = handle_request (sock, now, cmd, cmd_len+1);
1997 if (status != 0)
1998 goto out_close;
1999 }
2000 }
2002 out_close:
2003 close_connection(sock);
2005 self = pthread_self ();
2006 /* Remove this thread from the connection threads list */
2007 pthread_mutex_lock (&connection_threads_lock);
2008 /* Find out own index in the array */
2009 for (i = 0; i < connection_threads_num; i++)
2010 if (pthread_equal (connection_threads[i], self) != 0)
2011 break;
2012 assert (i < connection_threads_num);
2014 /* Move the trailing threads forward. */
2015 if (i < (connection_threads_num - 1))
2016 {
2017 memmove (connection_threads + i,
2018 connection_threads + i + 1,
2019 sizeof (pthread_t) * (connection_threads_num - i - 1));
2020 }
2022 connection_threads_num--;
2023 pthread_mutex_unlock (&connection_threads_lock);
2025 return (NULL);
2026 } /* }}} void *connection_thread_main */
2028 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2029 {
2030 int fd;
2031 struct sockaddr_un sa;
2032 listen_socket_t *temp;
2033 int status;
2034 const char *path;
2036 path = sock->addr;
2037 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2038 path += strlen("unix:");
2040 temp = (listen_socket_t *) realloc (listen_fds,
2041 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2042 if (temp == NULL)
2043 {
2044 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2045 return (-1);
2046 }
2047 listen_fds = temp;
2048 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2050 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2051 if (fd < 0)
2052 {
2053 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2054 rrd_strerror(errno));
2055 return (-1);
2056 }
2058 memset (&sa, 0, sizeof (sa));
2059 sa.sun_family = AF_UNIX;
2060 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2062 /* if we've gotten this far, we own the pid file. any daemon started
2063 * with the same args must not be alive. therefore, ensure that we can
2064 * create the socket...
2065 */
2066 unlink(path);
2068 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2069 if (status != 0)
2070 {
2071 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2072 path, rrd_strerror(errno));
2073 close (fd);
2074 return (-1);
2075 }
2077 status = listen (fd, /* backlog = */ 10);
2078 if (status != 0)
2079 {
2080 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2081 path, rrd_strerror(errno));
2082 close (fd);
2083 unlink (path);
2084 return (-1);
2085 }
2087 listen_fds[listen_fds_num].fd = fd;
2088 listen_fds[listen_fds_num].family = PF_UNIX;
2089 strncpy(listen_fds[listen_fds_num].addr, path,
2090 sizeof (listen_fds[listen_fds_num].addr) - 1);
2091 listen_fds_num++;
2093 return (0);
2094 } /* }}} int open_listen_socket_unix */
2096 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2097 {
2098 struct addrinfo ai_hints;
2099 struct addrinfo *ai_res;
2100 struct addrinfo *ai_ptr;
2101 char addr_copy[NI_MAXHOST];
2102 char *addr;
2103 char *port;
2104 int status;
2106 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2107 addr_copy[sizeof (addr_copy) - 1] = 0;
2108 addr = addr_copy;
2110 memset (&ai_hints, 0, sizeof (ai_hints));
2111 ai_hints.ai_flags = 0;
2112 #ifdef AI_ADDRCONFIG
2113 ai_hints.ai_flags |= AI_ADDRCONFIG;
2114 #endif
2115 ai_hints.ai_family = AF_UNSPEC;
2116 ai_hints.ai_socktype = SOCK_STREAM;
2118 port = NULL;
2119 if (*addr == '[') /* IPv6+port format */
2120 {
2121 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2122 addr++;
2124 port = strchr (addr, ']');
2125 if (port == NULL)
2126 {
2127 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2128 return (-1);
2129 }
2130 *port = 0;
2131 port++;
2133 if (*port == ':')
2134 port++;
2135 else if (*port == 0)
2136 port = NULL;
2137 else
2138 {
2139 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2140 return (-1);
2141 }
2142 } /* if (*addr = ']') */
2143 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2144 {
2145 port = rindex(addr, ':');
2146 if (port != NULL)
2147 {
2148 *port = 0;
2149 port++;
2150 }
2151 }
2152 ai_res = NULL;
2153 status = getaddrinfo (addr,
2154 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2155 &ai_hints, &ai_res);
2156 if (status != 0)
2157 {
2158 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2159 addr, gai_strerror (status));
2160 return (-1);
2161 }
2163 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2164 {
2165 int fd;
2166 listen_socket_t *temp;
2167 int one = 1;
2169 temp = (listen_socket_t *) realloc (listen_fds,
2170 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2171 if (temp == NULL)
2172 {
2173 fprintf (stderr,
2174 "rrdcached: open_listen_socket_network: realloc failed.\n");
2175 continue;
2176 }
2177 listen_fds = temp;
2178 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2180 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2181 if (fd < 0)
2182 {
2183 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2184 rrd_strerror(errno));
2185 continue;
2186 }
2188 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2190 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2191 if (status != 0)
2192 {
2193 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2194 sock->addr, rrd_strerror(errno));
2195 close (fd);
2196 continue;
2197 }
2199 status = listen (fd, /* backlog = */ 10);
2200 if (status != 0)
2201 {
2202 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2203 sock->addr, rrd_strerror(errno));
2204 close (fd);
2205 return (-1);
2206 }
2208 listen_fds[listen_fds_num].fd = fd;
2209 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2210 listen_fds_num++;
2211 } /* for (ai_ptr) */
2213 return (0);
2214 } /* }}} static int open_listen_socket_network */
2216 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2217 {
2218 assert(sock != NULL);
2219 assert(sock->addr != NULL);
2221 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2222 || sock->addr[0] == '/')
2223 return (open_listen_socket_unix(sock));
2224 else
2225 return (open_listen_socket_network(sock));
2226 } /* }}} int open_listen_socket */
2228 static int close_listen_sockets (void) /* {{{ */
2229 {
2230 size_t i;
2232 for (i = 0; i < listen_fds_num; i++)
2233 {
2234 close (listen_fds[i].fd);
2236 if (listen_fds[i].family == PF_UNIX)
2237 unlink(listen_fds[i].addr);
2238 }
2240 free (listen_fds);
2241 listen_fds = NULL;
2242 listen_fds_num = 0;
2244 return (0);
2245 } /* }}} int close_listen_sockets */
2247 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2248 {
2249 struct pollfd *pollfds;
2250 int pollfds_num;
2251 int status;
2252 int i;
2254 if (listen_fds_num < 1)
2255 {
2256 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2257 return (NULL);
2258 }
2260 pollfds_num = listen_fds_num;
2261 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2262 if (pollfds == NULL)
2263 {
2264 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2265 return (NULL);
2266 }
2267 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2269 RRDD_LOG(LOG_INFO, "listening for connections");
2271 while (do_shutdown == 0)
2272 {
2273 assert (pollfds_num == ((int) listen_fds_num));
2274 for (i = 0; i < pollfds_num; i++)
2275 {
2276 pollfds[i].fd = listen_fds[i].fd;
2277 pollfds[i].events = POLLIN | POLLPRI;
2278 pollfds[i].revents = 0;
2279 }
2281 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2282 if (do_shutdown)
2283 break;
2284 else if (status == 0) /* timeout */
2285 continue;
2286 else if (status < 0) /* error */
2287 {
2288 status = errno;
2289 if (status != EINTR)
2290 {
2291 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2292 }
2293 continue;
2294 }
2296 for (i = 0; i < pollfds_num; i++)
2297 {
2298 listen_socket_t *client_sock;
2299 struct sockaddr_storage client_sa;
2300 socklen_t client_sa_size;
2301 pthread_t tid;
2302 pthread_attr_t attr;
2304 if (pollfds[i].revents == 0)
2305 continue;
2307 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2308 {
2309 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2310 "poll(2) returned something unexpected for listen FD #%i.",
2311 pollfds[i].fd);
2312 continue;
2313 }
2315 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2316 if (client_sock == NULL)
2317 {
2318 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2319 continue;
2320 }
2321 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2323 client_sa_size = sizeof (client_sa);
2324 client_sock->fd = accept (pollfds[i].fd,
2325 (struct sockaddr *) &client_sa, &client_sa_size);
2326 if (client_sock->fd < 0)
2327 {
2328 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2329 free(client_sock);
2330 continue;
2331 }
2333 pthread_attr_init (&attr);
2334 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2336 status = pthread_create (&tid, &attr, connection_thread_main,
2337 client_sock);
2338 if (status != 0)
2339 {
2340 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2341 close_connection(client_sock);
2342 continue;
2343 }
2344 } /* for (pollfds_num) */
2345 } /* while (do_shutdown == 0) */
2347 RRDD_LOG(LOG_INFO, "starting shutdown");
2349 close_listen_sockets ();
2351 pthread_mutex_lock (&connection_threads_lock);
2352 while (connection_threads_num > 0)
2353 {
2354 pthread_t wait_for;
2356 wait_for = connection_threads[0];
2358 pthread_mutex_unlock (&connection_threads_lock);
2359 pthread_join (wait_for, /* retval = */ NULL);
2360 pthread_mutex_lock (&connection_threads_lock);
2361 }
2362 pthread_mutex_unlock (&connection_threads_lock);
2364 return (NULL);
2365 } /* }}} void *listen_thread_main */
2367 static int daemonize (void) /* {{{ */
2368 {
2369 int pid_fd;
2370 char *base_dir;
2372 daemon_uid = geteuid();
2374 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2375 if (pid_fd < 0)
2376 pid_fd = check_pidfile();
2377 if (pid_fd < 0)
2378 return pid_fd;
2380 /* open all the listen sockets */
2381 if (config_listen_address_list_len > 0)
2382 {
2383 for (int i = 0; i < config_listen_address_list_len; i++)
2384 open_listen_socket (config_listen_address_list[i]);
2385 }
2386 else
2387 {
2388 listen_socket_t sock;
2389 memset(&sock, 0, sizeof(sock));
2390 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2391 open_listen_socket (&sock);
2392 }
2394 if (listen_fds_num < 1)
2395 {
2396 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2397 goto error;
2398 }
2400 if (!stay_foreground)
2401 {
2402 pid_t child;
2404 child = fork ();
2405 if (child < 0)
2406 {
2407 fprintf (stderr, "daemonize: fork(2) failed.\n");
2408 goto error;
2409 }
2410 else if (child > 0)
2411 exit(0);
2413 /* Become session leader */
2414 setsid ();
2416 /* Open the first three file descriptors to /dev/null */
2417 close (2);
2418 close (1);
2419 close (0);
2421 open ("/dev/null", O_RDWR);
2422 dup (0);
2423 dup (0);
2424 } /* if (!stay_foreground) */
2426 /* Change into the /tmp directory. */
2427 base_dir = (config_base_dir != NULL)
2428 ? config_base_dir
2429 : "/tmp";
2431 if (chdir (base_dir) != 0)
2432 {
2433 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2434 goto error;
2435 }
2437 install_signal_handlers();
2439 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2440 RRDD_LOG(LOG_INFO, "starting up");
2442 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2443 if (cache_tree == NULL)
2444 {
2445 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2446 goto error;
2447 }
2449 return write_pidfile (pid_fd);
2451 error:
2452 remove_pidfile();
2453 return -1;
2454 } /* }}} int daemonize */
2456 static int cleanup (void) /* {{{ */
2457 {
2458 do_shutdown++;
2460 pthread_cond_signal (&cache_cond);
2461 pthread_join (queue_thread, /* return = */ NULL);
2463 remove_pidfile ();
2465 RRDD_LOG(LOG_INFO, "goodbye");
2466 closelog ();
2468 return (0);
2469 } /* }}} int cleanup */
2471 static int read_options (int argc, char **argv) /* {{{ */
2472 {
2473 int option;
2474 int status = 0;
2476 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2477 {
2478 switch (option)
2479 {
2480 case 'g':
2481 stay_foreground=1;
2482 break;
2484 case 'L':
2485 case 'l':
2486 {
2487 listen_socket_t **temp;
2488 listen_socket_t *new;
2490 new = malloc(sizeof(listen_socket_t));
2491 if (new == NULL)
2492 {
2493 fprintf(stderr, "read_options: malloc failed.\n");
2494 return(2);
2495 }
2496 memset(new, 0, sizeof(listen_socket_t));
2498 temp = (listen_socket_t **) realloc (config_listen_address_list,
2499 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2500 if (temp == NULL)
2501 {
2502 fprintf (stderr, "read_options: realloc failed.\n");
2503 return (2);
2504 }
2505 config_listen_address_list = temp;
2507 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2508 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2510 temp[config_listen_address_list_len] = new;
2511 config_listen_address_list_len++;
2512 }
2513 break;
2515 case 'f':
2516 {
2517 int temp;
2519 temp = atoi (optarg);
2520 if (temp > 0)
2521 config_flush_interval = temp;
2522 else
2523 {
2524 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2525 status = 3;
2526 }
2527 }
2528 break;
2530 case 'w':
2531 {
2532 int temp;
2534 temp = atoi (optarg);
2535 if (temp > 0)
2536 config_write_interval = temp;
2537 else
2538 {
2539 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2540 status = 2;
2541 }
2542 }
2543 break;
2545 case 'z':
2546 {
2547 int temp;
2549 temp = atoi(optarg);
2550 if (temp > 0)
2551 config_write_jitter = temp;
2552 else
2553 {
2554 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2555 status = 2;
2556 }
2558 break;
2559 }
2561 case 'B':
2562 config_write_base_only = 1;
2563 break;
2565 case 'b':
2566 {
2567 size_t len;
2568 char base_realpath[PATH_MAX];
2570 if (config_base_dir != NULL)
2571 free (config_base_dir);
2572 config_base_dir = strdup (optarg);
2573 if (config_base_dir == NULL)
2574 {
2575 fprintf (stderr, "read_options: strdup failed.\n");
2576 return (3);
2577 }
2579 /* make sure that the base directory is not resolved via
2580 * symbolic links. this makes some performance-enhancing
2581 * assumptions possible (we don't have to resolve paths
2582 * that start with a "/")
2583 */
2584 if (realpath(config_base_dir, base_realpath) == NULL)
2585 {
2586 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2587 return 5;
2588 }
2589 else if (strncmp(config_base_dir,
2590 base_realpath, sizeof(base_realpath)) != 0)
2591 {
2592 fprintf(stderr,
2593 "Base directory (-b) resolved via file system links!\n"
2594 "Please consult rrdcached '-b' documentation!\n"
2595 "Consider specifying the real directory (%s)\n",
2596 base_realpath);
2597 return 5;
2598 }
2600 len = strlen (config_base_dir);
2601 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2602 {
2603 config_base_dir[len - 1] = 0;
2604 len--;
2605 }
2607 if (len < 1)
2608 {
2609 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2610 return (4);
2611 }
2613 _config_base_dir_len = len;
2614 }
2615 break;
2617 case 'p':
2618 {
2619 if (config_pid_file != NULL)
2620 free (config_pid_file);
2621 config_pid_file = strdup (optarg);
2622 if (config_pid_file == NULL)
2623 {
2624 fprintf (stderr, "read_options: strdup failed.\n");
2625 return (3);
2626 }
2627 }
2628 break;
2630 case 'F':
2631 config_flush_at_shutdown = 1;
2632 break;
2634 case 'j':
2635 {
2636 struct stat statbuf;
2637 const char *dir = optarg;
2639 status = stat(dir, &statbuf);
2640 if (status != 0)
2641 {
2642 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2643 return 6;
2644 }
2646 if (!S_ISDIR(statbuf.st_mode)
2647 || access(dir, R_OK|W_OK|X_OK) != 0)
2648 {
2649 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2650 errno ? rrd_strerror(errno) : "");
2651 return 6;
2652 }
2654 journal_cur = malloc(PATH_MAX + 1);
2655 journal_old = malloc(PATH_MAX + 1);
2656 if (journal_cur == NULL || journal_old == NULL)
2657 {
2658 fprintf(stderr, "malloc failure for journal files\n");
2659 return 6;
2660 }
2661 else
2662 {
2663 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2664 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2665 }
2666 }
2667 break;
2669 case 'h':
2670 case '?':
2671 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2672 "\n"
2673 "Usage: rrdcached [options]\n"
2674 "\n"
2675 "Valid options are:\n"
2676 " -l <address> Socket address to listen to.\n"
2677 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2678 " -w <seconds> Interval in which to write data.\n"
2679 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2680 " -f <seconds> Interval in which to flush dead data.\n"
2681 " -p <file> Location of the PID-file.\n"
2682 " -b <dir> Base directory to change to.\n"
2683 " -B Restrict file access to paths within -b <dir>\n"
2684 " -g Do not fork and run in the foreground.\n"
2685 " -j <dir> Directory in which to create the journal files.\n"
2686 " -F Always flush all updates at shutdown\n"
2687 "\n"
2688 "For more information and a detailed description of all options "
2689 "please refer\n"
2690 "to the rrdcached(1) manual page.\n",
2691 VERSION);
2692 status = -1;
2693 break;
2694 } /* switch (option) */
2695 } /* while (getopt) */
2697 /* advise the user when values are not sane */
2698 if (config_flush_interval < 2 * config_write_interval)
2699 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2700 " 2x write interval (-w) !\n");
2701 if (config_write_jitter > config_write_interval)
2702 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2703 " write interval (-w) !\n");
2705 if (config_write_base_only && config_base_dir == NULL)
2706 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2707 " Consult the rrdcached documentation\n");
2709 if (journal_cur == NULL)
2710 config_flush_at_shutdown = 1;
2712 return (status);
2713 } /* }}} int read_options */
2715 int main (int argc, char **argv)
2716 {
2717 int status;
2719 status = read_options (argc, argv);
2720 if (status != 0)
2721 {
2722 if (status < 0)
2723 status = 0;
2724 return (status);
2725 }
2727 status = daemonize ();
2728 if (status != 0)
2729 {
2730 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2731 return (1);
2732 }
2734 journal_init();
2736 /* start the queue thread */
2737 memset (&queue_thread, 0, sizeof (queue_thread));
2738 status = pthread_create (&queue_thread,
2739 NULL, /* attr */
2740 queue_thread_main,
2741 NULL); /* args */
2742 if (status != 0)
2743 {
2744 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2745 cleanup();
2746 return (1);
2747 }
2749 listen_thread_main (NULL);
2750 cleanup ();
2752 return (0);
2753 } /* int main */
2755 /*
2756 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2757 */