1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111 * Types
112 */
113 typedef enum
114 {
115 PRIV_LOW,
116 PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
122 {
123 int fd;
124 char addr[PATH_MAX + 1];
125 int family;
126 socket_privilege privilege;
128 /* state for BATCH processing */
129 time_t batch_start;
130 int batch_cmd;
132 /* buffered IO */
133 char *rbuf;
134 off_t next_cmd;
135 off_t next_read;
137 char *wbuf;
138 ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146 char *file;
147 char **values;
148 int values_num;
149 time_t last_flush_time;
150 time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153 int flags;
154 pthread_cond_t flushed;
155 cache_item_t *prev;
156 cache_item_t *next;
157 };
159 struct callback_flush_data_s
160 {
161 time_t now;
162 time_t abs_timeout;
163 char **keys;
164 size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
169 {
170 HEAD,
171 TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180 * Variables
181 */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree *cache_tree = NULL;
198 static cache_item_t *cache_queue_head = NULL;
199 static cache_item_t *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /*
234 * Functions
235 */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239 do_shutdown++;
240 pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250 sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255 config_flush_at_shutdown = 1;
256 sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261 config_flush_at_shutdown = 0;
262 sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
266 {
267 /* These structures are static, because `sigaction' behaves weird if the are
268 * overwritten.. */
269 static struct sigaction sa_int;
270 static struct sigaction sa_term;
271 static struct sigaction sa_pipe;
272 static struct sigaction sa_usr1;
273 static struct sigaction sa_usr2;
275 /* Install signal handlers */
276 memset (&sa_int, 0, sizeof (sa_int));
277 sa_int.sa_handler = sig_int_handler;
278 sigaction (SIGINT, &sa_int, NULL);
280 memset (&sa_term, 0, sizeof (sa_term));
281 sa_term.sa_handler = sig_term_handler;
282 sigaction (SIGTERM, &sa_term, NULL);
284 memset (&sa_pipe, 0, sizeof (sa_pipe));
285 sa_pipe.sa_handler = SIG_IGN;
286 sigaction (SIGPIPE, &sa_pipe, NULL);
288 memset (&sa_pipe, 0, sizeof (sa_usr1));
289 sa_usr1.sa_handler = sig_usr1_handler;
290 sigaction (SIGUSR1, &sa_usr1, NULL);
292 memset (&sa_usr2, 0, sizeof (sa_usr2));
293 sa_usr2.sa_handler = sig_usr2_handler;
294 sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300 int fd;
301 char *file;
303 file = (config_pid_file != NULL)
304 ? config_pid_file
305 : LOCALSTATEDIR "/run/rrdcached.pid";
307 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308 if (fd < 0)
309 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310 action, file, rrd_strerror(errno));
312 return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318 int pid_fd;
319 pid_t pid;
320 char pid_str[16];
322 pid_fd = open_pidfile("open", O_RDWR);
323 if (pid_fd < 0)
324 return pid_fd;
326 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327 return -1;
329 pid = atoi(pid_str);
330 if (pid <= 0)
331 return -1;
333 /* another running process that we can signal COULD be
334 * a competing rrdcached */
335 if (pid != getpid() && kill(pid, 0) == 0)
336 {
337 fprintf(stderr,
338 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339 close(pid_fd);
340 return -1;
341 }
343 lseek(pid_fd, 0, SEEK_SET);
344 ftruncate(pid_fd, 0);
346 fprintf(stderr,
347 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348 "rrdcached: starting normally.\n", pid);
350 return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
354 {
355 pid_t pid;
356 FILE *fh;
358 pid = getpid ();
360 fh = fdopen (fd, "w");
361 if (fh == NULL)
362 {
363 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364 close(fd);
365 return (-1);
366 }
368 fprintf (fh, "%i\n", (int) pid);
369 fclose (fh);
371 return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
375 {
376 char *file;
377 int status;
379 file = (config_pid_file != NULL)
380 ? config_pid_file
381 : LOCALSTATEDIR "/run/rrdcached.pid";
383 status = unlink (file);
384 if (status == 0)
385 return (0);
386 return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391 char *eol;
393 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394 sock->next_read - sock->next_cmd);
396 if (eol == NULL)
397 {
398 /* no commands left, move remainder back to front of rbuf */
399 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400 sock->next_read - sock->next_cmd);
401 sock->next_read -= sock->next_cmd;
402 sock->next_cmd = 0;
403 *len = 0;
404 return NULL;
405 }
406 else
407 {
408 char *cmd = sock->rbuf + sock->next_cmd;
409 *eol = '\0';
411 sock->next_cmd = eol - sock->rbuf + 1;
413 if (eol > sock->rbuf && *(eol-1) == '\r')
414 *(--eol) = '\0'; /* handle "\r\n" EOL */
416 *len = eol - cmd;
418 return cmd;
419 }
421 /* NOTREACHED */
422 assert(1==0);
423 }
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428 char *new_buf;
430 assert(sock != NULL);
432 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433 if (new_buf == NULL)
434 {
435 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436 return -1;
437 }
439 strncpy(new_buf + sock->wbuf_len, str, len + 1);
441 sock->wbuf = new_buf;
442 sock->wbuf_len += len;
444 return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450 va_list argp;
451 char buffer[CMD_MAX];
452 int len;
454 if (sock == NULL) return 0; /* journal replay mode */
455 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457 va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461 len = vsprintf(buffer, fmt, argp);
462 #endif
463 va_end(argp);
464 if (len < 0)
465 {
466 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467 return -1;
468 }
470 return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
474 {
475 int lines = 0;
477 if (str != NULL)
478 {
479 while ((str = strchr(str, '\n')) != NULL)
480 {
481 ++lines;
482 ++str;
483 }
484 }
486 return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490 * returns 0 on success, -1 on error
491 * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493 char *fmt, ...) /* {{{ */
494 {
495 va_list argp;
496 char buffer[CMD_MAX];
497 int lines;
498 ssize_t wrote;
499 int rclen, len;
501 if (sock == NULL) return rc; /* journal replay mode */
503 if (sock->batch_start)
504 {
505 if (rc == RESP_OK)
506 return rc; /* no response on success during BATCH */
507 lines = sock->batch_cmd;
508 }
509 else if (rc == RESP_OK)
510 lines = count_lines(sock->wbuf);
511 else
512 lines = -1;
514 rclen = sprintf(buffer, "%d ", lines);
515 va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519 len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521 va_end(argp);
522 if (len < 0)
523 return -1;
525 len += rclen;
527 /* append the result to the wbuf, don't write to the user */
528 if (sock->batch_start)
529 return add_to_wbuf(sock, buffer, len);
531 /* first write must be complete */
532 if (len != write(sock->fd, buffer, len))
533 {
534 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535 return -1;
536 }
538 if (sock->wbuf != NULL && rc == RESP_OK)
539 {
540 wrote = 0;
541 while (wrote < sock->wbuf_len)
542 {
543 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544 if (wb <= 0)
545 {
546 RRDD_LOG(LOG_INFO, "send_response: could not write results");
547 return -1;
548 }
549 wrote += wb;
550 }
551 }
553 free(sock->wbuf); sock->wbuf = NULL;
554 sock->wbuf_len = 0;
556 return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561 ci->values = NULL;
562 ci->values_num = 0;
564 ci->last_flush_time = when;
565 if (config_write_jitter > 0)
566 ci->last_flush_time += (random() % config_write_jitter);
567 }
569 /* remove_from_queue
570 * remove a "cache_item_t" item from the queue.
571 * must hold 'cache_lock' when calling this
572 */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575 if (ci == NULL) return;
576 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578 if (ci->prev == NULL)
579 cache_queue_head = ci->next; /* reset head */
580 else
581 ci->prev->next = ci->next;
583 if (ci->next == NULL)
584 cache_queue_tail = ci->prev; /* reset the tail */
585 else
586 ci->next->prev = ci->prev;
588 ci->next = ci->prev = NULL;
589 ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
592 /* free the resources associated with the cache_item_t
593 * must hold cache_lock when calling this function
594 */
595 static void *free_cache_item(cache_item_t *ci) /* {{{ */
596 {
597 if (ci == NULL) return NULL;
599 remove_from_queue(ci);
601 for (int i=0; i < ci->values_num; i++)
602 free(ci->values[i]);
604 free (ci->values);
605 free (ci->file);
607 /* in case anyone is waiting */
608 pthread_cond_broadcast(&ci->flushed);
610 free (ci);
612 return NULL;
613 } /* }}} static void *free_cache_item */
615 /*
616 * enqueue_cache_item:
617 * `cache_lock' must be acquired before calling this function!
618 */
619 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
620 queue_side_t side)
621 {
622 if (ci == NULL)
623 return (-1);
625 if (ci->values_num == 0)
626 return (0);
628 if (side == HEAD)
629 {
630 if (cache_queue_head == ci)
631 return 0;
633 /* remove if further down in queue */
634 remove_from_queue(ci);
636 ci->prev = NULL;
637 ci->next = cache_queue_head;
638 if (ci->next != NULL)
639 ci->next->prev = ci;
640 cache_queue_head = ci;
642 if (cache_queue_tail == NULL)
643 cache_queue_tail = cache_queue_head;
644 }
645 else /* (side == TAIL) */
646 {
647 /* We don't move values back in the list.. */
648 if (ci->flags & CI_FLAGS_IN_QUEUE)
649 return (0);
651 assert (ci->next == NULL);
652 assert (ci->prev == NULL);
654 ci->prev = cache_queue_tail;
656 if (cache_queue_tail == NULL)
657 cache_queue_head = ci;
658 else
659 cache_queue_tail->next = ci;
661 cache_queue_tail = ci;
662 }
664 ci->flags |= CI_FLAGS_IN_QUEUE;
666 pthread_cond_broadcast(&cache_cond);
667 pthread_mutex_lock (&stats_lock);
668 stats_queue_length++;
669 pthread_mutex_unlock (&stats_lock);
671 return (0);
672 } /* }}} int enqueue_cache_item */
674 /*
675 * tree_callback_flush:
676 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
677 * while this is in progress.
678 */
679 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
680 gpointer data)
681 {
682 cache_item_t *ci;
683 callback_flush_data_t *cfd;
685 ci = (cache_item_t *) value;
686 cfd = (callback_flush_data_t *) data;
688 if (ci->flags & CI_FLAGS_IN_QUEUE)
689 return FALSE;
691 if ((ci->last_flush_time <= cfd->abs_timeout)
692 && (ci->values_num > 0))
693 {
694 enqueue_cache_item (ci, TAIL);
695 }
696 else if ((do_shutdown != 0)
697 && (ci->values_num > 0))
698 {
699 enqueue_cache_item (ci, TAIL);
700 }
701 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
702 && (ci->values_num <= 0))
703 {
704 char **temp;
706 temp = (char **) realloc (cfd->keys,
707 sizeof (char *) * (cfd->keys_num + 1));
708 if (temp == NULL)
709 {
710 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
711 return (FALSE);
712 }
713 cfd->keys = temp;
714 /* Make really sure this points to the _same_ place */
715 assert ((char *) key == ci->file);
716 cfd->keys[cfd->keys_num] = (char *) key;
717 cfd->keys_num++;
718 }
720 return (FALSE);
721 } /* }}} gboolean tree_callback_flush */
723 static int flush_old_values (int max_age)
724 {
725 callback_flush_data_t cfd;
726 size_t k;
728 memset (&cfd, 0, sizeof (cfd));
729 /* Pass the current time as user data so that we don't need to call
730 * `time' for each node. */
731 cfd.now = time (NULL);
732 cfd.keys = NULL;
733 cfd.keys_num = 0;
735 if (max_age > 0)
736 cfd.abs_timeout = cfd.now - max_age;
737 else
738 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
740 /* `tree_callback_flush' will return the keys of all values that haven't
741 * been touched in the last `config_flush_interval' seconds in `cfd'.
742 * The char*'s in this array point to the same memory as ci->file, so we
743 * don't need to free them separately. */
744 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
746 for (k = 0; k < cfd.keys_num; k++)
747 {
748 /* should never fail, since we have held the cache_lock
749 * the entire time */
750 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
751 }
753 if (cfd.keys != NULL)
754 {
755 free (cfd.keys);
756 cfd.keys = NULL;
757 }
759 return (0);
760 } /* int flush_old_values */
762 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
763 {
764 struct timeval now;
765 struct timespec next_flush;
766 int final_flush = 0; /* make sure we only flush once on shutdown */
768 gettimeofday (&now, NULL);
769 next_flush.tv_sec = now.tv_sec + config_flush_interval;
770 next_flush.tv_nsec = 1000 * now.tv_usec;
772 pthread_mutex_lock (&cache_lock);
773 while ((do_shutdown == 0) || (cache_queue_head != NULL))
774 {
775 cache_item_t *ci;
776 char *file;
777 char **values;
778 int values_num;
779 int status;
780 int i;
782 /* First, check if it's time to do the cache flush. */
783 gettimeofday (&now, NULL);
784 if ((now.tv_sec > next_flush.tv_sec)
785 || ((now.tv_sec == next_flush.tv_sec)
786 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
787 {
788 /* Flush all values that haven't been written in the last
789 * `config_write_interval' seconds. */
790 flush_old_values (config_write_interval);
792 /* Determine the time of the next cache flush. */
793 next_flush.tv_sec =
794 now.tv_sec + next_flush.tv_sec % config_flush_interval;
796 /* unlock the cache while we rotate so we don't block incoming
797 * updates if the fsync() blocks on disk I/O */
798 pthread_mutex_unlock(&cache_lock);
799 journal_rotate();
800 pthread_mutex_lock(&cache_lock);
801 }
803 /* Now, check if there's something to store away. If not, wait until
804 * something comes in or it's time to do the cache flush. if we are
805 * shutting down, do not wait around. */
806 if (cache_queue_head == NULL && !do_shutdown)
807 {
808 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
809 if ((status != 0) && (status != ETIMEDOUT))
810 {
811 RRDD_LOG (LOG_ERR, "queue_thread_main: "
812 "pthread_cond_timedwait returned %i.", status);
813 }
814 }
816 /* We're about to shut down */
817 if (do_shutdown != 0 && !final_flush++)
818 {
819 if (config_flush_at_shutdown)
820 flush_old_values (-1); /* flush everything */
821 else
822 break;
823 }
825 /* Check if a value has arrived. This may be NULL if we timed out or there
826 * was an interrupt such as a signal. */
827 if (cache_queue_head == NULL)
828 continue;
830 ci = cache_queue_head;
832 /* copy the relevant parts */
833 file = strdup (ci->file);
834 if (file == NULL)
835 {
836 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
837 continue;
838 }
840 assert(ci->values != NULL);
841 assert(ci->values_num > 0);
843 values = ci->values;
844 values_num = ci->values_num;
846 wipe_ci_values(ci, time(NULL));
847 remove_from_queue(ci);
849 pthread_mutex_lock (&stats_lock);
850 assert (stats_queue_length > 0);
851 stats_queue_length--;
852 pthread_mutex_unlock (&stats_lock);
854 pthread_mutex_unlock (&cache_lock);
856 rrd_clear_error ();
857 status = rrd_update_r (file, NULL, values_num, (void *) values);
858 if (status != 0)
859 {
860 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
861 "rrd_update_r (%s) failed with status %i. (%s)",
862 file, status, rrd_get_error());
863 }
865 journal_write("wrote", file);
866 pthread_cond_broadcast(&ci->flushed);
868 for (i = 0; i < values_num; i++)
869 free (values[i]);
871 free(values);
872 free(file);
874 if (status == 0)
875 {
876 pthread_mutex_lock (&stats_lock);
877 stats_updates_written++;
878 stats_data_sets_written += values_num;
879 pthread_mutex_unlock (&stats_lock);
880 }
882 pthread_mutex_lock (&cache_lock);
884 /* We're about to shut down */
885 if (do_shutdown != 0 && !final_flush++)
886 {
887 if (config_flush_at_shutdown)
888 flush_old_values (-1); /* flush everything */
889 else
890 break;
891 }
892 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
893 pthread_mutex_unlock (&cache_lock);
895 if (config_flush_at_shutdown)
896 {
897 assert(cache_queue_head == NULL);
898 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
899 }
901 journal_done();
903 return (NULL);
904 } /* }}} void *queue_thread_main */
906 static int buffer_get_field (char **buffer_ret, /* {{{ */
907 size_t *buffer_size_ret, char **field_ret)
908 {
909 char *buffer;
910 size_t buffer_pos;
911 size_t buffer_size;
912 char *field;
913 size_t field_size;
914 int status;
916 buffer = *buffer_ret;
917 buffer_pos = 0;
918 buffer_size = *buffer_size_ret;
919 field = *buffer_ret;
920 field_size = 0;
922 if (buffer_size <= 0)
923 return (-1);
925 /* This is ensured by `handle_request'. */
926 assert (buffer[buffer_size - 1] == '\0');
928 status = -1;
929 while (buffer_pos < buffer_size)
930 {
931 /* Check for end-of-field or end-of-buffer */
932 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
933 {
934 field[field_size] = 0;
935 field_size++;
936 buffer_pos++;
937 status = 0;
938 break;
939 }
940 /* Handle escaped characters. */
941 else if (buffer[buffer_pos] == '\\')
942 {
943 if (buffer_pos >= (buffer_size - 1))
944 break;
945 buffer_pos++;
946 field[field_size] = buffer[buffer_pos];
947 field_size++;
948 buffer_pos++;
949 }
950 /* Normal operation */
951 else
952 {
953 field[field_size] = buffer[buffer_pos];
954 field_size++;
955 buffer_pos++;
956 }
957 } /* while (buffer_pos < buffer_size) */
959 if (status != 0)
960 return (status);
962 *buffer_ret = buffer + buffer_pos;
963 *buffer_size_ret = buffer_size - buffer_pos;
964 *field_ret = field;
966 return (0);
967 } /* }}} int buffer_get_field */
969 /* if we're restricting writes to the base directory,
970 * check whether the file falls within the dir
971 * returns 1 if OK, otherwise 0
972 */
973 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
974 {
975 assert(file != NULL);
977 if (!config_write_base_only
978 || sock == NULL /* journal replay */
979 || config_base_dir == NULL)
980 return 1;
982 if (strstr(file, "../") != NULL) goto err;
984 /* relative paths without "../" are ok */
985 if (*file != '/') return 1;
987 /* file must be of the format base + "/" + <1+ char filename> */
988 if (strlen(file) < _config_base_dir_len + 2) goto err;
989 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
990 if (*(file + _config_base_dir_len) != '/') goto err;
992 return 1;
994 err:
995 if (sock != NULL && sock->fd >= 0)
996 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
998 return 0;
999 } /* }}} static int check_file_access */
1001 /* when using a base dir, convert relative paths to absolute paths.
1002 * if necessary, modifies the "filename" pointer to point
1003 * to the new path created in "tmp". "tmp" is provided
1004 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1005 *
1006 * this allows us to optimize for the expected case (absolute path)
1007 * with a no-op.
1008 */
1009 static void get_abs_path(char **filename, char *tmp)
1010 {
1011 assert(tmp != NULL);
1012 assert(filename != NULL && *filename != NULL);
1014 if (config_base_dir == NULL || **filename == '/')
1015 return;
1017 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1018 *filename = tmp;
1019 } /* }}} static int get_abs_path */
1021 /* returns 1 if we have the required privilege level,
1022 * otherwise issue an error to the user on sock */
1023 static int has_privilege (listen_socket_t *sock, /* {{{ */
1024 socket_privilege priv)
1025 {
1026 if (sock == NULL) /* journal replay */
1027 return 1;
1029 if (sock->privilege >= priv)
1030 return 1;
1032 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033 } /* }}} static int has_privilege */
1035 static int flush_file (const char *filename) /* {{{ */
1036 {
1037 cache_item_t *ci;
1039 pthread_mutex_lock (&cache_lock);
1041 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1042 if (ci == NULL)
1043 {
1044 pthread_mutex_unlock (&cache_lock);
1045 return (ENOENT);
1046 }
1048 if (ci->values_num > 0)
1049 {
1050 /* Enqueue at head */
1051 enqueue_cache_item (ci, HEAD);
1052 pthread_cond_wait(&ci->flushed, &cache_lock);
1053 }
1055 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1056 * may have been purged during our cond_wait() */
1058 pthread_mutex_unlock(&cache_lock);
1060 return (0);
1061 } /* }}} int flush_file */
1063 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1064 char *buffer, size_t buffer_size)
1065 {
1066 int status;
1067 char **help_text;
1068 char *command;
1070 char *help_help[2] =
1071 {
1072 "Command overview\n"
1073 ,
1074 "HELP [<command>]\n"
1075 "FLUSH <filename>\n"
1076 "FLUSHALL\n"
1077 "PENDING <filename>\n"
1078 "FORGET <filename>\n"
1079 "UPDATE <filename> <values> [<values> ...]\n"
1080 "BATCH\n"
1081 "STATS\n"
1082 "QUIT\n"
1083 };
1085 char *help_flush[2] =
1086 {
1087 "Help for FLUSH\n"
1088 ,
1089 "Usage: FLUSH <filename>\n"
1090 "\n"
1091 "Adds the given filename to the head of the update queue and returns\n"
1092 "after is has been dequeued.\n"
1093 };
1095 char *help_flushall[2] =
1096 {
1097 "Help for FLUSHALL\n"
1098 ,
1099 "Usage: FLUSHALL\n"
1100 "\n"
1101 "Triggers writing of all pending updates. Returns immediately.\n"
1102 };
1104 char *help_pending[2] =
1105 {
1106 "Help for PENDING\n"
1107 ,
1108 "Usage: PENDING <filename>\n"
1109 "\n"
1110 "Shows any 'pending' updates for a file, in order.\n"
1111 "The updates shown have not yet been written to the underlying RRD file.\n"
1112 };
1114 char *help_forget[2] =
1115 {
1116 "Help for FORGET\n"
1117 ,
1118 "Usage: FORGET <filename>\n"
1119 "\n"
1120 "Removes the file completely from the cache.\n"
1121 "Any pending updates for the file will be lost.\n"
1122 };
1124 char *help_update[2] =
1125 {
1126 "Help for UPDATE\n"
1127 ,
1128 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1129 "\n"
1130 "Adds the given file to the internal cache if it is not yet known and\n"
1131 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1132 "for details.\n"
1133 "\n"
1134 "Each <values> has the following form:\n"
1135 " <values> = <time>:<value>[:<value>[...]]\n"
1136 "See the rrdupdate(1) manpage for details.\n"
1137 };
1139 char *help_stats[2] =
1140 {
1141 "Help for STATS\n"
1142 ,
1143 "Usage: STATS\n"
1144 "\n"
1145 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1146 "a description of the values.\n"
1147 };
1149 char *help_batch[2] =
1150 {
1151 "Help for BATCH\n"
1152 ,
1153 "The 'BATCH' command permits the client to initiate a bulk load\n"
1154 " of commands to rrdcached.\n"
1155 "\n"
1156 "Usage:\n"
1157 "\n"
1158 " client: BATCH\n"
1159 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1160 " client: command #1\n"
1161 " client: command #2\n"
1162 " client: ... and so on\n"
1163 " client: .\n"
1164 " server: 2 errors\n"
1165 " server: 7 message for command #7\n"
1166 " server: 9 message for command #9\n"
1167 "\n"
1168 "For more information, consult the rrdcached(1) documentation.\n"
1169 };
1171 char *help_quit[2] =
1172 {
1173 "Help for QUIT\n"
1174 ,
1175 "Disconnect from rrdcached.\n"
1176 };
1178 status = buffer_get_field (&buffer, &buffer_size, &command);
1179 if (status != 0)
1180 help_text = help_help;
1181 else
1182 {
1183 if (strcasecmp (command, "update") == 0)
1184 help_text = help_update;
1185 else if (strcasecmp (command, "flush") == 0)
1186 help_text = help_flush;
1187 else if (strcasecmp (command, "flushall") == 0)
1188 help_text = help_flushall;
1189 else if (strcasecmp (command, "pending") == 0)
1190 help_text = help_pending;
1191 else if (strcasecmp (command, "forget") == 0)
1192 help_text = help_forget;
1193 else if (strcasecmp (command, "stats") == 0)
1194 help_text = help_stats;
1195 else if (strcasecmp (command, "batch") == 0)
1196 help_text = help_batch;
1197 else if (strcasecmp (command, "quit") == 0)
1198 help_text = help_quit;
1199 else
1200 help_text = help_help;
1201 }
1203 add_response_info(sock, help_text[1]);
1204 return send_response(sock, RESP_OK, help_text[0]);
1205 } /* }}} int handle_request_help */
1207 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1208 {
1209 uint64_t copy_queue_length;
1210 uint64_t copy_updates_received;
1211 uint64_t copy_flush_received;
1212 uint64_t copy_updates_written;
1213 uint64_t copy_data_sets_written;
1214 uint64_t copy_journal_bytes;
1215 uint64_t copy_journal_rotate;
1217 uint64_t tree_nodes_number;
1218 uint64_t tree_depth;
1220 pthread_mutex_lock (&stats_lock);
1221 copy_queue_length = stats_queue_length;
1222 copy_updates_received = stats_updates_received;
1223 copy_flush_received = stats_flush_received;
1224 copy_updates_written = stats_updates_written;
1225 copy_data_sets_written = stats_data_sets_written;
1226 copy_journal_bytes = stats_journal_bytes;
1227 copy_journal_rotate = stats_journal_rotate;
1228 pthread_mutex_unlock (&stats_lock);
1230 pthread_mutex_lock (&cache_lock);
1231 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1232 tree_depth = (uint64_t) g_tree_height (cache_tree);
1233 pthread_mutex_unlock (&cache_lock);
1235 add_response_info(sock,
1236 "QueueLength: %"PRIu64"\n", copy_queue_length);
1237 add_response_info(sock,
1238 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1239 add_response_info(sock,
1240 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1241 add_response_info(sock,
1242 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1243 add_response_info(sock,
1244 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1245 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1246 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1247 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1248 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1250 send_response(sock, RESP_OK, "Statistics follow\n");
1252 return (0);
1253 } /* }}} int handle_request_stats */
1255 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1256 char *buffer, size_t buffer_size)
1257 {
1258 char *file, file_tmp[PATH_MAX];
1259 int status;
1261 status = buffer_get_field (&buffer, &buffer_size, &file);
1262 if (status != 0)
1263 {
1264 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1265 }
1266 else
1267 {
1268 pthread_mutex_lock(&stats_lock);
1269 stats_flush_received++;
1270 pthread_mutex_unlock(&stats_lock);
1272 get_abs_path(&file, file_tmp);
1273 if (!check_file_access(file, sock)) return 0;
1275 status = flush_file (file);
1276 if (status == 0)
1277 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1278 else if (status == ENOENT)
1279 {
1280 /* no file in our tree; see whether it exists at all */
1281 struct stat statbuf;
1283 memset(&statbuf, 0, sizeof(statbuf));
1284 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1285 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1286 else
1287 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1288 }
1289 else if (status < 0)
1290 return send_response(sock, RESP_ERR, "Internal error.\n");
1291 else
1292 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1293 }
1295 /* NOTREACHED */
1296 assert(1==0);
1297 } /* }}} int handle_request_flush */
1299 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1300 {
1301 int status;
1303 status = has_privilege(sock, PRIV_HIGH);
1304 if (status <= 0)
1305 return status;
1307 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1309 pthread_mutex_lock(&cache_lock);
1310 flush_old_values(-1);
1311 pthread_mutex_unlock(&cache_lock);
1313 return send_response(sock, RESP_OK, "Started flush.\n");
1314 } /* }}} static int handle_request_flushall */
1316 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1317 char *buffer, size_t buffer_size)
1318 {
1319 int status;
1320 char *file, file_tmp[PATH_MAX];
1321 cache_item_t *ci;
1323 status = buffer_get_field(&buffer, &buffer_size, &file);
1324 if (status != 0)
1325 return send_response(sock, RESP_ERR,
1326 "Usage: PENDING <filename>\n");
1328 status = has_privilege(sock, PRIV_HIGH);
1329 if (status <= 0)
1330 return status;
1332 get_abs_path(&file, file_tmp);
1334 pthread_mutex_lock(&cache_lock);
1335 ci = g_tree_lookup(cache_tree, file);
1336 if (ci == NULL)
1337 {
1338 pthread_mutex_unlock(&cache_lock);
1339 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1340 }
1342 for (int i=0; i < ci->values_num; i++)
1343 add_response_info(sock, "%s\n", ci->values[i]);
1345 pthread_mutex_unlock(&cache_lock);
1346 return send_response(sock, RESP_OK, "updates pending\n");
1347 } /* }}} static int handle_request_pending */
1349 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1350 char *buffer, size_t buffer_size)
1351 {
1352 int status;
1353 gboolean found;
1354 char *file, file_tmp[PATH_MAX];
1356 status = buffer_get_field(&buffer, &buffer_size, &file);
1357 if (status != 0)
1358 return send_response(sock, RESP_ERR,
1359 "Usage: FORGET <filename>\n");
1361 status = has_privilege(sock, PRIV_HIGH);
1362 if (status <= 0)
1363 return status;
1365 get_abs_path(&file, file_tmp);
1366 if (!check_file_access(file, sock)) return 0;
1368 pthread_mutex_lock(&cache_lock);
1369 found = g_tree_remove(cache_tree, file);
1370 pthread_mutex_unlock(&cache_lock);
1372 if (found == TRUE)
1373 {
1374 if (sock != NULL)
1375 journal_write("forget", file);
1377 return send_response(sock, RESP_OK, "Gone!\n");
1378 }
1379 else
1380 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1382 /* NOTREACHED */
1383 assert(1==0);
1384 } /* }}} static int handle_request_forget */
1386 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1387 time_t now,
1388 char *buffer, size_t buffer_size)
1389 {
1390 char *file, file_tmp[PATH_MAX];
1391 int values_num = 0;
1392 int status;
1393 char orig_buf[CMD_MAX];
1395 cache_item_t *ci;
1397 status = has_privilege(sock, PRIV_HIGH);
1398 if (status <= 0)
1399 return status;
1401 /* save it for the journal later */
1402 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1404 status = buffer_get_field (&buffer, &buffer_size, &file);
1405 if (status != 0)
1406 return send_response(sock, RESP_ERR,
1407 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1409 pthread_mutex_lock(&stats_lock);
1410 stats_updates_received++;
1411 pthread_mutex_unlock(&stats_lock);
1413 get_abs_path(&file, file_tmp);
1414 if (!check_file_access(file, sock)) return 0;
1416 pthread_mutex_lock (&cache_lock);
1417 ci = g_tree_lookup (cache_tree, file);
1419 if (ci == NULL) /* {{{ */
1420 {
1421 struct stat statbuf;
1423 /* don't hold the lock while we setup; stat(2) might block */
1424 pthread_mutex_unlock(&cache_lock);
1426 memset (&statbuf, 0, sizeof (statbuf));
1427 status = stat (file, &statbuf);
1428 if (status != 0)
1429 {
1430 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1432 status = errno;
1433 if (status == ENOENT)
1434 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1435 else
1436 return send_response(sock, RESP_ERR,
1437 "stat failed with error %i.\n", status);
1438 }
1439 if (!S_ISREG (statbuf.st_mode))
1440 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1442 if (access(file, R_OK|W_OK) != 0)
1443 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1444 file, rrd_strerror(errno));
1446 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1447 if (ci == NULL)
1448 {
1449 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1451 return send_response(sock, RESP_ERR, "malloc failed.\n");
1452 }
1453 memset (ci, 0, sizeof (cache_item_t));
1455 ci->file = strdup (file);
1456 if (ci->file == NULL)
1457 {
1458 free (ci);
1459 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1461 return send_response(sock, RESP_ERR, "strdup failed.\n");
1462 }
1464 wipe_ci_values(ci, now);
1465 ci->flags = CI_FLAGS_IN_TREE;
1466 pthread_cond_init(&ci->flushed, NULL);
1468 pthread_mutex_lock(&cache_lock);
1469 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1470 } /* }}} */
1471 assert (ci != NULL);
1473 /* don't re-write updates in replay mode */
1474 if (sock != NULL)
1475 journal_write("update", orig_buf);
1477 while (buffer_size > 0)
1478 {
1479 char **temp;
1480 char *value;
1481 time_t stamp;
1482 char *eostamp;
1484 status = buffer_get_field (&buffer, &buffer_size, &value);
1485 if (status != 0)
1486 {
1487 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1488 break;
1489 }
1491 /* make sure update time is always moving forward */
1492 stamp = strtol(value, &eostamp, 10);
1493 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1494 {
1495 pthread_mutex_unlock(&cache_lock);
1496 return send_response(sock, RESP_ERR,
1497 "Cannot find timestamp in '%s'!\n", value);
1498 }
1499 else if (stamp <= ci->last_update_stamp)
1500 {
1501 pthread_mutex_unlock(&cache_lock);
1502 return send_response(sock, RESP_ERR,
1503 "illegal attempt to update using time %ld when last"
1504 " update time is %ld (minimum one second step)\n",
1505 stamp, ci->last_update_stamp);
1506 }
1507 else
1508 ci->last_update_stamp = stamp;
1510 temp = (char **) realloc (ci->values,
1511 sizeof (char *) * (ci->values_num + 1));
1512 if (temp == NULL)
1513 {
1514 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1515 continue;
1516 }
1517 ci->values = temp;
1519 ci->values[ci->values_num] = strdup (value);
1520 if (ci->values[ci->values_num] == NULL)
1521 {
1522 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1523 continue;
1524 }
1525 ci->values_num++;
1527 values_num++;
1528 }
1530 if (((now - ci->last_flush_time) >= config_write_interval)
1531 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1532 && (ci->values_num > 0))
1533 {
1534 enqueue_cache_item (ci, TAIL);
1535 }
1537 pthread_mutex_unlock (&cache_lock);
1539 if (values_num < 1)
1540 return send_response(sock, RESP_ERR, "No values updated.\n");
1541 else
1542 return send_response(sock, RESP_OK,
1543 "errors, enqueued %i value(s).\n", values_num);
1545 /* NOTREACHED */
1546 assert(1==0);
1548 } /* }}} int handle_request_update */
1550 /* we came across a "WROTE" entry during journal replay.
1551 * throw away any values that we have accumulated for this file
1552 */
1553 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1554 {
1555 int i;
1556 cache_item_t *ci;
1557 const char *file = buffer;
1559 pthread_mutex_lock(&cache_lock);
1561 ci = g_tree_lookup(cache_tree, file);
1562 if (ci == NULL)
1563 {
1564 pthread_mutex_unlock(&cache_lock);
1565 return (0);
1566 }
1568 if (ci->values)
1569 {
1570 for (i=0; i < ci->values_num; i++)
1571 free(ci->values[i]);
1573 free(ci->values);
1574 }
1576 wipe_ci_values(ci, now);
1577 remove_from_queue(ci);
1579 pthread_mutex_unlock(&cache_lock);
1580 return (0);
1581 } /* }}} int handle_request_wrote */
1583 /* start "BATCH" processing */
1584 static int batch_start (listen_socket_t *sock) /* {{{ */
1585 {
1586 int status;
1587 if (sock->batch_start)
1588 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1590 status = send_response(sock, RESP_OK,
1591 "Go ahead. End with dot '.' on its own line.\n");
1592 sock->batch_start = time(NULL);
1593 sock->batch_cmd = 0;
1595 return status;
1596 } /* }}} static int batch_start */
1598 /* finish "BATCH" processing and return results to the client */
1599 static int batch_done (listen_socket_t *sock) /* {{{ */
1600 {
1601 assert(sock->batch_start);
1602 sock->batch_start = 0;
1603 sock->batch_cmd = 0;
1604 return send_response(sock, RESP_OK, "errors\n");
1605 } /* }}} static int batch_done */
1607 /* if sock==NULL, we are in journal replay mode */
1608 static int handle_request (listen_socket_t *sock, /* {{{ */
1609 time_t now,
1610 char *buffer, size_t buffer_size)
1611 {
1612 char *buffer_ptr;
1613 char *command;
1614 int status;
1616 assert (buffer[buffer_size - 1] == '\0');
1618 buffer_ptr = buffer;
1619 command = NULL;
1620 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1621 if (status != 0)
1622 {
1623 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1624 return (-1);
1625 }
1627 if (sock != NULL && sock->batch_start)
1628 sock->batch_cmd++;
1630 if (strcasecmp (command, "update") == 0)
1631 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1632 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1633 {
1634 /* this is only valid in replay mode */
1635 return (handle_request_wrote (buffer_ptr, now));
1636 }
1637 else if (strcasecmp (command, "flush") == 0)
1638 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1639 else if (strcasecmp (command, "flushall") == 0)
1640 return (handle_request_flushall(sock));
1641 else if (strcasecmp (command, "pending") == 0)
1642 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1643 else if (strcasecmp (command, "forget") == 0)
1644 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1645 else if (strcasecmp (command, "stats") == 0)
1646 return (handle_request_stats (sock));
1647 else if (strcasecmp (command, "help") == 0)
1648 return (handle_request_help (sock, buffer_ptr, buffer_size));
1649 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1650 return batch_start(sock);
1651 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1652 return batch_done(sock);
1653 else if (strcasecmp (command, "quit") == 0)
1654 return -1;
1655 else
1656 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1658 /* NOTREACHED */
1659 assert(1==0);
1660 } /* }}} int handle_request */
1662 /* MUST NOT hold journal_lock before calling this */
1663 static void journal_rotate(void) /* {{{ */
1664 {
1665 FILE *old_fh = NULL;
1666 int new_fd;
1668 if (journal_cur == NULL || journal_old == NULL)
1669 return;
1671 pthread_mutex_lock(&journal_lock);
1673 /* we rotate this way (rename before close) so that the we can release
1674 * the journal lock as fast as possible. Journal writes to the new
1675 * journal can proceed immediately after the new file is opened. The
1676 * fclose can then block without affecting new updates.
1677 */
1678 if (journal_fh != NULL)
1679 {
1680 old_fh = journal_fh;
1681 journal_fh = NULL;
1682 rename(journal_cur, journal_old);
1683 ++stats_journal_rotate;
1684 }
1686 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1687 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1688 if (new_fd >= 0)
1689 {
1690 journal_fh = fdopen(new_fd, "a");
1691 if (journal_fh == NULL)
1692 close(new_fd);
1693 }
1695 pthread_mutex_unlock(&journal_lock);
1697 if (old_fh != NULL)
1698 fclose(old_fh);
1700 if (journal_fh == NULL)
1701 {
1702 RRDD_LOG(LOG_CRIT,
1703 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1704 journal_cur, rrd_strerror(errno));
1706 RRDD_LOG(LOG_ERR,
1707 "JOURNALING DISABLED: All values will be flushed at shutdown");
1708 config_flush_at_shutdown = 1;
1709 }
1711 } /* }}} static void journal_rotate */
1713 static void journal_done(void) /* {{{ */
1714 {
1715 if (journal_cur == NULL)
1716 return;
1718 pthread_mutex_lock(&journal_lock);
1719 if (journal_fh != NULL)
1720 {
1721 fclose(journal_fh);
1722 journal_fh = NULL;
1723 }
1725 if (config_flush_at_shutdown)
1726 {
1727 RRDD_LOG(LOG_INFO, "removing journals");
1728 unlink(journal_old);
1729 unlink(journal_cur);
1730 }
1731 else
1732 {
1733 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1734 "journals will be used at next startup");
1735 }
1737 pthread_mutex_unlock(&journal_lock);
1739 } /* }}} static void journal_done */
1741 static int journal_write(char *cmd, char *args) /* {{{ */
1742 {
1743 int chars;
1745 if (journal_fh == NULL)
1746 return 0;
1748 pthread_mutex_lock(&journal_lock);
1749 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1750 pthread_mutex_unlock(&journal_lock);
1752 if (chars > 0)
1753 {
1754 pthread_mutex_lock(&stats_lock);
1755 stats_journal_bytes += chars;
1756 pthread_mutex_unlock(&stats_lock);
1757 }
1759 return chars;
1760 } /* }}} static int journal_write */
1762 static int journal_replay (const char *file) /* {{{ */
1763 {
1764 FILE *fh;
1765 int entry_cnt = 0;
1766 int fail_cnt = 0;
1767 uint64_t line = 0;
1768 char entry[CMD_MAX];
1769 time_t now;
1771 if (file == NULL) return 0;
1773 {
1774 char *reason = "unknown error";
1775 int status = 0;
1776 struct stat statbuf;
1778 memset(&statbuf, 0, sizeof(statbuf));
1779 if (stat(file, &statbuf) != 0)
1780 {
1781 if (errno == ENOENT)
1782 return 0;
1784 reason = "stat error";
1785 status = errno;
1786 }
1787 else if (!S_ISREG(statbuf.st_mode))
1788 {
1789 reason = "not a regular file";
1790 status = EPERM;
1791 }
1792 if (statbuf.st_uid != daemon_uid)
1793 {
1794 reason = "not owned by daemon user";
1795 status = EACCES;
1796 }
1797 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1798 {
1799 reason = "must not be user/group writable";
1800 status = EACCES;
1801 }
1803 if (status != 0)
1804 {
1805 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1806 file, rrd_strerror(status), reason);
1807 return 0;
1808 }
1809 }
1811 fh = fopen(file, "r");
1812 if (fh == NULL)
1813 {
1814 if (errno != ENOENT)
1815 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1816 file, rrd_strerror(errno));
1817 return 0;
1818 }
1819 else
1820 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1822 now = time(NULL);
1824 while(!feof(fh))
1825 {
1826 size_t entry_len;
1828 ++line;
1829 if (fgets(entry, sizeof(entry), fh) == NULL)
1830 break;
1831 entry_len = strlen(entry);
1833 /* check \n termination in case journal writing crashed mid-line */
1834 if (entry_len == 0)
1835 continue;
1836 else if (entry[entry_len - 1] != '\n')
1837 {
1838 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1839 ++fail_cnt;
1840 continue;
1841 }
1843 entry[entry_len - 1] = '\0';
1845 if (handle_request(NULL, now, entry, entry_len) == 0)
1846 ++entry_cnt;
1847 else
1848 ++fail_cnt;
1849 }
1851 fclose(fh);
1853 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1854 entry_cnt, fail_cnt);
1856 return entry_cnt > 0 ? 1 : 0;
1857 } /* }}} static int journal_replay */
1859 static void journal_init(void) /* {{{ */
1860 {
1861 int had_journal = 0;
1863 if (journal_cur == NULL) return;
1865 pthread_mutex_lock(&journal_lock);
1867 RRDD_LOG(LOG_INFO, "checking for journal files");
1869 had_journal += journal_replay(journal_old);
1870 had_journal += journal_replay(journal_cur);
1872 /* it must have been a crash. start a flush */
1873 if (had_journal && config_flush_at_shutdown)
1874 flush_old_values(-1);
1876 pthread_mutex_unlock(&journal_lock);
1877 journal_rotate();
1879 RRDD_LOG(LOG_INFO, "journal processing complete");
1881 } /* }}} static void journal_init */
1883 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1884 {
1885 assert(sock != NULL);
1887 free(sock->rbuf); sock->rbuf = NULL;
1888 free(sock->wbuf); sock->wbuf = NULL;
1889 free(sock);
1890 } /* }}} void free_listen_socket */
1892 static void close_connection(listen_socket_t *sock) /* {{{ */
1893 {
1894 if (sock->fd >= 0)
1895 {
1896 close(sock->fd);
1897 sock->fd = -1;
1898 }
1900 free_listen_socket(sock);
1902 } /* }}} void close_connection */
1904 static void *connection_thread_main (void *args) /* {{{ */
1905 {
1906 listen_socket_t *sock;
1907 int i;
1908 int fd;
1910 sock = (listen_socket_t *) args;
1911 fd = sock->fd;
1913 /* init read buffers */
1914 sock->next_read = sock->next_cmd = 0;
1915 sock->rbuf = malloc(RBUF_SIZE);
1916 if (sock->rbuf == NULL)
1917 {
1918 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1919 close_connection(sock);
1920 return NULL;
1921 }
1923 pthread_mutex_lock (&connection_threads_lock);
1924 {
1925 pthread_t *temp;
1927 temp = (pthread_t *) realloc (connection_threads,
1928 sizeof (pthread_t) * (connection_threads_num + 1));
1929 if (temp == NULL)
1930 {
1931 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1932 }
1933 else
1934 {
1935 connection_threads = temp;
1936 connection_threads[connection_threads_num] = pthread_self ();
1937 connection_threads_num++;
1938 }
1939 }
1940 pthread_mutex_unlock (&connection_threads_lock);
1942 while (do_shutdown == 0)
1943 {
1944 char *cmd;
1945 ssize_t cmd_len;
1946 ssize_t rbytes;
1947 time_t now;
1949 struct pollfd pollfd;
1950 int status;
1952 pollfd.fd = fd;
1953 pollfd.events = POLLIN | POLLPRI;
1954 pollfd.revents = 0;
1956 status = poll (&pollfd, 1, /* timeout = */ 500);
1957 if (do_shutdown)
1958 break;
1959 else if (status == 0) /* timeout */
1960 continue;
1961 else if (status < 0) /* error */
1962 {
1963 status = errno;
1964 if (status != EINTR)
1965 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1966 continue;
1967 }
1969 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1970 break;
1971 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1972 {
1973 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1974 "poll(2) returned something unexpected: %#04hx",
1975 pollfd.revents);
1976 break;
1977 }
1979 rbytes = read(fd, sock->rbuf + sock->next_read,
1980 RBUF_SIZE - sock->next_read);
1981 if (rbytes < 0)
1982 {
1983 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1984 break;
1985 }
1986 else if (rbytes == 0)
1987 break; /* eof */
1989 sock->next_read += rbytes;
1991 if (sock->batch_start)
1992 now = sock->batch_start;
1993 else
1994 now = time(NULL);
1996 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1997 {
1998 status = handle_request (sock, now, cmd, cmd_len+1);
1999 if (status != 0)
2000 goto out_close;
2001 }
2002 }
2004 out_close:
2005 close_connection(sock);
2007 /* Remove this thread from the connection threads list */
2008 pthread_mutex_lock (&connection_threads_lock);
2009 {
2010 pthread_t self;
2011 pthread_t *temp;
2013 /* Find out own index in the array */
2014 self = pthread_self ();
2015 for (i = 0; i < connection_threads_num; i++)
2016 if (pthread_equal (connection_threads[i], self) != 0)
2017 break;
2018 assert (i < connection_threads_num);
2020 /* Move the trailing threads forward. */
2021 if (i < (connection_threads_num - 1))
2022 {
2023 memmove (connection_threads + i,
2024 connection_threads + i + 1,
2025 sizeof (pthread_t) * (connection_threads_num - i - 1));
2026 }
2028 connection_threads_num--;
2030 temp = realloc(connection_threads,
2031 sizeof(*connection_threads) * connection_threads_num);
2032 if (connection_threads_num > 0 && temp == NULL)
2033 RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2034 else
2035 connection_threads = temp;
2036 }
2037 pthread_mutex_unlock (&connection_threads_lock);
2039 return (NULL);
2040 } /* }}} void *connection_thread_main */
2042 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2043 {
2044 int fd;
2045 struct sockaddr_un sa;
2046 listen_socket_t *temp;
2047 int status;
2048 const char *path;
2050 path = sock->addr;
2051 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2052 path += strlen("unix:");
2054 temp = (listen_socket_t *) realloc (listen_fds,
2055 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2056 if (temp == NULL)
2057 {
2058 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2059 return (-1);
2060 }
2061 listen_fds = temp;
2062 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2064 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2065 if (fd < 0)
2066 {
2067 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2068 rrd_strerror(errno));
2069 return (-1);
2070 }
2072 memset (&sa, 0, sizeof (sa));
2073 sa.sun_family = AF_UNIX;
2074 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2076 /* if we've gotten this far, we own the pid file. any daemon started
2077 * with the same args must not be alive. therefore, ensure that we can
2078 * create the socket...
2079 */
2080 unlink(path);
2082 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2083 if (status != 0)
2084 {
2085 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2086 path, rrd_strerror(errno));
2087 close (fd);
2088 return (-1);
2089 }
2091 status = listen (fd, /* backlog = */ 10);
2092 if (status != 0)
2093 {
2094 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2095 path, rrd_strerror(errno));
2096 close (fd);
2097 unlink (path);
2098 return (-1);
2099 }
2101 listen_fds[listen_fds_num].fd = fd;
2102 listen_fds[listen_fds_num].family = PF_UNIX;
2103 strncpy(listen_fds[listen_fds_num].addr, path,
2104 sizeof (listen_fds[listen_fds_num].addr) - 1);
2105 listen_fds_num++;
2107 return (0);
2108 } /* }}} int open_listen_socket_unix */
2110 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2111 {
2112 struct addrinfo ai_hints;
2113 struct addrinfo *ai_res;
2114 struct addrinfo *ai_ptr;
2115 char addr_copy[NI_MAXHOST];
2116 char *addr;
2117 char *port;
2118 int status;
2120 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2121 addr_copy[sizeof (addr_copy) - 1] = 0;
2122 addr = addr_copy;
2124 memset (&ai_hints, 0, sizeof (ai_hints));
2125 ai_hints.ai_flags = 0;
2126 #ifdef AI_ADDRCONFIG
2127 ai_hints.ai_flags |= AI_ADDRCONFIG;
2128 #endif
2129 ai_hints.ai_family = AF_UNSPEC;
2130 ai_hints.ai_socktype = SOCK_STREAM;
2132 port = NULL;
2133 if (*addr == '[') /* IPv6+port format */
2134 {
2135 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2136 addr++;
2138 port = strchr (addr, ']');
2139 if (port == NULL)
2140 {
2141 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2142 return (-1);
2143 }
2144 *port = 0;
2145 port++;
2147 if (*port == ':')
2148 port++;
2149 else if (*port == 0)
2150 port = NULL;
2151 else
2152 {
2153 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2154 return (-1);
2155 }
2156 } /* if (*addr = ']') */
2157 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2158 {
2159 port = rindex(addr, ':');
2160 if (port != NULL)
2161 {
2162 *port = 0;
2163 port++;
2164 }
2165 }
2166 ai_res = NULL;
2167 status = getaddrinfo (addr,
2168 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2169 &ai_hints, &ai_res);
2170 if (status != 0)
2171 {
2172 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2173 addr, gai_strerror (status));
2174 return (-1);
2175 }
2177 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2178 {
2179 int fd;
2180 listen_socket_t *temp;
2181 int one = 1;
2183 temp = (listen_socket_t *) realloc (listen_fds,
2184 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2185 if (temp == NULL)
2186 {
2187 fprintf (stderr,
2188 "rrdcached: open_listen_socket_network: realloc failed.\n");
2189 continue;
2190 }
2191 listen_fds = temp;
2192 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2194 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2195 if (fd < 0)
2196 {
2197 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2198 rrd_strerror(errno));
2199 continue;
2200 }
2202 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2204 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2205 if (status != 0)
2206 {
2207 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2208 sock->addr, rrd_strerror(errno));
2209 close (fd);
2210 continue;
2211 }
2213 status = listen (fd, /* backlog = */ 10);
2214 if (status != 0)
2215 {
2216 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2217 sock->addr, rrd_strerror(errno));
2218 close (fd);
2219 freeaddrinfo(ai_res);
2220 return (-1);
2221 }
2223 listen_fds[listen_fds_num].fd = fd;
2224 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2225 listen_fds_num++;
2226 } /* for (ai_ptr) */
2228 freeaddrinfo(ai_res);
2229 return (0);
2230 } /* }}} static int open_listen_socket_network */
2232 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2233 {
2234 assert(sock != NULL);
2235 assert(sock->addr != NULL);
2237 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2238 || sock->addr[0] == '/')
2239 return (open_listen_socket_unix(sock));
2240 else
2241 return (open_listen_socket_network(sock));
2242 } /* }}} int open_listen_socket */
2244 static int close_listen_sockets (void) /* {{{ */
2245 {
2246 size_t i;
2248 for (i = 0; i < listen_fds_num; i++)
2249 {
2250 close (listen_fds[i].fd);
2252 if (listen_fds[i].family == PF_UNIX)
2253 unlink(listen_fds[i].addr);
2254 }
2256 free (listen_fds);
2257 listen_fds = NULL;
2258 listen_fds_num = 0;
2260 return (0);
2261 } /* }}} int close_listen_sockets */
2263 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2264 {
2265 struct pollfd *pollfds;
2266 int pollfds_num;
2267 int status;
2268 int i;
2270 if (listen_fds_num < 1)
2271 {
2272 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2273 return (NULL);
2274 }
2276 pollfds_num = listen_fds_num;
2277 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2278 if (pollfds == NULL)
2279 {
2280 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2281 return (NULL);
2282 }
2283 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2285 RRDD_LOG(LOG_INFO, "listening for connections");
2287 while (do_shutdown == 0)
2288 {
2289 for (i = 0; i < pollfds_num; i++)
2290 {
2291 pollfds[i].fd = listen_fds[i].fd;
2292 pollfds[i].events = POLLIN | POLLPRI;
2293 pollfds[i].revents = 0;
2294 }
2296 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2297 if (do_shutdown)
2298 break;
2299 else if (status == 0) /* timeout */
2300 continue;
2301 else if (status < 0) /* error */
2302 {
2303 status = errno;
2304 if (status != EINTR)
2305 {
2306 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2307 }
2308 continue;
2309 }
2311 for (i = 0; i < pollfds_num; i++)
2312 {
2313 listen_socket_t *client_sock;
2314 struct sockaddr_storage client_sa;
2315 socklen_t client_sa_size;
2316 pthread_t tid;
2317 pthread_attr_t attr;
2319 if (pollfds[i].revents == 0)
2320 continue;
2322 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2323 {
2324 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2325 "poll(2) returned something unexpected for listen FD #%i.",
2326 pollfds[i].fd);
2327 continue;
2328 }
2330 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2331 if (client_sock == NULL)
2332 {
2333 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2334 continue;
2335 }
2336 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2338 client_sa_size = sizeof (client_sa);
2339 client_sock->fd = accept (pollfds[i].fd,
2340 (struct sockaddr *) &client_sa, &client_sa_size);
2341 if (client_sock->fd < 0)
2342 {
2343 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2344 free(client_sock);
2345 continue;
2346 }
2348 pthread_attr_init (&attr);
2349 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2351 status = pthread_create (&tid, &attr, connection_thread_main,
2352 client_sock);
2353 if (status != 0)
2354 {
2355 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2356 close_connection(client_sock);
2357 continue;
2358 }
2359 } /* for (pollfds_num) */
2360 } /* while (do_shutdown == 0) */
2362 RRDD_LOG(LOG_INFO, "starting shutdown");
2364 close_listen_sockets ();
2366 pthread_mutex_lock (&connection_threads_lock);
2367 while (connection_threads_num > 0)
2368 {
2369 pthread_t wait_for;
2371 wait_for = connection_threads[0];
2373 pthread_mutex_unlock (&connection_threads_lock);
2374 pthread_join (wait_for, /* retval = */ NULL);
2375 pthread_mutex_lock (&connection_threads_lock);
2376 }
2377 pthread_mutex_unlock (&connection_threads_lock);
2379 free(pollfds);
2381 return (NULL);
2382 } /* }}} void *listen_thread_main */
2384 static int daemonize (void) /* {{{ */
2385 {
2386 int pid_fd;
2387 char *base_dir;
2389 daemon_uid = geteuid();
2391 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2392 if (pid_fd < 0)
2393 pid_fd = check_pidfile();
2394 if (pid_fd < 0)
2395 return pid_fd;
2397 /* open all the listen sockets */
2398 if (config_listen_address_list_len > 0)
2399 {
2400 for (int i = 0; i < config_listen_address_list_len; i++)
2401 {
2402 open_listen_socket (config_listen_address_list[i]);
2403 free_listen_socket (config_listen_address_list[i]);
2404 }
2406 free(config_listen_address_list);
2407 }
2408 else
2409 {
2410 listen_socket_t sock;
2411 memset(&sock, 0, sizeof(sock));
2412 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2413 open_listen_socket (&sock);
2414 }
2416 if (listen_fds_num < 1)
2417 {
2418 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2419 goto error;
2420 }
2422 if (!stay_foreground)
2423 {
2424 pid_t child;
2426 child = fork ();
2427 if (child < 0)
2428 {
2429 fprintf (stderr, "daemonize: fork(2) failed.\n");
2430 goto error;
2431 }
2432 else if (child > 0)
2433 exit(0);
2435 /* Become session leader */
2436 setsid ();
2438 /* Open the first three file descriptors to /dev/null */
2439 close (2);
2440 close (1);
2441 close (0);
2443 open ("/dev/null", O_RDWR);
2444 dup (0);
2445 dup (0);
2446 } /* if (!stay_foreground) */
2448 /* Change into the /tmp directory. */
2449 base_dir = (config_base_dir != NULL)
2450 ? config_base_dir
2451 : "/tmp";
2453 if (chdir (base_dir) != 0)
2454 {
2455 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2456 goto error;
2457 }
2459 install_signal_handlers();
2461 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2462 RRDD_LOG(LOG_INFO, "starting up");
2464 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2465 (GDestroyNotify) free_cache_item);
2466 if (cache_tree == NULL)
2467 {
2468 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2469 goto error;
2470 }
2472 return write_pidfile (pid_fd);
2474 error:
2475 remove_pidfile();
2476 return -1;
2477 } /* }}} int daemonize */
2479 static int cleanup (void) /* {{{ */
2480 {
2481 do_shutdown++;
2483 pthread_cond_signal (&cache_cond);
2484 pthread_join (queue_thread, /* return = */ NULL);
2486 remove_pidfile ();
2488 free(config_base_dir);
2489 free(config_pid_file);
2490 free(journal_cur);
2491 free(journal_old);
2493 pthread_mutex_lock(&cache_lock);
2494 g_tree_destroy(cache_tree);
2496 RRDD_LOG(LOG_INFO, "goodbye");
2497 closelog ();
2499 return (0);
2500 } /* }}} int cleanup */
2502 static int read_options (int argc, char **argv) /* {{{ */
2503 {
2504 int option;
2505 int status = 0;
2507 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2508 {
2509 switch (option)
2510 {
2511 case 'g':
2512 stay_foreground=1;
2513 break;
2515 case 'L':
2516 case 'l':
2517 {
2518 listen_socket_t **temp;
2519 listen_socket_t *new;
2521 new = malloc(sizeof(listen_socket_t));
2522 if (new == NULL)
2523 {
2524 fprintf(stderr, "read_options: malloc failed.\n");
2525 return(2);
2526 }
2527 memset(new, 0, sizeof(listen_socket_t));
2529 temp = (listen_socket_t **) realloc (config_listen_address_list,
2530 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2531 if (temp == NULL)
2532 {
2533 fprintf (stderr, "read_options: realloc failed.\n");
2534 return (2);
2535 }
2536 config_listen_address_list = temp;
2538 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2539 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2541 temp[config_listen_address_list_len] = new;
2542 config_listen_address_list_len++;
2543 }
2544 break;
2546 case 'f':
2547 {
2548 int temp;
2550 temp = atoi (optarg);
2551 if (temp > 0)
2552 config_flush_interval = temp;
2553 else
2554 {
2555 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2556 status = 3;
2557 }
2558 }
2559 break;
2561 case 'w':
2562 {
2563 int temp;
2565 temp = atoi (optarg);
2566 if (temp > 0)
2567 config_write_interval = temp;
2568 else
2569 {
2570 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2571 status = 2;
2572 }
2573 }
2574 break;
2576 case 'z':
2577 {
2578 int temp;
2580 temp = atoi(optarg);
2581 if (temp > 0)
2582 config_write_jitter = temp;
2583 else
2584 {
2585 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2586 status = 2;
2587 }
2589 break;
2590 }
2592 case 'B':
2593 config_write_base_only = 1;
2594 break;
2596 case 'b':
2597 {
2598 size_t len;
2599 char base_realpath[PATH_MAX];
2601 if (config_base_dir != NULL)
2602 free (config_base_dir);
2603 config_base_dir = strdup (optarg);
2604 if (config_base_dir == NULL)
2605 {
2606 fprintf (stderr, "read_options: strdup failed.\n");
2607 return (3);
2608 }
2610 /* make sure that the base directory is not resolved via
2611 * symbolic links. this makes some performance-enhancing
2612 * assumptions possible (we don't have to resolve paths
2613 * that start with a "/")
2614 */
2615 if (realpath(config_base_dir, base_realpath) == NULL)
2616 {
2617 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2618 return 5;
2619 }
2620 else if (strncmp(config_base_dir,
2621 base_realpath, sizeof(base_realpath)) != 0)
2622 {
2623 fprintf(stderr,
2624 "Base directory (-b) resolved via file system links!\n"
2625 "Please consult rrdcached '-b' documentation!\n"
2626 "Consider specifying the real directory (%s)\n",
2627 base_realpath);
2628 return 5;
2629 }
2631 len = strlen (config_base_dir);
2632 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2633 {
2634 config_base_dir[len - 1] = 0;
2635 len--;
2636 }
2638 if (len < 1)
2639 {
2640 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2641 return (4);
2642 }
2644 _config_base_dir_len = len;
2645 }
2646 break;
2648 case 'p':
2649 {
2650 if (config_pid_file != NULL)
2651 free (config_pid_file);
2652 config_pid_file = strdup (optarg);
2653 if (config_pid_file == NULL)
2654 {
2655 fprintf (stderr, "read_options: strdup failed.\n");
2656 return (3);
2657 }
2658 }
2659 break;
2661 case 'F':
2662 config_flush_at_shutdown = 1;
2663 break;
2665 case 'j':
2666 {
2667 struct stat statbuf;
2668 const char *dir = optarg;
2670 status = stat(dir, &statbuf);
2671 if (status != 0)
2672 {
2673 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2674 return 6;
2675 }
2677 if (!S_ISDIR(statbuf.st_mode)
2678 || access(dir, R_OK|W_OK|X_OK) != 0)
2679 {
2680 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2681 errno ? rrd_strerror(errno) : "");
2682 return 6;
2683 }
2685 journal_cur = malloc(PATH_MAX + 1);
2686 journal_old = malloc(PATH_MAX + 1);
2687 if (journal_cur == NULL || journal_old == NULL)
2688 {
2689 fprintf(stderr, "malloc failure for journal files\n");
2690 return 6;
2691 }
2692 else
2693 {
2694 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2695 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2696 }
2697 }
2698 break;
2700 case 'h':
2701 case '?':
2702 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2703 "\n"
2704 "Usage: rrdcached [options]\n"
2705 "\n"
2706 "Valid options are:\n"
2707 " -l <address> Socket address to listen to.\n"
2708 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2709 " -w <seconds> Interval in which to write data.\n"
2710 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2711 " -f <seconds> Interval in which to flush dead data.\n"
2712 " -p <file> Location of the PID-file.\n"
2713 " -b <dir> Base directory to change to.\n"
2714 " -B Restrict file access to paths within -b <dir>\n"
2715 " -g Do not fork and run in the foreground.\n"
2716 " -j <dir> Directory in which to create the journal files.\n"
2717 " -F Always flush all updates at shutdown\n"
2718 "\n"
2719 "For more information and a detailed description of all options "
2720 "please refer\n"
2721 "to the rrdcached(1) manual page.\n",
2722 VERSION);
2723 status = -1;
2724 break;
2725 } /* switch (option) */
2726 } /* while (getopt) */
2728 /* advise the user when values are not sane */
2729 if (config_flush_interval < 2 * config_write_interval)
2730 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2731 " 2x write interval (-w) !\n");
2732 if (config_write_jitter > config_write_interval)
2733 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2734 " write interval (-w) !\n");
2736 if (config_write_base_only && config_base_dir == NULL)
2737 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2738 " Consult the rrdcached documentation\n");
2740 if (journal_cur == NULL)
2741 config_flush_at_shutdown = 1;
2743 return (status);
2744 } /* }}} int read_options */
2746 int main (int argc, char **argv)
2747 {
2748 int status;
2750 status = read_options (argc, argv);
2751 if (status != 0)
2752 {
2753 if (status < 0)
2754 status = 0;
2755 return (status);
2756 }
2758 status = daemonize ();
2759 if (status != 0)
2760 {
2761 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2762 return (1);
2763 }
2765 journal_init();
2767 /* start the queue thread */
2768 memset (&queue_thread, 0, sizeof (queue_thread));
2769 status = pthread_create (&queue_thread,
2770 NULL, /* attr */
2771 queue_thread_main,
2772 NULL); /* args */
2773 if (status != 0)
2774 {
2775 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2776 cleanup();
2777 return (1);
2778 }
2780 listen_thread_main (NULL);
2781 cleanup ();
2783 return (0);
2784 } /* int main */
2786 /*
2787 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2788 */