1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 # include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116 * Types
117 */
118 typedef enum
119 {
120 PRIV_LOW,
121 PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
127 {
128 int fd;
129 char addr[PATH_MAX + 1];
130 int family;
131 socket_privilege privilege;
133 /* state for BATCH processing */
134 time_t batch_start;
135 int batch_cmd;
137 /* buffered IO */
138 char *rbuf;
139 off_t next_cmd;
140 off_t next_read;
142 char *wbuf;
143 ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct cache_item_s;
148 typedef struct cache_item_s cache_item_t;
149 struct cache_item_s
150 {
151 char *file;
152 char **values;
153 int values_num;
154 time_t last_flush_time;
155 time_t last_update_stamp;
156 #define CI_FLAGS_IN_TREE (1<<0)
157 #define CI_FLAGS_IN_QUEUE (1<<1)
158 int flags;
159 pthread_cond_t flushed;
160 cache_item_t *prev;
161 cache_item_t *next;
162 };
164 struct callback_flush_data_s
165 {
166 time_t now;
167 time_t abs_timeout;
168 char **keys;
169 size_t keys_num;
170 };
171 typedef struct callback_flush_data_s callback_flush_data_t;
173 enum queue_side_e
174 {
175 HEAD,
176 TAIL
177 };
178 typedef enum queue_side_e queue_side_t;
180 /* max length of socket command or response */
181 #define CMD_MAX 4096
182 #define RBUF_SIZE (CMD_MAX*2)
184 /*
185 * Variables
186 */
187 static int stay_foreground = 0;
188 static uid_t daemon_uid;
190 static listen_socket_t *listen_fds = NULL;
191 static size_t listen_fds_num = 0;
193 static int do_shutdown = 0;
195 static pthread_t *queue_threads;
196 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
197 static int config_queue_threads = 4;
199 static pthread_t flush_thread;
200 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
202 static pthread_t *connection_threads = NULL;
203 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
204 static int connection_threads_num = 0;
206 /* Cache stuff */
207 static GTree *cache_tree = NULL;
208 static cache_item_t *cache_queue_head = NULL;
209 static cache_item_t *cache_queue_tail = NULL;
210 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
212 static int config_write_interval = 300;
213 static int config_write_jitter = 0;
214 static int config_flush_interval = 3600;
215 static int config_flush_at_shutdown = 0;
216 static char *config_pid_file = NULL;
217 static char *config_base_dir = NULL;
218 static size_t _config_base_dir_len = 0;
219 static int config_write_base_only = 0;
221 static listen_socket_t **config_listen_address_list = NULL;
222 static int config_listen_address_list_len = 0;
224 static uint64_t stats_queue_length = 0;
225 static uint64_t stats_updates_received = 0;
226 static uint64_t stats_flush_received = 0;
227 static uint64_t stats_updates_written = 0;
228 static uint64_t stats_data_sets_written = 0;
229 static uint64_t stats_journal_bytes = 0;
230 static uint64_t stats_journal_rotate = 0;
231 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
233 /* Journaled updates */
234 static char *journal_cur = NULL;
235 static char *journal_old = NULL;
236 static FILE *journal_fh = NULL;
237 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
238 static int journal_write(char *cmd, char *args);
239 static void journal_done(void);
240 static void journal_rotate(void);
242 /*
243 * Functions
244 */
245 static void sig_common (const char *sig) /* {{{ */
246 {
247 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
248 do_shutdown++;
249 pthread_cond_broadcast(&flush_cond);
250 pthread_cond_broadcast(&queue_cond);
251 } /* }}} void sig_common */
253 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
254 {
255 sig_common("INT");
256 } /* }}} void sig_int_handler */
258 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
259 {
260 sig_common("TERM");
261 } /* }}} void sig_term_handler */
263 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
264 {
265 config_flush_at_shutdown = 1;
266 sig_common("USR1");
267 } /* }}} void sig_usr1_handler */
269 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
270 {
271 config_flush_at_shutdown = 0;
272 sig_common("USR2");
273 } /* }}} void sig_usr2_handler */
275 static void install_signal_handlers(void) /* {{{ */
276 {
277 /* These structures are static, because `sigaction' behaves weird if the are
278 * overwritten.. */
279 static struct sigaction sa_int;
280 static struct sigaction sa_term;
281 static struct sigaction sa_pipe;
282 static struct sigaction sa_usr1;
283 static struct sigaction sa_usr2;
285 /* Install signal handlers */
286 memset (&sa_int, 0, sizeof (sa_int));
287 sa_int.sa_handler = sig_int_handler;
288 sigaction (SIGINT, &sa_int, NULL);
290 memset (&sa_term, 0, sizeof (sa_term));
291 sa_term.sa_handler = sig_term_handler;
292 sigaction (SIGTERM, &sa_term, NULL);
294 memset (&sa_pipe, 0, sizeof (sa_pipe));
295 sa_pipe.sa_handler = SIG_IGN;
296 sigaction (SIGPIPE, &sa_pipe, NULL);
298 memset (&sa_pipe, 0, sizeof (sa_usr1));
299 sa_usr1.sa_handler = sig_usr1_handler;
300 sigaction (SIGUSR1, &sa_usr1, NULL);
302 memset (&sa_usr2, 0, sizeof (sa_usr2));
303 sa_usr2.sa_handler = sig_usr2_handler;
304 sigaction (SIGUSR2, &sa_usr2, NULL);
306 } /* }}} void install_signal_handlers */
308 static int open_pidfile(char *action, int oflag) /* {{{ */
309 {
310 int fd;
311 char *file;
313 file = (config_pid_file != NULL)
314 ? config_pid_file
315 : LOCALSTATEDIR "/run/rrdcached.pid";
317 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
318 if (fd < 0)
319 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
320 action, file, rrd_strerror(errno));
322 return(fd);
323 } /* }}} static int open_pidfile */
325 /* check existing pid file to see whether a daemon is running */
326 static int check_pidfile(void)
327 {
328 int pid_fd;
329 pid_t pid;
330 char pid_str[16];
332 pid_fd = open_pidfile("open", O_RDWR);
333 if (pid_fd < 0)
334 return pid_fd;
336 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
337 return -1;
339 pid = atoi(pid_str);
340 if (pid <= 0)
341 return -1;
343 /* another running process that we can signal COULD be
344 * a competing rrdcached */
345 if (pid != getpid() && kill(pid, 0) == 0)
346 {
347 fprintf(stderr,
348 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
349 close(pid_fd);
350 return -1;
351 }
353 lseek(pid_fd, 0, SEEK_SET);
354 ftruncate(pid_fd, 0);
356 fprintf(stderr,
357 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
358 "rrdcached: starting normally.\n", pid);
360 return pid_fd;
361 } /* }}} static int check_pidfile */
363 static int write_pidfile (int fd) /* {{{ */
364 {
365 pid_t pid;
366 FILE *fh;
368 pid = getpid ();
370 fh = fdopen (fd, "w");
371 if (fh == NULL)
372 {
373 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
374 close(fd);
375 return (-1);
376 }
378 fprintf (fh, "%i\n", (int) pid);
379 fclose (fh);
381 return (0);
382 } /* }}} int write_pidfile */
384 static int remove_pidfile (void) /* {{{ */
385 {
386 char *file;
387 int status;
389 file = (config_pid_file != NULL)
390 ? config_pid_file
391 : LOCALSTATEDIR "/run/rrdcached.pid";
393 status = unlink (file);
394 if (status == 0)
395 return (0);
396 return (errno);
397 } /* }}} int remove_pidfile */
399 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
400 {
401 char *eol;
403 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
404 sock->next_read - sock->next_cmd);
406 if (eol == NULL)
407 {
408 /* no commands left, move remainder back to front of rbuf */
409 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
410 sock->next_read - sock->next_cmd);
411 sock->next_read -= sock->next_cmd;
412 sock->next_cmd = 0;
413 *len = 0;
414 return NULL;
415 }
416 else
417 {
418 char *cmd = sock->rbuf + sock->next_cmd;
419 *eol = '\0';
421 sock->next_cmd = eol - sock->rbuf + 1;
423 if (eol > sock->rbuf && *(eol-1) == '\r')
424 *(--eol) = '\0'; /* handle "\r\n" EOL */
426 *len = eol - cmd;
428 return cmd;
429 }
431 /* NOTREACHED */
432 assert(1==0);
433 }
435 /* add the characters directly to the write buffer */
436 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
437 {
438 char *new_buf;
440 assert(sock != NULL);
442 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
443 if (new_buf == NULL)
444 {
445 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
446 return -1;
447 }
449 strncpy(new_buf + sock->wbuf_len, str, len + 1);
451 sock->wbuf = new_buf;
452 sock->wbuf_len += len;
454 return 0;
455 } /* }}} static int add_to_wbuf */
457 /* add the text to the "extra" info that's sent after the status line */
458 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
459 {
460 va_list argp;
461 char buffer[CMD_MAX];
462 int len;
464 if (sock == NULL) return 0; /* journal replay mode */
465 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
467 va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
470 #else
471 len = vsprintf(buffer, fmt, argp);
472 #endif
473 va_end(argp);
474 if (len < 0)
475 {
476 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
477 return -1;
478 }
480 return add_to_wbuf(sock, buffer, len);
481 } /* }}} static int add_response_info */
483 static int count_lines(char *str) /* {{{ */
484 {
485 int lines = 0;
487 if (str != NULL)
488 {
489 while ((str = strchr(str, '\n')) != NULL)
490 {
491 ++lines;
492 ++str;
493 }
494 }
496 return lines;
497 } /* }}} static int count_lines */
499 /* send the response back to the user.
500 * returns 0 on success, -1 on error
501 * write buffer is always zeroed after this call */
502 static int send_response (listen_socket_t *sock, response_code rc,
503 char *fmt, ...) /* {{{ */
504 {
505 va_list argp;
506 char buffer[CMD_MAX];
507 int lines;
508 ssize_t wrote;
509 int rclen, len;
511 if (sock == NULL) return rc; /* journal replay mode */
513 if (sock->batch_start)
514 {
515 if (rc == RESP_OK)
516 return rc; /* no response on success during BATCH */
517 lines = sock->batch_cmd;
518 }
519 else if (rc == RESP_OK)
520 lines = count_lines(sock->wbuf);
521 else
522 lines = -1;
524 rclen = sprintf(buffer, "%d ", lines);
525 va_start(argp, fmt);
526 #ifdef HAVE_VSNPRINTF
527 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
528 #else
529 len = vsprintf(buffer+rclen, fmt, argp);
530 #endif
531 va_end(argp);
532 if (len < 0)
533 return -1;
535 len += rclen;
537 /* append the result to the wbuf, don't write to the user */
538 if (sock->batch_start)
539 return add_to_wbuf(sock, buffer, len);
541 /* first write must be complete */
542 if (len != write(sock->fd, buffer, len))
543 {
544 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
545 return -1;
546 }
548 if (sock->wbuf != NULL && rc == RESP_OK)
549 {
550 wrote = 0;
551 while (wrote < sock->wbuf_len)
552 {
553 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
554 if (wb <= 0)
555 {
556 RRDD_LOG(LOG_INFO, "send_response: could not write results");
557 return -1;
558 }
559 wrote += wb;
560 }
561 }
563 free(sock->wbuf); sock->wbuf = NULL;
564 sock->wbuf_len = 0;
566 return 0;
567 } /* }}} */
569 static void wipe_ci_values(cache_item_t *ci, time_t when)
570 {
571 ci->values = NULL;
572 ci->values_num = 0;
574 ci->last_flush_time = when;
575 if (config_write_jitter > 0)
576 ci->last_flush_time += (random() % config_write_jitter);
577 }
579 /* remove_from_queue
580 * remove a "cache_item_t" item from the queue.
581 * must hold 'cache_lock' when calling this
582 */
583 static void remove_from_queue(cache_item_t *ci) /* {{{ */
584 {
585 if (ci == NULL) return;
586 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
588 if (ci->prev == NULL)
589 cache_queue_head = ci->next; /* reset head */
590 else
591 ci->prev->next = ci->next;
593 if (ci->next == NULL)
594 cache_queue_tail = ci->prev; /* reset the tail */
595 else
596 ci->next->prev = ci->prev;
598 ci->next = ci->prev = NULL;
599 ci->flags &= ~CI_FLAGS_IN_QUEUE;
601 pthread_mutex_lock (&stats_lock);
602 assert (stats_queue_length > 0);
603 stats_queue_length--;
604 pthread_mutex_unlock (&stats_lock);
606 } /* }}} static void remove_from_queue */
608 /* free the resources associated with the cache_item_t
609 * must hold cache_lock when calling this function
610 */
611 static void *free_cache_item(cache_item_t *ci) /* {{{ */
612 {
613 if (ci == NULL) return NULL;
615 remove_from_queue(ci);
617 for (int i=0; i < ci->values_num; i++)
618 free(ci->values[i]);
620 free (ci->values);
621 free (ci->file);
623 /* in case anyone is waiting */
624 pthread_cond_broadcast(&ci->flushed);
626 free (ci);
628 return NULL;
629 } /* }}} static void *free_cache_item */
631 /*
632 * enqueue_cache_item:
633 * `cache_lock' must be acquired before calling this function!
634 */
635 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
636 queue_side_t side)
637 {
638 if (ci == NULL)
639 return (-1);
641 if (ci->values_num == 0)
642 return (0);
644 if (side == HEAD)
645 {
646 if (cache_queue_head == ci)
647 return 0;
649 /* remove if further down in queue */
650 remove_from_queue(ci);
652 ci->prev = NULL;
653 ci->next = cache_queue_head;
654 if (ci->next != NULL)
655 ci->next->prev = ci;
656 cache_queue_head = ci;
658 if (cache_queue_tail == NULL)
659 cache_queue_tail = cache_queue_head;
660 }
661 else /* (side == TAIL) */
662 {
663 /* We don't move values back in the list.. */
664 if (ci->flags & CI_FLAGS_IN_QUEUE)
665 return (0);
667 assert (ci->next == NULL);
668 assert (ci->prev == NULL);
670 ci->prev = cache_queue_tail;
672 if (cache_queue_tail == NULL)
673 cache_queue_head = ci;
674 else
675 cache_queue_tail->next = ci;
677 cache_queue_tail = ci;
678 }
680 ci->flags |= CI_FLAGS_IN_QUEUE;
682 pthread_cond_signal(&queue_cond);
683 pthread_mutex_lock (&stats_lock);
684 stats_queue_length++;
685 pthread_mutex_unlock (&stats_lock);
687 return (0);
688 } /* }}} int enqueue_cache_item */
690 /*
691 * tree_callback_flush:
692 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
693 * while this is in progress.
694 */
695 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
696 gpointer data)
697 {
698 cache_item_t *ci;
699 callback_flush_data_t *cfd;
701 ci = (cache_item_t *) value;
702 cfd = (callback_flush_data_t *) data;
704 if (ci->flags & CI_FLAGS_IN_QUEUE)
705 return FALSE;
707 if ((ci->last_flush_time <= cfd->abs_timeout)
708 && (ci->values_num > 0))
709 {
710 enqueue_cache_item (ci, TAIL);
711 }
712 else if ((do_shutdown != 0)
713 && (ci->values_num > 0))
714 {
715 enqueue_cache_item (ci, TAIL);
716 }
717 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
718 && (ci->values_num <= 0))
719 {
720 char **temp;
722 temp = (char **) rrd_realloc (cfd->keys,
723 sizeof (char *) * (cfd->keys_num + 1));
724 if (temp == NULL)
725 {
726 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
727 return (FALSE);
728 }
729 cfd->keys = temp;
730 /* Make really sure this points to the _same_ place */
731 assert ((char *) key == ci->file);
732 cfd->keys[cfd->keys_num] = (char *) key;
733 cfd->keys_num++;
734 }
736 return (FALSE);
737 } /* }}} gboolean tree_callback_flush */
739 static int flush_old_values (int max_age)
740 {
741 callback_flush_data_t cfd;
742 size_t k;
744 memset (&cfd, 0, sizeof (cfd));
745 /* Pass the current time as user data so that we don't need to call
746 * `time' for each node. */
747 cfd.now = time (NULL);
748 cfd.keys = NULL;
749 cfd.keys_num = 0;
751 if (max_age > 0)
752 cfd.abs_timeout = cfd.now - max_age;
753 else
754 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
756 /* `tree_callback_flush' will return the keys of all values that haven't
757 * been touched in the last `config_flush_interval' seconds in `cfd'.
758 * The char*'s in this array point to the same memory as ci->file, so we
759 * don't need to free them separately. */
760 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
762 for (k = 0; k < cfd.keys_num; k++)
763 {
764 /* should never fail, since we have held the cache_lock
765 * the entire time */
766 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
767 }
769 if (cfd.keys != NULL)
770 {
771 free (cfd.keys);
772 cfd.keys = NULL;
773 }
775 return (0);
776 } /* int flush_old_values */
778 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
779 {
780 struct timeval now;
781 struct timespec next_flush;
782 int status;
784 gettimeofday (&now, NULL);
785 next_flush.tv_sec = now.tv_sec + config_flush_interval;
786 next_flush.tv_nsec = 1000 * now.tv_usec;
788 pthread_mutex_lock(&cache_lock);
790 while (!do_shutdown)
791 {
792 gettimeofday (&now, NULL);
793 if ((now.tv_sec > next_flush.tv_sec)
794 || ((now.tv_sec == next_flush.tv_sec)
795 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
796 {
797 /* Flush all values that haven't been written in the last
798 * `config_write_interval' seconds. */
799 flush_old_values (config_write_interval);
801 /* Determine the time of the next cache flush. */
802 next_flush.tv_sec =
803 now.tv_sec + next_flush.tv_sec % config_flush_interval;
805 /* unlock the cache while we rotate so we don't block incoming
806 * updates if the fsync() blocks on disk I/O */
807 pthread_mutex_unlock(&cache_lock);
808 journal_rotate();
809 pthread_mutex_lock(&cache_lock);
810 }
812 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
813 if (status != 0 && status != ETIMEDOUT)
814 {
815 RRDD_LOG (LOG_ERR, "flush_thread_main: "
816 "pthread_cond_timedwait returned %i.", status);
817 }
818 }
820 if (config_flush_at_shutdown)
821 flush_old_values (-1); /* flush everything */
823 pthread_mutex_unlock(&cache_lock);
825 return NULL;
826 } /* void *flush_thread_main */
828 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
829 {
830 pthread_mutex_lock (&cache_lock);
832 while (!do_shutdown
833 || (cache_queue_head != NULL && config_flush_at_shutdown))
834 {
835 cache_item_t *ci;
836 char *file;
837 char **values;
838 int values_num;
839 int status;
840 int i;
842 /* Now, check if there's something to store away. If not, wait until
843 * something comes in. if we are shutting down, do not wait around. */
844 if (cache_queue_head == NULL && !do_shutdown)
845 {
846 status = pthread_cond_wait (&queue_cond, &cache_lock);
847 if ((status != 0) && (status != ETIMEDOUT))
848 {
849 RRDD_LOG (LOG_ERR, "queue_thread_main: "
850 "pthread_cond_wait returned %i.", status);
851 }
852 }
854 /* Check if a value has arrived. This may be NULL if we timed out or there
855 * was an interrupt such as a signal. */
856 if (cache_queue_head == NULL)
857 continue;
859 ci = cache_queue_head;
861 /* copy the relevant parts */
862 file = strdup (ci->file);
863 if (file == NULL)
864 {
865 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
866 continue;
867 }
869 assert(ci->values != NULL);
870 assert(ci->values_num > 0);
872 values = ci->values;
873 values_num = ci->values_num;
875 wipe_ci_values(ci, time(NULL));
876 remove_from_queue(ci);
878 pthread_mutex_unlock (&cache_lock);
880 rrd_clear_error ();
881 status = rrd_update_r (file, NULL, values_num, (void *) values);
882 if (status != 0)
883 {
884 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
885 "rrd_update_r (%s) failed with status %i. (%s)",
886 file, status, rrd_get_error());
887 }
889 journal_write("wrote", file);
890 pthread_cond_broadcast(&ci->flushed);
892 for (i = 0; i < values_num; i++)
893 free (values[i]);
895 free(values);
896 free(file);
898 if (status == 0)
899 {
900 pthread_mutex_lock (&stats_lock);
901 stats_updates_written++;
902 stats_data_sets_written += values_num;
903 pthread_mutex_unlock (&stats_lock);
904 }
906 pthread_mutex_lock (&cache_lock);
907 }
908 pthread_mutex_unlock (&cache_lock);
910 return (NULL);
911 } /* }}} void *queue_thread_main */
913 static int buffer_get_field (char **buffer_ret, /* {{{ */
914 size_t *buffer_size_ret, char **field_ret)
915 {
916 char *buffer;
917 size_t buffer_pos;
918 size_t buffer_size;
919 char *field;
920 size_t field_size;
921 int status;
923 buffer = *buffer_ret;
924 buffer_pos = 0;
925 buffer_size = *buffer_size_ret;
926 field = *buffer_ret;
927 field_size = 0;
929 if (buffer_size <= 0)
930 return (-1);
932 /* This is ensured by `handle_request'. */
933 assert (buffer[buffer_size - 1] == '\0');
935 status = -1;
936 while (buffer_pos < buffer_size)
937 {
938 /* Check for end-of-field or end-of-buffer */
939 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
940 {
941 field[field_size] = 0;
942 field_size++;
943 buffer_pos++;
944 status = 0;
945 break;
946 }
947 /* Handle escaped characters. */
948 else if (buffer[buffer_pos] == '\\')
949 {
950 if (buffer_pos >= (buffer_size - 1))
951 break;
952 buffer_pos++;
953 field[field_size] = buffer[buffer_pos];
954 field_size++;
955 buffer_pos++;
956 }
957 /* Normal operation */
958 else
959 {
960 field[field_size] = buffer[buffer_pos];
961 field_size++;
962 buffer_pos++;
963 }
964 } /* while (buffer_pos < buffer_size) */
966 if (status != 0)
967 return (status);
969 *buffer_ret = buffer + buffer_pos;
970 *buffer_size_ret = buffer_size - buffer_pos;
971 *field_ret = field;
973 return (0);
974 } /* }}} int buffer_get_field */
976 /* if we're restricting writes to the base directory,
977 * check whether the file falls within the dir
978 * returns 1 if OK, otherwise 0
979 */
980 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
981 {
982 assert(file != NULL);
984 if (!config_write_base_only
985 || sock == NULL /* journal replay */
986 || config_base_dir == NULL)
987 return 1;
989 if (strstr(file, "../") != NULL) goto err;
991 /* relative paths without "../" are ok */
992 if (*file != '/') return 1;
994 /* file must be of the format base + "/" + <1+ char filename> */
995 if (strlen(file) < _config_base_dir_len + 2) goto err;
996 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
997 if (*(file + _config_base_dir_len) != '/') goto err;
999 return 1;
1001 err:
1002 if (sock != NULL && sock->fd >= 0)
1003 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1005 return 0;
1006 } /* }}} static int check_file_access */
1008 /* when using a base dir, convert relative paths to absolute paths.
1009 * if necessary, modifies the "filename" pointer to point
1010 * to the new path created in "tmp". "tmp" is provided
1011 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1012 *
1013 * this allows us to optimize for the expected case (absolute path)
1014 * with a no-op.
1015 */
1016 static void get_abs_path(char **filename, char *tmp)
1017 {
1018 assert(tmp != NULL);
1019 assert(filename != NULL && *filename != NULL);
1021 if (config_base_dir == NULL || **filename == '/')
1022 return;
1024 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1025 *filename = tmp;
1026 } /* }}} static int get_abs_path */
1028 /* returns 1 if we have the required privilege level,
1029 * otherwise issue an error to the user on sock */
1030 static int has_privilege (listen_socket_t *sock, /* {{{ */
1031 socket_privilege priv)
1032 {
1033 if (sock == NULL) /* journal replay */
1034 return 1;
1036 if (sock->privilege >= priv)
1037 return 1;
1039 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1040 } /* }}} static int has_privilege */
1042 static int flush_file (const char *filename) /* {{{ */
1043 {
1044 cache_item_t *ci;
1046 pthread_mutex_lock (&cache_lock);
1048 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1049 if (ci == NULL)
1050 {
1051 pthread_mutex_unlock (&cache_lock);
1052 return (ENOENT);
1053 }
1055 if (ci->values_num > 0)
1056 {
1057 /* Enqueue at head */
1058 enqueue_cache_item (ci, HEAD);
1059 pthread_cond_wait(&ci->flushed, &cache_lock);
1060 }
1062 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1063 * may have been purged during our cond_wait() */
1065 pthread_mutex_unlock(&cache_lock);
1067 return (0);
1068 } /* }}} int flush_file */
1070 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1071 char *buffer, size_t buffer_size)
1072 {
1073 int status;
1074 char **help_text;
1075 char *command;
1077 char *help_help[2] =
1078 {
1079 "Command overview\n"
1080 ,
1081 "HELP [<command>]\n"
1082 "FLUSH <filename>\n"
1083 "FLUSHALL\n"
1084 "PENDING <filename>\n"
1085 "FORGET <filename>\n"
1086 "QUEUE\n"
1087 "UPDATE <filename> <values> [<values> ...]\n"
1088 "BATCH\n"
1089 "STATS\n"
1090 "QUIT\n"
1091 };
1093 char *help_flush[2] =
1094 {
1095 "Help for FLUSH\n"
1096 ,
1097 "Usage: FLUSH <filename>\n"
1098 "\n"
1099 "Adds the given filename to the head of the update queue and returns\n"
1100 "after it has been dequeued.\n"
1101 };
1103 char *help_flushall[2] =
1104 {
1105 "Help for FLUSHALL\n"
1106 ,
1107 "Usage: FLUSHALL\n"
1108 "\n"
1109 "Triggers writing of all pending updates. Returns immediately.\n"
1110 };
1112 char *help_pending[2] =
1113 {
1114 "Help for PENDING\n"
1115 ,
1116 "Usage: PENDING <filename>\n"
1117 "\n"
1118 "Shows any 'pending' updates for a file, in order.\n"
1119 "The updates shown have not yet been written to the underlying RRD file.\n"
1120 };
1122 char *help_forget[2] =
1123 {
1124 "Help for FORGET\n"
1125 ,
1126 "Usage: FORGET <filename>\n"
1127 "\n"
1128 "Removes the file completely from the cache.\n"
1129 "Any pending updates for the file will be lost.\n"
1130 };
1132 char *help_queue[2] =
1133 {
1134 "Help for QUEUE\n"
1135 ,
1136 "Shows all files in the output queue.\n"
1137 "The output is zero or more lines in the following format:\n"
1138 "(where <num_vals> is the number of values to be written)\n"
1139 "\n"
1140 "<num_vals> <filename>\n"
1141 "\n"
1142 };
1144 char *help_update[2] =
1145 {
1146 "Help for UPDATE\n"
1147 ,
1148 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1149 "\n"
1150 "Adds the given file to the internal cache if it is not yet known and\n"
1151 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1152 "for details.\n"
1153 "\n"
1154 "Each <values> has the following form:\n"
1155 " <values> = <time>:<value>[:<value>[...]]\n"
1156 "See the rrdupdate(1) manpage for details.\n"
1157 };
1159 char *help_stats[2] =
1160 {
1161 "Help for STATS\n"
1162 ,
1163 "Usage: STATS\n"
1164 "\n"
1165 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1166 "a description of the values.\n"
1167 };
1169 char *help_batch[2] =
1170 {
1171 "Help for BATCH\n"
1172 ,
1173 "The 'BATCH' command permits the client to initiate a bulk load\n"
1174 " of commands to rrdcached.\n"
1175 "\n"
1176 "Usage:\n"
1177 "\n"
1178 " client: BATCH\n"
1179 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1180 " client: command #1\n"
1181 " client: command #2\n"
1182 " client: ... and so on\n"
1183 " client: .\n"
1184 " server: 2 errors\n"
1185 " server: 7 message for command #7\n"
1186 " server: 9 message for command #9\n"
1187 "\n"
1188 "For more information, consult the rrdcached(1) documentation.\n"
1189 };
1191 char *help_quit[2] =
1192 {
1193 "Help for QUIT\n"
1194 ,
1195 "Disconnect from rrdcached.\n"
1196 };
1198 status = buffer_get_field (&buffer, &buffer_size, &command);
1199 if (status != 0)
1200 help_text = help_help;
1201 else
1202 {
1203 if (strcasecmp (command, "update") == 0)
1204 help_text = help_update;
1205 else if (strcasecmp (command, "flush") == 0)
1206 help_text = help_flush;
1207 else if (strcasecmp (command, "flushall") == 0)
1208 help_text = help_flushall;
1209 else if (strcasecmp (command, "pending") == 0)
1210 help_text = help_pending;
1211 else if (strcasecmp (command, "forget") == 0)
1212 help_text = help_forget;
1213 else if (strcasecmp (command, "queue") == 0)
1214 help_text = help_queue;
1215 else if (strcasecmp (command, "stats") == 0)
1216 help_text = help_stats;
1217 else if (strcasecmp (command, "batch") == 0)
1218 help_text = help_batch;
1219 else if (strcasecmp (command, "quit") == 0)
1220 help_text = help_quit;
1221 else
1222 help_text = help_help;
1223 }
1225 add_response_info(sock, help_text[1]);
1226 return send_response(sock, RESP_OK, help_text[0]);
1227 } /* }}} int handle_request_help */
1229 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1230 {
1231 uint64_t copy_queue_length;
1232 uint64_t copy_updates_received;
1233 uint64_t copy_flush_received;
1234 uint64_t copy_updates_written;
1235 uint64_t copy_data_sets_written;
1236 uint64_t copy_journal_bytes;
1237 uint64_t copy_journal_rotate;
1239 uint64_t tree_nodes_number;
1240 uint64_t tree_depth;
1242 pthread_mutex_lock (&stats_lock);
1243 copy_queue_length = stats_queue_length;
1244 copy_updates_received = stats_updates_received;
1245 copy_flush_received = stats_flush_received;
1246 copy_updates_written = stats_updates_written;
1247 copy_data_sets_written = stats_data_sets_written;
1248 copy_journal_bytes = stats_journal_bytes;
1249 copy_journal_rotate = stats_journal_rotate;
1250 pthread_mutex_unlock (&stats_lock);
1252 pthread_mutex_lock (&cache_lock);
1253 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1254 tree_depth = (uint64_t) g_tree_height (cache_tree);
1255 pthread_mutex_unlock (&cache_lock);
1257 add_response_info(sock,
1258 "QueueLength: %"PRIu64"\n", copy_queue_length);
1259 add_response_info(sock,
1260 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1261 add_response_info(sock,
1262 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1263 add_response_info(sock,
1264 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1265 add_response_info(sock,
1266 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1267 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1268 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1269 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1270 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1272 send_response(sock, RESP_OK, "Statistics follow\n");
1274 return (0);
1275 } /* }}} int handle_request_stats */
1277 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1278 char *buffer, size_t buffer_size)
1279 {
1280 char *file, file_tmp[PATH_MAX];
1281 int status;
1283 status = buffer_get_field (&buffer, &buffer_size, &file);
1284 if (status != 0)
1285 {
1286 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1287 }
1288 else
1289 {
1290 pthread_mutex_lock(&stats_lock);
1291 stats_flush_received++;
1292 pthread_mutex_unlock(&stats_lock);
1294 get_abs_path(&file, file_tmp);
1295 if (!check_file_access(file, sock)) return 0;
1297 status = flush_file (file);
1298 if (status == 0)
1299 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1300 else if (status == ENOENT)
1301 {
1302 /* no file in our tree; see whether it exists at all */
1303 struct stat statbuf;
1305 memset(&statbuf, 0, sizeof(statbuf));
1306 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1307 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1308 else
1309 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1310 }
1311 else if (status < 0)
1312 return send_response(sock, RESP_ERR, "Internal error.\n");
1313 else
1314 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1315 }
1317 /* NOTREACHED */
1318 assert(1==0);
1319 } /* }}} int handle_request_flush */
1321 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1322 {
1323 int status;
1325 status = has_privilege(sock, PRIV_HIGH);
1326 if (status <= 0)
1327 return status;
1329 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1331 pthread_mutex_lock(&cache_lock);
1332 flush_old_values(-1);
1333 pthread_mutex_unlock(&cache_lock);
1335 return send_response(sock, RESP_OK, "Started flush.\n");
1336 } /* }}} static int handle_request_flushall */
1338 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1339 char *buffer, size_t buffer_size)
1340 {
1341 int status;
1342 char *file, file_tmp[PATH_MAX];
1343 cache_item_t *ci;
1345 status = buffer_get_field(&buffer, &buffer_size, &file);
1346 if (status != 0)
1347 return send_response(sock, RESP_ERR,
1348 "Usage: PENDING <filename>\n");
1350 status = has_privilege(sock, PRIV_HIGH);
1351 if (status <= 0)
1352 return status;
1354 get_abs_path(&file, file_tmp);
1356 pthread_mutex_lock(&cache_lock);
1357 ci = g_tree_lookup(cache_tree, file);
1358 if (ci == NULL)
1359 {
1360 pthread_mutex_unlock(&cache_lock);
1361 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1362 }
1364 for (int i=0; i < ci->values_num; i++)
1365 add_response_info(sock, "%s\n", ci->values[i]);
1367 pthread_mutex_unlock(&cache_lock);
1368 return send_response(sock, RESP_OK, "updates pending\n");
1369 } /* }}} static int handle_request_pending */
1371 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1372 char *buffer, size_t buffer_size)
1373 {
1374 int status;
1375 gboolean found;
1376 char *file, file_tmp[PATH_MAX];
1378 status = buffer_get_field(&buffer, &buffer_size, &file);
1379 if (status != 0)
1380 return send_response(sock, RESP_ERR,
1381 "Usage: FORGET <filename>\n");
1383 status = has_privilege(sock, PRIV_HIGH);
1384 if (status <= 0)
1385 return status;
1387 get_abs_path(&file, file_tmp);
1388 if (!check_file_access(file, sock)) return 0;
1390 pthread_mutex_lock(&cache_lock);
1391 found = g_tree_remove(cache_tree, file);
1392 pthread_mutex_unlock(&cache_lock);
1394 if (found == TRUE)
1395 {
1396 if (sock != NULL)
1397 journal_write("forget", file);
1399 return send_response(sock, RESP_OK, "Gone!\n");
1400 }
1401 else
1402 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1404 /* NOTREACHED */
1405 assert(1==0);
1406 } /* }}} static int handle_request_forget */
1408 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1409 {
1410 cache_item_t *ci;
1412 pthread_mutex_lock(&cache_lock);
1414 ci = cache_queue_head;
1415 while (ci != NULL)
1416 {
1417 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1418 ci = ci->next;
1419 }
1421 pthread_mutex_unlock(&cache_lock);
1423 return send_response(sock, RESP_OK, "in queue.\n");
1424 } /* }}} int handle_request_queue */
1426 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1427 time_t now,
1428 char *buffer, size_t buffer_size)
1429 {
1430 char *file, file_tmp[PATH_MAX];
1431 int values_num = 0;
1432 int status;
1433 char orig_buf[CMD_MAX];
1435 cache_item_t *ci;
1437 status = has_privilege(sock, PRIV_HIGH);
1438 if (status <= 0)
1439 return status;
1441 /* save it for the journal later */
1442 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1444 status = buffer_get_field (&buffer, &buffer_size, &file);
1445 if (status != 0)
1446 return send_response(sock, RESP_ERR,
1447 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1449 pthread_mutex_lock(&stats_lock);
1450 stats_updates_received++;
1451 pthread_mutex_unlock(&stats_lock);
1453 get_abs_path(&file, file_tmp);
1454 if (!check_file_access(file, sock)) return 0;
1456 pthread_mutex_lock (&cache_lock);
1457 ci = g_tree_lookup (cache_tree, file);
1459 if (ci == NULL) /* {{{ */
1460 {
1461 struct stat statbuf;
1463 /* don't hold the lock while we setup; stat(2) might block */
1464 pthread_mutex_unlock(&cache_lock);
1466 memset (&statbuf, 0, sizeof (statbuf));
1467 status = stat (file, &statbuf);
1468 if (status != 0)
1469 {
1470 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1472 status = errno;
1473 if (status == ENOENT)
1474 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1475 else
1476 return send_response(sock, RESP_ERR,
1477 "stat failed with error %i.\n", status);
1478 }
1479 if (!S_ISREG (statbuf.st_mode))
1480 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1482 if (access(file, R_OK|W_OK) != 0)
1483 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1484 file, rrd_strerror(errno));
1486 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1487 if (ci == NULL)
1488 {
1489 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1491 return send_response(sock, RESP_ERR, "malloc failed.\n");
1492 }
1493 memset (ci, 0, sizeof (cache_item_t));
1495 ci->file = strdup (file);
1496 if (ci->file == NULL)
1497 {
1498 free (ci);
1499 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1501 return send_response(sock, RESP_ERR, "strdup failed.\n");
1502 }
1504 wipe_ci_values(ci, now);
1505 ci->flags = CI_FLAGS_IN_TREE;
1506 pthread_cond_init(&ci->flushed, NULL);
1508 pthread_mutex_lock(&cache_lock);
1509 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1510 } /* }}} */
1511 assert (ci != NULL);
1513 /* don't re-write updates in replay mode */
1514 if (sock != NULL)
1515 journal_write("update", orig_buf);
1517 while (buffer_size > 0)
1518 {
1519 char **temp;
1520 char *value;
1521 time_t stamp;
1522 char *eostamp;
1524 status = buffer_get_field (&buffer, &buffer_size, &value);
1525 if (status != 0)
1526 {
1527 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1528 break;
1529 }
1531 /* make sure update time is always moving forward */
1532 stamp = strtol(value, &eostamp, 10);
1533 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1534 {
1535 pthread_mutex_unlock(&cache_lock);
1536 return send_response(sock, RESP_ERR,
1537 "Cannot find timestamp in '%s'!\n", value);
1538 }
1539 else if (stamp <= ci->last_update_stamp)
1540 {
1541 pthread_mutex_unlock(&cache_lock);
1542 return send_response(sock, RESP_ERR,
1543 "illegal attempt to update using time %ld when last"
1544 " update time is %ld (minimum one second step)\n",
1545 stamp, ci->last_update_stamp);
1546 }
1547 else
1548 ci->last_update_stamp = stamp;
1550 temp = (char **) rrd_realloc (ci->values,
1551 sizeof (char *) * (ci->values_num + 1));
1552 if (temp == NULL)
1553 {
1554 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1555 continue;
1556 }
1557 ci->values = temp;
1559 ci->values[ci->values_num] = strdup (value);
1560 if (ci->values[ci->values_num] == NULL)
1561 {
1562 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1563 continue;
1564 }
1565 ci->values_num++;
1567 values_num++;
1568 }
1570 if (((now - ci->last_flush_time) >= config_write_interval)
1571 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1572 && (ci->values_num > 0))
1573 {
1574 enqueue_cache_item (ci, TAIL);
1575 }
1577 pthread_mutex_unlock (&cache_lock);
1579 if (values_num < 1)
1580 return send_response(sock, RESP_ERR, "No values updated.\n");
1581 else
1582 return send_response(sock, RESP_OK,
1583 "errors, enqueued %i value(s).\n", values_num);
1585 /* NOTREACHED */
1586 assert(1==0);
1588 } /* }}} int handle_request_update */
1590 /* we came across a "WROTE" entry during journal replay.
1591 * throw away any values that we have accumulated for this file
1592 */
1593 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1594 {
1595 int i;
1596 cache_item_t *ci;
1597 const char *file = buffer;
1599 pthread_mutex_lock(&cache_lock);
1601 ci = g_tree_lookup(cache_tree, file);
1602 if (ci == NULL)
1603 {
1604 pthread_mutex_unlock(&cache_lock);
1605 return (0);
1606 }
1608 if (ci->values)
1609 {
1610 for (i=0; i < ci->values_num; i++)
1611 free(ci->values[i]);
1613 free(ci->values);
1614 }
1616 wipe_ci_values(ci, now);
1617 remove_from_queue(ci);
1619 pthread_mutex_unlock(&cache_lock);
1620 return (0);
1621 } /* }}} int handle_request_wrote */
1623 /* start "BATCH" processing */
1624 static int batch_start (listen_socket_t *sock) /* {{{ */
1625 {
1626 int status;
1627 if (sock->batch_start)
1628 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1630 status = send_response(sock, RESP_OK,
1631 "Go ahead. End with dot '.' on its own line.\n");
1632 sock->batch_start = time(NULL);
1633 sock->batch_cmd = 0;
1635 return status;
1636 } /* }}} static int batch_start */
1638 /* finish "BATCH" processing and return results to the client */
1639 static int batch_done (listen_socket_t *sock) /* {{{ */
1640 {
1641 assert(sock->batch_start);
1642 sock->batch_start = 0;
1643 sock->batch_cmd = 0;
1644 return send_response(sock, RESP_OK, "errors\n");
1645 } /* }}} static int batch_done */
1647 /* if sock==NULL, we are in journal replay mode */
1648 static int handle_request (listen_socket_t *sock, /* {{{ */
1649 time_t now,
1650 char *buffer, size_t buffer_size)
1651 {
1652 char *buffer_ptr;
1653 char *command;
1654 int status;
1656 assert (buffer[buffer_size - 1] == '\0');
1658 buffer_ptr = buffer;
1659 command = NULL;
1660 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1661 if (status != 0)
1662 {
1663 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1664 return (-1);
1665 }
1667 if (sock != NULL && sock->batch_start)
1668 sock->batch_cmd++;
1670 if (strcasecmp (command, "update") == 0)
1671 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1672 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1673 {
1674 /* this is only valid in replay mode */
1675 return (handle_request_wrote (buffer_ptr, now));
1676 }
1677 else if (strcasecmp (command, "flush") == 0)
1678 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1679 else if (strcasecmp (command, "flushall") == 0)
1680 return (handle_request_flushall(sock));
1681 else if (strcasecmp (command, "pending") == 0)
1682 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1683 else if (strcasecmp (command, "forget") == 0)
1684 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1685 else if (strcasecmp (command, "queue") == 0)
1686 return (handle_request_queue(sock));
1687 else if (strcasecmp (command, "stats") == 0)
1688 return (handle_request_stats (sock));
1689 else if (strcasecmp (command, "help") == 0)
1690 return (handle_request_help (sock, buffer_ptr, buffer_size));
1691 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1692 return batch_start(sock);
1693 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1694 return batch_done(sock);
1695 else if (strcasecmp (command, "quit") == 0)
1696 return -1;
1697 else
1698 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1700 /* NOTREACHED */
1701 assert(1==0);
1702 } /* }}} int handle_request */
1704 /* MUST NOT hold journal_lock before calling this */
1705 static void journal_rotate(void) /* {{{ */
1706 {
1707 FILE *old_fh = NULL;
1708 int new_fd;
1710 if (journal_cur == NULL || journal_old == NULL)
1711 return;
1713 pthread_mutex_lock(&journal_lock);
1715 /* we rotate this way (rename before close) so that the we can release
1716 * the journal lock as fast as possible. Journal writes to the new
1717 * journal can proceed immediately after the new file is opened. The
1718 * fclose can then block without affecting new updates.
1719 */
1720 if (journal_fh != NULL)
1721 {
1722 old_fh = journal_fh;
1723 journal_fh = NULL;
1724 rename(journal_cur, journal_old);
1725 ++stats_journal_rotate;
1726 }
1728 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1729 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1730 if (new_fd >= 0)
1731 {
1732 journal_fh = fdopen(new_fd, "a");
1733 if (journal_fh == NULL)
1734 close(new_fd);
1735 }
1737 pthread_mutex_unlock(&journal_lock);
1739 if (old_fh != NULL)
1740 fclose(old_fh);
1742 if (journal_fh == NULL)
1743 {
1744 RRDD_LOG(LOG_CRIT,
1745 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1746 journal_cur, rrd_strerror(errno));
1748 RRDD_LOG(LOG_ERR,
1749 "JOURNALING DISABLED: All values will be flushed at shutdown");
1750 config_flush_at_shutdown = 1;
1751 }
1753 } /* }}} static void journal_rotate */
1755 static void journal_done(void) /* {{{ */
1756 {
1757 if (journal_cur == NULL)
1758 return;
1760 pthread_mutex_lock(&journal_lock);
1761 if (journal_fh != NULL)
1762 {
1763 fclose(journal_fh);
1764 journal_fh = NULL;
1765 }
1767 if (config_flush_at_shutdown)
1768 {
1769 RRDD_LOG(LOG_INFO, "removing journals");
1770 unlink(journal_old);
1771 unlink(journal_cur);
1772 }
1773 else
1774 {
1775 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1776 "journals will be used at next startup");
1777 }
1779 pthread_mutex_unlock(&journal_lock);
1781 } /* }}} static void journal_done */
1783 static int journal_write(char *cmd, char *args) /* {{{ */
1784 {
1785 int chars;
1787 if (journal_fh == NULL)
1788 return 0;
1790 pthread_mutex_lock(&journal_lock);
1791 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1792 pthread_mutex_unlock(&journal_lock);
1794 if (chars > 0)
1795 {
1796 pthread_mutex_lock(&stats_lock);
1797 stats_journal_bytes += chars;
1798 pthread_mutex_unlock(&stats_lock);
1799 }
1801 return chars;
1802 } /* }}} static int journal_write */
1804 static int journal_replay (const char *file) /* {{{ */
1805 {
1806 FILE *fh;
1807 int entry_cnt = 0;
1808 int fail_cnt = 0;
1809 uint64_t line = 0;
1810 char entry[CMD_MAX];
1811 time_t now;
1813 if (file == NULL) return 0;
1815 {
1816 char *reason = "unknown error";
1817 int status = 0;
1818 struct stat statbuf;
1820 memset(&statbuf, 0, sizeof(statbuf));
1821 if (stat(file, &statbuf) != 0)
1822 {
1823 if (errno == ENOENT)
1824 return 0;
1826 reason = "stat error";
1827 status = errno;
1828 }
1829 else if (!S_ISREG(statbuf.st_mode))
1830 {
1831 reason = "not a regular file";
1832 status = EPERM;
1833 }
1834 if (statbuf.st_uid != daemon_uid)
1835 {
1836 reason = "not owned by daemon user";
1837 status = EACCES;
1838 }
1839 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1840 {
1841 reason = "must not be user/group writable";
1842 status = EACCES;
1843 }
1845 if (status != 0)
1846 {
1847 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1848 file, rrd_strerror(status), reason);
1849 return 0;
1850 }
1851 }
1853 fh = fopen(file, "r");
1854 if (fh == NULL)
1855 {
1856 if (errno != ENOENT)
1857 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1858 file, rrd_strerror(errno));
1859 return 0;
1860 }
1861 else
1862 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1864 now = time(NULL);
1866 while(!feof(fh))
1867 {
1868 size_t entry_len;
1870 ++line;
1871 if (fgets(entry, sizeof(entry), fh) == NULL)
1872 break;
1873 entry_len = strlen(entry);
1875 /* check \n termination in case journal writing crashed mid-line */
1876 if (entry_len == 0)
1877 continue;
1878 else if (entry[entry_len - 1] != '\n')
1879 {
1880 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1881 ++fail_cnt;
1882 continue;
1883 }
1885 entry[entry_len - 1] = '\0';
1887 if (handle_request(NULL, now, entry, entry_len) == 0)
1888 ++entry_cnt;
1889 else
1890 ++fail_cnt;
1891 }
1893 fclose(fh);
1895 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1896 entry_cnt, fail_cnt);
1898 return entry_cnt > 0 ? 1 : 0;
1899 } /* }}} static int journal_replay */
1901 static void journal_init(void) /* {{{ */
1902 {
1903 int had_journal = 0;
1905 if (journal_cur == NULL) return;
1907 pthread_mutex_lock(&journal_lock);
1909 RRDD_LOG(LOG_INFO, "checking for journal files");
1911 had_journal += journal_replay(journal_old);
1912 had_journal += journal_replay(journal_cur);
1914 /* it must have been a crash. start a flush */
1915 if (had_journal && config_flush_at_shutdown)
1916 flush_old_values(-1);
1918 pthread_mutex_unlock(&journal_lock);
1919 journal_rotate();
1921 RRDD_LOG(LOG_INFO, "journal processing complete");
1923 } /* }}} static void journal_init */
1925 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1926 {
1927 assert(sock != NULL);
1929 free(sock->rbuf); sock->rbuf = NULL;
1930 free(sock->wbuf); sock->wbuf = NULL;
1931 free(sock);
1932 } /* }}} void free_listen_socket */
1934 static void close_connection(listen_socket_t *sock) /* {{{ */
1935 {
1936 if (sock->fd >= 0)
1937 {
1938 close(sock->fd);
1939 sock->fd = -1;
1940 }
1942 free_listen_socket(sock);
1944 } /* }}} void close_connection */
1946 static void *connection_thread_main (void *args) /* {{{ */
1947 {
1948 listen_socket_t *sock;
1949 int i;
1950 int fd;
1952 sock = (listen_socket_t *) args;
1953 fd = sock->fd;
1955 /* init read buffers */
1956 sock->next_read = sock->next_cmd = 0;
1957 sock->rbuf = malloc(RBUF_SIZE);
1958 if (sock->rbuf == NULL)
1959 {
1960 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1961 close_connection(sock);
1962 return NULL;
1963 }
1965 pthread_mutex_lock (&connection_threads_lock);
1966 {
1967 pthread_t *temp;
1969 temp = (pthread_t *) rrd_realloc (connection_threads,
1970 sizeof (pthread_t) * (connection_threads_num + 1));
1971 if (temp == NULL)
1972 {
1973 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1974 }
1975 else
1976 {
1977 connection_threads = temp;
1978 connection_threads[connection_threads_num] = pthread_self ();
1979 connection_threads_num++;
1980 }
1981 }
1982 pthread_mutex_unlock (&connection_threads_lock);
1984 while (do_shutdown == 0)
1985 {
1986 char *cmd;
1987 ssize_t cmd_len;
1988 ssize_t rbytes;
1989 time_t now;
1991 struct pollfd pollfd;
1992 int status;
1994 pollfd.fd = fd;
1995 pollfd.events = POLLIN | POLLPRI;
1996 pollfd.revents = 0;
1998 status = poll (&pollfd, 1, /* timeout = */ 500);
1999 if (do_shutdown)
2000 break;
2001 else if (status == 0) /* timeout */
2002 continue;
2003 else if (status < 0) /* error */
2004 {
2005 status = errno;
2006 if (status != EINTR)
2007 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2008 continue;
2009 }
2011 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2012 break;
2013 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2014 {
2015 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2016 "poll(2) returned something unexpected: %#04hx",
2017 pollfd.revents);
2018 break;
2019 }
2021 rbytes = read(fd, sock->rbuf + sock->next_read,
2022 RBUF_SIZE - sock->next_read);
2023 if (rbytes < 0)
2024 {
2025 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2026 break;
2027 }
2028 else if (rbytes == 0)
2029 break; /* eof */
2031 sock->next_read += rbytes;
2033 if (sock->batch_start)
2034 now = sock->batch_start;
2035 else
2036 now = time(NULL);
2038 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2039 {
2040 status = handle_request (sock, now, cmd, cmd_len+1);
2041 if (status != 0)
2042 goto out_close;
2043 }
2044 }
2046 out_close:
2047 close_connection(sock);
2049 /* Remove this thread from the connection threads list */
2050 pthread_mutex_lock (&connection_threads_lock);
2051 {
2052 pthread_t self;
2053 pthread_t *temp;
2055 /* Find out own index in the array */
2056 self = pthread_self ();
2057 for (i = 0; i < connection_threads_num; i++)
2058 if (pthread_equal (connection_threads[i], self) != 0)
2059 break;
2060 assert (i < connection_threads_num);
2062 /* Move the trailing threads forward. */
2063 if (i < (connection_threads_num - 1))
2064 {
2065 memmove (connection_threads + i,
2066 connection_threads + i + 1,
2067 sizeof (pthread_t) * (connection_threads_num - i - 1));
2068 }
2070 connection_threads_num--;
2072 temp = rrd_realloc(connection_threads,
2073 sizeof(*connection_threads) * connection_threads_num);
2074 if (connection_threads_num > 0 && temp == NULL)
2075 RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2076 else
2077 connection_threads = temp;
2078 }
2079 pthread_mutex_unlock (&connection_threads_lock);
2081 return (NULL);
2082 } /* }}} void *connection_thread_main */
2084 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2085 {
2086 int fd;
2087 struct sockaddr_un sa;
2088 listen_socket_t *temp;
2089 int status;
2090 const char *path;
2092 path = sock->addr;
2093 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2094 path += strlen("unix:");
2096 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2097 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2098 if (temp == NULL)
2099 {
2100 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2101 return (-1);
2102 }
2103 listen_fds = temp;
2104 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2106 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2107 if (fd < 0)
2108 {
2109 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2110 rrd_strerror(errno));
2111 return (-1);
2112 }
2114 memset (&sa, 0, sizeof (sa));
2115 sa.sun_family = AF_UNIX;
2116 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2118 /* if we've gotten this far, we own the pid file. any daemon started
2119 * with the same args must not be alive. therefore, ensure that we can
2120 * create the socket...
2121 */
2122 unlink(path);
2124 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2125 if (status != 0)
2126 {
2127 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2128 path, rrd_strerror(errno));
2129 close (fd);
2130 return (-1);
2131 }
2133 status = listen (fd, /* backlog = */ 10);
2134 if (status != 0)
2135 {
2136 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2137 path, rrd_strerror(errno));
2138 close (fd);
2139 unlink (path);
2140 return (-1);
2141 }
2143 listen_fds[listen_fds_num].fd = fd;
2144 listen_fds[listen_fds_num].family = PF_UNIX;
2145 strncpy(listen_fds[listen_fds_num].addr, path,
2146 sizeof (listen_fds[listen_fds_num].addr) - 1);
2147 listen_fds_num++;
2149 return (0);
2150 } /* }}} int open_listen_socket_unix */
2152 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2153 {
2154 struct addrinfo ai_hints;
2155 struct addrinfo *ai_res;
2156 struct addrinfo *ai_ptr;
2157 char addr_copy[NI_MAXHOST];
2158 char *addr;
2159 char *port;
2160 int status;
2162 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2163 addr_copy[sizeof (addr_copy) - 1] = 0;
2164 addr = addr_copy;
2166 memset (&ai_hints, 0, sizeof (ai_hints));
2167 ai_hints.ai_flags = 0;
2168 #ifdef AI_ADDRCONFIG
2169 ai_hints.ai_flags |= AI_ADDRCONFIG;
2170 #endif
2171 ai_hints.ai_family = AF_UNSPEC;
2172 ai_hints.ai_socktype = SOCK_STREAM;
2174 port = NULL;
2175 if (*addr == '[') /* IPv6+port format */
2176 {
2177 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2178 addr++;
2180 port = strchr (addr, ']');
2181 if (port == NULL)
2182 {
2183 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2184 return (-1);
2185 }
2186 *port = 0;
2187 port++;
2189 if (*port == ':')
2190 port++;
2191 else if (*port == 0)
2192 port = NULL;
2193 else
2194 {
2195 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2196 return (-1);
2197 }
2198 } /* if (*addr = ']') */
2199 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2200 {
2201 port = rindex(addr, ':');
2202 if (port != NULL)
2203 {
2204 *port = 0;
2205 port++;
2206 }
2207 }
2208 ai_res = NULL;
2209 status = getaddrinfo (addr,
2210 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2211 &ai_hints, &ai_res);
2212 if (status != 0)
2213 {
2214 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2215 addr, gai_strerror (status));
2216 return (-1);
2217 }
2219 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2220 {
2221 int fd;
2222 listen_socket_t *temp;
2223 int one = 1;
2225 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2226 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2227 if (temp == NULL)
2228 {
2229 fprintf (stderr,
2230 "rrdcached: open_listen_socket_network: realloc failed.\n");
2231 continue;
2232 }
2233 listen_fds = temp;
2234 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2236 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2237 if (fd < 0)
2238 {
2239 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2240 rrd_strerror(errno));
2241 continue;
2242 }
2244 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2246 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2247 if (status != 0)
2248 {
2249 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2250 sock->addr, rrd_strerror(errno));
2251 close (fd);
2252 continue;
2253 }
2255 status = listen (fd, /* backlog = */ 10);
2256 if (status != 0)
2257 {
2258 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2259 sock->addr, rrd_strerror(errno));
2260 close (fd);
2261 freeaddrinfo(ai_res);
2262 return (-1);
2263 }
2265 listen_fds[listen_fds_num].fd = fd;
2266 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2267 listen_fds_num++;
2268 } /* for (ai_ptr) */
2270 freeaddrinfo(ai_res);
2271 return (0);
2272 } /* }}} static int open_listen_socket_network */
2274 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2275 {
2276 assert(sock != NULL);
2277 assert(sock->addr != NULL);
2279 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2280 || sock->addr[0] == '/')
2281 return (open_listen_socket_unix(sock));
2282 else
2283 return (open_listen_socket_network(sock));
2284 } /* }}} int open_listen_socket */
2286 static int close_listen_sockets (void) /* {{{ */
2287 {
2288 size_t i;
2290 for (i = 0; i < listen_fds_num; i++)
2291 {
2292 close (listen_fds[i].fd);
2294 if (listen_fds[i].family == PF_UNIX)
2295 unlink(listen_fds[i].addr);
2296 }
2298 free (listen_fds);
2299 listen_fds = NULL;
2300 listen_fds_num = 0;
2302 return (0);
2303 } /* }}} int close_listen_sockets */
2305 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2306 {
2307 struct pollfd *pollfds;
2308 int pollfds_num;
2309 int status;
2310 int i;
2312 if (listen_fds_num < 1)
2313 {
2314 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2315 return (NULL);
2316 }
2318 pollfds_num = listen_fds_num;
2319 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2320 if (pollfds == NULL)
2321 {
2322 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2323 return (NULL);
2324 }
2325 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2327 RRDD_LOG(LOG_INFO, "listening for connections");
2329 while (do_shutdown == 0)
2330 {
2331 for (i = 0; i < pollfds_num; i++)
2332 {
2333 pollfds[i].fd = listen_fds[i].fd;
2334 pollfds[i].events = POLLIN | POLLPRI;
2335 pollfds[i].revents = 0;
2336 }
2338 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2339 if (do_shutdown)
2340 break;
2341 else if (status == 0) /* timeout */
2342 continue;
2343 else if (status < 0) /* error */
2344 {
2345 status = errno;
2346 if (status != EINTR)
2347 {
2348 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2349 }
2350 continue;
2351 }
2353 for (i = 0; i < pollfds_num; i++)
2354 {
2355 listen_socket_t *client_sock;
2356 struct sockaddr_storage client_sa;
2357 socklen_t client_sa_size;
2358 pthread_t tid;
2359 pthread_attr_t attr;
2361 if (pollfds[i].revents == 0)
2362 continue;
2364 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2365 {
2366 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2367 "poll(2) returned something unexpected for listen FD #%i.",
2368 pollfds[i].fd);
2369 continue;
2370 }
2372 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2373 if (client_sock == NULL)
2374 {
2375 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2376 continue;
2377 }
2378 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2380 client_sa_size = sizeof (client_sa);
2381 client_sock->fd = accept (pollfds[i].fd,
2382 (struct sockaddr *) &client_sa, &client_sa_size);
2383 if (client_sock->fd < 0)
2384 {
2385 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2386 free(client_sock);
2387 continue;
2388 }
2390 pthread_attr_init (&attr);
2391 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2393 status = pthread_create (&tid, &attr, connection_thread_main,
2394 client_sock);
2395 if (status != 0)
2396 {
2397 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2398 close_connection(client_sock);
2399 continue;
2400 }
2401 } /* for (pollfds_num) */
2402 } /* while (do_shutdown == 0) */
2404 RRDD_LOG(LOG_INFO, "starting shutdown");
2406 close_listen_sockets ();
2408 pthread_mutex_lock (&connection_threads_lock);
2409 while (connection_threads_num > 0)
2410 {
2411 pthread_t wait_for;
2413 wait_for = connection_threads[0];
2415 pthread_mutex_unlock (&connection_threads_lock);
2416 pthread_join (wait_for, /* retval = */ NULL);
2417 pthread_mutex_lock (&connection_threads_lock);
2418 }
2419 pthread_mutex_unlock (&connection_threads_lock);
2421 free(pollfds);
2423 return (NULL);
2424 } /* }}} void *listen_thread_main */
2426 static int daemonize (void) /* {{{ */
2427 {
2428 int pid_fd;
2429 char *base_dir;
2431 daemon_uid = geteuid();
2433 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2434 if (pid_fd < 0)
2435 pid_fd = check_pidfile();
2436 if (pid_fd < 0)
2437 return pid_fd;
2439 /* open all the listen sockets */
2440 if (config_listen_address_list_len > 0)
2441 {
2442 for (int i = 0; i < config_listen_address_list_len; i++)
2443 {
2444 open_listen_socket (config_listen_address_list[i]);
2445 free_listen_socket (config_listen_address_list[i]);
2446 }
2448 free(config_listen_address_list);
2449 }
2450 else
2451 {
2452 listen_socket_t sock;
2453 memset(&sock, 0, sizeof(sock));
2454 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2455 open_listen_socket (&sock);
2456 }
2458 if (listen_fds_num < 1)
2459 {
2460 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2461 goto error;
2462 }
2464 if (!stay_foreground)
2465 {
2466 pid_t child;
2468 child = fork ();
2469 if (child < 0)
2470 {
2471 fprintf (stderr, "daemonize: fork(2) failed.\n");
2472 goto error;
2473 }
2474 else if (child > 0)
2475 exit(0);
2477 /* Become session leader */
2478 setsid ();
2480 /* Open the first three file descriptors to /dev/null */
2481 close (2);
2482 close (1);
2483 close (0);
2485 open ("/dev/null", O_RDWR);
2486 dup (0);
2487 dup (0);
2488 } /* if (!stay_foreground) */
2490 /* Change into the /tmp directory. */
2491 base_dir = (config_base_dir != NULL)
2492 ? config_base_dir
2493 : "/tmp";
2495 if (chdir (base_dir) != 0)
2496 {
2497 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2498 goto error;
2499 }
2501 install_signal_handlers();
2503 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2504 RRDD_LOG(LOG_INFO, "starting up");
2506 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2507 (GDestroyNotify) free_cache_item);
2508 if (cache_tree == NULL)
2509 {
2510 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2511 goto error;
2512 }
2514 return write_pidfile (pid_fd);
2516 error:
2517 remove_pidfile();
2518 return -1;
2519 } /* }}} int daemonize */
2521 static int cleanup (void) /* {{{ */
2522 {
2523 do_shutdown++;
2525 pthread_cond_broadcast (&flush_cond);
2526 pthread_join (flush_thread, NULL);
2528 pthread_cond_broadcast (&queue_cond);
2529 for (int i = 0; i < config_queue_threads; i++)
2530 pthread_join (queue_threads[i], NULL);
2532 if (config_flush_at_shutdown)
2533 {
2534 assert(cache_queue_head == NULL);
2535 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2536 }
2538 journal_done();
2539 remove_pidfile ();
2541 free(queue_threads);
2542 free(config_base_dir);
2543 free(config_pid_file);
2544 free(journal_cur);
2545 free(journal_old);
2547 pthread_mutex_lock(&cache_lock);
2548 g_tree_destroy(cache_tree);
2550 RRDD_LOG(LOG_INFO, "goodbye");
2551 closelog ();
2553 return (0);
2554 } /* }}} int cleanup */
2556 static int read_options (int argc, char **argv) /* {{{ */
2557 {
2558 int option;
2559 int status = 0;
2561 while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2562 {
2563 switch (option)
2564 {
2565 case 'g':
2566 stay_foreground=1;
2567 break;
2569 case 'L':
2570 case 'l':
2571 {
2572 listen_socket_t **temp;
2573 listen_socket_t *new;
2575 new = malloc(sizeof(listen_socket_t));
2576 if (new == NULL)
2577 {
2578 fprintf(stderr, "read_options: malloc failed.\n");
2579 return(2);
2580 }
2581 memset(new, 0, sizeof(listen_socket_t));
2583 temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2584 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2585 if (temp == NULL)
2586 {
2587 fprintf (stderr, "read_options: realloc failed.\n");
2588 return (2);
2589 }
2590 config_listen_address_list = temp;
2592 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2593 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2595 temp[config_listen_address_list_len] = new;
2596 config_listen_address_list_len++;
2597 }
2598 break;
2600 case 'f':
2601 {
2602 int temp;
2604 temp = atoi (optarg);
2605 if (temp > 0)
2606 config_flush_interval = temp;
2607 else
2608 {
2609 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2610 status = 3;
2611 }
2612 }
2613 break;
2615 case 'w':
2616 {
2617 int temp;
2619 temp = atoi (optarg);
2620 if (temp > 0)
2621 config_write_interval = temp;
2622 else
2623 {
2624 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2625 status = 2;
2626 }
2627 }
2628 break;
2630 case 'z':
2631 {
2632 int temp;
2634 temp = atoi(optarg);
2635 if (temp > 0)
2636 config_write_jitter = temp;
2637 else
2638 {
2639 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2640 status = 2;
2641 }
2643 break;
2644 }
2646 case 't':
2647 {
2648 int threads;
2649 threads = atoi(optarg);
2650 if (threads >= 1)
2651 config_queue_threads = threads;
2652 else
2653 {
2654 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2655 return 1;
2656 }
2657 }
2658 break;
2660 case 'B':
2661 config_write_base_only = 1;
2662 break;
2664 case 'b':
2665 {
2666 size_t len;
2667 char base_realpath[PATH_MAX];
2669 if (config_base_dir != NULL)
2670 free (config_base_dir);
2671 config_base_dir = strdup (optarg);
2672 if (config_base_dir == NULL)
2673 {
2674 fprintf (stderr, "read_options: strdup failed.\n");
2675 return (3);
2676 }
2678 /* make sure that the base directory is not resolved via
2679 * symbolic links. this makes some performance-enhancing
2680 * assumptions possible (we don't have to resolve paths
2681 * that start with a "/")
2682 */
2683 if (realpath(config_base_dir, base_realpath) == NULL)
2684 {
2685 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2686 return 5;
2687 }
2688 else if (strncmp(config_base_dir,
2689 base_realpath, sizeof(base_realpath)) != 0)
2690 {
2691 fprintf(stderr,
2692 "Base directory (-b) resolved via file system links!\n"
2693 "Please consult rrdcached '-b' documentation!\n"
2694 "Consider specifying the real directory (%s)\n",
2695 base_realpath);
2696 return 5;
2697 }
2699 len = strlen (config_base_dir);
2700 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2701 {
2702 config_base_dir[len - 1] = 0;
2703 len--;
2704 }
2706 if (len < 1)
2707 {
2708 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2709 return (4);
2710 }
2712 _config_base_dir_len = len;
2713 }
2714 break;
2716 case 'p':
2717 {
2718 if (config_pid_file != NULL)
2719 free (config_pid_file);
2720 config_pid_file = strdup (optarg);
2721 if (config_pid_file == NULL)
2722 {
2723 fprintf (stderr, "read_options: strdup failed.\n");
2724 return (3);
2725 }
2726 }
2727 break;
2729 case 'F':
2730 config_flush_at_shutdown = 1;
2731 break;
2733 case 'j':
2734 {
2735 struct stat statbuf;
2736 const char *dir = optarg;
2738 status = stat(dir, &statbuf);
2739 if (status != 0)
2740 {
2741 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2742 return 6;
2743 }
2745 if (!S_ISDIR(statbuf.st_mode)
2746 || access(dir, R_OK|W_OK|X_OK) != 0)
2747 {
2748 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2749 errno ? rrd_strerror(errno) : "");
2750 return 6;
2751 }
2753 journal_cur = malloc(PATH_MAX + 1);
2754 journal_old = malloc(PATH_MAX + 1);
2755 if (journal_cur == NULL || journal_old == NULL)
2756 {
2757 fprintf(stderr, "malloc failure for journal files\n");
2758 return 6;
2759 }
2760 else
2761 {
2762 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2763 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2764 }
2765 }
2766 break;
2768 case 'h':
2769 case '?':
2770 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2771 "\n"
2772 "Usage: rrdcached [options]\n"
2773 "\n"
2774 "Valid options are:\n"
2775 " -l <address> Socket address to listen to.\n"
2776 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2777 " -w <seconds> Interval in which to write data.\n"
2778 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2779 " -t <threads> Number of write threads.\n"
2780 " -f <seconds> Interval in which to flush dead data.\n"
2781 " -p <file> Location of the PID-file.\n"
2782 " -b <dir> Base directory to change to.\n"
2783 " -B Restrict file access to paths within -b <dir>\n"
2784 " -g Do not fork and run in the foreground.\n"
2785 " -j <dir> Directory in which to create the journal files.\n"
2786 " -F Always flush all updates at shutdown\n"
2787 "\n"
2788 "For more information and a detailed description of all options "
2789 "please refer\n"
2790 "to the rrdcached(1) manual page.\n",
2791 VERSION);
2792 status = -1;
2793 break;
2794 } /* switch (option) */
2795 } /* while (getopt) */
2797 /* advise the user when values are not sane */
2798 if (config_flush_interval < 2 * config_write_interval)
2799 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2800 " 2x write interval (-w) !\n");
2801 if (config_write_jitter > config_write_interval)
2802 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2803 " write interval (-w) !\n");
2805 if (config_write_base_only && config_base_dir == NULL)
2806 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2807 " Consult the rrdcached documentation\n");
2809 if (journal_cur == NULL)
2810 config_flush_at_shutdown = 1;
2812 return (status);
2813 } /* }}} int read_options */
2815 int main (int argc, char **argv)
2816 {
2817 int status;
2819 status = read_options (argc, argv);
2820 if (status != 0)
2821 {
2822 if (status < 0)
2823 status = 0;
2824 return (status);
2825 }
2827 status = daemonize ();
2828 if (status != 0)
2829 {
2830 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2831 return (1);
2832 }
2834 journal_init();
2836 /* start the queue threads */
2837 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2838 if (queue_threads == NULL)
2839 {
2840 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2841 cleanup();
2842 return (1);
2843 }
2844 for (int i = 0; i < config_queue_threads; i++)
2845 {
2846 memset (&queue_threads[i], 0, sizeof (*queue_threads));
2847 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2848 if (status != 0)
2849 {
2850 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2851 cleanup();
2852 return (1);
2853 }
2854 }
2856 /* start the flush thread */
2857 memset(&flush_thread, 0, sizeof(flush_thread));
2858 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2859 if (status != 0)
2860 {
2861 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2862 cleanup();
2863 return (1);
2864 }
2866 listen_thread_main (NULL);
2867 cleanup ();
2869 return (0);
2870 } /* int main */
2872 /*
2873 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2874 */