1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111 * Types
112 */
113 typedef enum
114 {
115 PRIV_LOW,
116 PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
122 {
123 int fd;
124 char addr[PATH_MAX + 1];
125 int family;
126 socket_privilege privilege;
128 /* state for BATCH processing */
129 time_t batch_start;
130 int batch_cmd;
132 /* buffered IO */
133 char *rbuf;
134 off_t next_cmd;
135 off_t next_read;
137 char *wbuf;
138 ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146 char *file;
147 char **values;
148 int values_num;
149 time_t last_flush_time;
150 time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153 int flags;
154 pthread_cond_t flushed;
155 cache_item_t *prev;
156 cache_item_t *next;
157 };
159 struct callback_flush_data_s
160 {
161 time_t now;
162 time_t abs_timeout;
163 char **keys;
164 size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
169 {
170 HEAD,
171 TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180 * Variables
181 */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree *cache_tree = NULL;
198 static cache_item_t *cache_queue_head = NULL;
199 static cache_item_t *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /*
234 * Functions
235 */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239 do_shutdown++;
240 pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250 sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255 config_flush_at_shutdown = 1;
256 sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261 config_flush_at_shutdown = 0;
262 sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
266 {
267 /* These structures are static, because `sigaction' behaves weird if the are
268 * overwritten.. */
269 static struct sigaction sa_int;
270 static struct sigaction sa_term;
271 static struct sigaction sa_pipe;
272 static struct sigaction sa_usr1;
273 static struct sigaction sa_usr2;
275 /* Install signal handlers */
276 memset (&sa_int, 0, sizeof (sa_int));
277 sa_int.sa_handler = sig_int_handler;
278 sigaction (SIGINT, &sa_int, NULL);
280 memset (&sa_term, 0, sizeof (sa_term));
281 sa_term.sa_handler = sig_term_handler;
282 sigaction (SIGTERM, &sa_term, NULL);
284 memset (&sa_pipe, 0, sizeof (sa_pipe));
285 sa_pipe.sa_handler = SIG_IGN;
286 sigaction (SIGPIPE, &sa_pipe, NULL);
288 memset (&sa_pipe, 0, sizeof (sa_usr1));
289 sa_usr1.sa_handler = sig_usr1_handler;
290 sigaction (SIGUSR1, &sa_usr1, NULL);
292 memset (&sa_usr2, 0, sizeof (sa_usr2));
293 sa_usr2.sa_handler = sig_usr2_handler;
294 sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300 int fd;
301 char *file;
303 file = (config_pid_file != NULL)
304 ? config_pid_file
305 : LOCALSTATEDIR "/run/rrdcached.pid";
307 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308 if (fd < 0)
309 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310 action, file, rrd_strerror(errno));
312 return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318 int pid_fd;
319 pid_t pid;
320 char pid_str[16];
322 pid_fd = open_pidfile("open", O_RDWR);
323 if (pid_fd < 0)
324 return pid_fd;
326 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327 return -1;
329 pid = atoi(pid_str);
330 if (pid <= 0)
331 return -1;
333 /* another running process that we can signal COULD be
334 * a competing rrdcached */
335 if (pid != getpid() && kill(pid, 0) == 0)
336 {
337 fprintf(stderr,
338 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339 close(pid_fd);
340 return -1;
341 }
343 lseek(pid_fd, 0, SEEK_SET);
344 ftruncate(pid_fd, 0);
346 fprintf(stderr,
347 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348 "rrdcached: starting normally.\n", pid);
350 return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
354 {
355 pid_t pid;
356 FILE *fh;
358 pid = getpid ();
360 fh = fdopen (fd, "w");
361 if (fh == NULL)
362 {
363 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364 close(fd);
365 return (-1);
366 }
368 fprintf (fh, "%i\n", (int) pid);
369 fclose (fh);
371 return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
375 {
376 char *file;
377 int status;
379 file = (config_pid_file != NULL)
380 ? config_pid_file
381 : LOCALSTATEDIR "/run/rrdcached.pid";
383 status = unlink (file);
384 if (status == 0)
385 return (0);
386 return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391 char *eol;
393 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394 sock->next_read - sock->next_cmd);
396 if (eol == NULL)
397 {
398 /* no commands left, move remainder back to front of rbuf */
399 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400 sock->next_read - sock->next_cmd);
401 sock->next_read -= sock->next_cmd;
402 sock->next_cmd = 0;
403 *len = 0;
404 return NULL;
405 }
406 else
407 {
408 char *cmd = sock->rbuf + sock->next_cmd;
409 *eol = '\0';
411 sock->next_cmd = eol - sock->rbuf + 1;
413 if (eol > sock->rbuf && *(eol-1) == '\r')
414 *(--eol) = '\0'; /* handle "\r\n" EOL */
416 *len = eol - cmd;
418 return cmd;
419 }
421 /* NOTREACHED */
422 assert(1==0);
423 }
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428 char *new_buf;
430 assert(sock != NULL);
432 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433 if (new_buf == NULL)
434 {
435 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436 return -1;
437 }
439 strncpy(new_buf + sock->wbuf_len, str, len + 1);
441 sock->wbuf = new_buf;
442 sock->wbuf_len += len;
444 return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450 va_list argp;
451 char buffer[CMD_MAX];
452 int len;
454 if (sock == NULL) return 0; /* journal replay mode */
455 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457 va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461 len = vsprintf(buffer, fmt, argp);
462 #endif
463 va_end(argp);
464 if (len < 0)
465 {
466 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467 return -1;
468 }
470 return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
474 {
475 int lines = 0;
477 if (str != NULL)
478 {
479 while ((str = strchr(str, '\n')) != NULL)
480 {
481 ++lines;
482 ++str;
483 }
484 }
486 return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490 * returns 0 on success, -1 on error
491 * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493 char *fmt, ...) /* {{{ */
494 {
495 va_list argp;
496 char buffer[CMD_MAX];
497 int lines;
498 ssize_t wrote;
499 int rclen, len;
501 if (sock == NULL) return rc; /* journal replay mode */
503 if (sock->batch_start)
504 {
505 if (rc == RESP_OK)
506 return rc; /* no response on success during BATCH */
507 lines = sock->batch_cmd;
508 }
509 else if (rc == RESP_OK)
510 lines = count_lines(sock->wbuf);
511 else
512 lines = -1;
514 rclen = sprintf(buffer, "%d ", lines);
515 va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519 len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521 va_end(argp);
522 if (len < 0)
523 return -1;
525 len += rclen;
527 /* append the result to the wbuf, don't write to the user */
528 if (sock->batch_start)
529 return add_to_wbuf(sock, buffer, len);
531 /* first write must be complete */
532 if (len != write(sock->fd, buffer, len))
533 {
534 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535 return -1;
536 }
538 if (sock->wbuf != NULL && rc == RESP_OK)
539 {
540 wrote = 0;
541 while (wrote < sock->wbuf_len)
542 {
543 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544 if (wb <= 0)
545 {
546 RRDD_LOG(LOG_INFO, "send_response: could not write results");
547 return -1;
548 }
549 wrote += wb;
550 }
551 }
553 free(sock->wbuf); sock->wbuf = NULL;
554 sock->wbuf_len = 0;
556 return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561 ci->values = NULL;
562 ci->values_num = 0;
564 ci->last_flush_time = when;
565 if (config_write_jitter > 0)
566 ci->last_flush_time += (random() % config_write_jitter);
567 }
569 /* remove_from_queue
570 * remove a "cache_item_t" item from the queue.
571 * must hold 'cache_lock' when calling this
572 */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575 if (ci == NULL) return;
576 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578 if (ci->prev == NULL)
579 cache_queue_head = ci->next; /* reset head */
580 else
581 ci->prev->next = ci->next;
583 if (ci->next == NULL)
584 cache_queue_tail = ci->prev; /* reset the tail */
585 else
586 ci->next->prev = ci->prev;
588 ci->next = ci->prev = NULL;
589 ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
592 /* free the resources associated with the cache_item_t
593 * must hold cache_lock when calling this function
594 */
595 static void *free_cache_item(cache_item_t *ci) /* {{{ */
596 {
597 if (ci == NULL) return NULL;
599 remove_from_queue(ci);
601 for (int i=0; i < ci->values_num; i++)
602 free(ci->values[i]);
604 free (ci->values);
605 free (ci->file);
607 /* in case anyone is waiting */
608 pthread_cond_broadcast(&ci->flushed);
610 free (ci);
612 return NULL;
613 } /* }}} static void *free_cache_item */
615 /*
616 * enqueue_cache_item:
617 * `cache_lock' must be acquired before calling this function!
618 */
619 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
620 queue_side_t side)
621 {
622 if (ci == NULL)
623 return (-1);
625 if (ci->values_num == 0)
626 return (0);
628 if (side == HEAD)
629 {
630 if (cache_queue_head == ci)
631 return 0;
633 /* remove if further down in queue */
634 remove_from_queue(ci);
636 ci->prev = NULL;
637 ci->next = cache_queue_head;
638 if (ci->next != NULL)
639 ci->next->prev = ci;
640 cache_queue_head = ci;
642 if (cache_queue_tail == NULL)
643 cache_queue_tail = cache_queue_head;
644 }
645 else /* (side == TAIL) */
646 {
647 /* We don't move values back in the list.. */
648 if (ci->flags & CI_FLAGS_IN_QUEUE)
649 return (0);
651 assert (ci->next == NULL);
652 assert (ci->prev == NULL);
654 ci->prev = cache_queue_tail;
656 if (cache_queue_tail == NULL)
657 cache_queue_head = ci;
658 else
659 cache_queue_tail->next = ci;
661 cache_queue_tail = ci;
662 }
664 ci->flags |= CI_FLAGS_IN_QUEUE;
666 pthread_cond_broadcast(&cache_cond);
667 pthread_mutex_lock (&stats_lock);
668 stats_queue_length++;
669 pthread_mutex_unlock (&stats_lock);
671 return (0);
672 } /* }}} int enqueue_cache_item */
674 /*
675 * tree_callback_flush:
676 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
677 * while this is in progress.
678 */
679 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
680 gpointer data)
681 {
682 cache_item_t *ci;
683 callback_flush_data_t *cfd;
685 ci = (cache_item_t *) value;
686 cfd = (callback_flush_data_t *) data;
688 if (ci->flags & CI_FLAGS_IN_QUEUE)
689 return FALSE;
691 if ((ci->last_flush_time <= cfd->abs_timeout)
692 && (ci->values_num > 0))
693 {
694 enqueue_cache_item (ci, TAIL);
695 }
696 else if ((do_shutdown != 0)
697 && (ci->values_num > 0))
698 {
699 enqueue_cache_item (ci, TAIL);
700 }
701 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
702 && (ci->values_num <= 0))
703 {
704 char **temp;
706 temp = (char **) realloc (cfd->keys,
707 sizeof (char *) * (cfd->keys_num + 1));
708 if (temp == NULL)
709 {
710 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
711 return (FALSE);
712 }
713 cfd->keys = temp;
714 /* Make really sure this points to the _same_ place */
715 assert ((char *) key == ci->file);
716 cfd->keys[cfd->keys_num] = (char *) key;
717 cfd->keys_num++;
718 }
720 return (FALSE);
721 } /* }}} gboolean tree_callback_flush */
723 static int flush_old_values (int max_age)
724 {
725 callback_flush_data_t cfd;
726 size_t k;
728 memset (&cfd, 0, sizeof (cfd));
729 /* Pass the current time as user data so that we don't need to call
730 * `time' for each node. */
731 cfd.now = time (NULL);
732 cfd.keys = NULL;
733 cfd.keys_num = 0;
735 if (max_age > 0)
736 cfd.abs_timeout = cfd.now - max_age;
737 else
738 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
740 /* `tree_callback_flush' will return the keys of all values that haven't
741 * been touched in the last `config_flush_interval' seconds in `cfd'.
742 * The char*'s in this array point to the same memory as ci->file, so we
743 * don't need to free them separately. */
744 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
746 for (k = 0; k < cfd.keys_num; k++)
747 {
748 /* should never fail, since we have held the cache_lock
749 * the entire time */
750 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
751 }
753 if (cfd.keys != NULL)
754 {
755 free (cfd.keys);
756 cfd.keys = NULL;
757 }
759 return (0);
760 } /* int flush_old_values */
762 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
763 {
764 struct timeval now;
765 struct timespec next_flush;
766 int final_flush = 0; /* make sure we only flush once on shutdown */
768 gettimeofday (&now, NULL);
769 next_flush.tv_sec = now.tv_sec + config_flush_interval;
770 next_flush.tv_nsec = 1000 * now.tv_usec;
772 pthread_mutex_lock (&cache_lock);
773 while ((do_shutdown == 0) || (cache_queue_head != NULL))
774 {
775 cache_item_t *ci;
776 char *file;
777 char **values;
778 int values_num;
779 int status;
780 int i;
782 /* First, check if it's time to do the cache flush. */
783 gettimeofday (&now, NULL);
784 if ((now.tv_sec > next_flush.tv_sec)
785 || ((now.tv_sec == next_flush.tv_sec)
786 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
787 {
788 /* Flush all values that haven't been written in the last
789 * `config_write_interval' seconds. */
790 flush_old_values (config_write_interval);
792 /* Determine the time of the next cache flush. */
793 next_flush.tv_sec =
794 now.tv_sec + next_flush.tv_sec % config_flush_interval;
796 /* unlock the cache while we rotate so we don't block incoming
797 * updates if the fsync() blocks on disk I/O */
798 pthread_mutex_unlock(&cache_lock);
799 journal_rotate();
800 pthread_mutex_lock(&cache_lock);
801 }
803 /* Now, check if there's something to store away. If not, wait until
804 * something comes in or it's time to do the cache flush. if we are
805 * shutting down, do not wait around. */
806 if (cache_queue_head == NULL && !do_shutdown)
807 {
808 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
809 if ((status != 0) && (status != ETIMEDOUT))
810 {
811 RRDD_LOG (LOG_ERR, "queue_thread_main: "
812 "pthread_cond_timedwait returned %i.", status);
813 }
814 }
816 /* We're about to shut down */
817 if (do_shutdown != 0 && !final_flush++)
818 {
819 if (config_flush_at_shutdown)
820 flush_old_values (-1); /* flush everything */
821 else
822 break;
823 }
825 /* Check if a value has arrived. This may be NULL if we timed out or there
826 * was an interrupt such as a signal. */
827 if (cache_queue_head == NULL)
828 continue;
830 ci = cache_queue_head;
832 /* copy the relevant parts */
833 file = strdup (ci->file);
834 if (file == NULL)
835 {
836 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
837 continue;
838 }
840 assert(ci->values != NULL);
841 assert(ci->values_num > 0);
843 values = ci->values;
844 values_num = ci->values_num;
846 wipe_ci_values(ci, time(NULL));
847 remove_from_queue(ci);
849 pthread_mutex_lock (&stats_lock);
850 assert (stats_queue_length > 0);
851 stats_queue_length--;
852 pthread_mutex_unlock (&stats_lock);
854 pthread_mutex_unlock (&cache_lock);
856 rrd_clear_error ();
857 status = rrd_update_r (file, NULL, values_num, (void *) values);
858 if (status != 0)
859 {
860 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
861 "rrd_update_r (%s) failed with status %i. (%s)",
862 file, status, rrd_get_error());
863 }
865 journal_write("wrote", file);
866 pthread_cond_broadcast(&ci->flushed);
868 for (i = 0; i < values_num; i++)
869 free (values[i]);
871 free(values);
872 free(file);
874 if (status == 0)
875 {
876 pthread_mutex_lock (&stats_lock);
877 stats_updates_written++;
878 stats_data_sets_written += values_num;
879 pthread_mutex_unlock (&stats_lock);
880 }
882 pthread_mutex_lock (&cache_lock);
884 /* We're about to shut down */
885 if (do_shutdown != 0 && !final_flush++)
886 {
887 if (config_flush_at_shutdown)
888 flush_old_values (-1); /* flush everything */
889 else
890 break;
891 }
892 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
893 pthread_mutex_unlock (&cache_lock);
895 if (config_flush_at_shutdown)
896 {
897 assert(cache_queue_head == NULL);
898 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
899 }
901 journal_done();
903 return (NULL);
904 } /* }}} void *queue_thread_main */
906 static int buffer_get_field (char **buffer_ret, /* {{{ */
907 size_t *buffer_size_ret, char **field_ret)
908 {
909 char *buffer;
910 size_t buffer_pos;
911 size_t buffer_size;
912 char *field;
913 size_t field_size;
914 int status;
916 buffer = *buffer_ret;
917 buffer_pos = 0;
918 buffer_size = *buffer_size_ret;
919 field = *buffer_ret;
920 field_size = 0;
922 if (buffer_size <= 0)
923 return (-1);
925 /* This is ensured by `handle_request'. */
926 assert (buffer[buffer_size - 1] == '\0');
928 status = -1;
929 while (buffer_pos < buffer_size)
930 {
931 /* Check for end-of-field or end-of-buffer */
932 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
933 {
934 field[field_size] = 0;
935 field_size++;
936 buffer_pos++;
937 status = 0;
938 break;
939 }
940 /* Handle escaped characters. */
941 else if (buffer[buffer_pos] == '\\')
942 {
943 if (buffer_pos >= (buffer_size - 1))
944 break;
945 buffer_pos++;
946 field[field_size] = buffer[buffer_pos];
947 field_size++;
948 buffer_pos++;
949 }
950 /* Normal operation */
951 else
952 {
953 field[field_size] = buffer[buffer_pos];
954 field_size++;
955 buffer_pos++;
956 }
957 } /* while (buffer_pos < buffer_size) */
959 if (status != 0)
960 return (status);
962 *buffer_ret = buffer + buffer_pos;
963 *buffer_size_ret = buffer_size - buffer_pos;
964 *field_ret = field;
966 return (0);
967 } /* }}} int buffer_get_field */
969 /* if we're restricting writes to the base directory,
970 * check whether the file falls within the dir
971 * returns 1 if OK, otherwise 0
972 */
973 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
974 {
975 assert(file != NULL);
977 if (!config_write_base_only
978 || sock == NULL /* journal replay */
979 || config_base_dir == NULL)
980 return 1;
982 if (strstr(file, "../") != NULL) goto err;
984 /* relative paths without "../" are ok */
985 if (*file != '/') return 1;
987 /* file must be of the format base + "/" + <1+ char filename> */
988 if (strlen(file) < _config_base_dir_len + 2) goto err;
989 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
990 if (*(file + _config_base_dir_len) != '/') goto err;
992 return 1;
994 err:
995 if (sock != NULL && sock->fd >= 0)
996 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
998 return 0;
999 } /* }}} static int check_file_access */
1001 /* when using a base dir, convert relative paths to absolute paths.
1002 * if necessary, modifies the "filename" pointer to point
1003 * to the new path created in "tmp". "tmp" is provided
1004 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1005 *
1006 * this allows us to optimize for the expected case (absolute path)
1007 * with a no-op.
1008 */
1009 static void get_abs_path(char **filename, char *tmp)
1010 {
1011 assert(tmp != NULL);
1012 assert(filename != NULL && *filename != NULL);
1014 if (config_base_dir == NULL || **filename == '/')
1015 return;
1017 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1018 *filename = tmp;
1019 } /* }}} static int get_abs_path */
1021 /* returns 1 if we have the required privilege level,
1022 * otherwise issue an error to the user on sock */
1023 static int has_privilege (listen_socket_t *sock, /* {{{ */
1024 socket_privilege priv)
1025 {
1026 if (sock == NULL) /* journal replay */
1027 return 1;
1029 if (sock->privilege >= priv)
1030 return 1;
1032 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033 } /* }}} static int has_privilege */
1035 static int flush_file (const char *filename) /* {{{ */
1036 {
1037 cache_item_t *ci;
1039 pthread_mutex_lock (&cache_lock);
1041 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1042 if (ci == NULL)
1043 {
1044 pthread_mutex_unlock (&cache_lock);
1045 return (ENOENT);
1046 }
1048 if (ci->values_num > 0)
1049 {
1050 /* Enqueue at head */
1051 enqueue_cache_item (ci, HEAD);
1052 pthread_cond_wait(&ci->flushed, &cache_lock);
1053 }
1055 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1056 * may have been purged during our cond_wait() */
1058 pthread_mutex_unlock(&cache_lock);
1060 return (0);
1061 } /* }}} int flush_file */
1063 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1064 char *buffer, size_t buffer_size)
1065 {
1066 int status;
1067 char **help_text;
1068 char *command;
1070 char *help_help[2] =
1071 {
1072 "Command overview\n"
1073 ,
1074 "HELP [<command>]\n"
1075 "FLUSH <filename>\n"
1076 "FLUSHALL\n"
1077 "PENDING <filename>\n"
1078 "FORGET <filename>\n"
1079 "UPDATE <filename> <values> [<values> ...]\n"
1080 "BATCH\n"
1081 "STATS\n"
1082 };
1084 char *help_flush[2] =
1085 {
1086 "Help for FLUSH\n"
1087 ,
1088 "Usage: FLUSH <filename>\n"
1089 "\n"
1090 "Adds the given filename to the head of the update queue and returns\n"
1091 "after is has been dequeued.\n"
1092 };
1094 char *help_flushall[2] =
1095 {
1096 "Help for FLUSHALL\n"
1097 ,
1098 "Usage: FLUSHALL\n"
1099 "\n"
1100 "Triggers writing of all pending updates. Returns immediately.\n"
1101 };
1103 char *help_pending[2] =
1104 {
1105 "Help for PENDING\n"
1106 ,
1107 "Usage: PENDING <filename>\n"
1108 "\n"
1109 "Shows any 'pending' updates for a file, in order.\n"
1110 "The updates shown have not yet been written to the underlying RRD file.\n"
1111 };
1113 char *help_forget[2] =
1114 {
1115 "Help for FORGET\n"
1116 ,
1117 "Usage: FORGET <filename>\n"
1118 "\n"
1119 "Removes the file completely from the cache.\n"
1120 "Any pending updates for the file will be lost.\n"
1121 };
1123 char *help_update[2] =
1124 {
1125 "Help for UPDATE\n"
1126 ,
1127 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1128 "\n"
1129 "Adds the given file to the internal cache if it is not yet known and\n"
1130 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1131 "for details.\n"
1132 "\n"
1133 "Each <values> has the following form:\n"
1134 " <values> = <time>:<value>[:<value>[...]]\n"
1135 "See the rrdupdate(1) manpage for details.\n"
1136 };
1138 char *help_stats[2] =
1139 {
1140 "Help for STATS\n"
1141 ,
1142 "Usage: STATS\n"
1143 "\n"
1144 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1145 "a description of the values.\n"
1146 };
1148 char *help_batch[2] =
1149 {
1150 "Help for BATCH\n"
1151 ,
1152 "The 'BATCH' command permits the client to initiate a bulk load\n"
1153 " of commands to rrdcached.\n"
1154 "\n"
1155 "Usage:\n"
1156 "\n"
1157 " client: BATCH\n"
1158 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1159 " client: command #1\n"
1160 " client: command #2\n"
1161 " client: ... and so on\n"
1162 " client: .\n"
1163 " server: 2 errors\n"
1164 " server: 7 message for command #7\n"
1165 " server: 9 message for command #9\n"
1166 "\n"
1167 "For more information, consult the rrdcached(1) documentation.\n"
1168 };
1170 status = buffer_get_field (&buffer, &buffer_size, &command);
1171 if (status != 0)
1172 help_text = help_help;
1173 else
1174 {
1175 if (strcasecmp (command, "update") == 0)
1176 help_text = help_update;
1177 else if (strcasecmp (command, "flush") == 0)
1178 help_text = help_flush;
1179 else if (strcasecmp (command, "flushall") == 0)
1180 help_text = help_flushall;
1181 else if (strcasecmp (command, "pending") == 0)
1182 help_text = help_pending;
1183 else if (strcasecmp (command, "forget") == 0)
1184 help_text = help_forget;
1185 else if (strcasecmp (command, "stats") == 0)
1186 help_text = help_stats;
1187 else if (strcasecmp (command, "batch") == 0)
1188 help_text = help_batch;
1189 else
1190 help_text = help_help;
1191 }
1193 add_response_info(sock, help_text[1]);
1194 return send_response(sock, RESP_OK, help_text[0]);
1195 } /* }}} int handle_request_help */
1197 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1198 {
1199 uint64_t copy_queue_length;
1200 uint64_t copy_updates_received;
1201 uint64_t copy_flush_received;
1202 uint64_t copy_updates_written;
1203 uint64_t copy_data_sets_written;
1204 uint64_t copy_journal_bytes;
1205 uint64_t copy_journal_rotate;
1207 uint64_t tree_nodes_number;
1208 uint64_t tree_depth;
1210 pthread_mutex_lock (&stats_lock);
1211 copy_queue_length = stats_queue_length;
1212 copy_updates_received = stats_updates_received;
1213 copy_flush_received = stats_flush_received;
1214 copy_updates_written = stats_updates_written;
1215 copy_data_sets_written = stats_data_sets_written;
1216 copy_journal_bytes = stats_journal_bytes;
1217 copy_journal_rotate = stats_journal_rotate;
1218 pthread_mutex_unlock (&stats_lock);
1220 pthread_mutex_lock (&cache_lock);
1221 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1222 tree_depth = (uint64_t) g_tree_height (cache_tree);
1223 pthread_mutex_unlock (&cache_lock);
1225 add_response_info(sock,
1226 "QueueLength: %"PRIu64"\n", copy_queue_length);
1227 add_response_info(sock,
1228 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1229 add_response_info(sock,
1230 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1231 add_response_info(sock,
1232 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1233 add_response_info(sock,
1234 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1235 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1236 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1237 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1238 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1240 send_response(sock, RESP_OK, "Statistics follow\n");
1242 return (0);
1243 } /* }}} int handle_request_stats */
1245 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1246 char *buffer, size_t buffer_size)
1247 {
1248 char *file, file_tmp[PATH_MAX];
1249 int status;
1251 status = buffer_get_field (&buffer, &buffer_size, &file);
1252 if (status != 0)
1253 {
1254 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1255 }
1256 else
1257 {
1258 pthread_mutex_lock(&stats_lock);
1259 stats_flush_received++;
1260 pthread_mutex_unlock(&stats_lock);
1262 get_abs_path(&file, file_tmp);
1263 if (!check_file_access(file, sock)) return 0;
1265 status = flush_file (file);
1266 if (status == 0)
1267 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1268 else if (status == ENOENT)
1269 {
1270 /* no file in our tree; see whether it exists at all */
1271 struct stat statbuf;
1273 memset(&statbuf, 0, sizeof(statbuf));
1274 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1275 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1276 else
1277 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1278 }
1279 else if (status < 0)
1280 return send_response(sock, RESP_ERR, "Internal error.\n");
1281 else
1282 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1283 }
1285 /* NOTREACHED */
1286 assert(1==0);
1287 } /* }}} int handle_request_flush */
1289 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1290 {
1291 int status;
1293 status = has_privilege(sock, PRIV_HIGH);
1294 if (status <= 0)
1295 return status;
1297 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1299 pthread_mutex_lock(&cache_lock);
1300 flush_old_values(-1);
1301 pthread_mutex_unlock(&cache_lock);
1303 return send_response(sock, RESP_OK, "Started flush.\n");
1304 } /* }}} static int handle_request_flushall */
1306 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1307 char *buffer, size_t buffer_size)
1308 {
1309 int status;
1310 char *file, file_tmp[PATH_MAX];
1311 cache_item_t *ci;
1313 status = buffer_get_field(&buffer, &buffer_size, &file);
1314 if (status != 0)
1315 return send_response(sock, RESP_ERR,
1316 "Usage: PENDING <filename>\n");
1318 status = has_privilege(sock, PRIV_HIGH);
1319 if (status <= 0)
1320 return status;
1322 get_abs_path(&file, file_tmp);
1324 pthread_mutex_lock(&cache_lock);
1325 ci = g_tree_lookup(cache_tree, file);
1326 if (ci == NULL)
1327 {
1328 pthread_mutex_unlock(&cache_lock);
1329 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1330 }
1332 for (int i=0; i < ci->values_num; i++)
1333 add_response_info(sock, "%s\n", ci->values[i]);
1335 pthread_mutex_unlock(&cache_lock);
1336 return send_response(sock, RESP_OK, "updates pending\n");
1337 } /* }}} static int handle_request_pending */
1339 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1340 char *buffer, size_t buffer_size)
1341 {
1342 int status;
1343 gboolean found;
1344 char *file, file_tmp[PATH_MAX];
1346 status = buffer_get_field(&buffer, &buffer_size, &file);
1347 if (status != 0)
1348 return send_response(sock, RESP_ERR,
1349 "Usage: FORGET <filename>\n");
1351 status = has_privilege(sock, PRIV_HIGH);
1352 if (status <= 0)
1353 return status;
1355 get_abs_path(&file, file_tmp);
1356 if (!check_file_access(file, sock)) return 0;
1358 pthread_mutex_lock(&cache_lock);
1359 found = g_tree_remove(cache_tree, file);
1360 pthread_mutex_unlock(&cache_lock);
1362 if (found == TRUE)
1363 {
1364 if (sock != NULL)
1365 journal_write("forget", file);
1367 return send_response(sock, RESP_OK, "Gone!\n");
1368 }
1369 else
1370 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1372 /* NOTREACHED */
1373 assert(1==0);
1374 } /* }}} static int handle_request_forget */
1376 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1377 time_t now,
1378 char *buffer, size_t buffer_size)
1379 {
1380 char *file, file_tmp[PATH_MAX];
1381 int values_num = 0;
1382 int status;
1383 char orig_buf[CMD_MAX];
1385 cache_item_t *ci;
1387 status = has_privilege(sock, PRIV_HIGH);
1388 if (status <= 0)
1389 return status;
1391 /* save it for the journal later */
1392 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1394 status = buffer_get_field (&buffer, &buffer_size, &file);
1395 if (status != 0)
1396 return send_response(sock, RESP_ERR,
1397 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1399 pthread_mutex_lock(&stats_lock);
1400 stats_updates_received++;
1401 pthread_mutex_unlock(&stats_lock);
1403 get_abs_path(&file, file_tmp);
1404 if (!check_file_access(file, sock)) return 0;
1406 pthread_mutex_lock (&cache_lock);
1407 ci = g_tree_lookup (cache_tree, file);
1409 if (ci == NULL) /* {{{ */
1410 {
1411 struct stat statbuf;
1413 /* don't hold the lock while we setup; stat(2) might block */
1414 pthread_mutex_unlock(&cache_lock);
1416 memset (&statbuf, 0, sizeof (statbuf));
1417 status = stat (file, &statbuf);
1418 if (status != 0)
1419 {
1420 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1422 status = errno;
1423 if (status == ENOENT)
1424 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1425 else
1426 return send_response(sock, RESP_ERR,
1427 "stat failed with error %i.\n", status);
1428 }
1429 if (!S_ISREG (statbuf.st_mode))
1430 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1432 if (access(file, R_OK|W_OK) != 0)
1433 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1434 file, rrd_strerror(errno));
1436 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1437 if (ci == NULL)
1438 {
1439 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1441 return send_response(sock, RESP_ERR, "malloc failed.\n");
1442 }
1443 memset (ci, 0, sizeof (cache_item_t));
1445 ci->file = strdup (file);
1446 if (ci->file == NULL)
1447 {
1448 free (ci);
1449 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1451 return send_response(sock, RESP_ERR, "strdup failed.\n");
1452 }
1454 wipe_ci_values(ci, now);
1455 ci->flags = CI_FLAGS_IN_TREE;
1456 pthread_cond_init(&ci->flushed, NULL);
1458 pthread_mutex_lock(&cache_lock);
1459 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1460 } /* }}} */
1461 assert (ci != NULL);
1463 /* don't re-write updates in replay mode */
1464 if (sock != NULL)
1465 journal_write("update", orig_buf);
1467 while (buffer_size > 0)
1468 {
1469 char **temp;
1470 char *value;
1471 time_t stamp;
1472 char *eostamp;
1474 status = buffer_get_field (&buffer, &buffer_size, &value);
1475 if (status != 0)
1476 {
1477 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1478 break;
1479 }
1481 /* make sure update time is always moving forward */
1482 stamp = strtol(value, &eostamp, 10);
1483 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1484 {
1485 pthread_mutex_unlock(&cache_lock);
1486 return send_response(sock, RESP_ERR,
1487 "Cannot find timestamp in '%s'!\n", value);
1488 }
1489 else if (stamp <= ci->last_update_stamp)
1490 {
1491 pthread_mutex_unlock(&cache_lock);
1492 return send_response(sock, RESP_ERR,
1493 "illegal attempt to update using time %ld when last"
1494 " update time is %ld (minimum one second step)\n",
1495 stamp, ci->last_update_stamp);
1496 }
1497 else
1498 ci->last_update_stamp = stamp;
1500 temp = (char **) realloc (ci->values,
1501 sizeof (char *) * (ci->values_num + 1));
1502 if (temp == NULL)
1503 {
1504 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1505 continue;
1506 }
1507 ci->values = temp;
1509 ci->values[ci->values_num] = strdup (value);
1510 if (ci->values[ci->values_num] == NULL)
1511 {
1512 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1513 continue;
1514 }
1515 ci->values_num++;
1517 values_num++;
1518 }
1520 if (((now - ci->last_flush_time) >= config_write_interval)
1521 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1522 && (ci->values_num > 0))
1523 {
1524 enqueue_cache_item (ci, TAIL);
1525 }
1527 pthread_mutex_unlock (&cache_lock);
1529 if (values_num < 1)
1530 return send_response(sock, RESP_ERR, "No values updated.\n");
1531 else
1532 return send_response(sock, RESP_OK,
1533 "errors, enqueued %i value(s).\n", values_num);
1535 /* NOTREACHED */
1536 assert(1==0);
1538 } /* }}} int handle_request_update */
1540 /* we came across a "WROTE" entry during journal replay.
1541 * throw away any values that we have accumulated for this file
1542 */
1543 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1544 {
1545 int i;
1546 cache_item_t *ci;
1547 const char *file = buffer;
1549 pthread_mutex_lock(&cache_lock);
1551 ci = g_tree_lookup(cache_tree, file);
1552 if (ci == NULL)
1553 {
1554 pthread_mutex_unlock(&cache_lock);
1555 return (0);
1556 }
1558 if (ci->values)
1559 {
1560 for (i=0; i < ci->values_num; i++)
1561 free(ci->values[i]);
1563 free(ci->values);
1564 }
1566 wipe_ci_values(ci, now);
1567 remove_from_queue(ci);
1569 pthread_mutex_unlock(&cache_lock);
1570 return (0);
1571 } /* }}} int handle_request_wrote */
1573 /* start "BATCH" processing */
1574 static int batch_start (listen_socket_t *sock) /* {{{ */
1575 {
1576 int status;
1577 if (sock->batch_start)
1578 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1580 status = send_response(sock, RESP_OK,
1581 "Go ahead. End with dot '.' on its own line.\n");
1582 sock->batch_start = time(NULL);
1583 sock->batch_cmd = 0;
1585 return status;
1586 } /* }}} static int batch_start */
1588 /* finish "BATCH" processing and return results to the client */
1589 static int batch_done (listen_socket_t *sock) /* {{{ */
1590 {
1591 assert(sock->batch_start);
1592 sock->batch_start = 0;
1593 sock->batch_cmd = 0;
1594 return send_response(sock, RESP_OK, "errors\n");
1595 } /* }}} static int batch_done */
1597 /* if sock==NULL, we are in journal replay mode */
1598 static int handle_request (listen_socket_t *sock, /* {{{ */
1599 time_t now,
1600 char *buffer, size_t buffer_size)
1601 {
1602 char *buffer_ptr;
1603 char *command;
1604 int status;
1606 assert (buffer[buffer_size - 1] == '\0');
1608 buffer_ptr = buffer;
1609 command = NULL;
1610 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1611 if (status != 0)
1612 {
1613 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1614 return (-1);
1615 }
1617 if (sock != NULL && sock->batch_start)
1618 sock->batch_cmd++;
1620 if (strcasecmp (command, "update") == 0)
1621 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1622 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1623 {
1624 /* this is only valid in replay mode */
1625 return (handle_request_wrote (buffer_ptr, now));
1626 }
1627 else if (strcasecmp (command, "flush") == 0)
1628 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1629 else if (strcasecmp (command, "flushall") == 0)
1630 return (handle_request_flushall(sock));
1631 else if (strcasecmp (command, "pending") == 0)
1632 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1633 else if (strcasecmp (command, "forget") == 0)
1634 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1635 else if (strcasecmp (command, "stats") == 0)
1636 return (handle_request_stats (sock));
1637 else if (strcasecmp (command, "help") == 0)
1638 return (handle_request_help (sock, buffer_ptr, buffer_size));
1639 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1640 return batch_start(sock);
1641 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1642 return batch_done(sock);
1643 else
1644 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1646 /* NOTREACHED */
1647 assert(1==0);
1648 } /* }}} int handle_request */
1650 /* MUST NOT hold journal_lock before calling this */
1651 static void journal_rotate(void) /* {{{ */
1652 {
1653 FILE *old_fh = NULL;
1654 int new_fd;
1656 if (journal_cur == NULL || journal_old == NULL)
1657 return;
1659 pthread_mutex_lock(&journal_lock);
1661 /* we rotate this way (rename before close) so that the we can release
1662 * the journal lock as fast as possible. Journal writes to the new
1663 * journal can proceed immediately after the new file is opened. The
1664 * fclose can then block without affecting new updates.
1665 */
1666 if (journal_fh != NULL)
1667 {
1668 old_fh = journal_fh;
1669 journal_fh = NULL;
1670 rename(journal_cur, journal_old);
1671 ++stats_journal_rotate;
1672 }
1674 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1675 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1676 if (new_fd >= 0)
1677 {
1678 journal_fh = fdopen(new_fd, "a");
1679 if (journal_fh == NULL)
1680 close(new_fd);
1681 }
1683 pthread_mutex_unlock(&journal_lock);
1685 if (old_fh != NULL)
1686 fclose(old_fh);
1688 if (journal_fh == NULL)
1689 {
1690 RRDD_LOG(LOG_CRIT,
1691 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1692 journal_cur, rrd_strerror(errno));
1694 RRDD_LOG(LOG_ERR,
1695 "JOURNALING DISABLED: All values will be flushed at shutdown");
1696 config_flush_at_shutdown = 1;
1697 }
1699 } /* }}} static void journal_rotate */
1701 static void journal_done(void) /* {{{ */
1702 {
1703 if (journal_cur == NULL)
1704 return;
1706 pthread_mutex_lock(&journal_lock);
1707 if (journal_fh != NULL)
1708 {
1709 fclose(journal_fh);
1710 journal_fh = NULL;
1711 }
1713 if (config_flush_at_shutdown)
1714 {
1715 RRDD_LOG(LOG_INFO, "removing journals");
1716 unlink(journal_old);
1717 unlink(journal_cur);
1718 }
1719 else
1720 {
1721 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1722 "journals will be used at next startup");
1723 }
1725 pthread_mutex_unlock(&journal_lock);
1727 } /* }}} static void journal_done */
1729 static int journal_write(char *cmd, char *args) /* {{{ */
1730 {
1731 int chars;
1733 if (journal_fh == NULL)
1734 return 0;
1736 pthread_mutex_lock(&journal_lock);
1737 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1738 pthread_mutex_unlock(&journal_lock);
1740 if (chars > 0)
1741 {
1742 pthread_mutex_lock(&stats_lock);
1743 stats_journal_bytes += chars;
1744 pthread_mutex_unlock(&stats_lock);
1745 }
1747 return chars;
1748 } /* }}} static int journal_write */
1750 static int journal_replay (const char *file) /* {{{ */
1751 {
1752 FILE *fh;
1753 int entry_cnt = 0;
1754 int fail_cnt = 0;
1755 uint64_t line = 0;
1756 char entry[CMD_MAX];
1757 time_t now;
1759 if (file == NULL) return 0;
1761 {
1762 char *reason;
1763 int status = 0;
1764 struct stat statbuf;
1766 memset(&statbuf, 0, sizeof(statbuf));
1767 if (stat(file, &statbuf) != 0)
1768 {
1769 if (errno == ENOENT)
1770 return 0;
1772 reason = "stat error";
1773 status = errno;
1774 }
1775 else if (!S_ISREG(statbuf.st_mode))
1776 {
1777 reason = "not a regular file";
1778 status = EPERM;
1779 }
1780 if (statbuf.st_uid != daemon_uid)
1781 {
1782 reason = "not owned by daemon user";
1783 status = EACCES;
1784 }
1785 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1786 {
1787 reason = "must not be user/group writable";
1788 status = EACCES;
1789 }
1791 if (status != 0)
1792 {
1793 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1794 file, rrd_strerror(status), reason);
1795 return 0;
1796 }
1797 }
1799 fh = fopen(file, "r");
1800 if (fh == NULL)
1801 {
1802 if (errno != ENOENT)
1803 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1804 file, rrd_strerror(errno));
1805 return 0;
1806 }
1807 else
1808 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1810 now = time(NULL);
1812 while(!feof(fh))
1813 {
1814 size_t entry_len;
1816 ++line;
1817 if (fgets(entry, sizeof(entry), fh) == NULL)
1818 break;
1819 entry_len = strlen(entry);
1821 /* check \n termination in case journal writing crashed mid-line */
1822 if (entry_len == 0)
1823 continue;
1824 else if (entry[entry_len - 1] != '\n')
1825 {
1826 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1827 ++fail_cnt;
1828 continue;
1829 }
1831 entry[entry_len - 1] = '\0';
1833 if (handle_request(NULL, now, entry, entry_len) == 0)
1834 ++entry_cnt;
1835 else
1836 ++fail_cnt;
1837 }
1839 fclose(fh);
1841 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1842 entry_cnt, fail_cnt);
1844 return entry_cnt > 0 ? 1 : 0;
1845 } /* }}} static int journal_replay */
1847 static void journal_init(void) /* {{{ */
1848 {
1849 int had_journal = 0;
1851 if (journal_cur == NULL) return;
1853 pthread_mutex_lock(&journal_lock);
1855 RRDD_LOG(LOG_INFO, "checking for journal files");
1857 had_journal += journal_replay(journal_old);
1858 had_journal += journal_replay(journal_cur);
1860 /* it must have been a crash. start a flush */
1861 if (had_journal && config_flush_at_shutdown)
1862 flush_old_values(-1);
1864 pthread_mutex_unlock(&journal_lock);
1865 journal_rotate();
1867 RRDD_LOG(LOG_INFO, "journal processing complete");
1869 } /* }}} static void journal_init */
1871 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1872 {
1873 assert(sock != NULL);
1875 free(sock->rbuf); sock->rbuf = NULL;
1876 free(sock->wbuf); sock->wbuf = NULL;
1877 free(sock);
1878 } /* }}} void free_listen_socket */
1880 static void close_connection(listen_socket_t *sock) /* {{{ */
1881 {
1882 if (sock->fd >= 0)
1883 {
1884 close(sock->fd);
1885 sock->fd = -1;
1886 }
1888 free_listen_socket(sock);
1890 } /* }}} void close_connection */
1892 static void *connection_thread_main (void *args) /* {{{ */
1893 {
1894 listen_socket_t *sock;
1895 int i;
1896 int fd;
1898 sock = (listen_socket_t *) args;
1899 fd = sock->fd;
1901 /* init read buffers */
1902 sock->next_read = sock->next_cmd = 0;
1903 sock->rbuf = malloc(RBUF_SIZE);
1904 if (sock->rbuf == NULL)
1905 {
1906 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1907 close_connection(sock);
1908 return NULL;
1909 }
1911 pthread_mutex_lock (&connection_threads_lock);
1912 {
1913 pthread_t *temp;
1915 temp = (pthread_t *) realloc (connection_threads,
1916 sizeof (pthread_t) * (connection_threads_num + 1));
1917 if (temp == NULL)
1918 {
1919 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1920 }
1921 else
1922 {
1923 connection_threads = temp;
1924 connection_threads[connection_threads_num] = pthread_self ();
1925 connection_threads_num++;
1926 }
1927 }
1928 pthread_mutex_unlock (&connection_threads_lock);
1930 while (do_shutdown == 0)
1931 {
1932 char *cmd;
1933 ssize_t cmd_len;
1934 ssize_t rbytes;
1935 time_t now;
1937 struct pollfd pollfd;
1938 int status;
1940 pollfd.fd = fd;
1941 pollfd.events = POLLIN | POLLPRI;
1942 pollfd.revents = 0;
1944 status = poll (&pollfd, 1, /* timeout = */ 500);
1945 if (do_shutdown)
1946 break;
1947 else if (status == 0) /* timeout */
1948 continue;
1949 else if (status < 0) /* error */
1950 {
1951 status = errno;
1952 if (status != EINTR)
1953 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1954 continue;
1955 }
1957 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1958 break;
1959 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1960 {
1961 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1962 "poll(2) returned something unexpected: %#04hx",
1963 pollfd.revents);
1964 break;
1965 }
1967 rbytes = read(fd, sock->rbuf + sock->next_read,
1968 RBUF_SIZE - sock->next_read);
1969 if (rbytes < 0)
1970 {
1971 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1972 break;
1973 }
1974 else if (rbytes == 0)
1975 break; /* eof */
1977 sock->next_read += rbytes;
1979 if (sock->batch_start)
1980 now = sock->batch_start;
1981 else
1982 now = time(NULL);
1984 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1985 {
1986 status = handle_request (sock, now, cmd, cmd_len+1);
1987 if (status != 0)
1988 goto out_close;
1989 }
1990 }
1992 out_close:
1993 close_connection(sock);
1995 /* Remove this thread from the connection threads list */
1996 pthread_mutex_lock (&connection_threads_lock);
1997 {
1998 pthread_t self;
1999 pthread_t *temp;
2001 /* Find out own index in the array */
2002 self = pthread_self ();
2003 for (i = 0; i < connection_threads_num; i++)
2004 if (pthread_equal (connection_threads[i], self) != 0)
2005 break;
2006 assert (i < connection_threads_num);
2008 /* Move the trailing threads forward. */
2009 if (i < (connection_threads_num - 1))
2010 {
2011 memmove (connection_threads + i,
2012 connection_threads + i + 1,
2013 sizeof (pthread_t) * (connection_threads_num - i - 1));
2014 }
2016 connection_threads_num--;
2018 temp = realloc(connection_threads,
2019 sizeof(*connection_threads) * connection_threads_num);
2020 if (connection_threads_num > 0 && temp == NULL)
2021 RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2022 else
2023 connection_threads = temp;
2024 }
2025 pthread_mutex_unlock (&connection_threads_lock);
2027 return (NULL);
2028 } /* }}} void *connection_thread_main */
2030 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2031 {
2032 int fd;
2033 struct sockaddr_un sa;
2034 listen_socket_t *temp;
2035 int status;
2036 const char *path;
2038 path = sock->addr;
2039 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2040 path += strlen("unix:");
2042 temp = (listen_socket_t *) realloc (listen_fds,
2043 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2044 if (temp == NULL)
2045 {
2046 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2047 return (-1);
2048 }
2049 listen_fds = temp;
2050 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2052 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2053 if (fd < 0)
2054 {
2055 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2056 rrd_strerror(errno));
2057 return (-1);
2058 }
2060 memset (&sa, 0, sizeof (sa));
2061 sa.sun_family = AF_UNIX;
2062 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2064 /* if we've gotten this far, we own the pid file. any daemon started
2065 * with the same args must not be alive. therefore, ensure that we can
2066 * create the socket...
2067 */
2068 unlink(path);
2070 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2071 if (status != 0)
2072 {
2073 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2074 path, rrd_strerror(errno));
2075 close (fd);
2076 return (-1);
2077 }
2079 status = listen (fd, /* backlog = */ 10);
2080 if (status != 0)
2081 {
2082 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2083 path, rrd_strerror(errno));
2084 close (fd);
2085 unlink (path);
2086 return (-1);
2087 }
2089 listen_fds[listen_fds_num].fd = fd;
2090 listen_fds[listen_fds_num].family = PF_UNIX;
2091 strncpy(listen_fds[listen_fds_num].addr, path,
2092 sizeof (listen_fds[listen_fds_num].addr) - 1);
2093 listen_fds_num++;
2095 return (0);
2096 } /* }}} int open_listen_socket_unix */
2098 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2099 {
2100 struct addrinfo ai_hints;
2101 struct addrinfo *ai_res;
2102 struct addrinfo *ai_ptr;
2103 char addr_copy[NI_MAXHOST];
2104 char *addr;
2105 char *port;
2106 int status;
2108 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2109 addr_copy[sizeof (addr_copy) - 1] = 0;
2110 addr = addr_copy;
2112 memset (&ai_hints, 0, sizeof (ai_hints));
2113 ai_hints.ai_flags = 0;
2114 #ifdef AI_ADDRCONFIG
2115 ai_hints.ai_flags |= AI_ADDRCONFIG;
2116 #endif
2117 ai_hints.ai_family = AF_UNSPEC;
2118 ai_hints.ai_socktype = SOCK_STREAM;
2120 port = NULL;
2121 if (*addr == '[') /* IPv6+port format */
2122 {
2123 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2124 addr++;
2126 port = strchr (addr, ']');
2127 if (port == NULL)
2128 {
2129 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2130 return (-1);
2131 }
2132 *port = 0;
2133 port++;
2135 if (*port == ':')
2136 port++;
2137 else if (*port == 0)
2138 port = NULL;
2139 else
2140 {
2141 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2142 return (-1);
2143 }
2144 } /* if (*addr = ']') */
2145 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2146 {
2147 port = rindex(addr, ':');
2148 if (port != NULL)
2149 {
2150 *port = 0;
2151 port++;
2152 }
2153 }
2154 ai_res = NULL;
2155 status = getaddrinfo (addr,
2156 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2157 &ai_hints, &ai_res);
2158 if (status != 0)
2159 {
2160 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2161 addr, gai_strerror (status));
2162 return (-1);
2163 }
2165 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2166 {
2167 int fd;
2168 listen_socket_t *temp;
2169 int one = 1;
2171 temp = (listen_socket_t *) realloc (listen_fds,
2172 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2173 if (temp == NULL)
2174 {
2175 fprintf (stderr,
2176 "rrdcached: open_listen_socket_network: realloc failed.\n");
2177 continue;
2178 }
2179 listen_fds = temp;
2180 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2182 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2183 if (fd < 0)
2184 {
2185 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2186 rrd_strerror(errno));
2187 continue;
2188 }
2190 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2192 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2193 if (status != 0)
2194 {
2195 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2196 sock->addr, rrd_strerror(errno));
2197 close (fd);
2198 continue;
2199 }
2201 status = listen (fd, /* backlog = */ 10);
2202 if (status != 0)
2203 {
2204 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2205 sock->addr, rrd_strerror(errno));
2206 close (fd);
2207 freeaddrinfo(ai_res);
2208 return (-1);
2209 }
2211 listen_fds[listen_fds_num].fd = fd;
2212 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2213 listen_fds_num++;
2214 } /* for (ai_ptr) */
2216 freeaddrinfo(ai_res);
2217 return (0);
2218 } /* }}} static int open_listen_socket_network */
2220 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2221 {
2222 assert(sock != NULL);
2223 assert(sock->addr != NULL);
2225 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2226 || sock->addr[0] == '/')
2227 return (open_listen_socket_unix(sock));
2228 else
2229 return (open_listen_socket_network(sock));
2230 } /* }}} int open_listen_socket */
2232 static int close_listen_sockets (void) /* {{{ */
2233 {
2234 size_t i;
2236 for (i = 0; i < listen_fds_num; i++)
2237 {
2238 close (listen_fds[i].fd);
2240 if (listen_fds[i].family == PF_UNIX)
2241 unlink(listen_fds[i].addr);
2242 }
2244 free (listen_fds);
2245 listen_fds = NULL;
2246 listen_fds_num = 0;
2248 return (0);
2249 } /* }}} int close_listen_sockets */
2251 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2252 {
2253 struct pollfd *pollfds;
2254 int pollfds_num;
2255 int status;
2256 int i;
2258 if (listen_fds_num < 1)
2259 {
2260 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2261 return (NULL);
2262 }
2264 pollfds_num = listen_fds_num;
2265 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2266 if (pollfds == NULL)
2267 {
2268 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2269 return (NULL);
2270 }
2271 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2273 RRDD_LOG(LOG_INFO, "listening for connections");
2275 while (do_shutdown == 0)
2276 {
2277 for (i = 0; i < pollfds_num; i++)
2278 {
2279 pollfds[i].fd = listen_fds[i].fd;
2280 pollfds[i].events = POLLIN | POLLPRI;
2281 pollfds[i].revents = 0;
2282 }
2284 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2285 if (do_shutdown)
2286 break;
2287 else if (status == 0) /* timeout */
2288 continue;
2289 else if (status < 0) /* error */
2290 {
2291 status = errno;
2292 if (status != EINTR)
2293 {
2294 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2295 }
2296 continue;
2297 }
2299 for (i = 0; i < pollfds_num; i++)
2300 {
2301 listen_socket_t *client_sock;
2302 struct sockaddr_storage client_sa;
2303 socklen_t client_sa_size;
2304 pthread_t tid;
2305 pthread_attr_t attr;
2307 if (pollfds[i].revents == 0)
2308 continue;
2310 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2311 {
2312 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2313 "poll(2) returned something unexpected for listen FD #%i.",
2314 pollfds[i].fd);
2315 continue;
2316 }
2318 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2319 if (client_sock == NULL)
2320 {
2321 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2322 continue;
2323 }
2324 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2326 client_sa_size = sizeof (client_sa);
2327 client_sock->fd = accept (pollfds[i].fd,
2328 (struct sockaddr *) &client_sa, &client_sa_size);
2329 if (client_sock->fd < 0)
2330 {
2331 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2332 free(client_sock);
2333 continue;
2334 }
2336 pthread_attr_init (&attr);
2337 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2339 status = pthread_create (&tid, &attr, connection_thread_main,
2340 client_sock);
2341 if (status != 0)
2342 {
2343 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2344 close_connection(client_sock);
2345 continue;
2346 }
2347 } /* for (pollfds_num) */
2348 } /* while (do_shutdown == 0) */
2350 RRDD_LOG(LOG_INFO, "starting shutdown");
2352 close_listen_sockets ();
2354 pthread_mutex_lock (&connection_threads_lock);
2355 while (connection_threads_num > 0)
2356 {
2357 pthread_t wait_for;
2359 wait_for = connection_threads[0];
2361 pthread_mutex_unlock (&connection_threads_lock);
2362 pthread_join (wait_for, /* retval = */ NULL);
2363 pthread_mutex_lock (&connection_threads_lock);
2364 }
2365 pthread_mutex_unlock (&connection_threads_lock);
2367 free(pollfds);
2369 return (NULL);
2370 } /* }}} void *listen_thread_main */
2372 static int daemonize (void) /* {{{ */
2373 {
2374 int pid_fd;
2375 char *base_dir;
2377 daemon_uid = geteuid();
2379 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2380 if (pid_fd < 0)
2381 pid_fd = check_pidfile();
2382 if (pid_fd < 0)
2383 return pid_fd;
2385 /* open all the listen sockets */
2386 if (config_listen_address_list_len > 0)
2387 {
2388 for (int i = 0; i < config_listen_address_list_len; i++)
2389 {
2390 open_listen_socket (config_listen_address_list[i]);
2391 free_listen_socket (config_listen_address_list[i]);
2392 }
2394 free(config_listen_address_list);
2395 }
2396 else
2397 {
2398 listen_socket_t sock;
2399 memset(&sock, 0, sizeof(sock));
2400 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2401 open_listen_socket (&sock);
2402 }
2404 if (listen_fds_num < 1)
2405 {
2406 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2407 goto error;
2408 }
2410 if (!stay_foreground)
2411 {
2412 pid_t child;
2414 child = fork ();
2415 if (child < 0)
2416 {
2417 fprintf (stderr, "daemonize: fork(2) failed.\n");
2418 goto error;
2419 }
2420 else if (child > 0)
2421 exit(0);
2423 /* Become session leader */
2424 setsid ();
2426 /* Open the first three file descriptors to /dev/null */
2427 close (2);
2428 close (1);
2429 close (0);
2431 open ("/dev/null", O_RDWR);
2432 dup (0);
2433 dup (0);
2434 } /* if (!stay_foreground) */
2436 /* Change into the /tmp directory. */
2437 base_dir = (config_base_dir != NULL)
2438 ? config_base_dir
2439 : "/tmp";
2441 if (chdir (base_dir) != 0)
2442 {
2443 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2444 goto error;
2445 }
2447 install_signal_handlers();
2449 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2450 RRDD_LOG(LOG_INFO, "starting up");
2452 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2453 (GDestroyNotify) free_cache_item);
2454 if (cache_tree == NULL)
2455 {
2456 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2457 goto error;
2458 }
2460 return write_pidfile (pid_fd);
2462 error:
2463 remove_pidfile();
2464 return -1;
2465 } /* }}} int daemonize */
2467 static int cleanup (void) /* {{{ */
2468 {
2469 do_shutdown++;
2471 pthread_cond_signal (&cache_cond);
2472 pthread_join (queue_thread, /* return = */ NULL);
2474 remove_pidfile ();
2476 free(config_base_dir);
2477 free(config_pid_file);
2478 free(journal_cur);
2479 free(journal_old);
2481 pthread_mutex_lock(&cache_lock);
2482 g_tree_destroy(cache_tree);
2484 RRDD_LOG(LOG_INFO, "goodbye");
2485 closelog ();
2487 return (0);
2488 } /* }}} int cleanup */
2490 static int read_options (int argc, char **argv) /* {{{ */
2491 {
2492 int option;
2493 int status = 0;
2495 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2496 {
2497 switch (option)
2498 {
2499 case 'g':
2500 stay_foreground=1;
2501 break;
2503 case 'L':
2504 case 'l':
2505 {
2506 listen_socket_t **temp;
2507 listen_socket_t *new;
2509 new = malloc(sizeof(listen_socket_t));
2510 if (new == NULL)
2511 {
2512 fprintf(stderr, "read_options: malloc failed.\n");
2513 return(2);
2514 }
2515 memset(new, 0, sizeof(listen_socket_t));
2517 temp = (listen_socket_t **) realloc (config_listen_address_list,
2518 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2519 if (temp == NULL)
2520 {
2521 fprintf (stderr, "read_options: realloc failed.\n");
2522 return (2);
2523 }
2524 config_listen_address_list = temp;
2526 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2527 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2529 temp[config_listen_address_list_len] = new;
2530 config_listen_address_list_len++;
2531 }
2532 break;
2534 case 'f':
2535 {
2536 int temp;
2538 temp = atoi (optarg);
2539 if (temp > 0)
2540 config_flush_interval = temp;
2541 else
2542 {
2543 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2544 status = 3;
2545 }
2546 }
2547 break;
2549 case 'w':
2550 {
2551 int temp;
2553 temp = atoi (optarg);
2554 if (temp > 0)
2555 config_write_interval = temp;
2556 else
2557 {
2558 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2559 status = 2;
2560 }
2561 }
2562 break;
2564 case 'z':
2565 {
2566 int temp;
2568 temp = atoi(optarg);
2569 if (temp > 0)
2570 config_write_jitter = temp;
2571 else
2572 {
2573 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2574 status = 2;
2575 }
2577 break;
2578 }
2580 case 'B':
2581 config_write_base_only = 1;
2582 break;
2584 case 'b':
2585 {
2586 size_t len;
2587 char base_realpath[PATH_MAX];
2589 if (config_base_dir != NULL)
2590 free (config_base_dir);
2591 config_base_dir = strdup (optarg);
2592 if (config_base_dir == NULL)
2593 {
2594 fprintf (stderr, "read_options: strdup failed.\n");
2595 return (3);
2596 }
2598 /* make sure that the base directory is not resolved via
2599 * symbolic links. this makes some performance-enhancing
2600 * assumptions possible (we don't have to resolve paths
2601 * that start with a "/")
2602 */
2603 if (realpath(config_base_dir, base_realpath) == NULL)
2604 {
2605 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2606 return 5;
2607 }
2608 else if (strncmp(config_base_dir,
2609 base_realpath, sizeof(base_realpath)) != 0)
2610 {
2611 fprintf(stderr,
2612 "Base directory (-b) resolved via file system links!\n"
2613 "Please consult rrdcached '-b' documentation!\n"
2614 "Consider specifying the real directory (%s)\n",
2615 base_realpath);
2616 return 5;
2617 }
2619 len = strlen (config_base_dir);
2620 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2621 {
2622 config_base_dir[len - 1] = 0;
2623 len--;
2624 }
2626 if (len < 1)
2627 {
2628 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2629 return (4);
2630 }
2632 _config_base_dir_len = len;
2633 }
2634 break;
2636 case 'p':
2637 {
2638 if (config_pid_file != NULL)
2639 free (config_pid_file);
2640 config_pid_file = strdup (optarg);
2641 if (config_pid_file == NULL)
2642 {
2643 fprintf (stderr, "read_options: strdup failed.\n");
2644 return (3);
2645 }
2646 }
2647 break;
2649 case 'F':
2650 config_flush_at_shutdown = 1;
2651 break;
2653 case 'j':
2654 {
2655 struct stat statbuf;
2656 const char *dir = optarg;
2658 status = stat(dir, &statbuf);
2659 if (status != 0)
2660 {
2661 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2662 return 6;
2663 }
2665 if (!S_ISDIR(statbuf.st_mode)
2666 || access(dir, R_OK|W_OK|X_OK) != 0)
2667 {
2668 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2669 errno ? rrd_strerror(errno) : "");
2670 return 6;
2671 }
2673 journal_cur = malloc(PATH_MAX + 1);
2674 journal_old = malloc(PATH_MAX + 1);
2675 if (journal_cur == NULL || journal_old == NULL)
2676 {
2677 fprintf(stderr, "malloc failure for journal files\n");
2678 return 6;
2679 }
2680 else
2681 {
2682 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2683 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2684 }
2685 }
2686 break;
2688 case 'h':
2689 case '?':
2690 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2691 "\n"
2692 "Usage: rrdcached [options]\n"
2693 "\n"
2694 "Valid options are:\n"
2695 " -l <address> Socket address to listen to.\n"
2696 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2697 " -w <seconds> Interval in which to write data.\n"
2698 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2699 " -f <seconds> Interval in which to flush dead data.\n"
2700 " -p <file> Location of the PID-file.\n"
2701 " -b <dir> Base directory to change to.\n"
2702 " -B Restrict file access to paths within -b <dir>\n"
2703 " -g Do not fork and run in the foreground.\n"
2704 " -j <dir> Directory in which to create the journal files.\n"
2705 " -F Always flush all updates at shutdown\n"
2706 "\n"
2707 "For more information and a detailed description of all options "
2708 "please refer\n"
2709 "to the rrdcached(1) manual page.\n",
2710 VERSION);
2711 status = -1;
2712 break;
2713 } /* switch (option) */
2714 } /* while (getopt) */
2716 /* advise the user when values are not sane */
2717 if (config_flush_interval < 2 * config_write_interval)
2718 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2719 " 2x write interval (-w) !\n");
2720 if (config_write_jitter > config_write_interval)
2721 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2722 " write interval (-w) !\n");
2724 if (config_write_base_only && config_base_dir == NULL)
2725 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2726 " Consult the rrdcached documentation\n");
2728 if (journal_cur == NULL)
2729 config_flush_at_shutdown = 1;
2731 return (status);
2732 } /* }}} int read_options */
2734 int main (int argc, char **argv)
2735 {
2736 int status;
2738 status = read_options (argc, argv);
2739 if (status != 0)
2740 {
2741 if (status < 0)
2742 status = 0;
2743 return (status);
2744 }
2746 status = daemonize ();
2747 if (status != 0)
2748 {
2749 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2750 return (1);
2751 }
2753 journal_init();
2755 /* start the queue thread */
2756 memset (&queue_thread, 0, sizeof (queue_thread));
2757 status = pthread_create (&queue_thread,
2758 NULL, /* attr */
2759 queue_thread_main,
2760 NULL); /* args */
2761 if (status != 0)
2762 {
2763 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2764 cleanup();
2765 return (1);
2766 }
2768 listen_thread_main (NULL);
2769 cleanup ();
2771 return (0);
2772 } /* int main */
2774 /*
2775 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2776 */