5231942647362921afce80bb01e62b03e75bbdbb
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111 * Types
112 */
113 typedef enum
114 {
115 PRIV_LOW,
116 PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
122 {
123 int fd;
124 char addr[PATH_MAX + 1];
125 int family;
126 socket_privilege privilege;
128 /* state for BATCH processing */
129 time_t batch_start;
130 int batch_cmd;
132 /* buffered IO */
133 char *rbuf;
134 off_t next_cmd;
135 off_t next_read;
137 char *wbuf;
138 ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146 char *file;
147 char **values;
148 int values_num;
149 time_t last_flush_time;
150 time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153 int flags;
154 pthread_cond_t flushed;
155 cache_item_t *prev;
156 cache_item_t *next;
157 };
159 struct callback_flush_data_s
160 {
161 time_t now;
162 time_t abs_timeout;
163 char **keys;
164 size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
169 {
170 HEAD,
171 TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180 * Variables
181 */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree *cache_tree = NULL;
198 static cache_item_t *cache_queue_head = NULL;
199 static cache_item_t *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /*
234 * Functions
235 */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239 do_shutdown++;
240 pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250 sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255 config_flush_at_shutdown = 1;
256 sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261 config_flush_at_shutdown = 0;
262 sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
266 {
267 /* These structures are static, because `sigaction' behaves weird if the are
268 * overwritten.. */
269 static struct sigaction sa_int;
270 static struct sigaction sa_term;
271 static struct sigaction sa_pipe;
272 static struct sigaction sa_usr1;
273 static struct sigaction sa_usr2;
275 /* Install signal handlers */
276 memset (&sa_int, 0, sizeof (sa_int));
277 sa_int.sa_handler = sig_int_handler;
278 sigaction (SIGINT, &sa_int, NULL);
280 memset (&sa_term, 0, sizeof (sa_term));
281 sa_term.sa_handler = sig_term_handler;
282 sigaction (SIGTERM, &sa_term, NULL);
284 memset (&sa_pipe, 0, sizeof (sa_pipe));
285 sa_pipe.sa_handler = SIG_IGN;
286 sigaction (SIGPIPE, &sa_pipe, NULL);
288 memset (&sa_pipe, 0, sizeof (sa_usr1));
289 sa_usr1.sa_handler = sig_usr1_handler;
290 sigaction (SIGUSR1, &sa_usr1, NULL);
292 memset (&sa_usr2, 0, sizeof (sa_usr2));
293 sa_usr2.sa_handler = sig_usr2_handler;
294 sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300 int fd;
301 char *file;
303 file = (config_pid_file != NULL)
304 ? config_pid_file
305 : LOCALSTATEDIR "/run/rrdcached.pid";
307 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308 if (fd < 0)
309 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310 action, file, rrd_strerror(errno));
312 return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318 int pid_fd;
319 pid_t pid;
320 char pid_str[16];
322 pid_fd = open_pidfile("open", O_RDWR);
323 if (pid_fd < 0)
324 return pid_fd;
326 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327 return -1;
329 pid = atoi(pid_str);
330 if (pid <= 0)
331 return -1;
333 /* another running process that we can signal COULD be
334 * a competing rrdcached */
335 if (pid != getpid() && kill(pid, 0) == 0)
336 {
337 fprintf(stderr,
338 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339 close(pid_fd);
340 return -1;
341 }
343 lseek(pid_fd, 0, SEEK_SET);
344 ftruncate(pid_fd, 0);
346 fprintf(stderr,
347 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348 "rrdcached: starting normally.\n", pid);
350 return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
354 {
355 pid_t pid;
356 FILE *fh;
358 pid = getpid ();
360 fh = fdopen (fd, "w");
361 if (fh == NULL)
362 {
363 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364 close(fd);
365 return (-1);
366 }
368 fprintf (fh, "%i\n", (int) pid);
369 fclose (fh);
371 return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
375 {
376 char *file;
377 int status;
379 file = (config_pid_file != NULL)
380 ? config_pid_file
381 : LOCALSTATEDIR "/run/rrdcached.pid";
383 status = unlink (file);
384 if (status == 0)
385 return (0);
386 return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391 char *eol;
393 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394 sock->next_read - sock->next_cmd);
396 if (eol == NULL)
397 {
398 /* no commands left, move remainder back to front of rbuf */
399 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400 sock->next_read - sock->next_cmd);
401 sock->next_read -= sock->next_cmd;
402 sock->next_cmd = 0;
403 *len = 0;
404 return NULL;
405 }
406 else
407 {
408 char *cmd = sock->rbuf + sock->next_cmd;
409 *eol = '\0';
411 sock->next_cmd = eol - sock->rbuf + 1;
413 if (eol > sock->rbuf && *(eol-1) == '\r')
414 *(--eol) = '\0'; /* handle "\r\n" EOL */
416 *len = eol - cmd;
418 return cmd;
419 }
421 /* NOTREACHED */
422 assert(1==0);
423 }
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428 char *new_buf;
430 assert(sock != NULL);
432 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433 if (new_buf == NULL)
434 {
435 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436 return -1;
437 }
439 strncpy(new_buf + sock->wbuf_len, str, len + 1);
441 sock->wbuf = new_buf;
442 sock->wbuf_len += len;
444 return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450 va_list argp;
451 char buffer[CMD_MAX];
452 int len;
454 if (sock == NULL) return 0; /* journal replay mode */
455 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457 va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461 len = vsprintf(buffer, fmt, argp);
462 #endif
463 va_end(argp);
464 if (len < 0)
465 {
466 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467 return -1;
468 }
470 return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
474 {
475 int lines = 0;
477 if (str != NULL)
478 {
479 while ((str = strchr(str, '\n')) != NULL)
480 {
481 ++lines;
482 ++str;
483 }
484 }
486 return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490 * returns 0 on success, -1 on error
491 * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493 char *fmt, ...) /* {{{ */
494 {
495 va_list argp;
496 char buffer[CMD_MAX];
497 int lines;
498 ssize_t wrote;
499 int rclen, len;
501 if (sock == NULL) return rc; /* journal replay mode */
503 if (sock->batch_start)
504 {
505 if (rc == RESP_OK)
506 return rc; /* no response on success during BATCH */
507 lines = sock->batch_cmd;
508 }
509 else if (rc == RESP_OK)
510 lines = count_lines(sock->wbuf);
511 else
512 lines = -1;
514 rclen = sprintf(buffer, "%d ", lines);
515 va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519 len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521 va_end(argp);
522 if (len < 0)
523 return -1;
525 len += rclen;
527 /* append the result to the wbuf, don't write to the user */
528 if (sock->batch_start)
529 return add_to_wbuf(sock, buffer, len);
531 /* first write must be complete */
532 if (len != write(sock->fd, buffer, len))
533 {
534 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535 return -1;
536 }
538 if (sock->wbuf != NULL && rc == RESP_OK)
539 {
540 wrote = 0;
541 while (wrote < sock->wbuf_len)
542 {
543 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544 if (wb <= 0)
545 {
546 RRDD_LOG(LOG_INFO, "send_response: could not write results");
547 return -1;
548 }
549 wrote += wb;
550 }
551 }
553 free(sock->wbuf); sock->wbuf = NULL;
554 sock->wbuf_len = 0;
556 return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561 ci->values = NULL;
562 ci->values_num = 0;
564 ci->last_flush_time = when;
565 if (config_write_jitter > 0)
566 ci->last_flush_time += (random() % config_write_jitter);
567 }
569 /* remove_from_queue
570 * remove a "cache_item_t" item from the queue.
571 * must hold 'cache_lock' when calling this
572 */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575 if (ci == NULL) return;
576 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578 if (ci->prev == NULL)
579 cache_queue_head = ci->next; /* reset head */
580 else
581 ci->prev->next = ci->next;
583 if (ci->next == NULL)
584 cache_queue_tail = ci->prev; /* reset the tail */
585 else
586 ci->next->prev = ci->prev;
588 ci->next = ci->prev = NULL;
589 ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
592 /* free the resources associated with the cache_item_t
593 * must hold cache_lock when calling this function
594 */
595 static void *free_cache_item(cache_item_t *ci) /* {{{ */
596 {
597 if (ci == NULL) return NULL;
599 remove_from_queue(ci);
601 for (int i=0; i < ci->values_num; i++)
602 free(ci->values[i]);
604 free (ci->values);
605 free (ci->file);
607 /* in case anyone is waiting */
608 pthread_cond_broadcast(&ci->flushed);
610 free (ci);
612 return NULL;
613 } /* }}} static void *free_cache_item */
615 /*
616 * enqueue_cache_item:
617 * `cache_lock' must be acquired before calling this function!
618 */
619 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
620 queue_side_t side)
621 {
622 if (ci == NULL)
623 return (-1);
625 if (ci->values_num == 0)
626 return (0);
628 if (side == HEAD)
629 {
630 if (cache_queue_head == ci)
631 return 0;
633 /* remove if further down in queue */
634 remove_from_queue(ci);
636 ci->prev = NULL;
637 ci->next = cache_queue_head;
638 if (ci->next != NULL)
639 ci->next->prev = ci;
640 cache_queue_head = ci;
642 if (cache_queue_tail == NULL)
643 cache_queue_tail = cache_queue_head;
644 }
645 else /* (side == TAIL) */
646 {
647 /* We don't move values back in the list.. */
648 if (ci->flags & CI_FLAGS_IN_QUEUE)
649 return (0);
651 assert (ci->next == NULL);
652 assert (ci->prev == NULL);
654 ci->prev = cache_queue_tail;
656 if (cache_queue_tail == NULL)
657 cache_queue_head = ci;
658 else
659 cache_queue_tail->next = ci;
661 cache_queue_tail = ci;
662 }
664 ci->flags |= CI_FLAGS_IN_QUEUE;
666 pthread_cond_broadcast(&cache_cond);
667 pthread_mutex_lock (&stats_lock);
668 stats_queue_length++;
669 pthread_mutex_unlock (&stats_lock);
671 return (0);
672 } /* }}} int enqueue_cache_item */
674 /*
675 * tree_callback_flush:
676 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
677 * while this is in progress.
678 */
679 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
680 gpointer data)
681 {
682 cache_item_t *ci;
683 callback_flush_data_t *cfd;
685 ci = (cache_item_t *) value;
686 cfd = (callback_flush_data_t *) data;
688 if (ci->flags & CI_FLAGS_IN_QUEUE)
689 return FALSE;
691 if ((ci->last_flush_time <= cfd->abs_timeout)
692 && (ci->values_num > 0))
693 {
694 enqueue_cache_item (ci, TAIL);
695 }
696 else if ((do_shutdown != 0)
697 && (ci->values_num > 0))
698 {
699 enqueue_cache_item (ci, TAIL);
700 }
701 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
702 && (ci->values_num <= 0))
703 {
704 char **temp;
706 temp = (char **) realloc (cfd->keys,
707 sizeof (char *) * (cfd->keys_num + 1));
708 if (temp == NULL)
709 {
710 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
711 return (FALSE);
712 }
713 cfd->keys = temp;
714 /* Make really sure this points to the _same_ place */
715 assert ((char *) key == ci->file);
716 cfd->keys[cfd->keys_num] = (char *) key;
717 cfd->keys_num++;
718 }
720 return (FALSE);
721 } /* }}} gboolean tree_callback_flush */
723 static int flush_old_values (int max_age)
724 {
725 callback_flush_data_t cfd;
726 size_t k;
728 memset (&cfd, 0, sizeof (cfd));
729 /* Pass the current time as user data so that we don't need to call
730 * `time' for each node. */
731 cfd.now = time (NULL);
732 cfd.keys = NULL;
733 cfd.keys_num = 0;
735 if (max_age > 0)
736 cfd.abs_timeout = cfd.now - max_age;
737 else
738 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
740 /* `tree_callback_flush' will return the keys of all values that haven't
741 * been touched in the last `config_flush_interval' seconds in `cfd'.
742 * The char*'s in this array point to the same memory as ci->file, so we
743 * don't need to free them separately. */
744 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
746 for (k = 0; k < cfd.keys_num; k++)
747 {
748 /* should never fail, since we have held the cache_lock
749 * the entire time */
750 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
751 }
753 if (cfd.keys != NULL)
754 {
755 free (cfd.keys);
756 cfd.keys = NULL;
757 }
759 return (0);
760 } /* int flush_old_values */
762 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
763 {
764 struct timeval now;
765 struct timespec next_flush;
766 int final_flush = 0; /* make sure we only flush once on shutdown */
768 gettimeofday (&now, NULL);
769 next_flush.tv_sec = now.tv_sec + config_flush_interval;
770 next_flush.tv_nsec = 1000 * now.tv_usec;
772 pthread_mutex_lock (&cache_lock);
773 while ((do_shutdown == 0) || (cache_queue_head != NULL))
774 {
775 cache_item_t *ci;
776 char *file;
777 char **values;
778 int values_num;
779 int status;
780 int i;
782 /* First, check if it's time to do the cache flush. */
783 gettimeofday (&now, NULL);
784 if ((now.tv_sec > next_flush.tv_sec)
785 || ((now.tv_sec == next_flush.tv_sec)
786 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
787 {
788 /* Flush all values that haven't been written in the last
789 * `config_write_interval' seconds. */
790 flush_old_values (config_write_interval);
792 /* Determine the time of the next cache flush. */
793 next_flush.tv_sec =
794 now.tv_sec + next_flush.tv_sec % config_flush_interval;
796 /* unlock the cache while we rotate so we don't block incoming
797 * updates if the fsync() blocks on disk I/O */
798 pthread_mutex_unlock(&cache_lock);
799 journal_rotate();
800 pthread_mutex_lock(&cache_lock);
801 }
803 /* Now, check if there's something to store away. If not, wait until
804 * something comes in or it's time to do the cache flush. if we are
805 * shutting down, do not wait around. */
806 if (cache_queue_head == NULL && !do_shutdown)
807 {
808 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
809 if ((status != 0) && (status != ETIMEDOUT))
810 {
811 RRDD_LOG (LOG_ERR, "queue_thread_main: "
812 "pthread_cond_timedwait returned %i.", status);
813 }
814 }
816 /* We're about to shut down */
817 if (do_shutdown != 0 && !final_flush++)
818 {
819 if (config_flush_at_shutdown)
820 flush_old_values (-1); /* flush everything */
821 else
822 break;
823 }
825 /* Check if a value has arrived. This may be NULL if we timed out or there
826 * was an interrupt such as a signal. */
827 if (cache_queue_head == NULL)
828 continue;
830 ci = cache_queue_head;
832 /* copy the relevant parts */
833 file = strdup (ci->file);
834 if (file == NULL)
835 {
836 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
837 continue;
838 }
840 assert(ci->values != NULL);
841 assert(ci->values_num > 0);
843 values = ci->values;
844 values_num = ci->values_num;
846 wipe_ci_values(ci, time(NULL));
847 remove_from_queue(ci);
849 pthread_mutex_lock (&stats_lock);
850 assert (stats_queue_length > 0);
851 stats_queue_length--;
852 pthread_mutex_unlock (&stats_lock);
854 pthread_mutex_unlock (&cache_lock);
856 rrd_clear_error ();
857 status = rrd_update_r (file, NULL, values_num, (void *) values);
858 if (status != 0)
859 {
860 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
861 "rrd_update_r (%s) failed with status %i. (%s)",
862 file, status, rrd_get_error());
863 }
865 journal_write("wrote", file);
866 pthread_cond_broadcast(&ci->flushed);
868 for (i = 0; i < values_num; i++)
869 free (values[i]);
871 free(values);
872 free(file);
874 if (status == 0)
875 {
876 pthread_mutex_lock (&stats_lock);
877 stats_updates_written++;
878 stats_data_sets_written += values_num;
879 pthread_mutex_unlock (&stats_lock);
880 }
882 pthread_mutex_lock (&cache_lock);
884 /* We're about to shut down */
885 if (do_shutdown != 0 && !final_flush++)
886 {
887 if (config_flush_at_shutdown)
888 flush_old_values (-1); /* flush everything */
889 else
890 break;
891 }
892 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
893 pthread_mutex_unlock (&cache_lock);
895 if (config_flush_at_shutdown)
896 {
897 assert(cache_queue_head == NULL);
898 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
899 }
901 journal_done();
903 return (NULL);
904 } /* }}} void *queue_thread_main */
906 static int buffer_get_field (char **buffer_ret, /* {{{ */
907 size_t *buffer_size_ret, char **field_ret)
908 {
909 char *buffer;
910 size_t buffer_pos;
911 size_t buffer_size;
912 char *field;
913 size_t field_size;
914 int status;
916 buffer = *buffer_ret;
917 buffer_pos = 0;
918 buffer_size = *buffer_size_ret;
919 field = *buffer_ret;
920 field_size = 0;
922 if (buffer_size <= 0)
923 return (-1);
925 /* This is ensured by `handle_request'. */
926 assert (buffer[buffer_size - 1] == '\0');
928 status = -1;
929 while (buffer_pos < buffer_size)
930 {
931 /* Check for end-of-field or end-of-buffer */
932 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
933 {
934 field[field_size] = 0;
935 field_size++;
936 buffer_pos++;
937 status = 0;
938 break;
939 }
940 /* Handle escaped characters. */
941 else if (buffer[buffer_pos] == '\\')
942 {
943 if (buffer_pos >= (buffer_size - 1))
944 break;
945 buffer_pos++;
946 field[field_size] = buffer[buffer_pos];
947 field_size++;
948 buffer_pos++;
949 }
950 /* Normal operation */
951 else
952 {
953 field[field_size] = buffer[buffer_pos];
954 field_size++;
955 buffer_pos++;
956 }
957 } /* while (buffer_pos < buffer_size) */
959 if (status != 0)
960 return (status);
962 *buffer_ret = buffer + buffer_pos;
963 *buffer_size_ret = buffer_size - buffer_pos;
964 *field_ret = field;
966 return (0);
967 } /* }}} int buffer_get_field */
969 /* if we're restricting writes to the base directory,
970 * check whether the file falls within the dir
971 * returns 1 if OK, otherwise 0
972 */
973 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
974 {
975 assert(file != NULL);
977 if (!config_write_base_only
978 || sock == NULL /* journal replay */
979 || config_base_dir == NULL)
980 return 1;
982 if (strstr(file, "../") != NULL) goto err;
984 /* relative paths without "../" are ok */
985 if (*file != '/') return 1;
987 /* file must be of the format base + "/" + <1+ char filename> */
988 if (strlen(file) < _config_base_dir_len + 2) goto err;
989 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
990 if (*(file + _config_base_dir_len) != '/') goto err;
992 return 1;
994 err:
995 if (sock != NULL && sock->fd >= 0)
996 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
998 return 0;
999 } /* }}} static int check_file_access */
1001 /* when using a base dir, convert relative paths to absolute paths.
1002 * if necessary, modifies the "filename" pointer to point
1003 * to the new path created in "tmp". "tmp" is provided
1004 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1005 *
1006 * this allows us to optimize for the expected case (absolute path)
1007 * with a no-op.
1008 */
1009 static void get_abs_path(char **filename, char *tmp)
1010 {
1011 assert(tmp != NULL);
1012 assert(filename != NULL && *filename != NULL);
1014 if (config_base_dir == NULL || **filename == '/')
1015 return;
1017 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1018 *filename = tmp;
1019 } /* }}} static int get_abs_path */
1021 /* returns 1 if we have the required privilege level,
1022 * otherwise issue an error to the user on sock */
1023 static int has_privilege (listen_socket_t *sock, /* {{{ */
1024 socket_privilege priv)
1025 {
1026 if (sock == NULL) /* journal replay */
1027 return 1;
1029 if (sock->privilege >= priv)
1030 return 1;
1032 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033 } /* }}} static int has_privilege */
1035 static int flush_file (const char *filename) /* {{{ */
1036 {
1037 cache_item_t *ci;
1039 pthread_mutex_lock (&cache_lock);
1041 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1042 if (ci == NULL)
1043 {
1044 pthread_mutex_unlock (&cache_lock);
1045 return (ENOENT);
1046 }
1048 if (ci->values_num > 0)
1049 {
1050 /* Enqueue at head */
1051 enqueue_cache_item (ci, HEAD);
1052 pthread_cond_wait(&ci->flushed, &cache_lock);
1053 }
1055 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1056 * may have been purged during our cond_wait() */
1058 pthread_mutex_unlock(&cache_lock);
1060 return (0);
1061 } /* }}} int flush_file */
1063 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1064 char *buffer, size_t buffer_size)
1065 {
1066 int status;
1067 char **help_text;
1068 char *command;
1070 char *help_help[2] =
1071 {
1072 "Command overview\n"
1073 ,
1074 "HELP [<command>]\n"
1075 "FLUSH <filename>\n"
1076 "FLUSHALL\n"
1077 "PENDING <filename>\n"
1078 "FORGET <filename>\n"
1079 "UPDATE <filename> <values> [<values> ...]\n"
1080 "BATCH\n"
1081 "STATS\n"
1082 };
1084 char *help_flush[2] =
1085 {
1086 "Help for FLUSH\n"
1087 ,
1088 "Usage: FLUSH <filename>\n"
1089 "\n"
1090 "Adds the given filename to the head of the update queue and returns\n"
1091 "after is has been dequeued.\n"
1092 };
1094 char *help_flushall[2] =
1095 {
1096 "Help for FLUSHALL\n"
1097 ,
1098 "Usage: FLUSHALL\n"
1099 "\n"
1100 "Triggers writing of all pending updates. Returns immediately.\n"
1101 };
1103 char *help_pending[2] =
1104 {
1105 "Help for PENDING\n"
1106 ,
1107 "Usage: PENDING <filename>\n"
1108 "\n"
1109 "Shows any 'pending' updates for a file, in order.\n"
1110 "The updates shown have not yet been written to the underlying RRD file.\n"
1111 };
1113 char *help_forget[2] =
1114 {
1115 "Help for FORGET\n"
1116 ,
1117 "Usage: FORGET <filename>\n"
1118 "\n"
1119 "Removes the file completely from the cache.\n"
1120 "Any pending updates for the file will be lost.\n"
1121 };
1123 char *help_update[2] =
1124 {
1125 "Help for UPDATE\n"
1126 ,
1127 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1128 "\n"
1129 "Adds the given file to the internal cache if it is not yet known and\n"
1130 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1131 "for details.\n"
1132 "\n"
1133 "Each <values> has the following form:\n"
1134 " <values> = <time>:<value>[:<value>[...]]\n"
1135 "See the rrdupdate(1) manpage for details.\n"
1136 };
1138 char *help_stats[2] =
1139 {
1140 "Help for STATS\n"
1141 ,
1142 "Usage: STATS\n"
1143 "\n"
1144 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1145 "a description of the values.\n"
1146 };
1148 char *help_batch[2] =
1149 {
1150 "Help for BATCH\n"
1151 ,
1152 "The 'BATCH' command permits the client to initiate a bulk load\n"
1153 " of commands to rrdcached.\n"
1154 "\n"
1155 "Usage:\n"
1156 "\n"
1157 " client: BATCH\n"
1158 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1159 " client: command #1\n"
1160 " client: command #2\n"
1161 " client: ... and so on\n"
1162 " client: .\n"
1163 " server: 2 errors\n"
1164 " server: 7 message for command #7\n"
1165 " server: 9 message for command #9\n"
1166 "\n"
1167 "For more information, consult the rrdcached(1) documentation.\n"
1168 };
1170 status = buffer_get_field (&buffer, &buffer_size, &command);
1171 if (status != 0)
1172 help_text = help_help;
1173 else
1174 {
1175 if (strcasecmp (command, "update") == 0)
1176 help_text = help_update;
1177 else if (strcasecmp (command, "flush") == 0)
1178 help_text = help_flush;
1179 else if (strcasecmp (command, "flushall") == 0)
1180 help_text = help_flushall;
1181 else if (strcasecmp (command, "pending") == 0)
1182 help_text = help_pending;
1183 else if (strcasecmp (command, "forget") == 0)
1184 help_text = help_forget;
1185 else if (strcasecmp (command, "stats") == 0)
1186 help_text = help_stats;
1187 else if (strcasecmp (command, "batch") == 0)
1188 help_text = help_batch;
1189 else
1190 help_text = help_help;
1191 }
1193 add_response_info(sock, help_text[1]);
1194 return send_response(sock, RESP_OK, help_text[0]);
1195 } /* }}} int handle_request_help */
1197 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1198 {
1199 uint64_t copy_queue_length;
1200 uint64_t copy_updates_received;
1201 uint64_t copy_flush_received;
1202 uint64_t copy_updates_written;
1203 uint64_t copy_data_sets_written;
1204 uint64_t copy_journal_bytes;
1205 uint64_t copy_journal_rotate;
1207 uint64_t tree_nodes_number;
1208 uint64_t tree_depth;
1210 pthread_mutex_lock (&stats_lock);
1211 copy_queue_length = stats_queue_length;
1212 copy_updates_received = stats_updates_received;
1213 copy_flush_received = stats_flush_received;
1214 copy_updates_written = stats_updates_written;
1215 copy_data_sets_written = stats_data_sets_written;
1216 copy_journal_bytes = stats_journal_bytes;
1217 copy_journal_rotate = stats_journal_rotate;
1218 pthread_mutex_unlock (&stats_lock);
1220 pthread_mutex_lock (&cache_lock);
1221 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1222 tree_depth = (uint64_t) g_tree_height (cache_tree);
1223 pthread_mutex_unlock (&cache_lock);
1225 add_response_info(sock,
1226 "QueueLength: %"PRIu64"\n", copy_queue_length);
1227 add_response_info(sock,
1228 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1229 add_response_info(sock,
1230 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1231 add_response_info(sock,
1232 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1233 add_response_info(sock,
1234 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1235 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1236 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1237 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1238 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1240 send_response(sock, RESP_OK, "Statistics follow\n");
1242 return (0);
1243 } /* }}} int handle_request_stats */
1245 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1246 char *buffer, size_t buffer_size)
1247 {
1248 char *file, file_tmp[PATH_MAX];
1249 int status;
1251 status = buffer_get_field (&buffer, &buffer_size, &file);
1252 if (status != 0)
1253 {
1254 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1255 }
1256 else
1257 {
1258 pthread_mutex_lock(&stats_lock);
1259 stats_flush_received++;
1260 pthread_mutex_unlock(&stats_lock);
1262 get_abs_path(&file, file_tmp);
1263 if (!check_file_access(file, sock)) return 0;
1265 status = flush_file (file);
1266 if (status == 0)
1267 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1268 else if (status == ENOENT)
1269 {
1270 /* no file in our tree; see whether it exists at all */
1271 struct stat statbuf;
1273 memset(&statbuf, 0, sizeof(statbuf));
1274 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1275 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1276 else
1277 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1278 }
1279 else if (status < 0)
1280 return send_response(sock, RESP_ERR, "Internal error.\n");
1281 else
1282 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1283 }
1285 /* NOTREACHED */
1286 assert(1==0);
1287 } /* }}} int handle_request_flush */
1289 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1290 {
1291 int status;
1293 status = has_privilege(sock, PRIV_HIGH);
1294 if (status <= 0)
1295 return status;
1297 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1299 pthread_mutex_lock(&cache_lock);
1300 flush_old_values(-1);
1301 pthread_mutex_unlock(&cache_lock);
1303 return send_response(sock, RESP_OK, "Started flush.\n");
1304 } /* }}} static int handle_request_flushall */
1306 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1307 char *buffer, size_t buffer_size)
1308 {
1309 int status;
1310 char *file, file_tmp[PATH_MAX];
1311 cache_item_t *ci;
1313 status = buffer_get_field(&buffer, &buffer_size, &file);
1314 if (status != 0)
1315 return send_response(sock, RESP_ERR,
1316 "Usage: PENDING <filename>\n");
1318 status = has_privilege(sock, PRIV_HIGH);
1319 if (status <= 0)
1320 return status;
1322 get_abs_path(&file, file_tmp);
1324 pthread_mutex_lock(&cache_lock);
1325 ci = g_tree_lookup(cache_tree, file);
1326 if (ci == NULL)
1327 {
1328 pthread_mutex_unlock(&cache_lock);
1329 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1330 }
1332 for (int i=0; i < ci->values_num; i++)
1333 add_response_info(sock, "%s\n", ci->values[i]);
1335 pthread_mutex_unlock(&cache_lock);
1336 return send_response(sock, RESP_OK, "updates pending\n");
1337 } /* }}} static int handle_request_pending */
1339 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1340 char *buffer, size_t buffer_size)
1341 {
1342 int status;
1343 gboolean found;
1344 char *file, file_tmp[PATH_MAX];
1346 status = buffer_get_field(&buffer, &buffer_size, &file);
1347 if (status != 0)
1348 return send_response(sock, RESP_ERR,
1349 "Usage: FORGET <filename>\n");
1351 status = has_privilege(sock, PRIV_HIGH);
1352 if (status <= 0)
1353 return status;
1355 get_abs_path(&file, file_tmp);
1356 if (!check_file_access(file, sock)) return 0;
1358 pthread_mutex_lock(&cache_lock);
1359 found = g_tree_remove(cache_tree, file);
1360 pthread_mutex_unlock(&cache_lock);
1362 if (found == TRUE)
1363 {
1364 if (sock != NULL)
1365 journal_write("forget", file);
1367 return send_response(sock, RESP_OK, "Gone!\n");
1368 }
1369 else
1370 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1372 /* NOTREACHED */
1373 assert(1==0);
1374 } /* }}} static int handle_request_forget */
1376 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1377 time_t now,
1378 char *buffer, size_t buffer_size)
1379 {
1380 char *file, file_tmp[PATH_MAX];
1381 int values_num = 0;
1382 int status;
1383 char orig_buf[CMD_MAX];
1385 cache_item_t *ci;
1387 status = has_privilege(sock, PRIV_HIGH);
1388 if (status <= 0)
1389 return status;
1391 /* save it for the journal later */
1392 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1394 status = buffer_get_field (&buffer, &buffer_size, &file);
1395 if (status != 0)
1396 return send_response(sock, RESP_ERR,
1397 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1399 pthread_mutex_lock(&stats_lock);
1400 stats_updates_received++;
1401 pthread_mutex_unlock(&stats_lock);
1403 get_abs_path(&file, file_tmp);
1404 if (!check_file_access(file, sock)) return 0;
1406 pthread_mutex_lock (&cache_lock);
1407 ci = g_tree_lookup (cache_tree, file);
1409 if (ci == NULL) /* {{{ */
1410 {
1411 struct stat statbuf;
1413 /* don't hold the lock while we setup; stat(2) might block */
1414 pthread_mutex_unlock(&cache_lock);
1416 memset (&statbuf, 0, sizeof (statbuf));
1417 status = stat (file, &statbuf);
1418 if (status != 0)
1419 {
1420 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1422 status = errno;
1423 if (status == ENOENT)
1424 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1425 else
1426 return send_response(sock, RESP_ERR,
1427 "stat failed with error %i.\n", status);
1428 }
1429 if (!S_ISREG (statbuf.st_mode))
1430 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1432 if (access(file, R_OK|W_OK) != 0)
1433 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1434 file, rrd_strerror(errno));
1436 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1437 if (ci == NULL)
1438 {
1439 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1441 return send_response(sock, RESP_ERR, "malloc failed.\n");
1442 }
1443 memset (ci, 0, sizeof (cache_item_t));
1445 ci->file = strdup (file);
1446 if (ci->file == NULL)
1447 {
1448 free (ci);
1449 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1451 return send_response(sock, RESP_ERR, "strdup failed.\n");
1452 }
1454 wipe_ci_values(ci, now);
1455 ci->flags = CI_FLAGS_IN_TREE;
1456 pthread_cond_init(&ci->flushed, NULL);
1458 pthread_mutex_lock(&cache_lock);
1459 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1460 } /* }}} */
1461 assert (ci != NULL);
1463 /* don't re-write updates in replay mode */
1464 if (sock != NULL)
1465 journal_write("update", orig_buf);
1467 while (buffer_size > 0)
1468 {
1469 char **temp;
1470 char *value;
1471 time_t stamp;
1472 char *eostamp;
1474 status = buffer_get_field (&buffer, &buffer_size, &value);
1475 if (status != 0)
1476 {
1477 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1478 break;
1479 }
1481 /* make sure update time is always moving forward */
1482 stamp = strtol(value, &eostamp, 10);
1483 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1484 {
1485 pthread_mutex_unlock(&cache_lock);
1486 return send_response(sock, RESP_ERR,
1487 "Cannot find timestamp in '%s'!\n", value);
1488 }
1489 else if (stamp <= ci->last_update_stamp)
1490 {
1491 pthread_mutex_unlock(&cache_lock);
1492 return send_response(sock, RESP_ERR,
1493 "illegal attempt to update using time %ld when last"
1494 " update time is %ld (minimum one second step)\n",
1495 stamp, ci->last_update_stamp);
1496 }
1497 else
1498 ci->last_update_stamp = stamp;
1500 temp = (char **) realloc (ci->values,
1501 sizeof (char *) * (ci->values_num + 1));
1502 if (temp == NULL)
1503 {
1504 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1505 continue;
1506 }
1507 ci->values = temp;
1509 ci->values[ci->values_num] = strdup (value);
1510 if (ci->values[ci->values_num] == NULL)
1511 {
1512 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1513 continue;
1514 }
1515 ci->values_num++;
1517 values_num++;
1518 }
1520 if (((now - ci->last_flush_time) >= config_write_interval)
1521 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1522 && (ci->values_num > 0))
1523 {
1524 enqueue_cache_item (ci, TAIL);
1525 }
1527 pthread_mutex_unlock (&cache_lock);
1529 if (values_num < 1)
1530 return send_response(sock, RESP_ERR, "No values updated.\n");
1531 else
1532 return send_response(sock, RESP_OK,
1533 "errors, enqueued %i value(s).\n", values_num);
1535 /* NOTREACHED */
1536 assert(1==0);
1538 } /* }}} int handle_request_update */
1540 /* we came across a "WROTE" entry during journal replay.
1541 * throw away any values that we have accumulated for this file
1542 */
1543 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1544 {
1545 int i;
1546 cache_item_t *ci;
1547 const char *file = buffer;
1549 pthread_mutex_lock(&cache_lock);
1551 ci = g_tree_lookup(cache_tree, file);
1552 if (ci == NULL)
1553 {
1554 pthread_mutex_unlock(&cache_lock);
1555 return (0);
1556 }
1558 if (ci->values)
1559 {
1560 for (i=0; i < ci->values_num; i++)
1561 free(ci->values[i]);
1563 free(ci->values);
1564 }
1566 wipe_ci_values(ci, now);
1567 remove_from_queue(ci);
1569 pthread_mutex_unlock(&cache_lock);
1570 return (0);
1571 } /* }}} int handle_request_wrote */
1573 /* start "BATCH" processing */
1574 static int batch_start (listen_socket_t *sock) /* {{{ */
1575 {
1576 int status;
1577 if (sock->batch_start)
1578 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1580 status = send_response(sock, RESP_OK,
1581 "Go ahead. End with dot '.' on its own line.\n");
1582 sock->batch_start = time(NULL);
1583 sock->batch_cmd = 0;
1585 return status;
1586 } /* }}} static int batch_start */
1588 /* finish "BATCH" processing and return results to the client */
1589 static int batch_done (listen_socket_t *sock) /* {{{ */
1590 {
1591 assert(sock->batch_start);
1592 sock->batch_start = 0;
1593 sock->batch_cmd = 0;
1594 return send_response(sock, RESP_OK, "errors\n");
1595 } /* }}} static int batch_done */
1597 /* if sock==NULL, we are in journal replay mode */
1598 static int handle_request (listen_socket_t *sock, /* {{{ */
1599 time_t now,
1600 char *buffer, size_t buffer_size)
1601 {
1602 char *buffer_ptr;
1603 char *command;
1604 int status;
1606 assert (buffer[buffer_size - 1] == '\0');
1608 buffer_ptr = buffer;
1609 command = NULL;
1610 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1611 if (status != 0)
1612 {
1613 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1614 return (-1);
1615 }
1617 if (sock != NULL && sock->batch_start)
1618 sock->batch_cmd++;
1620 if (strcasecmp (command, "update") == 0)
1621 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1622 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1623 {
1624 /* this is only valid in replay mode */
1625 return (handle_request_wrote (buffer_ptr, now));
1626 }
1627 else if (strcasecmp (command, "flush") == 0)
1628 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1629 else if (strcasecmp (command, "flushall") == 0)
1630 return (handle_request_flushall(sock));
1631 else if (strcasecmp (command, "pending") == 0)
1632 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1633 else if (strcasecmp (command, "forget") == 0)
1634 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1635 else if (strcasecmp (command, "stats") == 0)
1636 return (handle_request_stats (sock));
1637 else if (strcasecmp (command, "help") == 0)
1638 return (handle_request_help (sock, buffer_ptr, buffer_size));
1639 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1640 return batch_start(sock);
1641 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1642 return batch_done(sock);
1643 else if (strcasecmp (command, "quit") == 0)
1644 return -1;
1645 else
1646 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1648 /* NOTREACHED */
1649 assert(1==0);
1650 } /* }}} int handle_request */
1652 /* MUST NOT hold journal_lock before calling this */
1653 static void journal_rotate(void) /* {{{ */
1654 {
1655 FILE *old_fh = NULL;
1656 int new_fd;
1658 if (journal_cur == NULL || journal_old == NULL)
1659 return;
1661 pthread_mutex_lock(&journal_lock);
1663 /* we rotate this way (rename before close) so that the we can release
1664 * the journal lock as fast as possible. Journal writes to the new
1665 * journal can proceed immediately after the new file is opened. The
1666 * fclose can then block without affecting new updates.
1667 */
1668 if (journal_fh != NULL)
1669 {
1670 old_fh = journal_fh;
1671 journal_fh = NULL;
1672 rename(journal_cur, journal_old);
1673 ++stats_journal_rotate;
1674 }
1676 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1677 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1678 if (new_fd >= 0)
1679 {
1680 journal_fh = fdopen(new_fd, "a");
1681 if (journal_fh == NULL)
1682 close(new_fd);
1683 }
1685 pthread_mutex_unlock(&journal_lock);
1687 if (old_fh != NULL)
1688 fclose(old_fh);
1690 if (journal_fh == NULL)
1691 {
1692 RRDD_LOG(LOG_CRIT,
1693 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1694 journal_cur, rrd_strerror(errno));
1696 RRDD_LOG(LOG_ERR,
1697 "JOURNALING DISABLED: All values will be flushed at shutdown");
1698 config_flush_at_shutdown = 1;
1699 }
1701 } /* }}} static void journal_rotate */
1703 static void journal_done(void) /* {{{ */
1704 {
1705 if (journal_cur == NULL)
1706 return;
1708 pthread_mutex_lock(&journal_lock);
1709 if (journal_fh != NULL)
1710 {
1711 fclose(journal_fh);
1712 journal_fh = NULL;
1713 }
1715 if (config_flush_at_shutdown)
1716 {
1717 RRDD_LOG(LOG_INFO, "removing journals");
1718 unlink(journal_old);
1719 unlink(journal_cur);
1720 }
1721 else
1722 {
1723 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1724 "journals will be used at next startup");
1725 }
1727 pthread_mutex_unlock(&journal_lock);
1729 } /* }}} static void journal_done */
1731 static int journal_write(char *cmd, char *args) /* {{{ */
1732 {
1733 int chars;
1735 if (journal_fh == NULL)
1736 return 0;
1738 pthread_mutex_lock(&journal_lock);
1739 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1740 pthread_mutex_unlock(&journal_lock);
1742 if (chars > 0)
1743 {
1744 pthread_mutex_lock(&stats_lock);
1745 stats_journal_bytes += chars;
1746 pthread_mutex_unlock(&stats_lock);
1747 }
1749 return chars;
1750 } /* }}} static int journal_write */
1752 static int journal_replay (const char *file) /* {{{ */
1753 {
1754 FILE *fh;
1755 int entry_cnt = 0;
1756 int fail_cnt = 0;
1757 uint64_t line = 0;
1758 char entry[CMD_MAX];
1759 time_t now;
1761 if (file == NULL) return 0;
1763 {
1764 char *reason = "unknown error";
1765 int status = 0;
1766 struct stat statbuf;
1768 memset(&statbuf, 0, sizeof(statbuf));
1769 if (stat(file, &statbuf) != 0)
1770 {
1771 if (errno == ENOENT)
1772 return 0;
1774 reason = "stat error";
1775 status = errno;
1776 }
1777 else if (!S_ISREG(statbuf.st_mode))
1778 {
1779 reason = "not a regular file";
1780 status = EPERM;
1781 }
1782 if (statbuf.st_uid != daemon_uid)
1783 {
1784 reason = "not owned by daemon user";
1785 status = EACCES;
1786 }
1787 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1788 {
1789 reason = "must not be user/group writable";
1790 status = EACCES;
1791 }
1793 if (status != 0)
1794 {
1795 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1796 file, rrd_strerror(status), reason);
1797 return 0;
1798 }
1799 }
1801 fh = fopen(file, "r");
1802 if (fh == NULL)
1803 {
1804 if (errno != ENOENT)
1805 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1806 file, rrd_strerror(errno));
1807 return 0;
1808 }
1809 else
1810 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1812 now = time(NULL);
1814 while(!feof(fh))
1815 {
1816 size_t entry_len;
1818 ++line;
1819 if (fgets(entry, sizeof(entry), fh) == NULL)
1820 break;
1821 entry_len = strlen(entry);
1823 /* check \n termination in case journal writing crashed mid-line */
1824 if (entry_len == 0)
1825 continue;
1826 else if (entry[entry_len - 1] != '\n')
1827 {
1828 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1829 ++fail_cnt;
1830 continue;
1831 }
1833 entry[entry_len - 1] = '\0';
1835 if (handle_request(NULL, now, entry, entry_len) == 0)
1836 ++entry_cnt;
1837 else
1838 ++fail_cnt;
1839 }
1841 fclose(fh);
1843 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1844 entry_cnt, fail_cnt);
1846 return entry_cnt > 0 ? 1 : 0;
1847 } /* }}} static int journal_replay */
1849 static void journal_init(void) /* {{{ */
1850 {
1851 int had_journal = 0;
1853 if (journal_cur == NULL) return;
1855 pthread_mutex_lock(&journal_lock);
1857 RRDD_LOG(LOG_INFO, "checking for journal files");
1859 had_journal += journal_replay(journal_old);
1860 had_journal += journal_replay(journal_cur);
1862 /* it must have been a crash. start a flush */
1863 if (had_journal && config_flush_at_shutdown)
1864 flush_old_values(-1);
1866 pthread_mutex_unlock(&journal_lock);
1867 journal_rotate();
1869 RRDD_LOG(LOG_INFO, "journal processing complete");
1871 } /* }}} static void journal_init */
1873 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1874 {
1875 assert(sock != NULL);
1877 free(sock->rbuf); sock->rbuf = NULL;
1878 free(sock->wbuf); sock->wbuf = NULL;
1879 free(sock);
1880 } /* }}} void free_listen_socket */
1882 static void close_connection(listen_socket_t *sock) /* {{{ */
1883 {
1884 if (sock->fd >= 0)
1885 {
1886 close(sock->fd);
1887 sock->fd = -1;
1888 }
1890 free_listen_socket(sock);
1892 } /* }}} void close_connection */
1894 static void *connection_thread_main (void *args) /* {{{ */
1895 {
1896 listen_socket_t *sock;
1897 int i;
1898 int fd;
1900 sock = (listen_socket_t *) args;
1901 fd = sock->fd;
1903 /* init read buffers */
1904 sock->next_read = sock->next_cmd = 0;
1905 sock->rbuf = malloc(RBUF_SIZE);
1906 if (sock->rbuf == NULL)
1907 {
1908 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1909 close_connection(sock);
1910 return NULL;
1911 }
1913 pthread_mutex_lock (&connection_threads_lock);
1914 {
1915 pthread_t *temp;
1917 temp = (pthread_t *) realloc (connection_threads,
1918 sizeof (pthread_t) * (connection_threads_num + 1));
1919 if (temp == NULL)
1920 {
1921 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1922 }
1923 else
1924 {
1925 connection_threads = temp;
1926 connection_threads[connection_threads_num] = pthread_self ();
1927 connection_threads_num++;
1928 }
1929 }
1930 pthread_mutex_unlock (&connection_threads_lock);
1932 while (do_shutdown == 0)
1933 {
1934 char *cmd;
1935 ssize_t cmd_len;
1936 ssize_t rbytes;
1937 time_t now;
1939 struct pollfd pollfd;
1940 int status;
1942 pollfd.fd = fd;
1943 pollfd.events = POLLIN | POLLPRI;
1944 pollfd.revents = 0;
1946 status = poll (&pollfd, 1, /* timeout = */ 500);
1947 if (do_shutdown)
1948 break;
1949 else if (status == 0) /* timeout */
1950 continue;
1951 else if (status < 0) /* error */
1952 {
1953 status = errno;
1954 if (status != EINTR)
1955 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1956 continue;
1957 }
1959 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1960 break;
1961 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1962 {
1963 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1964 "poll(2) returned something unexpected: %#04hx",
1965 pollfd.revents);
1966 break;
1967 }
1969 rbytes = read(fd, sock->rbuf + sock->next_read,
1970 RBUF_SIZE - sock->next_read);
1971 if (rbytes < 0)
1972 {
1973 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1974 break;
1975 }
1976 else if (rbytes == 0)
1977 break; /* eof */
1979 sock->next_read += rbytes;
1981 if (sock->batch_start)
1982 now = sock->batch_start;
1983 else
1984 now = time(NULL);
1986 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1987 {
1988 status = handle_request (sock, now, cmd, cmd_len+1);
1989 if (status != 0)
1990 goto out_close;
1991 }
1992 }
1994 out_close:
1995 close_connection(sock);
1997 /* Remove this thread from the connection threads list */
1998 pthread_mutex_lock (&connection_threads_lock);
1999 {
2000 pthread_t self;
2001 pthread_t *temp;
2003 /* Find out own index in the array */
2004 self = pthread_self ();
2005 for (i = 0; i < connection_threads_num; i++)
2006 if (pthread_equal (connection_threads[i], self) != 0)
2007 break;
2008 assert (i < connection_threads_num);
2010 /* Move the trailing threads forward. */
2011 if (i < (connection_threads_num - 1))
2012 {
2013 memmove (connection_threads + i,
2014 connection_threads + i + 1,
2015 sizeof (pthread_t) * (connection_threads_num - i - 1));
2016 }
2018 connection_threads_num--;
2020 temp = realloc(connection_threads,
2021 sizeof(*connection_threads) * connection_threads_num);
2022 if (connection_threads_num > 0 && temp == NULL)
2023 RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2024 else
2025 connection_threads = temp;
2026 }
2027 pthread_mutex_unlock (&connection_threads_lock);
2029 return (NULL);
2030 } /* }}} void *connection_thread_main */
2032 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2033 {
2034 int fd;
2035 struct sockaddr_un sa;
2036 listen_socket_t *temp;
2037 int status;
2038 const char *path;
2040 path = sock->addr;
2041 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2042 path += strlen("unix:");
2044 temp = (listen_socket_t *) realloc (listen_fds,
2045 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2046 if (temp == NULL)
2047 {
2048 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2049 return (-1);
2050 }
2051 listen_fds = temp;
2052 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2054 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2055 if (fd < 0)
2056 {
2057 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2058 rrd_strerror(errno));
2059 return (-1);
2060 }
2062 memset (&sa, 0, sizeof (sa));
2063 sa.sun_family = AF_UNIX;
2064 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2066 /* if we've gotten this far, we own the pid file. any daemon started
2067 * with the same args must not be alive. therefore, ensure that we can
2068 * create the socket...
2069 */
2070 unlink(path);
2072 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2073 if (status != 0)
2074 {
2075 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2076 path, rrd_strerror(errno));
2077 close (fd);
2078 return (-1);
2079 }
2081 status = listen (fd, /* backlog = */ 10);
2082 if (status != 0)
2083 {
2084 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2085 path, rrd_strerror(errno));
2086 close (fd);
2087 unlink (path);
2088 return (-1);
2089 }
2091 listen_fds[listen_fds_num].fd = fd;
2092 listen_fds[listen_fds_num].family = PF_UNIX;
2093 strncpy(listen_fds[listen_fds_num].addr, path,
2094 sizeof (listen_fds[listen_fds_num].addr) - 1);
2095 listen_fds_num++;
2097 return (0);
2098 } /* }}} int open_listen_socket_unix */
2100 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2101 {
2102 struct addrinfo ai_hints;
2103 struct addrinfo *ai_res;
2104 struct addrinfo *ai_ptr;
2105 char addr_copy[NI_MAXHOST];
2106 char *addr;
2107 char *port;
2108 int status;
2110 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2111 addr_copy[sizeof (addr_copy) - 1] = 0;
2112 addr = addr_copy;
2114 memset (&ai_hints, 0, sizeof (ai_hints));
2115 ai_hints.ai_flags = 0;
2116 #ifdef AI_ADDRCONFIG
2117 ai_hints.ai_flags |= AI_ADDRCONFIG;
2118 #endif
2119 ai_hints.ai_family = AF_UNSPEC;
2120 ai_hints.ai_socktype = SOCK_STREAM;
2122 port = NULL;
2123 if (*addr == '[') /* IPv6+port format */
2124 {
2125 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2126 addr++;
2128 port = strchr (addr, ']');
2129 if (port == NULL)
2130 {
2131 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2132 return (-1);
2133 }
2134 *port = 0;
2135 port++;
2137 if (*port == ':')
2138 port++;
2139 else if (*port == 0)
2140 port = NULL;
2141 else
2142 {
2143 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2144 return (-1);
2145 }
2146 } /* if (*addr = ']') */
2147 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2148 {
2149 port = rindex(addr, ':');
2150 if (port != NULL)
2151 {
2152 *port = 0;
2153 port++;
2154 }
2155 }
2156 ai_res = NULL;
2157 status = getaddrinfo (addr,
2158 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2159 &ai_hints, &ai_res);
2160 if (status != 0)
2161 {
2162 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2163 addr, gai_strerror (status));
2164 return (-1);
2165 }
2167 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2168 {
2169 int fd;
2170 listen_socket_t *temp;
2171 int one = 1;
2173 temp = (listen_socket_t *) realloc (listen_fds,
2174 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2175 if (temp == NULL)
2176 {
2177 fprintf (stderr,
2178 "rrdcached: open_listen_socket_network: realloc failed.\n");
2179 continue;
2180 }
2181 listen_fds = temp;
2182 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2184 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2185 if (fd < 0)
2186 {
2187 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2188 rrd_strerror(errno));
2189 continue;
2190 }
2192 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2194 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2195 if (status != 0)
2196 {
2197 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2198 sock->addr, rrd_strerror(errno));
2199 close (fd);
2200 continue;
2201 }
2203 status = listen (fd, /* backlog = */ 10);
2204 if (status != 0)
2205 {
2206 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2207 sock->addr, rrd_strerror(errno));
2208 close (fd);
2209 freeaddrinfo(ai_res);
2210 return (-1);
2211 }
2213 listen_fds[listen_fds_num].fd = fd;
2214 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2215 listen_fds_num++;
2216 } /* for (ai_ptr) */
2218 freeaddrinfo(ai_res);
2219 return (0);
2220 } /* }}} static int open_listen_socket_network */
2222 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2223 {
2224 assert(sock != NULL);
2225 assert(sock->addr != NULL);
2227 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2228 || sock->addr[0] == '/')
2229 return (open_listen_socket_unix(sock));
2230 else
2231 return (open_listen_socket_network(sock));
2232 } /* }}} int open_listen_socket */
2234 static int close_listen_sockets (void) /* {{{ */
2235 {
2236 size_t i;
2238 for (i = 0; i < listen_fds_num; i++)
2239 {
2240 close (listen_fds[i].fd);
2242 if (listen_fds[i].family == PF_UNIX)
2243 unlink(listen_fds[i].addr);
2244 }
2246 free (listen_fds);
2247 listen_fds = NULL;
2248 listen_fds_num = 0;
2250 return (0);
2251 } /* }}} int close_listen_sockets */
2253 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2254 {
2255 struct pollfd *pollfds;
2256 int pollfds_num;
2257 int status;
2258 int i;
2260 if (listen_fds_num < 1)
2261 {
2262 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2263 return (NULL);
2264 }
2266 pollfds_num = listen_fds_num;
2267 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2268 if (pollfds == NULL)
2269 {
2270 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2271 return (NULL);
2272 }
2273 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2275 RRDD_LOG(LOG_INFO, "listening for connections");
2277 while (do_shutdown == 0)
2278 {
2279 for (i = 0; i < pollfds_num; i++)
2280 {
2281 pollfds[i].fd = listen_fds[i].fd;
2282 pollfds[i].events = POLLIN | POLLPRI;
2283 pollfds[i].revents = 0;
2284 }
2286 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2287 if (do_shutdown)
2288 break;
2289 else if (status == 0) /* timeout */
2290 continue;
2291 else if (status < 0) /* error */
2292 {
2293 status = errno;
2294 if (status != EINTR)
2295 {
2296 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2297 }
2298 continue;
2299 }
2301 for (i = 0; i < pollfds_num; i++)
2302 {
2303 listen_socket_t *client_sock;
2304 struct sockaddr_storage client_sa;
2305 socklen_t client_sa_size;
2306 pthread_t tid;
2307 pthread_attr_t attr;
2309 if (pollfds[i].revents == 0)
2310 continue;
2312 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2313 {
2314 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2315 "poll(2) returned something unexpected for listen FD #%i.",
2316 pollfds[i].fd);
2317 continue;
2318 }
2320 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2321 if (client_sock == NULL)
2322 {
2323 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2324 continue;
2325 }
2326 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2328 client_sa_size = sizeof (client_sa);
2329 client_sock->fd = accept (pollfds[i].fd,
2330 (struct sockaddr *) &client_sa, &client_sa_size);
2331 if (client_sock->fd < 0)
2332 {
2333 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2334 free(client_sock);
2335 continue;
2336 }
2338 pthread_attr_init (&attr);
2339 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2341 status = pthread_create (&tid, &attr, connection_thread_main,
2342 client_sock);
2343 if (status != 0)
2344 {
2345 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2346 close_connection(client_sock);
2347 continue;
2348 }
2349 } /* for (pollfds_num) */
2350 } /* while (do_shutdown == 0) */
2352 RRDD_LOG(LOG_INFO, "starting shutdown");
2354 close_listen_sockets ();
2356 pthread_mutex_lock (&connection_threads_lock);
2357 while (connection_threads_num > 0)
2358 {
2359 pthread_t wait_for;
2361 wait_for = connection_threads[0];
2363 pthread_mutex_unlock (&connection_threads_lock);
2364 pthread_join (wait_for, /* retval = */ NULL);
2365 pthread_mutex_lock (&connection_threads_lock);
2366 }
2367 pthread_mutex_unlock (&connection_threads_lock);
2369 free(pollfds);
2371 return (NULL);
2372 } /* }}} void *listen_thread_main */
2374 static int daemonize (void) /* {{{ */
2375 {
2376 int pid_fd;
2377 char *base_dir;
2379 daemon_uid = geteuid();
2381 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2382 if (pid_fd < 0)
2383 pid_fd = check_pidfile();
2384 if (pid_fd < 0)
2385 return pid_fd;
2387 /* open all the listen sockets */
2388 if (config_listen_address_list_len > 0)
2389 {
2390 for (int i = 0; i < config_listen_address_list_len; i++)
2391 {
2392 open_listen_socket (config_listen_address_list[i]);
2393 free_listen_socket (config_listen_address_list[i]);
2394 }
2396 free(config_listen_address_list);
2397 }
2398 else
2399 {
2400 listen_socket_t sock;
2401 memset(&sock, 0, sizeof(sock));
2402 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2403 open_listen_socket (&sock);
2404 }
2406 if (listen_fds_num < 1)
2407 {
2408 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2409 goto error;
2410 }
2412 if (!stay_foreground)
2413 {
2414 pid_t child;
2416 child = fork ();
2417 if (child < 0)
2418 {
2419 fprintf (stderr, "daemonize: fork(2) failed.\n");
2420 goto error;
2421 }
2422 else if (child > 0)
2423 exit(0);
2425 /* Become session leader */
2426 setsid ();
2428 /* Open the first three file descriptors to /dev/null */
2429 close (2);
2430 close (1);
2431 close (0);
2433 open ("/dev/null", O_RDWR);
2434 dup (0);
2435 dup (0);
2436 } /* if (!stay_foreground) */
2438 /* Change into the /tmp directory. */
2439 base_dir = (config_base_dir != NULL)
2440 ? config_base_dir
2441 : "/tmp";
2443 if (chdir (base_dir) != 0)
2444 {
2445 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2446 goto error;
2447 }
2449 install_signal_handlers();
2451 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2452 RRDD_LOG(LOG_INFO, "starting up");
2454 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2455 (GDestroyNotify) free_cache_item);
2456 if (cache_tree == NULL)
2457 {
2458 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2459 goto error;
2460 }
2462 return write_pidfile (pid_fd);
2464 error:
2465 remove_pidfile();
2466 return -1;
2467 } /* }}} int daemonize */
2469 static int cleanup (void) /* {{{ */
2470 {
2471 do_shutdown++;
2473 pthread_cond_signal (&cache_cond);
2474 pthread_join (queue_thread, /* return = */ NULL);
2476 remove_pidfile ();
2478 free(config_base_dir);
2479 free(config_pid_file);
2480 free(journal_cur);
2481 free(journal_old);
2483 pthread_mutex_lock(&cache_lock);
2484 g_tree_destroy(cache_tree);
2486 RRDD_LOG(LOG_INFO, "goodbye");
2487 closelog ();
2489 return (0);
2490 } /* }}} int cleanup */
2492 static int read_options (int argc, char **argv) /* {{{ */
2493 {
2494 int option;
2495 int status = 0;
2497 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2498 {
2499 switch (option)
2500 {
2501 case 'g':
2502 stay_foreground=1;
2503 break;
2505 case 'L':
2506 case 'l':
2507 {
2508 listen_socket_t **temp;
2509 listen_socket_t *new;
2511 new = malloc(sizeof(listen_socket_t));
2512 if (new == NULL)
2513 {
2514 fprintf(stderr, "read_options: malloc failed.\n");
2515 return(2);
2516 }
2517 memset(new, 0, sizeof(listen_socket_t));
2519 temp = (listen_socket_t **) realloc (config_listen_address_list,
2520 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2521 if (temp == NULL)
2522 {
2523 fprintf (stderr, "read_options: realloc failed.\n");
2524 return (2);
2525 }
2526 config_listen_address_list = temp;
2528 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2529 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2531 temp[config_listen_address_list_len] = new;
2532 config_listen_address_list_len++;
2533 }
2534 break;
2536 case 'f':
2537 {
2538 int temp;
2540 temp = atoi (optarg);
2541 if (temp > 0)
2542 config_flush_interval = temp;
2543 else
2544 {
2545 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2546 status = 3;
2547 }
2548 }
2549 break;
2551 case 'w':
2552 {
2553 int temp;
2555 temp = atoi (optarg);
2556 if (temp > 0)
2557 config_write_interval = temp;
2558 else
2559 {
2560 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2561 status = 2;
2562 }
2563 }
2564 break;
2566 case 'z':
2567 {
2568 int temp;
2570 temp = atoi(optarg);
2571 if (temp > 0)
2572 config_write_jitter = temp;
2573 else
2574 {
2575 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2576 status = 2;
2577 }
2579 break;
2580 }
2582 case 'B':
2583 config_write_base_only = 1;
2584 break;
2586 case 'b':
2587 {
2588 size_t len;
2589 char base_realpath[PATH_MAX];
2591 if (config_base_dir != NULL)
2592 free (config_base_dir);
2593 config_base_dir = strdup (optarg);
2594 if (config_base_dir == NULL)
2595 {
2596 fprintf (stderr, "read_options: strdup failed.\n");
2597 return (3);
2598 }
2600 /* make sure that the base directory is not resolved via
2601 * symbolic links. this makes some performance-enhancing
2602 * assumptions possible (we don't have to resolve paths
2603 * that start with a "/")
2604 */
2605 if (realpath(config_base_dir, base_realpath) == NULL)
2606 {
2607 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2608 return 5;
2609 }
2610 else if (strncmp(config_base_dir,
2611 base_realpath, sizeof(base_realpath)) != 0)
2612 {
2613 fprintf(stderr,
2614 "Base directory (-b) resolved via file system links!\n"
2615 "Please consult rrdcached '-b' documentation!\n"
2616 "Consider specifying the real directory (%s)\n",
2617 base_realpath);
2618 return 5;
2619 }
2621 len = strlen (config_base_dir);
2622 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2623 {
2624 config_base_dir[len - 1] = 0;
2625 len--;
2626 }
2628 if (len < 1)
2629 {
2630 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2631 return (4);
2632 }
2634 _config_base_dir_len = len;
2635 }
2636 break;
2638 case 'p':
2639 {
2640 if (config_pid_file != NULL)
2641 free (config_pid_file);
2642 config_pid_file = strdup (optarg);
2643 if (config_pid_file == NULL)
2644 {
2645 fprintf (stderr, "read_options: strdup failed.\n");
2646 return (3);
2647 }
2648 }
2649 break;
2651 case 'F':
2652 config_flush_at_shutdown = 1;
2653 break;
2655 case 'j':
2656 {
2657 struct stat statbuf;
2658 const char *dir = optarg;
2660 status = stat(dir, &statbuf);
2661 if (status != 0)
2662 {
2663 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2664 return 6;
2665 }
2667 if (!S_ISDIR(statbuf.st_mode)
2668 || access(dir, R_OK|W_OK|X_OK) != 0)
2669 {
2670 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2671 errno ? rrd_strerror(errno) : "");
2672 return 6;
2673 }
2675 journal_cur = malloc(PATH_MAX + 1);
2676 journal_old = malloc(PATH_MAX + 1);
2677 if (journal_cur == NULL || journal_old == NULL)
2678 {
2679 fprintf(stderr, "malloc failure for journal files\n");
2680 return 6;
2681 }
2682 else
2683 {
2684 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2685 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2686 }
2687 }
2688 break;
2690 case 'h':
2691 case '?':
2692 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2693 "\n"
2694 "Usage: rrdcached [options]\n"
2695 "\n"
2696 "Valid options are:\n"
2697 " -l <address> Socket address to listen to.\n"
2698 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2699 " -w <seconds> Interval in which to write data.\n"
2700 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2701 " -f <seconds> Interval in which to flush dead data.\n"
2702 " -p <file> Location of the PID-file.\n"
2703 " -b <dir> Base directory to change to.\n"
2704 " -B Restrict file access to paths within -b <dir>\n"
2705 " -g Do not fork and run in the foreground.\n"
2706 " -j <dir> Directory in which to create the journal files.\n"
2707 " -F Always flush all updates at shutdown\n"
2708 "\n"
2709 "For more information and a detailed description of all options "
2710 "please refer\n"
2711 "to the rrdcached(1) manual page.\n",
2712 VERSION);
2713 status = -1;
2714 break;
2715 } /* switch (option) */
2716 } /* while (getopt) */
2718 /* advise the user when values are not sane */
2719 if (config_flush_interval < 2 * config_write_interval)
2720 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2721 " 2x write interval (-w) !\n");
2722 if (config_write_jitter > config_write_interval)
2723 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2724 " write interval (-w) !\n");
2726 if (config_write_base_only && config_base_dir == NULL)
2727 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2728 " Consult the rrdcached documentation\n");
2730 if (journal_cur == NULL)
2731 config_flush_at_shutdown = 1;
2733 return (status);
2734 } /* }}} int read_options */
2736 int main (int argc, char **argv)
2737 {
2738 int status;
2740 status = read_options (argc, argv);
2741 if (status != 0)
2742 {
2743 if (status < 0)
2744 status = 0;
2745 return (status);
2746 }
2748 status = daemonize ();
2749 if (status != 0)
2750 {
2751 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2752 return (1);
2753 }
2755 journal_init();
2757 /* start the queue thread */
2758 memset (&queue_thread, 0, sizeof (queue_thread));
2759 status = pthread_create (&queue_thread,
2760 NULL, /* attr */
2761 queue_thread_main,
2762 NULL); /* args */
2763 if (status != 0)
2764 {
2765 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2766 cleanup();
2767 return (1);
2768 }
2770 listen_thread_main (NULL);
2771 cleanup ();
2773 return (0);
2774 } /* int main */
2776 /*
2777 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2778 */