1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111 * Types
112 */
113 typedef enum
114 {
115 PRIV_LOW,
116 PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
122 {
123 int fd;
124 char addr[PATH_MAX + 1];
125 int family;
126 socket_privilege privilege;
128 /* state for BATCH processing */
129 time_t batch_start;
130 int batch_cmd;
132 /* buffered IO */
133 char *rbuf;
134 off_t next_cmd;
135 off_t next_read;
137 char *wbuf;
138 ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146 char *file;
147 char **values;
148 int values_num;
149 time_t last_flush_time;
150 time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153 int flags;
154 pthread_cond_t flushed;
155 cache_item_t *prev;
156 cache_item_t *next;
157 };
159 struct callback_flush_data_s
160 {
161 time_t now;
162 time_t abs_timeout;
163 char **keys;
164 size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
169 {
170 HEAD,
171 TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180 * Variables
181 */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree *cache_tree = NULL;
198 static cache_item_t *cache_queue_head = NULL;
199 static cache_item_t *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /*
234 * Functions
235 */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239 do_shutdown++;
240 pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250 sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255 config_flush_at_shutdown = 1;
256 sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261 config_flush_at_shutdown = 0;
262 sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
266 {
267 /* These structures are static, because `sigaction' behaves weird if the are
268 * overwritten.. */
269 static struct sigaction sa_int;
270 static struct sigaction sa_term;
271 static struct sigaction sa_pipe;
272 static struct sigaction sa_usr1;
273 static struct sigaction sa_usr2;
275 /* Install signal handlers */
276 memset (&sa_int, 0, sizeof (sa_int));
277 sa_int.sa_handler = sig_int_handler;
278 sigaction (SIGINT, &sa_int, NULL);
280 memset (&sa_term, 0, sizeof (sa_term));
281 sa_term.sa_handler = sig_term_handler;
282 sigaction (SIGTERM, &sa_term, NULL);
284 memset (&sa_pipe, 0, sizeof (sa_pipe));
285 sa_pipe.sa_handler = SIG_IGN;
286 sigaction (SIGPIPE, &sa_pipe, NULL);
288 memset (&sa_pipe, 0, sizeof (sa_usr1));
289 sa_usr1.sa_handler = sig_usr1_handler;
290 sigaction (SIGUSR1, &sa_usr1, NULL);
292 memset (&sa_usr2, 0, sizeof (sa_usr2));
293 sa_usr2.sa_handler = sig_usr2_handler;
294 sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300 int fd;
301 char *file;
303 file = (config_pid_file != NULL)
304 ? config_pid_file
305 : LOCALSTATEDIR "/run/rrdcached.pid";
307 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308 if (fd < 0)
309 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310 action, file, rrd_strerror(errno));
312 return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318 int pid_fd;
319 pid_t pid;
320 char pid_str[16];
322 pid_fd = open_pidfile("open", O_RDWR);
323 if (pid_fd < 0)
324 return pid_fd;
326 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327 return -1;
329 pid = atoi(pid_str);
330 if (pid <= 0)
331 return -1;
333 /* another running process that we can signal COULD be
334 * a competing rrdcached */
335 if (pid != getpid() && kill(pid, 0) == 0)
336 {
337 fprintf(stderr,
338 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339 close(pid_fd);
340 return -1;
341 }
343 lseek(pid_fd, 0, SEEK_SET);
344 ftruncate(pid_fd, 0);
346 fprintf(stderr,
347 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348 "rrdcached: starting normally.\n", pid);
350 return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
354 {
355 pid_t pid;
356 FILE *fh;
358 pid = getpid ();
360 fh = fdopen (fd, "w");
361 if (fh == NULL)
362 {
363 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364 close(fd);
365 return (-1);
366 }
368 fprintf (fh, "%i\n", (int) pid);
369 fclose (fh);
371 return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
375 {
376 char *file;
377 int status;
379 file = (config_pid_file != NULL)
380 ? config_pid_file
381 : LOCALSTATEDIR "/run/rrdcached.pid";
383 status = unlink (file);
384 if (status == 0)
385 return (0);
386 return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391 char *eol;
393 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394 sock->next_read - sock->next_cmd);
396 if (eol == NULL)
397 {
398 /* no commands left, move remainder back to front of rbuf */
399 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400 sock->next_read - sock->next_cmd);
401 sock->next_read -= sock->next_cmd;
402 sock->next_cmd = 0;
403 *len = 0;
404 return NULL;
405 }
406 else
407 {
408 char *cmd = sock->rbuf + sock->next_cmd;
409 *eol = '\0';
411 sock->next_cmd = eol - sock->rbuf + 1;
413 if (eol > sock->rbuf && *(eol-1) == '\r')
414 *(--eol) = '\0'; /* handle "\r\n" EOL */
416 *len = eol - cmd;
418 return cmd;
419 }
421 /* NOTREACHED */
422 assert(1==0);
423 }
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428 char *new_buf;
430 assert(sock != NULL);
432 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
433 if (new_buf == NULL)
434 {
435 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436 return -1;
437 }
439 strncpy(new_buf + sock->wbuf_len, str, len + 1);
441 sock->wbuf = new_buf;
442 sock->wbuf_len += len;
444 return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450 va_list argp;
451 char buffer[CMD_MAX];
452 int len;
454 if (sock == NULL) return 0; /* journal replay mode */
455 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457 va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461 len = vsprintf(buffer, fmt, argp);
462 #endif
463 va_end(argp);
464 if (len < 0)
465 {
466 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467 return -1;
468 }
470 return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
474 {
475 int lines = 0;
477 if (str != NULL)
478 {
479 while ((str = strchr(str, '\n')) != NULL)
480 {
481 ++lines;
482 ++str;
483 }
484 }
486 return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490 * returns 0 on success, -1 on error
491 * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493 char *fmt, ...) /* {{{ */
494 {
495 va_list argp;
496 char buffer[CMD_MAX];
497 int lines;
498 ssize_t wrote;
499 int rclen, len;
501 if (sock == NULL) return rc; /* journal replay mode */
503 if (sock->batch_start)
504 {
505 if (rc == RESP_OK)
506 return rc; /* no response on success during BATCH */
507 lines = sock->batch_cmd;
508 }
509 else if (rc == RESP_OK)
510 lines = count_lines(sock->wbuf);
511 else
512 lines = -1;
514 rclen = sprintf(buffer, "%d ", lines);
515 va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519 len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521 va_end(argp);
522 if (len < 0)
523 return -1;
525 len += rclen;
527 /* append the result to the wbuf, don't write to the user */
528 if (sock->batch_start)
529 return add_to_wbuf(sock, buffer, len);
531 /* first write must be complete */
532 if (len != write(sock->fd, buffer, len))
533 {
534 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535 return -1;
536 }
538 if (sock->wbuf != NULL && rc == RESP_OK)
539 {
540 wrote = 0;
541 while (wrote < sock->wbuf_len)
542 {
543 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544 if (wb <= 0)
545 {
546 RRDD_LOG(LOG_INFO, "send_response: could not write results");
547 return -1;
548 }
549 wrote += wb;
550 }
551 }
553 free(sock->wbuf); sock->wbuf = NULL;
554 sock->wbuf_len = 0;
556 return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561 ci->values = NULL;
562 ci->values_num = 0;
564 ci->last_flush_time = when;
565 if (config_write_jitter > 0)
566 ci->last_flush_time += (random() % config_write_jitter);
567 }
569 /* remove_from_queue
570 * remove a "cache_item_t" item from the queue.
571 * must hold 'cache_lock' when calling this
572 */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575 if (ci == NULL) return;
576 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578 if (ci->prev == NULL)
579 cache_queue_head = ci->next; /* reset head */
580 else
581 ci->prev->next = ci->next;
583 if (ci->next == NULL)
584 cache_queue_tail = ci->prev; /* reset the tail */
585 else
586 ci->next->prev = ci->prev;
588 ci->next = ci->prev = NULL;
589 ci->flags &= ~CI_FLAGS_IN_QUEUE;
591 pthread_mutex_lock (&stats_lock);
592 assert (stats_queue_length > 0);
593 stats_queue_length--;
594 pthread_mutex_unlock (&stats_lock);
596 } /* }}} static void remove_from_queue */
598 /* free the resources associated with the cache_item_t
599 * must hold cache_lock when calling this function
600 */
601 static void *free_cache_item(cache_item_t *ci) /* {{{ */
602 {
603 if (ci == NULL) return NULL;
605 remove_from_queue(ci);
607 for (int i=0; i < ci->values_num; i++)
608 free(ci->values[i]);
610 free (ci->values);
611 free (ci->file);
613 /* in case anyone is waiting */
614 pthread_cond_broadcast(&ci->flushed);
616 free (ci);
618 return NULL;
619 } /* }}} static void *free_cache_item */
621 /*
622 * enqueue_cache_item:
623 * `cache_lock' must be acquired before calling this function!
624 */
625 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
626 queue_side_t side)
627 {
628 if (ci == NULL)
629 return (-1);
631 if (ci->values_num == 0)
632 return (0);
634 if (side == HEAD)
635 {
636 if (cache_queue_head == ci)
637 return 0;
639 /* remove if further down in queue */
640 remove_from_queue(ci);
642 ci->prev = NULL;
643 ci->next = cache_queue_head;
644 if (ci->next != NULL)
645 ci->next->prev = ci;
646 cache_queue_head = ci;
648 if (cache_queue_tail == NULL)
649 cache_queue_tail = cache_queue_head;
650 }
651 else /* (side == TAIL) */
652 {
653 /* We don't move values back in the list.. */
654 if (ci->flags & CI_FLAGS_IN_QUEUE)
655 return (0);
657 assert (ci->next == NULL);
658 assert (ci->prev == NULL);
660 ci->prev = cache_queue_tail;
662 if (cache_queue_tail == NULL)
663 cache_queue_head = ci;
664 else
665 cache_queue_tail->next = ci;
667 cache_queue_tail = ci;
668 }
670 ci->flags |= CI_FLAGS_IN_QUEUE;
672 pthread_cond_broadcast(&cache_cond);
673 pthread_mutex_lock (&stats_lock);
674 stats_queue_length++;
675 pthread_mutex_unlock (&stats_lock);
677 return (0);
678 } /* }}} int enqueue_cache_item */
680 /*
681 * tree_callback_flush:
682 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
683 * while this is in progress.
684 */
685 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
686 gpointer data)
687 {
688 cache_item_t *ci;
689 callback_flush_data_t *cfd;
691 ci = (cache_item_t *) value;
692 cfd = (callback_flush_data_t *) data;
694 if (ci->flags & CI_FLAGS_IN_QUEUE)
695 return FALSE;
697 if ((ci->last_flush_time <= cfd->abs_timeout)
698 && (ci->values_num > 0))
699 {
700 enqueue_cache_item (ci, TAIL);
701 }
702 else if ((do_shutdown != 0)
703 && (ci->values_num > 0))
704 {
705 enqueue_cache_item (ci, TAIL);
706 }
707 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
708 && (ci->values_num <= 0))
709 {
710 char **temp;
712 temp = (char **) rrd_realloc (cfd->keys,
713 sizeof (char *) * (cfd->keys_num + 1));
714 if (temp == NULL)
715 {
716 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
717 return (FALSE);
718 }
719 cfd->keys = temp;
720 /* Make really sure this points to the _same_ place */
721 assert ((char *) key == ci->file);
722 cfd->keys[cfd->keys_num] = (char *) key;
723 cfd->keys_num++;
724 }
726 return (FALSE);
727 } /* }}} gboolean tree_callback_flush */
729 static int flush_old_values (int max_age)
730 {
731 callback_flush_data_t cfd;
732 size_t k;
734 memset (&cfd, 0, sizeof (cfd));
735 /* Pass the current time as user data so that we don't need to call
736 * `time' for each node. */
737 cfd.now = time (NULL);
738 cfd.keys = NULL;
739 cfd.keys_num = 0;
741 if (max_age > 0)
742 cfd.abs_timeout = cfd.now - max_age;
743 else
744 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
746 /* `tree_callback_flush' will return the keys of all values that haven't
747 * been touched in the last `config_flush_interval' seconds in `cfd'.
748 * The char*'s in this array point to the same memory as ci->file, so we
749 * don't need to free them separately. */
750 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
752 for (k = 0; k < cfd.keys_num; k++)
753 {
754 /* should never fail, since we have held the cache_lock
755 * the entire time */
756 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
757 }
759 if (cfd.keys != NULL)
760 {
761 free (cfd.keys);
762 cfd.keys = NULL;
763 }
765 return (0);
766 } /* int flush_old_values */
768 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
769 {
770 struct timeval now;
771 struct timespec next_flush;
772 int final_flush = 0; /* make sure we only flush once on shutdown */
774 gettimeofday (&now, NULL);
775 next_flush.tv_sec = now.tv_sec + config_flush_interval;
776 next_flush.tv_nsec = 1000 * now.tv_usec;
778 pthread_mutex_lock (&cache_lock);
779 while ((do_shutdown == 0) || (cache_queue_head != NULL))
780 {
781 cache_item_t *ci;
782 char *file;
783 char **values;
784 int values_num;
785 int status;
786 int i;
788 /* First, check if it's time to do the cache flush. */
789 gettimeofday (&now, NULL);
790 if ((now.tv_sec > next_flush.tv_sec)
791 || ((now.tv_sec == next_flush.tv_sec)
792 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
793 {
794 /* Flush all values that haven't been written in the last
795 * `config_write_interval' seconds. */
796 flush_old_values (config_write_interval);
798 /* Determine the time of the next cache flush. */
799 next_flush.tv_sec =
800 now.tv_sec + next_flush.tv_sec % config_flush_interval;
802 /* unlock the cache while we rotate so we don't block incoming
803 * updates if the fsync() blocks on disk I/O */
804 pthread_mutex_unlock(&cache_lock);
805 journal_rotate();
806 pthread_mutex_lock(&cache_lock);
807 }
809 /* Now, check if there's something to store away. If not, wait until
810 * something comes in or it's time to do the cache flush. if we are
811 * shutting down, do not wait around. */
812 if (cache_queue_head == NULL && !do_shutdown)
813 {
814 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
815 if ((status != 0) && (status != ETIMEDOUT))
816 {
817 RRDD_LOG (LOG_ERR, "queue_thread_main: "
818 "pthread_cond_timedwait returned %i.", status);
819 }
820 }
822 /* We're about to shut down */
823 if (do_shutdown != 0 && !final_flush++)
824 {
825 if (config_flush_at_shutdown)
826 flush_old_values (-1); /* flush everything */
827 else
828 break;
829 }
831 /* Check if a value has arrived. This may be NULL if we timed out or there
832 * was an interrupt such as a signal. */
833 if (cache_queue_head == NULL)
834 continue;
836 ci = cache_queue_head;
838 /* copy the relevant parts */
839 file = strdup (ci->file);
840 if (file == NULL)
841 {
842 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
843 continue;
844 }
846 assert(ci->values != NULL);
847 assert(ci->values_num > 0);
849 values = ci->values;
850 values_num = ci->values_num;
852 wipe_ci_values(ci, time(NULL));
853 remove_from_queue(ci);
855 pthread_mutex_unlock (&cache_lock);
857 rrd_clear_error ();
858 status = rrd_update_r (file, NULL, values_num, (void *) values);
859 if (status != 0)
860 {
861 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
862 "rrd_update_r (%s) failed with status %i. (%s)",
863 file, status, rrd_get_error());
864 }
866 journal_write("wrote", file);
867 pthread_cond_broadcast(&ci->flushed);
869 for (i = 0; i < values_num; i++)
870 free (values[i]);
872 free(values);
873 free(file);
875 if (status == 0)
876 {
877 pthread_mutex_lock (&stats_lock);
878 stats_updates_written++;
879 stats_data_sets_written += values_num;
880 pthread_mutex_unlock (&stats_lock);
881 }
883 pthread_mutex_lock (&cache_lock);
885 /* We're about to shut down */
886 if (do_shutdown != 0 && !final_flush++)
887 {
888 if (config_flush_at_shutdown)
889 flush_old_values (-1); /* flush everything */
890 else
891 break;
892 }
893 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
894 pthread_mutex_unlock (&cache_lock);
896 if (config_flush_at_shutdown)
897 {
898 assert(cache_queue_head == NULL);
899 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
900 }
902 journal_done();
904 return (NULL);
905 } /* }}} void *queue_thread_main */
907 static int buffer_get_field (char **buffer_ret, /* {{{ */
908 size_t *buffer_size_ret, char **field_ret)
909 {
910 char *buffer;
911 size_t buffer_pos;
912 size_t buffer_size;
913 char *field;
914 size_t field_size;
915 int status;
917 buffer = *buffer_ret;
918 buffer_pos = 0;
919 buffer_size = *buffer_size_ret;
920 field = *buffer_ret;
921 field_size = 0;
923 if (buffer_size <= 0)
924 return (-1);
926 /* This is ensured by `handle_request'. */
927 assert (buffer[buffer_size - 1] == '\0');
929 status = -1;
930 while (buffer_pos < buffer_size)
931 {
932 /* Check for end-of-field or end-of-buffer */
933 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
934 {
935 field[field_size] = 0;
936 field_size++;
937 buffer_pos++;
938 status = 0;
939 break;
940 }
941 /* Handle escaped characters. */
942 else if (buffer[buffer_pos] == '\\')
943 {
944 if (buffer_pos >= (buffer_size - 1))
945 break;
946 buffer_pos++;
947 field[field_size] = buffer[buffer_pos];
948 field_size++;
949 buffer_pos++;
950 }
951 /* Normal operation */
952 else
953 {
954 field[field_size] = buffer[buffer_pos];
955 field_size++;
956 buffer_pos++;
957 }
958 } /* while (buffer_pos < buffer_size) */
960 if (status != 0)
961 return (status);
963 *buffer_ret = buffer + buffer_pos;
964 *buffer_size_ret = buffer_size - buffer_pos;
965 *field_ret = field;
967 return (0);
968 } /* }}} int buffer_get_field */
970 /* if we're restricting writes to the base directory,
971 * check whether the file falls within the dir
972 * returns 1 if OK, otherwise 0
973 */
974 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
975 {
976 assert(file != NULL);
978 if (!config_write_base_only
979 || sock == NULL /* journal replay */
980 || config_base_dir == NULL)
981 return 1;
983 if (strstr(file, "../") != NULL) goto err;
985 /* relative paths without "../" are ok */
986 if (*file != '/') return 1;
988 /* file must be of the format base + "/" + <1+ char filename> */
989 if (strlen(file) < _config_base_dir_len + 2) goto err;
990 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
991 if (*(file + _config_base_dir_len) != '/') goto err;
993 return 1;
995 err:
996 if (sock != NULL && sock->fd >= 0)
997 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
999 return 0;
1000 } /* }}} static int check_file_access */
1002 /* when using a base dir, convert relative paths to absolute paths.
1003 * if necessary, modifies the "filename" pointer to point
1004 * to the new path created in "tmp". "tmp" is provided
1005 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1006 *
1007 * this allows us to optimize for the expected case (absolute path)
1008 * with a no-op.
1009 */
1010 static void get_abs_path(char **filename, char *tmp)
1011 {
1012 assert(tmp != NULL);
1013 assert(filename != NULL && *filename != NULL);
1015 if (config_base_dir == NULL || **filename == '/')
1016 return;
1018 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1019 *filename = tmp;
1020 } /* }}} static int get_abs_path */
1022 /* returns 1 if we have the required privilege level,
1023 * otherwise issue an error to the user on sock */
1024 static int has_privilege (listen_socket_t *sock, /* {{{ */
1025 socket_privilege priv)
1026 {
1027 if (sock == NULL) /* journal replay */
1028 return 1;
1030 if (sock->privilege >= priv)
1031 return 1;
1033 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1034 } /* }}} static int has_privilege */
1036 static int flush_file (const char *filename) /* {{{ */
1037 {
1038 cache_item_t *ci;
1040 pthread_mutex_lock (&cache_lock);
1042 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1043 if (ci == NULL)
1044 {
1045 pthread_mutex_unlock (&cache_lock);
1046 return (ENOENT);
1047 }
1049 if (ci->values_num > 0)
1050 {
1051 /* Enqueue at head */
1052 enqueue_cache_item (ci, HEAD);
1053 pthread_cond_wait(&ci->flushed, &cache_lock);
1054 }
1056 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1057 * may have been purged during our cond_wait() */
1059 pthread_mutex_unlock(&cache_lock);
1061 return (0);
1062 } /* }}} int flush_file */
1064 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1065 char *buffer, size_t buffer_size)
1066 {
1067 int status;
1068 char **help_text;
1069 char *command;
1071 char *help_help[2] =
1072 {
1073 "Command overview\n"
1074 ,
1075 "HELP [<command>]\n"
1076 "FLUSH <filename>\n"
1077 "FLUSHALL\n"
1078 "PENDING <filename>\n"
1079 "FORGET <filename>\n"
1080 "QUEUE\n"
1081 "UPDATE <filename> <values> [<values> ...]\n"
1082 "BATCH\n"
1083 "STATS\n"
1084 "QUIT\n"
1085 };
1087 char *help_flush[2] =
1088 {
1089 "Help for FLUSH\n"
1090 ,
1091 "Usage: FLUSH <filename>\n"
1092 "\n"
1093 "Adds the given filename to the head of the update queue and returns\n"
1094 "after is has been dequeued.\n"
1095 };
1097 char *help_flushall[2] =
1098 {
1099 "Help for FLUSHALL\n"
1100 ,
1101 "Usage: FLUSHALL\n"
1102 "\n"
1103 "Triggers writing of all pending updates. Returns immediately.\n"
1104 };
1106 char *help_pending[2] =
1107 {
1108 "Help for PENDING\n"
1109 ,
1110 "Usage: PENDING <filename>\n"
1111 "\n"
1112 "Shows any 'pending' updates for a file, in order.\n"
1113 "The updates shown have not yet been written to the underlying RRD file.\n"
1114 };
1116 char *help_forget[2] =
1117 {
1118 "Help for FORGET\n"
1119 ,
1120 "Usage: FORGET <filename>\n"
1121 "\n"
1122 "Removes the file completely from the cache.\n"
1123 "Any pending updates for the file will be lost.\n"
1124 };
1126 char *help_queue[2] =
1127 {
1128 "Help for QUEUE\n"
1129 ,
1130 "Shows all files in the output queue.\n"
1131 "The output is zero or more lines in the following format:\n"
1132 "(where <num_vals> is the number of values to be written)\n"
1133 "\n"
1134 "<num_vals> <filename>\n"
1135 "\n"
1136 };
1138 char *help_update[2] =
1139 {
1140 "Help for UPDATE\n"
1141 ,
1142 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1143 "\n"
1144 "Adds the given file to the internal cache if it is not yet known and\n"
1145 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1146 "for details.\n"
1147 "\n"
1148 "Each <values> has the following form:\n"
1149 " <values> = <time>:<value>[:<value>[...]]\n"
1150 "See the rrdupdate(1) manpage for details.\n"
1151 };
1153 char *help_stats[2] =
1154 {
1155 "Help for STATS\n"
1156 ,
1157 "Usage: STATS\n"
1158 "\n"
1159 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1160 "a description of the values.\n"
1161 };
1163 char *help_batch[2] =
1164 {
1165 "Help for BATCH\n"
1166 ,
1167 "The 'BATCH' command permits the client to initiate a bulk load\n"
1168 " of commands to rrdcached.\n"
1169 "\n"
1170 "Usage:\n"
1171 "\n"
1172 " client: BATCH\n"
1173 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1174 " client: command #1\n"
1175 " client: command #2\n"
1176 " client: ... and so on\n"
1177 " client: .\n"
1178 " server: 2 errors\n"
1179 " server: 7 message for command #7\n"
1180 " server: 9 message for command #9\n"
1181 "\n"
1182 "For more information, consult the rrdcached(1) documentation.\n"
1183 };
1185 char *help_quit[2] =
1186 {
1187 "Help for QUIT\n"
1188 ,
1189 "Disconnect from rrdcached.\n"
1190 };
1192 status = buffer_get_field (&buffer, &buffer_size, &command);
1193 if (status != 0)
1194 help_text = help_help;
1195 else
1196 {
1197 if (strcasecmp (command, "update") == 0)
1198 help_text = help_update;
1199 else if (strcasecmp (command, "flush") == 0)
1200 help_text = help_flush;
1201 else if (strcasecmp (command, "flushall") == 0)
1202 help_text = help_flushall;
1203 else if (strcasecmp (command, "pending") == 0)
1204 help_text = help_pending;
1205 else if (strcasecmp (command, "forget") == 0)
1206 help_text = help_forget;
1207 else if (strcasecmp (command, "queue") == 0)
1208 help_text = help_queue;
1209 else if (strcasecmp (command, "stats") == 0)
1210 help_text = help_stats;
1211 else if (strcasecmp (command, "batch") == 0)
1212 help_text = help_batch;
1213 else if (strcasecmp (command, "quit") == 0)
1214 help_text = help_quit;
1215 else
1216 help_text = help_help;
1217 }
1219 add_response_info(sock, help_text[1]);
1220 return send_response(sock, RESP_OK, help_text[0]);
1221 } /* }}} int handle_request_help */
1223 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1224 {
1225 uint64_t copy_queue_length;
1226 uint64_t copy_updates_received;
1227 uint64_t copy_flush_received;
1228 uint64_t copy_updates_written;
1229 uint64_t copy_data_sets_written;
1230 uint64_t copy_journal_bytes;
1231 uint64_t copy_journal_rotate;
1233 uint64_t tree_nodes_number;
1234 uint64_t tree_depth;
1236 pthread_mutex_lock (&stats_lock);
1237 copy_queue_length = stats_queue_length;
1238 copy_updates_received = stats_updates_received;
1239 copy_flush_received = stats_flush_received;
1240 copy_updates_written = stats_updates_written;
1241 copy_data_sets_written = stats_data_sets_written;
1242 copy_journal_bytes = stats_journal_bytes;
1243 copy_journal_rotate = stats_journal_rotate;
1244 pthread_mutex_unlock (&stats_lock);
1246 pthread_mutex_lock (&cache_lock);
1247 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1248 tree_depth = (uint64_t) g_tree_height (cache_tree);
1249 pthread_mutex_unlock (&cache_lock);
1251 add_response_info(sock,
1252 "QueueLength: %"PRIu64"\n", copy_queue_length);
1253 add_response_info(sock,
1254 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1255 add_response_info(sock,
1256 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1257 add_response_info(sock,
1258 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1259 add_response_info(sock,
1260 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1261 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1262 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1263 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1264 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1266 send_response(sock, RESP_OK, "Statistics follow\n");
1268 return (0);
1269 } /* }}} int handle_request_stats */
1271 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1272 char *buffer, size_t buffer_size)
1273 {
1274 char *file, file_tmp[PATH_MAX];
1275 int status;
1277 status = buffer_get_field (&buffer, &buffer_size, &file);
1278 if (status != 0)
1279 {
1280 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1281 }
1282 else
1283 {
1284 pthread_mutex_lock(&stats_lock);
1285 stats_flush_received++;
1286 pthread_mutex_unlock(&stats_lock);
1288 get_abs_path(&file, file_tmp);
1289 if (!check_file_access(file, sock)) return 0;
1291 status = flush_file (file);
1292 if (status == 0)
1293 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1294 else if (status == ENOENT)
1295 {
1296 /* no file in our tree; see whether it exists at all */
1297 struct stat statbuf;
1299 memset(&statbuf, 0, sizeof(statbuf));
1300 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1301 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1302 else
1303 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1304 }
1305 else if (status < 0)
1306 return send_response(sock, RESP_ERR, "Internal error.\n");
1307 else
1308 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1309 }
1311 /* NOTREACHED */
1312 assert(1==0);
1313 } /* }}} int handle_request_flush */
1315 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1316 {
1317 int status;
1319 status = has_privilege(sock, PRIV_HIGH);
1320 if (status <= 0)
1321 return status;
1323 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1325 pthread_mutex_lock(&cache_lock);
1326 flush_old_values(-1);
1327 pthread_mutex_unlock(&cache_lock);
1329 return send_response(sock, RESP_OK, "Started flush.\n");
1330 } /* }}} static int handle_request_flushall */
1332 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1333 char *buffer, size_t buffer_size)
1334 {
1335 int status;
1336 char *file, file_tmp[PATH_MAX];
1337 cache_item_t *ci;
1339 status = buffer_get_field(&buffer, &buffer_size, &file);
1340 if (status != 0)
1341 return send_response(sock, RESP_ERR,
1342 "Usage: PENDING <filename>\n");
1344 status = has_privilege(sock, PRIV_HIGH);
1345 if (status <= 0)
1346 return status;
1348 get_abs_path(&file, file_tmp);
1350 pthread_mutex_lock(&cache_lock);
1351 ci = g_tree_lookup(cache_tree, file);
1352 if (ci == NULL)
1353 {
1354 pthread_mutex_unlock(&cache_lock);
1355 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1356 }
1358 for (int i=0; i < ci->values_num; i++)
1359 add_response_info(sock, "%s\n", ci->values[i]);
1361 pthread_mutex_unlock(&cache_lock);
1362 return send_response(sock, RESP_OK, "updates pending\n");
1363 } /* }}} static int handle_request_pending */
1365 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1366 char *buffer, size_t buffer_size)
1367 {
1368 int status;
1369 gboolean found;
1370 char *file, file_tmp[PATH_MAX];
1372 status = buffer_get_field(&buffer, &buffer_size, &file);
1373 if (status != 0)
1374 return send_response(sock, RESP_ERR,
1375 "Usage: FORGET <filename>\n");
1377 status = has_privilege(sock, PRIV_HIGH);
1378 if (status <= 0)
1379 return status;
1381 get_abs_path(&file, file_tmp);
1382 if (!check_file_access(file, sock)) return 0;
1384 pthread_mutex_lock(&cache_lock);
1385 found = g_tree_remove(cache_tree, file);
1386 pthread_mutex_unlock(&cache_lock);
1388 if (found == TRUE)
1389 {
1390 if (sock != NULL)
1391 journal_write("forget", file);
1393 return send_response(sock, RESP_OK, "Gone!\n");
1394 }
1395 else
1396 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1398 /* NOTREACHED */
1399 assert(1==0);
1400 } /* }}} static int handle_request_forget */
1402 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1403 {
1404 cache_item_t *ci;
1406 pthread_mutex_lock(&cache_lock);
1408 ci = cache_queue_head;
1409 while (ci != NULL)
1410 {
1411 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1412 ci = ci->next;
1413 }
1415 pthread_mutex_unlock(&cache_lock);
1417 return send_response(sock, RESP_OK, "in queue.\n");
1418 } /* }}} int handle_request_queue */
1420 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1421 time_t now,
1422 char *buffer, size_t buffer_size)
1423 {
1424 char *file, file_tmp[PATH_MAX];
1425 int values_num = 0;
1426 int status;
1427 char orig_buf[CMD_MAX];
1429 cache_item_t *ci;
1431 status = has_privilege(sock, PRIV_HIGH);
1432 if (status <= 0)
1433 return status;
1435 /* save it for the journal later */
1436 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1438 status = buffer_get_field (&buffer, &buffer_size, &file);
1439 if (status != 0)
1440 return send_response(sock, RESP_ERR,
1441 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1443 pthread_mutex_lock(&stats_lock);
1444 stats_updates_received++;
1445 pthread_mutex_unlock(&stats_lock);
1447 get_abs_path(&file, file_tmp);
1448 if (!check_file_access(file, sock)) return 0;
1450 pthread_mutex_lock (&cache_lock);
1451 ci = g_tree_lookup (cache_tree, file);
1453 if (ci == NULL) /* {{{ */
1454 {
1455 struct stat statbuf;
1457 /* don't hold the lock while we setup; stat(2) might block */
1458 pthread_mutex_unlock(&cache_lock);
1460 memset (&statbuf, 0, sizeof (statbuf));
1461 status = stat (file, &statbuf);
1462 if (status != 0)
1463 {
1464 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1466 status = errno;
1467 if (status == ENOENT)
1468 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1469 else
1470 return send_response(sock, RESP_ERR,
1471 "stat failed with error %i.\n", status);
1472 }
1473 if (!S_ISREG (statbuf.st_mode))
1474 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1476 if (access(file, R_OK|W_OK) != 0)
1477 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1478 file, rrd_strerror(errno));
1480 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1481 if (ci == NULL)
1482 {
1483 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1485 return send_response(sock, RESP_ERR, "malloc failed.\n");
1486 }
1487 memset (ci, 0, sizeof (cache_item_t));
1489 ci->file = strdup (file);
1490 if (ci->file == NULL)
1491 {
1492 free (ci);
1493 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1495 return send_response(sock, RESP_ERR, "strdup failed.\n");
1496 }
1498 wipe_ci_values(ci, now);
1499 ci->flags = CI_FLAGS_IN_TREE;
1500 pthread_cond_init(&ci->flushed, NULL);
1502 pthread_mutex_lock(&cache_lock);
1503 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1504 } /* }}} */
1505 assert (ci != NULL);
1507 /* don't re-write updates in replay mode */
1508 if (sock != NULL)
1509 journal_write("update", orig_buf);
1511 while (buffer_size > 0)
1512 {
1513 char **temp;
1514 char *value;
1515 time_t stamp;
1516 char *eostamp;
1518 status = buffer_get_field (&buffer, &buffer_size, &value);
1519 if (status != 0)
1520 {
1521 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1522 break;
1523 }
1525 /* make sure update time is always moving forward */
1526 stamp = strtol(value, &eostamp, 10);
1527 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1528 {
1529 pthread_mutex_unlock(&cache_lock);
1530 return send_response(sock, RESP_ERR,
1531 "Cannot find timestamp in '%s'!\n", value);
1532 }
1533 else if (stamp <= ci->last_update_stamp)
1534 {
1535 pthread_mutex_unlock(&cache_lock);
1536 return send_response(sock, RESP_ERR,
1537 "illegal attempt to update using time %ld when last"
1538 " update time is %ld (minimum one second step)\n",
1539 stamp, ci->last_update_stamp);
1540 }
1541 else
1542 ci->last_update_stamp = stamp;
1544 temp = (char **) rrd_realloc (ci->values,
1545 sizeof (char *) * (ci->values_num + 1));
1546 if (temp == NULL)
1547 {
1548 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1549 continue;
1550 }
1551 ci->values = temp;
1553 ci->values[ci->values_num] = strdup (value);
1554 if (ci->values[ci->values_num] == NULL)
1555 {
1556 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1557 continue;
1558 }
1559 ci->values_num++;
1561 values_num++;
1562 }
1564 if (((now - ci->last_flush_time) >= config_write_interval)
1565 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1566 && (ci->values_num > 0))
1567 {
1568 enqueue_cache_item (ci, TAIL);
1569 }
1571 pthread_mutex_unlock (&cache_lock);
1573 if (values_num < 1)
1574 return send_response(sock, RESP_ERR, "No values updated.\n");
1575 else
1576 return send_response(sock, RESP_OK,
1577 "errors, enqueued %i value(s).\n", values_num);
1579 /* NOTREACHED */
1580 assert(1==0);
1582 } /* }}} int handle_request_update */
1584 /* we came across a "WROTE" entry during journal replay.
1585 * throw away any values that we have accumulated for this file
1586 */
1587 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1588 {
1589 int i;
1590 cache_item_t *ci;
1591 const char *file = buffer;
1593 pthread_mutex_lock(&cache_lock);
1595 ci = g_tree_lookup(cache_tree, file);
1596 if (ci == NULL)
1597 {
1598 pthread_mutex_unlock(&cache_lock);
1599 return (0);
1600 }
1602 if (ci->values)
1603 {
1604 for (i=0; i < ci->values_num; i++)
1605 free(ci->values[i]);
1607 free(ci->values);
1608 }
1610 wipe_ci_values(ci, now);
1611 remove_from_queue(ci);
1613 pthread_mutex_unlock(&cache_lock);
1614 return (0);
1615 } /* }}} int handle_request_wrote */
1617 /* start "BATCH" processing */
1618 static int batch_start (listen_socket_t *sock) /* {{{ */
1619 {
1620 int status;
1621 if (sock->batch_start)
1622 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1624 status = send_response(sock, RESP_OK,
1625 "Go ahead. End with dot '.' on its own line.\n");
1626 sock->batch_start = time(NULL);
1627 sock->batch_cmd = 0;
1629 return status;
1630 } /* }}} static int batch_start */
1632 /* finish "BATCH" processing and return results to the client */
1633 static int batch_done (listen_socket_t *sock) /* {{{ */
1634 {
1635 assert(sock->batch_start);
1636 sock->batch_start = 0;
1637 sock->batch_cmd = 0;
1638 return send_response(sock, RESP_OK, "errors\n");
1639 } /* }}} static int batch_done */
1641 /* if sock==NULL, we are in journal replay mode */
1642 static int handle_request (listen_socket_t *sock, /* {{{ */
1643 time_t now,
1644 char *buffer, size_t buffer_size)
1645 {
1646 char *buffer_ptr;
1647 char *command;
1648 int status;
1650 assert (buffer[buffer_size - 1] == '\0');
1652 buffer_ptr = buffer;
1653 command = NULL;
1654 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1655 if (status != 0)
1656 {
1657 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1658 return (-1);
1659 }
1661 if (sock != NULL && sock->batch_start)
1662 sock->batch_cmd++;
1664 if (strcasecmp (command, "update") == 0)
1665 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1666 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1667 {
1668 /* this is only valid in replay mode */
1669 return (handle_request_wrote (buffer_ptr, now));
1670 }
1671 else if (strcasecmp (command, "flush") == 0)
1672 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1673 else if (strcasecmp (command, "flushall") == 0)
1674 return (handle_request_flushall(sock));
1675 else if (strcasecmp (command, "pending") == 0)
1676 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1677 else if (strcasecmp (command, "forget") == 0)
1678 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1679 else if (strcasecmp (command, "queue") == 0)
1680 return (handle_request_queue(sock));
1681 else if (strcasecmp (command, "stats") == 0)
1682 return (handle_request_stats (sock));
1683 else if (strcasecmp (command, "help") == 0)
1684 return (handle_request_help (sock, buffer_ptr, buffer_size));
1685 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1686 return batch_start(sock);
1687 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1688 return batch_done(sock);
1689 else if (strcasecmp (command, "quit") == 0)
1690 return -1;
1691 else
1692 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1694 /* NOTREACHED */
1695 assert(1==0);
1696 } /* }}} int handle_request */
1698 /* MUST NOT hold journal_lock before calling this */
1699 static void journal_rotate(void) /* {{{ */
1700 {
1701 FILE *old_fh = NULL;
1702 int new_fd;
1704 if (journal_cur == NULL || journal_old == NULL)
1705 return;
1707 pthread_mutex_lock(&journal_lock);
1709 /* we rotate this way (rename before close) so that the we can release
1710 * the journal lock as fast as possible. Journal writes to the new
1711 * journal can proceed immediately after the new file is opened. The
1712 * fclose can then block without affecting new updates.
1713 */
1714 if (journal_fh != NULL)
1715 {
1716 old_fh = journal_fh;
1717 journal_fh = NULL;
1718 rename(journal_cur, journal_old);
1719 ++stats_journal_rotate;
1720 }
1722 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1723 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1724 if (new_fd >= 0)
1725 {
1726 journal_fh = fdopen(new_fd, "a");
1727 if (journal_fh == NULL)
1728 close(new_fd);
1729 }
1731 pthread_mutex_unlock(&journal_lock);
1733 if (old_fh != NULL)
1734 fclose(old_fh);
1736 if (journal_fh == NULL)
1737 {
1738 RRDD_LOG(LOG_CRIT,
1739 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1740 journal_cur, rrd_strerror(errno));
1742 RRDD_LOG(LOG_ERR,
1743 "JOURNALING DISABLED: All values will be flushed at shutdown");
1744 config_flush_at_shutdown = 1;
1745 }
1747 } /* }}} static void journal_rotate */
1749 static void journal_done(void) /* {{{ */
1750 {
1751 if (journal_cur == NULL)
1752 return;
1754 pthread_mutex_lock(&journal_lock);
1755 if (journal_fh != NULL)
1756 {
1757 fclose(journal_fh);
1758 journal_fh = NULL;
1759 }
1761 if (config_flush_at_shutdown)
1762 {
1763 RRDD_LOG(LOG_INFO, "removing journals");
1764 unlink(journal_old);
1765 unlink(journal_cur);
1766 }
1767 else
1768 {
1769 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1770 "journals will be used at next startup");
1771 }
1773 pthread_mutex_unlock(&journal_lock);
1775 } /* }}} static void journal_done */
1777 static int journal_write(char *cmd, char *args) /* {{{ */
1778 {
1779 int chars;
1781 if (journal_fh == NULL)
1782 return 0;
1784 pthread_mutex_lock(&journal_lock);
1785 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1786 pthread_mutex_unlock(&journal_lock);
1788 if (chars > 0)
1789 {
1790 pthread_mutex_lock(&stats_lock);
1791 stats_journal_bytes += chars;
1792 pthread_mutex_unlock(&stats_lock);
1793 }
1795 return chars;
1796 } /* }}} static int journal_write */
1798 static int journal_replay (const char *file) /* {{{ */
1799 {
1800 FILE *fh;
1801 int entry_cnt = 0;
1802 int fail_cnt = 0;
1803 uint64_t line = 0;
1804 char entry[CMD_MAX];
1805 time_t now;
1807 if (file == NULL) return 0;
1809 {
1810 char *reason = "unknown error";
1811 int status = 0;
1812 struct stat statbuf;
1814 memset(&statbuf, 0, sizeof(statbuf));
1815 if (stat(file, &statbuf) != 0)
1816 {
1817 if (errno == ENOENT)
1818 return 0;
1820 reason = "stat error";
1821 status = errno;
1822 }
1823 else if (!S_ISREG(statbuf.st_mode))
1824 {
1825 reason = "not a regular file";
1826 status = EPERM;
1827 }
1828 if (statbuf.st_uid != daemon_uid)
1829 {
1830 reason = "not owned by daemon user";
1831 status = EACCES;
1832 }
1833 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1834 {
1835 reason = "must not be user/group writable";
1836 status = EACCES;
1837 }
1839 if (status != 0)
1840 {
1841 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1842 file, rrd_strerror(status), reason);
1843 return 0;
1844 }
1845 }
1847 fh = fopen(file, "r");
1848 if (fh == NULL)
1849 {
1850 if (errno != ENOENT)
1851 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1852 file, rrd_strerror(errno));
1853 return 0;
1854 }
1855 else
1856 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1858 now = time(NULL);
1860 while(!feof(fh))
1861 {
1862 size_t entry_len;
1864 ++line;
1865 if (fgets(entry, sizeof(entry), fh) == NULL)
1866 break;
1867 entry_len = strlen(entry);
1869 /* check \n termination in case journal writing crashed mid-line */
1870 if (entry_len == 0)
1871 continue;
1872 else if (entry[entry_len - 1] != '\n')
1873 {
1874 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1875 ++fail_cnt;
1876 continue;
1877 }
1879 entry[entry_len - 1] = '\0';
1881 if (handle_request(NULL, now, entry, entry_len) == 0)
1882 ++entry_cnt;
1883 else
1884 ++fail_cnt;
1885 }
1887 fclose(fh);
1889 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1890 entry_cnt, fail_cnt);
1892 return entry_cnt > 0 ? 1 : 0;
1893 } /* }}} static int journal_replay */
1895 static void journal_init(void) /* {{{ */
1896 {
1897 int had_journal = 0;
1899 if (journal_cur == NULL) return;
1901 pthread_mutex_lock(&journal_lock);
1903 RRDD_LOG(LOG_INFO, "checking for journal files");
1905 had_journal += journal_replay(journal_old);
1906 had_journal += journal_replay(journal_cur);
1908 /* it must have been a crash. start a flush */
1909 if (had_journal && config_flush_at_shutdown)
1910 flush_old_values(-1);
1912 pthread_mutex_unlock(&journal_lock);
1913 journal_rotate();
1915 RRDD_LOG(LOG_INFO, "journal processing complete");
1917 } /* }}} static void journal_init */
1919 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1920 {
1921 assert(sock != NULL);
1923 free(sock->rbuf); sock->rbuf = NULL;
1924 free(sock->wbuf); sock->wbuf = NULL;
1925 free(sock);
1926 } /* }}} void free_listen_socket */
1928 static void close_connection(listen_socket_t *sock) /* {{{ */
1929 {
1930 if (sock->fd >= 0)
1931 {
1932 close(sock->fd);
1933 sock->fd = -1;
1934 }
1936 free_listen_socket(sock);
1938 } /* }}} void close_connection */
1940 static void *connection_thread_main (void *args) /* {{{ */
1941 {
1942 listen_socket_t *sock;
1943 int i;
1944 int fd;
1946 sock = (listen_socket_t *) args;
1947 fd = sock->fd;
1949 /* init read buffers */
1950 sock->next_read = sock->next_cmd = 0;
1951 sock->rbuf = malloc(RBUF_SIZE);
1952 if (sock->rbuf == NULL)
1953 {
1954 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1955 close_connection(sock);
1956 return NULL;
1957 }
1959 pthread_mutex_lock (&connection_threads_lock);
1960 {
1961 pthread_t *temp;
1963 temp = (pthread_t *) rrd_realloc (connection_threads,
1964 sizeof (pthread_t) * (connection_threads_num + 1));
1965 if (temp == NULL)
1966 {
1967 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1968 }
1969 else
1970 {
1971 connection_threads = temp;
1972 connection_threads[connection_threads_num] = pthread_self ();
1973 connection_threads_num++;
1974 }
1975 }
1976 pthread_mutex_unlock (&connection_threads_lock);
1978 while (do_shutdown == 0)
1979 {
1980 char *cmd;
1981 ssize_t cmd_len;
1982 ssize_t rbytes;
1983 time_t now;
1985 struct pollfd pollfd;
1986 int status;
1988 pollfd.fd = fd;
1989 pollfd.events = POLLIN | POLLPRI;
1990 pollfd.revents = 0;
1992 status = poll (&pollfd, 1, /* timeout = */ 500);
1993 if (do_shutdown)
1994 break;
1995 else if (status == 0) /* timeout */
1996 continue;
1997 else if (status < 0) /* error */
1998 {
1999 status = errno;
2000 if (status != EINTR)
2001 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2002 continue;
2003 }
2005 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2006 break;
2007 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2008 {
2009 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2010 "poll(2) returned something unexpected: %#04hx",
2011 pollfd.revents);
2012 break;
2013 }
2015 rbytes = read(fd, sock->rbuf + sock->next_read,
2016 RBUF_SIZE - sock->next_read);
2017 if (rbytes < 0)
2018 {
2019 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2020 break;
2021 }
2022 else if (rbytes == 0)
2023 break; /* eof */
2025 sock->next_read += rbytes;
2027 if (sock->batch_start)
2028 now = sock->batch_start;
2029 else
2030 now = time(NULL);
2032 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2033 {
2034 status = handle_request (sock, now, cmd, cmd_len+1);
2035 if (status != 0)
2036 goto out_close;
2037 }
2038 }
2040 out_close:
2041 close_connection(sock);
2043 /* Remove this thread from the connection threads list */
2044 pthread_mutex_lock (&connection_threads_lock);
2045 {
2046 pthread_t self;
2047 pthread_t *temp;
2049 /* Find out own index in the array */
2050 self = pthread_self ();
2051 for (i = 0; i < connection_threads_num; i++)
2052 if (pthread_equal (connection_threads[i], self) != 0)
2053 break;
2054 assert (i < connection_threads_num);
2056 /* Move the trailing threads forward. */
2057 if (i < (connection_threads_num - 1))
2058 {
2059 memmove (connection_threads + i,
2060 connection_threads + i + 1,
2061 sizeof (pthread_t) * (connection_threads_num - i - 1));
2062 }
2064 connection_threads_num--;
2066 temp = rrd_realloc(connection_threads,
2067 sizeof(*connection_threads) * connection_threads_num);
2068 if (connection_threads_num > 0 && temp == NULL)
2069 RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2070 else
2071 connection_threads = temp;
2072 }
2073 pthread_mutex_unlock (&connection_threads_lock);
2075 return (NULL);
2076 } /* }}} void *connection_thread_main */
2078 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2079 {
2080 int fd;
2081 struct sockaddr_un sa;
2082 listen_socket_t *temp;
2083 int status;
2084 const char *path;
2086 path = sock->addr;
2087 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2088 path += strlen("unix:");
2090 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2091 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2092 if (temp == NULL)
2093 {
2094 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2095 return (-1);
2096 }
2097 listen_fds = temp;
2098 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2100 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2101 if (fd < 0)
2102 {
2103 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2104 rrd_strerror(errno));
2105 return (-1);
2106 }
2108 memset (&sa, 0, sizeof (sa));
2109 sa.sun_family = AF_UNIX;
2110 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2112 /* if we've gotten this far, we own the pid file. any daemon started
2113 * with the same args must not be alive. therefore, ensure that we can
2114 * create the socket...
2115 */
2116 unlink(path);
2118 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2119 if (status != 0)
2120 {
2121 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2122 path, rrd_strerror(errno));
2123 close (fd);
2124 return (-1);
2125 }
2127 status = listen (fd, /* backlog = */ 10);
2128 if (status != 0)
2129 {
2130 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2131 path, rrd_strerror(errno));
2132 close (fd);
2133 unlink (path);
2134 return (-1);
2135 }
2137 listen_fds[listen_fds_num].fd = fd;
2138 listen_fds[listen_fds_num].family = PF_UNIX;
2139 strncpy(listen_fds[listen_fds_num].addr, path,
2140 sizeof (listen_fds[listen_fds_num].addr) - 1);
2141 listen_fds_num++;
2143 return (0);
2144 } /* }}} int open_listen_socket_unix */
2146 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2147 {
2148 struct addrinfo ai_hints;
2149 struct addrinfo *ai_res;
2150 struct addrinfo *ai_ptr;
2151 char addr_copy[NI_MAXHOST];
2152 char *addr;
2153 char *port;
2154 int status;
2156 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2157 addr_copy[sizeof (addr_copy) - 1] = 0;
2158 addr = addr_copy;
2160 memset (&ai_hints, 0, sizeof (ai_hints));
2161 ai_hints.ai_flags = 0;
2162 #ifdef AI_ADDRCONFIG
2163 ai_hints.ai_flags |= AI_ADDRCONFIG;
2164 #endif
2165 ai_hints.ai_family = AF_UNSPEC;
2166 ai_hints.ai_socktype = SOCK_STREAM;
2168 port = NULL;
2169 if (*addr == '[') /* IPv6+port format */
2170 {
2171 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2172 addr++;
2174 port = strchr (addr, ']');
2175 if (port == NULL)
2176 {
2177 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2178 return (-1);
2179 }
2180 *port = 0;
2181 port++;
2183 if (*port == ':')
2184 port++;
2185 else if (*port == 0)
2186 port = NULL;
2187 else
2188 {
2189 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2190 return (-1);
2191 }
2192 } /* if (*addr = ']') */
2193 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2194 {
2195 port = rindex(addr, ':');
2196 if (port != NULL)
2197 {
2198 *port = 0;
2199 port++;
2200 }
2201 }
2202 ai_res = NULL;
2203 status = getaddrinfo (addr,
2204 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2205 &ai_hints, &ai_res);
2206 if (status != 0)
2207 {
2208 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2209 addr, gai_strerror (status));
2210 return (-1);
2211 }
2213 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2214 {
2215 int fd;
2216 listen_socket_t *temp;
2217 int one = 1;
2219 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2220 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2221 if (temp == NULL)
2222 {
2223 fprintf (stderr,
2224 "rrdcached: open_listen_socket_network: realloc failed.\n");
2225 continue;
2226 }
2227 listen_fds = temp;
2228 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2230 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2231 if (fd < 0)
2232 {
2233 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2234 rrd_strerror(errno));
2235 continue;
2236 }
2238 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2240 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2241 if (status != 0)
2242 {
2243 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2244 sock->addr, rrd_strerror(errno));
2245 close (fd);
2246 continue;
2247 }
2249 status = listen (fd, /* backlog = */ 10);
2250 if (status != 0)
2251 {
2252 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2253 sock->addr, rrd_strerror(errno));
2254 close (fd);
2255 freeaddrinfo(ai_res);
2256 return (-1);
2257 }
2259 listen_fds[listen_fds_num].fd = fd;
2260 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2261 listen_fds_num++;
2262 } /* for (ai_ptr) */
2264 freeaddrinfo(ai_res);
2265 return (0);
2266 } /* }}} static int open_listen_socket_network */
2268 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2269 {
2270 assert(sock != NULL);
2271 assert(sock->addr != NULL);
2273 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2274 || sock->addr[0] == '/')
2275 return (open_listen_socket_unix(sock));
2276 else
2277 return (open_listen_socket_network(sock));
2278 } /* }}} int open_listen_socket */
2280 static int close_listen_sockets (void) /* {{{ */
2281 {
2282 size_t i;
2284 for (i = 0; i < listen_fds_num; i++)
2285 {
2286 close (listen_fds[i].fd);
2288 if (listen_fds[i].family == PF_UNIX)
2289 unlink(listen_fds[i].addr);
2290 }
2292 free (listen_fds);
2293 listen_fds = NULL;
2294 listen_fds_num = 0;
2296 return (0);
2297 } /* }}} int close_listen_sockets */
2299 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2300 {
2301 struct pollfd *pollfds;
2302 int pollfds_num;
2303 int status;
2304 int i;
2306 if (listen_fds_num < 1)
2307 {
2308 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2309 return (NULL);
2310 }
2312 pollfds_num = listen_fds_num;
2313 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2314 if (pollfds == NULL)
2315 {
2316 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2317 return (NULL);
2318 }
2319 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2321 RRDD_LOG(LOG_INFO, "listening for connections");
2323 while (do_shutdown == 0)
2324 {
2325 for (i = 0; i < pollfds_num; i++)
2326 {
2327 pollfds[i].fd = listen_fds[i].fd;
2328 pollfds[i].events = POLLIN | POLLPRI;
2329 pollfds[i].revents = 0;
2330 }
2332 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2333 if (do_shutdown)
2334 break;
2335 else if (status == 0) /* timeout */
2336 continue;
2337 else if (status < 0) /* error */
2338 {
2339 status = errno;
2340 if (status != EINTR)
2341 {
2342 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2343 }
2344 continue;
2345 }
2347 for (i = 0; i < pollfds_num; i++)
2348 {
2349 listen_socket_t *client_sock;
2350 struct sockaddr_storage client_sa;
2351 socklen_t client_sa_size;
2352 pthread_t tid;
2353 pthread_attr_t attr;
2355 if (pollfds[i].revents == 0)
2356 continue;
2358 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2359 {
2360 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2361 "poll(2) returned something unexpected for listen FD #%i.",
2362 pollfds[i].fd);
2363 continue;
2364 }
2366 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2367 if (client_sock == NULL)
2368 {
2369 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2370 continue;
2371 }
2372 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2374 client_sa_size = sizeof (client_sa);
2375 client_sock->fd = accept (pollfds[i].fd,
2376 (struct sockaddr *) &client_sa, &client_sa_size);
2377 if (client_sock->fd < 0)
2378 {
2379 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2380 free(client_sock);
2381 continue;
2382 }
2384 pthread_attr_init (&attr);
2385 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2387 status = pthread_create (&tid, &attr, connection_thread_main,
2388 client_sock);
2389 if (status != 0)
2390 {
2391 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2392 close_connection(client_sock);
2393 continue;
2394 }
2395 } /* for (pollfds_num) */
2396 } /* while (do_shutdown == 0) */
2398 RRDD_LOG(LOG_INFO, "starting shutdown");
2400 close_listen_sockets ();
2402 pthread_mutex_lock (&connection_threads_lock);
2403 while (connection_threads_num > 0)
2404 {
2405 pthread_t wait_for;
2407 wait_for = connection_threads[0];
2409 pthread_mutex_unlock (&connection_threads_lock);
2410 pthread_join (wait_for, /* retval = */ NULL);
2411 pthread_mutex_lock (&connection_threads_lock);
2412 }
2413 pthread_mutex_unlock (&connection_threads_lock);
2415 free(pollfds);
2417 return (NULL);
2418 } /* }}} void *listen_thread_main */
2420 static int daemonize (void) /* {{{ */
2421 {
2422 int pid_fd;
2423 char *base_dir;
2425 daemon_uid = geteuid();
2427 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2428 if (pid_fd < 0)
2429 pid_fd = check_pidfile();
2430 if (pid_fd < 0)
2431 return pid_fd;
2433 /* open all the listen sockets */
2434 if (config_listen_address_list_len > 0)
2435 {
2436 for (int i = 0; i < config_listen_address_list_len; i++)
2437 {
2438 open_listen_socket (config_listen_address_list[i]);
2439 free_listen_socket (config_listen_address_list[i]);
2440 }
2442 free(config_listen_address_list);
2443 }
2444 else
2445 {
2446 listen_socket_t sock;
2447 memset(&sock, 0, sizeof(sock));
2448 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2449 open_listen_socket (&sock);
2450 }
2452 if (listen_fds_num < 1)
2453 {
2454 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2455 goto error;
2456 }
2458 if (!stay_foreground)
2459 {
2460 pid_t child;
2462 child = fork ();
2463 if (child < 0)
2464 {
2465 fprintf (stderr, "daemonize: fork(2) failed.\n");
2466 goto error;
2467 }
2468 else if (child > 0)
2469 exit(0);
2471 /* Become session leader */
2472 setsid ();
2474 /* Open the first three file descriptors to /dev/null */
2475 close (2);
2476 close (1);
2477 close (0);
2479 open ("/dev/null", O_RDWR);
2480 dup (0);
2481 dup (0);
2482 } /* if (!stay_foreground) */
2484 /* Change into the /tmp directory. */
2485 base_dir = (config_base_dir != NULL)
2486 ? config_base_dir
2487 : "/tmp";
2489 if (chdir (base_dir) != 0)
2490 {
2491 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2492 goto error;
2493 }
2495 install_signal_handlers();
2497 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2498 RRDD_LOG(LOG_INFO, "starting up");
2500 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2501 (GDestroyNotify) free_cache_item);
2502 if (cache_tree == NULL)
2503 {
2504 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2505 goto error;
2506 }
2508 return write_pidfile (pid_fd);
2510 error:
2511 remove_pidfile();
2512 return -1;
2513 } /* }}} int daemonize */
2515 static int cleanup (void) /* {{{ */
2516 {
2517 do_shutdown++;
2519 pthread_cond_signal (&cache_cond);
2520 pthread_join (queue_thread, /* return = */ NULL);
2522 remove_pidfile ();
2524 free(config_base_dir);
2525 free(config_pid_file);
2526 free(journal_cur);
2527 free(journal_old);
2529 pthread_mutex_lock(&cache_lock);
2530 g_tree_destroy(cache_tree);
2532 RRDD_LOG(LOG_INFO, "goodbye");
2533 closelog ();
2535 return (0);
2536 } /* }}} int cleanup */
2538 static int read_options (int argc, char **argv) /* {{{ */
2539 {
2540 int option;
2541 int status = 0;
2543 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2544 {
2545 switch (option)
2546 {
2547 case 'g':
2548 stay_foreground=1;
2549 break;
2551 case 'L':
2552 case 'l':
2553 {
2554 listen_socket_t **temp;
2555 listen_socket_t *new;
2557 new = malloc(sizeof(listen_socket_t));
2558 if (new == NULL)
2559 {
2560 fprintf(stderr, "read_options: malloc failed.\n");
2561 return(2);
2562 }
2563 memset(new, 0, sizeof(listen_socket_t));
2565 temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2566 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2567 if (temp == NULL)
2568 {
2569 fprintf (stderr, "read_options: realloc failed.\n");
2570 return (2);
2571 }
2572 config_listen_address_list = temp;
2574 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2575 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2577 temp[config_listen_address_list_len] = new;
2578 config_listen_address_list_len++;
2579 }
2580 break;
2582 case 'f':
2583 {
2584 int temp;
2586 temp = atoi (optarg);
2587 if (temp > 0)
2588 config_flush_interval = temp;
2589 else
2590 {
2591 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2592 status = 3;
2593 }
2594 }
2595 break;
2597 case 'w':
2598 {
2599 int temp;
2601 temp = atoi (optarg);
2602 if (temp > 0)
2603 config_write_interval = temp;
2604 else
2605 {
2606 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2607 status = 2;
2608 }
2609 }
2610 break;
2612 case 'z':
2613 {
2614 int temp;
2616 temp = atoi(optarg);
2617 if (temp > 0)
2618 config_write_jitter = temp;
2619 else
2620 {
2621 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2622 status = 2;
2623 }
2625 break;
2626 }
2628 case 'B':
2629 config_write_base_only = 1;
2630 break;
2632 case 'b':
2633 {
2634 size_t len;
2635 char base_realpath[PATH_MAX];
2637 if (config_base_dir != NULL)
2638 free (config_base_dir);
2639 config_base_dir = strdup (optarg);
2640 if (config_base_dir == NULL)
2641 {
2642 fprintf (stderr, "read_options: strdup failed.\n");
2643 return (3);
2644 }
2646 /* make sure that the base directory is not resolved via
2647 * symbolic links. this makes some performance-enhancing
2648 * assumptions possible (we don't have to resolve paths
2649 * that start with a "/")
2650 */
2651 if (realpath(config_base_dir, base_realpath) == NULL)
2652 {
2653 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2654 return 5;
2655 }
2656 else if (strncmp(config_base_dir,
2657 base_realpath, sizeof(base_realpath)) != 0)
2658 {
2659 fprintf(stderr,
2660 "Base directory (-b) resolved via file system links!\n"
2661 "Please consult rrdcached '-b' documentation!\n"
2662 "Consider specifying the real directory (%s)\n",
2663 base_realpath);
2664 return 5;
2665 }
2667 len = strlen (config_base_dir);
2668 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2669 {
2670 config_base_dir[len - 1] = 0;
2671 len--;
2672 }
2674 if (len < 1)
2675 {
2676 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2677 return (4);
2678 }
2680 _config_base_dir_len = len;
2681 }
2682 break;
2684 case 'p':
2685 {
2686 if (config_pid_file != NULL)
2687 free (config_pid_file);
2688 config_pid_file = strdup (optarg);
2689 if (config_pid_file == NULL)
2690 {
2691 fprintf (stderr, "read_options: strdup failed.\n");
2692 return (3);
2693 }
2694 }
2695 break;
2697 case 'F':
2698 config_flush_at_shutdown = 1;
2699 break;
2701 case 'j':
2702 {
2703 struct stat statbuf;
2704 const char *dir = optarg;
2706 status = stat(dir, &statbuf);
2707 if (status != 0)
2708 {
2709 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2710 return 6;
2711 }
2713 if (!S_ISDIR(statbuf.st_mode)
2714 || access(dir, R_OK|W_OK|X_OK) != 0)
2715 {
2716 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2717 errno ? rrd_strerror(errno) : "");
2718 return 6;
2719 }
2721 journal_cur = malloc(PATH_MAX + 1);
2722 journal_old = malloc(PATH_MAX + 1);
2723 if (journal_cur == NULL || journal_old == NULL)
2724 {
2725 fprintf(stderr, "malloc failure for journal files\n");
2726 return 6;
2727 }
2728 else
2729 {
2730 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2731 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2732 }
2733 }
2734 break;
2736 case 'h':
2737 case '?':
2738 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2739 "\n"
2740 "Usage: rrdcached [options]\n"
2741 "\n"
2742 "Valid options are:\n"
2743 " -l <address> Socket address to listen to.\n"
2744 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2745 " -w <seconds> Interval in which to write data.\n"
2746 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2747 " -f <seconds> Interval in which to flush dead data.\n"
2748 " -p <file> Location of the PID-file.\n"
2749 " -b <dir> Base directory to change to.\n"
2750 " -B Restrict file access to paths within -b <dir>\n"
2751 " -g Do not fork and run in the foreground.\n"
2752 " -j <dir> Directory in which to create the journal files.\n"
2753 " -F Always flush all updates at shutdown\n"
2754 "\n"
2755 "For more information and a detailed description of all options "
2756 "please refer\n"
2757 "to the rrdcached(1) manual page.\n",
2758 VERSION);
2759 status = -1;
2760 break;
2761 } /* switch (option) */
2762 } /* while (getopt) */
2764 /* advise the user when values are not sane */
2765 if (config_flush_interval < 2 * config_write_interval)
2766 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2767 " 2x write interval (-w) !\n");
2768 if (config_write_jitter > config_write_interval)
2769 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2770 " write interval (-w) !\n");
2772 if (config_write_base_only && config_base_dir == NULL)
2773 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2774 " Consult the rrdcached documentation\n");
2776 if (journal_cur == NULL)
2777 config_flush_at_shutdown = 1;
2779 return (status);
2780 } /* }}} int read_options */
2782 int main (int argc, char **argv)
2783 {
2784 int status;
2786 status = read_options (argc, argv);
2787 if (status != 0)
2788 {
2789 if (status < 0)
2790 status = 0;
2791 return (status);
2792 }
2794 status = daemonize ();
2795 if (status != 0)
2796 {
2797 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2798 return (1);
2799 }
2801 journal_init();
2803 /* start the queue thread */
2804 memset (&queue_thread, 0, sizeof (queue_thread));
2805 status = pthread_create (&queue_thread,
2806 NULL, /* attr */
2807 queue_thread_main,
2808 NULL); /* args */
2809 if (status != 0)
2810 {
2811 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2812 cleanup();
2813 return (1);
2814 }
2816 listen_thread_main (NULL);
2817 cleanup ();
2819 return (0);
2820 } /* int main */
2822 /*
2823 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2824 */