d56cf06da51ddde4126d302557b9a3915f257244
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111 * Types
112 */
113 typedef enum
114 {
115 PRIV_LOW,
116 PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
122 {
123 int fd;
124 char addr[PATH_MAX + 1];
125 int family;
126 socket_privilege privilege;
128 /* state for BATCH processing */
129 time_t batch_start;
130 int batch_cmd;
132 /* buffered IO */
133 char *rbuf;
134 off_t next_cmd;
135 off_t next_read;
137 char *wbuf;
138 ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146 char *file;
147 char **values;
148 int values_num;
149 time_t last_flush_time;
150 time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153 int flags;
154 pthread_cond_t flushed;
155 cache_item_t *prev;
156 cache_item_t *next;
157 };
159 struct callback_flush_data_s
160 {
161 time_t now;
162 time_t abs_timeout;
163 char **keys;
164 size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
169 {
170 HEAD,
171 TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180 * Variables
181 */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t queue_thread;
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
196 /* Cache stuff */
197 static GTree *cache_tree = NULL;
198 static cache_item_t *cache_queue_head = NULL;
199 static cache_item_t *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
203 static int config_write_interval = 300;
204 static int config_write_jitter = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
233 /*
234 * Functions
235 */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239 do_shutdown++;
240 pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245 sig_common("INT");
246 } /* }}} void sig_int_handler */
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250 sig_common("TERM");
251 } /* }}} void sig_term_handler */
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255 config_flush_at_shutdown = 1;
256 sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261 config_flush_at_shutdown = 0;
262 sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
265 static void install_signal_handlers(void) /* {{{ */
266 {
267 /* These structures are static, because `sigaction' behaves weird if the are
268 * overwritten.. */
269 static struct sigaction sa_int;
270 static struct sigaction sa_term;
271 static struct sigaction sa_pipe;
272 static struct sigaction sa_usr1;
273 static struct sigaction sa_usr2;
275 /* Install signal handlers */
276 memset (&sa_int, 0, sizeof (sa_int));
277 sa_int.sa_handler = sig_int_handler;
278 sigaction (SIGINT, &sa_int, NULL);
280 memset (&sa_term, 0, sizeof (sa_term));
281 sa_term.sa_handler = sig_term_handler;
282 sigaction (SIGTERM, &sa_term, NULL);
284 memset (&sa_pipe, 0, sizeof (sa_pipe));
285 sa_pipe.sa_handler = SIG_IGN;
286 sigaction (SIGPIPE, &sa_pipe, NULL);
288 memset (&sa_pipe, 0, sizeof (sa_usr1));
289 sa_usr1.sa_handler = sig_usr1_handler;
290 sigaction (SIGUSR1, &sa_usr1, NULL);
292 memset (&sa_usr2, 0, sizeof (sa_usr2));
293 sa_usr2.sa_handler = sig_usr2_handler;
294 sigaction (SIGUSR2, &sa_usr2, NULL);
296 } /* }}} void install_signal_handlers */
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300 int fd;
301 char *file;
303 file = (config_pid_file != NULL)
304 ? config_pid_file
305 : LOCALSTATEDIR "/run/rrdcached.pid";
307 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308 if (fd < 0)
309 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310 action, file, rrd_strerror(errno));
312 return(fd);
313 } /* }}} static int open_pidfile */
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318 int pid_fd;
319 pid_t pid;
320 char pid_str[16];
322 pid_fd = open_pidfile("open", O_RDWR);
323 if (pid_fd < 0)
324 return pid_fd;
326 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327 return -1;
329 pid = atoi(pid_str);
330 if (pid <= 0)
331 return -1;
333 /* another running process that we can signal COULD be
334 * a competing rrdcached */
335 if (pid != getpid() && kill(pid, 0) == 0)
336 {
337 fprintf(stderr,
338 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339 close(pid_fd);
340 return -1;
341 }
343 lseek(pid_fd, 0, SEEK_SET);
344 ftruncate(pid_fd, 0);
346 fprintf(stderr,
347 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348 "rrdcached: starting normally.\n", pid);
350 return pid_fd;
351 } /* }}} static int check_pidfile */
353 static int write_pidfile (int fd) /* {{{ */
354 {
355 pid_t pid;
356 FILE *fh;
358 pid = getpid ();
360 fh = fdopen (fd, "w");
361 if (fh == NULL)
362 {
363 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364 close(fd);
365 return (-1);
366 }
368 fprintf (fh, "%i\n", (int) pid);
369 fclose (fh);
371 return (0);
372 } /* }}} int write_pidfile */
374 static int remove_pidfile (void) /* {{{ */
375 {
376 char *file;
377 int status;
379 file = (config_pid_file != NULL)
380 ? config_pid_file
381 : LOCALSTATEDIR "/run/rrdcached.pid";
383 status = unlink (file);
384 if (status == 0)
385 return (0);
386 return (errno);
387 } /* }}} int remove_pidfile */
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391 char *eol;
393 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394 sock->next_read - sock->next_cmd);
396 if (eol == NULL)
397 {
398 /* no commands left, move remainder back to front of rbuf */
399 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400 sock->next_read - sock->next_cmd);
401 sock->next_read -= sock->next_cmd;
402 sock->next_cmd = 0;
403 *len = 0;
404 return NULL;
405 }
406 else
407 {
408 char *cmd = sock->rbuf + sock->next_cmd;
409 *eol = '\0';
411 sock->next_cmd = eol - sock->rbuf + 1;
413 if (eol > sock->rbuf && *(eol-1) == '\r')
414 *(--eol) = '\0'; /* handle "\r\n" EOL */
416 *len = eol - cmd;
418 return cmd;
419 }
421 /* NOTREACHED */
422 assert(1==0);
423 }
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428 char *new_buf;
430 assert(sock != NULL);
432 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
433 if (new_buf == NULL)
434 {
435 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436 return -1;
437 }
439 strncpy(new_buf + sock->wbuf_len, str, len + 1);
441 sock->wbuf = new_buf;
442 sock->wbuf_len += len;
444 return 0;
445 } /* }}} static int add_to_wbuf */
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450 va_list argp;
451 char buffer[CMD_MAX];
452 int len;
454 if (sock == NULL) return 0; /* journal replay mode */
455 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
457 va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461 len = vsprintf(buffer, fmt, argp);
462 #endif
463 va_end(argp);
464 if (len < 0)
465 {
466 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467 return -1;
468 }
470 return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
473 static int count_lines(char *str) /* {{{ */
474 {
475 int lines = 0;
477 if (str != NULL)
478 {
479 while ((str = strchr(str, '\n')) != NULL)
480 {
481 ++lines;
482 ++str;
483 }
484 }
486 return lines;
487 } /* }}} static int count_lines */
489 /* send the response back to the user.
490 * returns 0 on success, -1 on error
491 * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493 char *fmt, ...) /* {{{ */
494 {
495 va_list argp;
496 char buffer[CMD_MAX];
497 int lines;
498 ssize_t wrote;
499 int rclen, len;
501 if (sock == NULL) return rc; /* journal replay mode */
503 if (sock->batch_start)
504 {
505 if (rc == RESP_OK)
506 return rc; /* no response on success during BATCH */
507 lines = sock->batch_cmd;
508 }
509 else if (rc == RESP_OK)
510 lines = count_lines(sock->wbuf);
511 else
512 lines = -1;
514 rclen = sprintf(buffer, "%d ", lines);
515 va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519 len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521 va_end(argp);
522 if (len < 0)
523 return -1;
525 len += rclen;
527 /* append the result to the wbuf, don't write to the user */
528 if (sock->batch_start)
529 return add_to_wbuf(sock, buffer, len);
531 /* first write must be complete */
532 if (len != write(sock->fd, buffer, len))
533 {
534 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535 return -1;
536 }
538 if (sock->wbuf != NULL && rc == RESP_OK)
539 {
540 wrote = 0;
541 while (wrote < sock->wbuf_len)
542 {
543 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544 if (wb <= 0)
545 {
546 RRDD_LOG(LOG_INFO, "send_response: could not write results");
547 return -1;
548 }
549 wrote += wb;
550 }
551 }
553 free(sock->wbuf); sock->wbuf = NULL;
554 sock->wbuf_len = 0;
556 return 0;
557 } /* }}} */
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561 ci->values = NULL;
562 ci->values_num = 0;
564 ci->last_flush_time = when;
565 if (config_write_jitter > 0)
566 ci->last_flush_time += (random() % config_write_jitter);
567 }
569 /* remove_from_queue
570 * remove a "cache_item_t" item from the queue.
571 * must hold 'cache_lock' when calling this
572 */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575 if (ci == NULL) return;
576 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
578 if (ci->prev == NULL)
579 cache_queue_head = ci->next; /* reset head */
580 else
581 ci->prev->next = ci->next;
583 if (ci->next == NULL)
584 cache_queue_tail = ci->prev; /* reset the tail */
585 else
586 ci->next->prev = ci->prev;
588 ci->next = ci->prev = NULL;
589 ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
592 /* free the resources associated with the cache_item_t
593 * must hold cache_lock when calling this function
594 */
595 static void *free_cache_item(cache_item_t *ci) /* {{{ */
596 {
597 if (ci == NULL) return NULL;
599 remove_from_queue(ci);
601 for (int i=0; i < ci->values_num; i++)
602 free(ci->values[i]);
604 free (ci->values);
605 free (ci->file);
607 /* in case anyone is waiting */
608 pthread_cond_broadcast(&ci->flushed);
610 free (ci);
612 return NULL;
613 } /* }}} static void *free_cache_item */
615 /*
616 * enqueue_cache_item:
617 * `cache_lock' must be acquired before calling this function!
618 */
619 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
620 queue_side_t side)
621 {
622 if (ci == NULL)
623 return (-1);
625 if (ci->values_num == 0)
626 return (0);
628 if (side == HEAD)
629 {
630 if (cache_queue_head == ci)
631 return 0;
633 /* remove if further down in queue */
634 remove_from_queue(ci);
636 ci->prev = NULL;
637 ci->next = cache_queue_head;
638 if (ci->next != NULL)
639 ci->next->prev = ci;
640 cache_queue_head = ci;
642 if (cache_queue_tail == NULL)
643 cache_queue_tail = cache_queue_head;
644 }
645 else /* (side == TAIL) */
646 {
647 /* We don't move values back in the list.. */
648 if (ci->flags & CI_FLAGS_IN_QUEUE)
649 return (0);
651 assert (ci->next == NULL);
652 assert (ci->prev == NULL);
654 ci->prev = cache_queue_tail;
656 if (cache_queue_tail == NULL)
657 cache_queue_head = ci;
658 else
659 cache_queue_tail->next = ci;
661 cache_queue_tail = ci;
662 }
664 ci->flags |= CI_FLAGS_IN_QUEUE;
666 pthread_cond_broadcast(&cache_cond);
667 pthread_mutex_lock (&stats_lock);
668 stats_queue_length++;
669 pthread_mutex_unlock (&stats_lock);
671 return (0);
672 } /* }}} int enqueue_cache_item */
674 /*
675 * tree_callback_flush:
676 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
677 * while this is in progress.
678 */
679 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
680 gpointer data)
681 {
682 cache_item_t *ci;
683 callback_flush_data_t *cfd;
685 ci = (cache_item_t *) value;
686 cfd = (callback_flush_data_t *) data;
688 if (ci->flags & CI_FLAGS_IN_QUEUE)
689 return FALSE;
691 if ((ci->last_flush_time <= cfd->abs_timeout)
692 && (ci->values_num > 0))
693 {
694 enqueue_cache_item (ci, TAIL);
695 }
696 else if ((do_shutdown != 0)
697 && (ci->values_num > 0))
698 {
699 enqueue_cache_item (ci, TAIL);
700 }
701 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
702 && (ci->values_num <= 0))
703 {
704 char **temp;
706 temp = (char **) rrd_realloc (cfd->keys,
707 sizeof (char *) * (cfd->keys_num + 1));
708 if (temp == NULL)
709 {
710 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
711 return (FALSE);
712 }
713 cfd->keys = temp;
714 /* Make really sure this points to the _same_ place */
715 assert ((char *) key == ci->file);
716 cfd->keys[cfd->keys_num] = (char *) key;
717 cfd->keys_num++;
718 }
720 return (FALSE);
721 } /* }}} gboolean tree_callback_flush */
723 static int flush_old_values (int max_age)
724 {
725 callback_flush_data_t cfd;
726 size_t k;
728 memset (&cfd, 0, sizeof (cfd));
729 /* Pass the current time as user data so that we don't need to call
730 * `time' for each node. */
731 cfd.now = time (NULL);
732 cfd.keys = NULL;
733 cfd.keys_num = 0;
735 if (max_age > 0)
736 cfd.abs_timeout = cfd.now - max_age;
737 else
738 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
740 /* `tree_callback_flush' will return the keys of all values that haven't
741 * been touched in the last `config_flush_interval' seconds in `cfd'.
742 * The char*'s in this array point to the same memory as ci->file, so we
743 * don't need to free them separately. */
744 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
746 for (k = 0; k < cfd.keys_num; k++)
747 {
748 /* should never fail, since we have held the cache_lock
749 * the entire time */
750 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
751 }
753 if (cfd.keys != NULL)
754 {
755 free (cfd.keys);
756 cfd.keys = NULL;
757 }
759 return (0);
760 } /* int flush_old_values */
762 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
763 {
764 struct timeval now;
765 struct timespec next_flush;
766 int final_flush = 0; /* make sure we only flush once on shutdown */
768 gettimeofday (&now, NULL);
769 next_flush.tv_sec = now.tv_sec + config_flush_interval;
770 next_flush.tv_nsec = 1000 * now.tv_usec;
772 pthread_mutex_lock (&cache_lock);
773 while ((do_shutdown == 0) || (cache_queue_head != NULL))
774 {
775 cache_item_t *ci;
776 char *file;
777 char **values;
778 int values_num;
779 int status;
780 int i;
782 /* First, check if it's time to do the cache flush. */
783 gettimeofday (&now, NULL);
784 if ((now.tv_sec > next_flush.tv_sec)
785 || ((now.tv_sec == next_flush.tv_sec)
786 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
787 {
788 /* Flush all values that haven't been written in the last
789 * `config_write_interval' seconds. */
790 flush_old_values (config_write_interval);
792 /* Determine the time of the next cache flush. */
793 next_flush.tv_sec =
794 now.tv_sec + next_flush.tv_sec % config_flush_interval;
796 /* unlock the cache while we rotate so we don't block incoming
797 * updates if the fsync() blocks on disk I/O */
798 pthread_mutex_unlock(&cache_lock);
799 journal_rotate();
800 pthread_mutex_lock(&cache_lock);
801 }
803 /* Now, check if there's something to store away. If not, wait until
804 * something comes in or it's time to do the cache flush. if we are
805 * shutting down, do not wait around. */
806 if (cache_queue_head == NULL && !do_shutdown)
807 {
808 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
809 if ((status != 0) && (status != ETIMEDOUT))
810 {
811 RRDD_LOG (LOG_ERR, "queue_thread_main: "
812 "pthread_cond_timedwait returned %i.", status);
813 }
814 }
816 /* We're about to shut down */
817 if (do_shutdown != 0 && !final_flush++)
818 {
819 if (config_flush_at_shutdown)
820 flush_old_values (-1); /* flush everything */
821 else
822 break;
823 }
825 /* Check if a value has arrived. This may be NULL if we timed out or there
826 * was an interrupt such as a signal. */
827 if (cache_queue_head == NULL)
828 continue;
830 ci = cache_queue_head;
832 /* copy the relevant parts */
833 file = strdup (ci->file);
834 if (file == NULL)
835 {
836 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
837 continue;
838 }
840 assert(ci->values != NULL);
841 assert(ci->values_num > 0);
843 values = ci->values;
844 values_num = ci->values_num;
846 wipe_ci_values(ci, time(NULL));
847 remove_from_queue(ci);
849 pthread_mutex_lock (&stats_lock);
850 assert (stats_queue_length > 0);
851 stats_queue_length--;
852 pthread_mutex_unlock (&stats_lock);
854 pthread_mutex_unlock (&cache_lock);
856 rrd_clear_error ();
857 status = rrd_update_r (file, NULL, values_num, (void *) values);
858 if (status != 0)
859 {
860 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
861 "rrd_update_r (%s) failed with status %i. (%s)",
862 file, status, rrd_get_error());
863 }
865 journal_write("wrote", file);
866 pthread_cond_broadcast(&ci->flushed);
868 for (i = 0; i < values_num; i++)
869 free (values[i]);
871 free(values);
872 free(file);
874 if (status == 0)
875 {
876 pthread_mutex_lock (&stats_lock);
877 stats_updates_written++;
878 stats_data_sets_written += values_num;
879 pthread_mutex_unlock (&stats_lock);
880 }
882 pthread_mutex_lock (&cache_lock);
884 /* We're about to shut down */
885 if (do_shutdown != 0 && !final_flush++)
886 {
887 if (config_flush_at_shutdown)
888 flush_old_values (-1); /* flush everything */
889 else
890 break;
891 }
892 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
893 pthread_mutex_unlock (&cache_lock);
895 if (config_flush_at_shutdown)
896 {
897 assert(cache_queue_head == NULL);
898 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
899 }
901 journal_done();
903 return (NULL);
904 } /* }}} void *queue_thread_main */
906 static int buffer_get_field (char **buffer_ret, /* {{{ */
907 size_t *buffer_size_ret, char **field_ret)
908 {
909 char *buffer;
910 size_t buffer_pos;
911 size_t buffer_size;
912 char *field;
913 size_t field_size;
914 int status;
916 buffer = *buffer_ret;
917 buffer_pos = 0;
918 buffer_size = *buffer_size_ret;
919 field = *buffer_ret;
920 field_size = 0;
922 if (buffer_size <= 0)
923 return (-1);
925 /* This is ensured by `handle_request'. */
926 assert (buffer[buffer_size - 1] == '\0');
928 status = -1;
929 while (buffer_pos < buffer_size)
930 {
931 /* Check for end-of-field or end-of-buffer */
932 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
933 {
934 field[field_size] = 0;
935 field_size++;
936 buffer_pos++;
937 status = 0;
938 break;
939 }
940 /* Handle escaped characters. */
941 else if (buffer[buffer_pos] == '\\')
942 {
943 if (buffer_pos >= (buffer_size - 1))
944 break;
945 buffer_pos++;
946 field[field_size] = buffer[buffer_pos];
947 field_size++;
948 buffer_pos++;
949 }
950 /* Normal operation */
951 else
952 {
953 field[field_size] = buffer[buffer_pos];
954 field_size++;
955 buffer_pos++;
956 }
957 } /* while (buffer_pos < buffer_size) */
959 if (status != 0)
960 return (status);
962 *buffer_ret = buffer + buffer_pos;
963 *buffer_size_ret = buffer_size - buffer_pos;
964 *field_ret = field;
966 return (0);
967 } /* }}} int buffer_get_field */
969 /* if we're restricting writes to the base directory,
970 * check whether the file falls within the dir
971 * returns 1 if OK, otherwise 0
972 */
973 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
974 {
975 assert(file != NULL);
977 if (!config_write_base_only
978 || sock == NULL /* journal replay */
979 || config_base_dir == NULL)
980 return 1;
982 if (strstr(file, "../") != NULL) goto err;
984 /* relative paths without "../" are ok */
985 if (*file != '/') return 1;
987 /* file must be of the format base + "/" + <1+ char filename> */
988 if (strlen(file) < _config_base_dir_len + 2) goto err;
989 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
990 if (*(file + _config_base_dir_len) != '/') goto err;
992 return 1;
994 err:
995 if (sock != NULL && sock->fd >= 0)
996 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
998 return 0;
999 } /* }}} static int check_file_access */
1001 /* when using a base dir, convert relative paths to absolute paths.
1002 * if necessary, modifies the "filename" pointer to point
1003 * to the new path created in "tmp". "tmp" is provided
1004 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1005 *
1006 * this allows us to optimize for the expected case (absolute path)
1007 * with a no-op.
1008 */
1009 static void get_abs_path(char **filename, char *tmp)
1010 {
1011 assert(tmp != NULL);
1012 assert(filename != NULL && *filename != NULL);
1014 if (config_base_dir == NULL || **filename == '/')
1015 return;
1017 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1018 *filename = tmp;
1019 } /* }}} static int get_abs_path */
1021 /* returns 1 if we have the required privilege level,
1022 * otherwise issue an error to the user on sock */
1023 static int has_privilege (listen_socket_t *sock, /* {{{ */
1024 socket_privilege priv)
1025 {
1026 if (sock == NULL) /* journal replay */
1027 return 1;
1029 if (sock->privilege >= priv)
1030 return 1;
1032 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033 } /* }}} static int has_privilege */
1035 static int flush_file (const char *filename) /* {{{ */
1036 {
1037 cache_item_t *ci;
1039 pthread_mutex_lock (&cache_lock);
1041 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1042 if (ci == NULL)
1043 {
1044 pthread_mutex_unlock (&cache_lock);
1045 return (ENOENT);
1046 }
1048 if (ci->values_num > 0)
1049 {
1050 /* Enqueue at head */
1051 enqueue_cache_item (ci, HEAD);
1052 pthread_cond_wait(&ci->flushed, &cache_lock);
1053 }
1055 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1056 * may have been purged during our cond_wait() */
1058 pthread_mutex_unlock(&cache_lock);
1060 return (0);
1061 } /* }}} int flush_file */
1063 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1064 char *buffer, size_t buffer_size)
1065 {
1066 int status;
1067 char **help_text;
1068 char *command;
1070 char *help_help[2] =
1071 {
1072 "Command overview\n"
1073 ,
1074 "HELP [<command>]\n"
1075 "FLUSH <filename>\n"
1076 "FLUSHALL\n"
1077 "PENDING <filename>\n"
1078 "FORGET <filename>\n"
1079 "QUEUE\n"
1080 "UPDATE <filename> <values> [<values> ...]\n"
1081 "BATCH\n"
1082 "STATS\n"
1083 "QUIT\n"
1084 };
1086 char *help_flush[2] =
1087 {
1088 "Help for FLUSH\n"
1089 ,
1090 "Usage: FLUSH <filename>\n"
1091 "\n"
1092 "Adds the given filename to the head of the update queue and returns\n"
1093 "after is has been dequeued.\n"
1094 };
1096 char *help_flushall[2] =
1097 {
1098 "Help for FLUSHALL\n"
1099 ,
1100 "Usage: FLUSHALL\n"
1101 "\n"
1102 "Triggers writing of all pending updates. Returns immediately.\n"
1103 };
1105 char *help_pending[2] =
1106 {
1107 "Help for PENDING\n"
1108 ,
1109 "Usage: PENDING <filename>\n"
1110 "\n"
1111 "Shows any 'pending' updates for a file, in order.\n"
1112 "The updates shown have not yet been written to the underlying RRD file.\n"
1113 };
1115 char *help_forget[2] =
1116 {
1117 "Help for FORGET\n"
1118 ,
1119 "Usage: FORGET <filename>\n"
1120 "\n"
1121 "Removes the file completely from the cache.\n"
1122 "Any pending updates for the file will be lost.\n"
1123 };
1125 char *help_queue[2] =
1126 {
1127 "Help for QUEUE\n"
1128 ,
1129 "Shows all files in the output queue.\n"
1130 "The output is zero or more lines in the following format:\n"
1131 "(where <num_vals> is the number of values to be written)\n"
1132 "\n"
1133 "<num_vals> <filename>\n"
1134 "\n"
1135 };
1137 char *help_update[2] =
1138 {
1139 "Help for UPDATE\n"
1140 ,
1141 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1142 "\n"
1143 "Adds the given file to the internal cache if it is not yet known and\n"
1144 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1145 "for details.\n"
1146 "\n"
1147 "Each <values> has the following form:\n"
1148 " <values> = <time>:<value>[:<value>[...]]\n"
1149 "See the rrdupdate(1) manpage for details.\n"
1150 };
1152 char *help_stats[2] =
1153 {
1154 "Help for STATS\n"
1155 ,
1156 "Usage: STATS\n"
1157 "\n"
1158 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1159 "a description of the values.\n"
1160 };
1162 char *help_batch[2] =
1163 {
1164 "Help for BATCH\n"
1165 ,
1166 "The 'BATCH' command permits the client to initiate a bulk load\n"
1167 " of commands to rrdcached.\n"
1168 "\n"
1169 "Usage:\n"
1170 "\n"
1171 " client: BATCH\n"
1172 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1173 " client: command #1\n"
1174 " client: command #2\n"
1175 " client: ... and so on\n"
1176 " client: .\n"
1177 " server: 2 errors\n"
1178 " server: 7 message for command #7\n"
1179 " server: 9 message for command #9\n"
1180 "\n"
1181 "For more information, consult the rrdcached(1) documentation.\n"
1182 };
1184 char *help_quit[2] =
1185 {
1186 "Help for QUIT\n"
1187 ,
1188 "Disconnect from rrdcached.\n"
1189 };
1191 status = buffer_get_field (&buffer, &buffer_size, &command);
1192 if (status != 0)
1193 help_text = help_help;
1194 else
1195 {
1196 if (strcasecmp (command, "update") == 0)
1197 help_text = help_update;
1198 else if (strcasecmp (command, "flush") == 0)
1199 help_text = help_flush;
1200 else if (strcasecmp (command, "flushall") == 0)
1201 help_text = help_flushall;
1202 else if (strcasecmp (command, "pending") == 0)
1203 help_text = help_pending;
1204 else if (strcasecmp (command, "forget") == 0)
1205 help_text = help_forget;
1206 else if (strcasecmp (command, "queue") == 0)
1207 help_text = help_queue;
1208 else if (strcasecmp (command, "stats") == 0)
1209 help_text = help_stats;
1210 else if (strcasecmp (command, "batch") == 0)
1211 help_text = help_batch;
1212 else if (strcasecmp (command, "quit") == 0)
1213 help_text = help_quit;
1214 else
1215 help_text = help_help;
1216 }
1218 add_response_info(sock, help_text[1]);
1219 return send_response(sock, RESP_OK, help_text[0]);
1220 } /* }}} int handle_request_help */
1222 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1223 {
1224 uint64_t copy_queue_length;
1225 uint64_t copy_updates_received;
1226 uint64_t copy_flush_received;
1227 uint64_t copy_updates_written;
1228 uint64_t copy_data_sets_written;
1229 uint64_t copy_journal_bytes;
1230 uint64_t copy_journal_rotate;
1232 uint64_t tree_nodes_number;
1233 uint64_t tree_depth;
1235 pthread_mutex_lock (&stats_lock);
1236 copy_queue_length = stats_queue_length;
1237 copy_updates_received = stats_updates_received;
1238 copy_flush_received = stats_flush_received;
1239 copy_updates_written = stats_updates_written;
1240 copy_data_sets_written = stats_data_sets_written;
1241 copy_journal_bytes = stats_journal_bytes;
1242 copy_journal_rotate = stats_journal_rotate;
1243 pthread_mutex_unlock (&stats_lock);
1245 pthread_mutex_lock (&cache_lock);
1246 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1247 tree_depth = (uint64_t) g_tree_height (cache_tree);
1248 pthread_mutex_unlock (&cache_lock);
1250 add_response_info(sock,
1251 "QueueLength: %"PRIu64"\n", copy_queue_length);
1252 add_response_info(sock,
1253 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1254 add_response_info(sock,
1255 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1256 add_response_info(sock,
1257 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1258 add_response_info(sock,
1259 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1260 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1261 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1262 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1263 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1265 send_response(sock, RESP_OK, "Statistics follow\n");
1267 return (0);
1268 } /* }}} int handle_request_stats */
1270 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1271 char *buffer, size_t buffer_size)
1272 {
1273 char *file, file_tmp[PATH_MAX];
1274 int status;
1276 status = buffer_get_field (&buffer, &buffer_size, &file);
1277 if (status != 0)
1278 {
1279 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1280 }
1281 else
1282 {
1283 pthread_mutex_lock(&stats_lock);
1284 stats_flush_received++;
1285 pthread_mutex_unlock(&stats_lock);
1287 get_abs_path(&file, file_tmp);
1288 if (!check_file_access(file, sock)) return 0;
1290 status = flush_file (file);
1291 if (status == 0)
1292 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1293 else if (status == ENOENT)
1294 {
1295 /* no file in our tree; see whether it exists at all */
1296 struct stat statbuf;
1298 memset(&statbuf, 0, sizeof(statbuf));
1299 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1300 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1301 else
1302 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1303 }
1304 else if (status < 0)
1305 return send_response(sock, RESP_ERR, "Internal error.\n");
1306 else
1307 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1308 }
1310 /* NOTREACHED */
1311 assert(1==0);
1312 } /* }}} int handle_request_flush */
1314 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1315 {
1316 int status;
1318 status = has_privilege(sock, PRIV_HIGH);
1319 if (status <= 0)
1320 return status;
1322 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1324 pthread_mutex_lock(&cache_lock);
1325 flush_old_values(-1);
1326 pthread_mutex_unlock(&cache_lock);
1328 return send_response(sock, RESP_OK, "Started flush.\n");
1329 } /* }}} static int handle_request_flushall */
1331 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1332 char *buffer, size_t buffer_size)
1333 {
1334 int status;
1335 char *file, file_tmp[PATH_MAX];
1336 cache_item_t *ci;
1338 status = buffer_get_field(&buffer, &buffer_size, &file);
1339 if (status != 0)
1340 return send_response(sock, RESP_ERR,
1341 "Usage: PENDING <filename>\n");
1343 status = has_privilege(sock, PRIV_HIGH);
1344 if (status <= 0)
1345 return status;
1347 get_abs_path(&file, file_tmp);
1349 pthread_mutex_lock(&cache_lock);
1350 ci = g_tree_lookup(cache_tree, file);
1351 if (ci == NULL)
1352 {
1353 pthread_mutex_unlock(&cache_lock);
1354 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1355 }
1357 for (int i=0; i < ci->values_num; i++)
1358 add_response_info(sock, "%s\n", ci->values[i]);
1360 pthread_mutex_unlock(&cache_lock);
1361 return send_response(sock, RESP_OK, "updates pending\n");
1362 } /* }}} static int handle_request_pending */
1364 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1365 char *buffer, size_t buffer_size)
1366 {
1367 int status;
1368 gboolean found;
1369 char *file, file_tmp[PATH_MAX];
1371 status = buffer_get_field(&buffer, &buffer_size, &file);
1372 if (status != 0)
1373 return send_response(sock, RESP_ERR,
1374 "Usage: FORGET <filename>\n");
1376 status = has_privilege(sock, PRIV_HIGH);
1377 if (status <= 0)
1378 return status;
1380 get_abs_path(&file, file_tmp);
1381 if (!check_file_access(file, sock)) return 0;
1383 pthread_mutex_lock(&cache_lock);
1384 found = g_tree_remove(cache_tree, file);
1385 pthread_mutex_unlock(&cache_lock);
1387 if (found == TRUE)
1388 {
1389 if (sock != NULL)
1390 journal_write("forget", file);
1392 return send_response(sock, RESP_OK, "Gone!\n");
1393 }
1394 else
1395 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1397 /* NOTREACHED */
1398 assert(1==0);
1399 } /* }}} static int handle_request_forget */
1401 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1402 {
1403 cache_item_t *ci;
1405 pthread_mutex_lock(&cache_lock);
1407 ci = cache_queue_head;
1408 while (ci != NULL)
1409 {
1410 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1411 ci = ci->next;
1412 }
1414 pthread_mutex_unlock(&cache_lock);
1416 return send_response(sock, RESP_OK, "in queue.\n");
1417 } /* }}} int handle_request_queue */
1419 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1420 time_t now,
1421 char *buffer, size_t buffer_size)
1422 {
1423 char *file, file_tmp[PATH_MAX];
1424 int values_num = 0;
1425 int status;
1426 char orig_buf[CMD_MAX];
1428 cache_item_t *ci;
1430 status = has_privilege(sock, PRIV_HIGH);
1431 if (status <= 0)
1432 return status;
1434 /* save it for the journal later */
1435 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1437 status = buffer_get_field (&buffer, &buffer_size, &file);
1438 if (status != 0)
1439 return send_response(sock, RESP_ERR,
1440 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1442 pthread_mutex_lock(&stats_lock);
1443 stats_updates_received++;
1444 pthread_mutex_unlock(&stats_lock);
1446 get_abs_path(&file, file_tmp);
1447 if (!check_file_access(file, sock)) return 0;
1449 pthread_mutex_lock (&cache_lock);
1450 ci = g_tree_lookup (cache_tree, file);
1452 if (ci == NULL) /* {{{ */
1453 {
1454 struct stat statbuf;
1456 /* don't hold the lock while we setup; stat(2) might block */
1457 pthread_mutex_unlock(&cache_lock);
1459 memset (&statbuf, 0, sizeof (statbuf));
1460 status = stat (file, &statbuf);
1461 if (status != 0)
1462 {
1463 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1465 status = errno;
1466 if (status == ENOENT)
1467 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1468 else
1469 return send_response(sock, RESP_ERR,
1470 "stat failed with error %i.\n", status);
1471 }
1472 if (!S_ISREG (statbuf.st_mode))
1473 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1475 if (access(file, R_OK|W_OK) != 0)
1476 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1477 file, rrd_strerror(errno));
1479 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1480 if (ci == NULL)
1481 {
1482 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1484 return send_response(sock, RESP_ERR, "malloc failed.\n");
1485 }
1486 memset (ci, 0, sizeof (cache_item_t));
1488 ci->file = strdup (file);
1489 if (ci->file == NULL)
1490 {
1491 free (ci);
1492 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1494 return send_response(sock, RESP_ERR, "strdup failed.\n");
1495 }
1497 wipe_ci_values(ci, now);
1498 ci->flags = CI_FLAGS_IN_TREE;
1499 pthread_cond_init(&ci->flushed, NULL);
1501 pthread_mutex_lock(&cache_lock);
1502 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1503 } /* }}} */
1504 assert (ci != NULL);
1506 /* don't re-write updates in replay mode */
1507 if (sock != NULL)
1508 journal_write("update", orig_buf);
1510 while (buffer_size > 0)
1511 {
1512 char **temp;
1513 char *value;
1514 time_t stamp;
1515 char *eostamp;
1517 status = buffer_get_field (&buffer, &buffer_size, &value);
1518 if (status != 0)
1519 {
1520 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1521 break;
1522 }
1524 /* make sure update time is always moving forward */
1525 stamp = strtol(value, &eostamp, 10);
1526 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1527 {
1528 pthread_mutex_unlock(&cache_lock);
1529 return send_response(sock, RESP_ERR,
1530 "Cannot find timestamp in '%s'!\n", value);
1531 }
1532 else if (stamp <= ci->last_update_stamp)
1533 {
1534 pthread_mutex_unlock(&cache_lock);
1535 return send_response(sock, RESP_ERR,
1536 "illegal attempt to update using time %ld when last"
1537 " update time is %ld (minimum one second step)\n",
1538 stamp, ci->last_update_stamp);
1539 }
1540 else
1541 ci->last_update_stamp = stamp;
1543 temp = (char **) rrd_realloc (ci->values,
1544 sizeof (char *) * (ci->values_num + 1));
1545 if (temp == NULL)
1546 {
1547 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1548 continue;
1549 }
1550 ci->values = temp;
1552 ci->values[ci->values_num] = strdup (value);
1553 if (ci->values[ci->values_num] == NULL)
1554 {
1555 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1556 continue;
1557 }
1558 ci->values_num++;
1560 values_num++;
1561 }
1563 if (((now - ci->last_flush_time) >= config_write_interval)
1564 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1565 && (ci->values_num > 0))
1566 {
1567 enqueue_cache_item (ci, TAIL);
1568 }
1570 pthread_mutex_unlock (&cache_lock);
1572 if (values_num < 1)
1573 return send_response(sock, RESP_ERR, "No values updated.\n");
1574 else
1575 return send_response(sock, RESP_OK,
1576 "errors, enqueued %i value(s).\n", values_num);
1578 /* NOTREACHED */
1579 assert(1==0);
1581 } /* }}} int handle_request_update */
1583 /* we came across a "WROTE" entry during journal replay.
1584 * throw away any values that we have accumulated for this file
1585 */
1586 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1587 {
1588 int i;
1589 cache_item_t *ci;
1590 const char *file = buffer;
1592 pthread_mutex_lock(&cache_lock);
1594 ci = g_tree_lookup(cache_tree, file);
1595 if (ci == NULL)
1596 {
1597 pthread_mutex_unlock(&cache_lock);
1598 return (0);
1599 }
1601 if (ci->values)
1602 {
1603 for (i=0; i < ci->values_num; i++)
1604 free(ci->values[i]);
1606 free(ci->values);
1607 }
1609 wipe_ci_values(ci, now);
1610 remove_from_queue(ci);
1612 pthread_mutex_unlock(&cache_lock);
1613 return (0);
1614 } /* }}} int handle_request_wrote */
1616 /* start "BATCH" processing */
1617 static int batch_start (listen_socket_t *sock) /* {{{ */
1618 {
1619 int status;
1620 if (sock->batch_start)
1621 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1623 status = send_response(sock, RESP_OK,
1624 "Go ahead. End with dot '.' on its own line.\n");
1625 sock->batch_start = time(NULL);
1626 sock->batch_cmd = 0;
1628 return status;
1629 } /* }}} static int batch_start */
1631 /* finish "BATCH" processing and return results to the client */
1632 static int batch_done (listen_socket_t *sock) /* {{{ */
1633 {
1634 assert(sock->batch_start);
1635 sock->batch_start = 0;
1636 sock->batch_cmd = 0;
1637 return send_response(sock, RESP_OK, "errors\n");
1638 } /* }}} static int batch_done */
1640 /* if sock==NULL, we are in journal replay mode */
1641 static int handle_request (listen_socket_t *sock, /* {{{ */
1642 time_t now,
1643 char *buffer, size_t buffer_size)
1644 {
1645 char *buffer_ptr;
1646 char *command;
1647 int status;
1649 assert (buffer[buffer_size - 1] == '\0');
1651 buffer_ptr = buffer;
1652 command = NULL;
1653 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1654 if (status != 0)
1655 {
1656 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1657 return (-1);
1658 }
1660 if (sock != NULL && sock->batch_start)
1661 sock->batch_cmd++;
1663 if (strcasecmp (command, "update") == 0)
1664 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1665 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1666 {
1667 /* this is only valid in replay mode */
1668 return (handle_request_wrote (buffer_ptr, now));
1669 }
1670 else if (strcasecmp (command, "flush") == 0)
1671 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1672 else if (strcasecmp (command, "flushall") == 0)
1673 return (handle_request_flushall(sock));
1674 else if (strcasecmp (command, "pending") == 0)
1675 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1676 else if (strcasecmp (command, "forget") == 0)
1677 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1678 else if (strcasecmp (command, "queue") == 0)
1679 return (handle_request_queue(sock));
1680 else if (strcasecmp (command, "stats") == 0)
1681 return (handle_request_stats (sock));
1682 else if (strcasecmp (command, "help") == 0)
1683 return (handle_request_help (sock, buffer_ptr, buffer_size));
1684 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1685 return batch_start(sock);
1686 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1687 return batch_done(sock);
1688 else if (strcasecmp (command, "quit") == 0)
1689 return -1;
1690 else
1691 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1693 /* NOTREACHED */
1694 assert(1==0);
1695 } /* }}} int handle_request */
1697 /* MUST NOT hold journal_lock before calling this */
1698 static void journal_rotate(void) /* {{{ */
1699 {
1700 FILE *old_fh = NULL;
1701 int new_fd;
1703 if (journal_cur == NULL || journal_old == NULL)
1704 return;
1706 pthread_mutex_lock(&journal_lock);
1708 /* we rotate this way (rename before close) so that the we can release
1709 * the journal lock as fast as possible. Journal writes to the new
1710 * journal can proceed immediately after the new file is opened. The
1711 * fclose can then block without affecting new updates.
1712 */
1713 if (journal_fh != NULL)
1714 {
1715 old_fh = journal_fh;
1716 journal_fh = NULL;
1717 rename(journal_cur, journal_old);
1718 ++stats_journal_rotate;
1719 }
1721 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1722 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1723 if (new_fd >= 0)
1724 {
1725 journal_fh = fdopen(new_fd, "a");
1726 if (journal_fh == NULL)
1727 close(new_fd);
1728 }
1730 pthread_mutex_unlock(&journal_lock);
1732 if (old_fh != NULL)
1733 fclose(old_fh);
1735 if (journal_fh == NULL)
1736 {
1737 RRDD_LOG(LOG_CRIT,
1738 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1739 journal_cur, rrd_strerror(errno));
1741 RRDD_LOG(LOG_ERR,
1742 "JOURNALING DISABLED: All values will be flushed at shutdown");
1743 config_flush_at_shutdown = 1;
1744 }
1746 } /* }}} static void journal_rotate */
1748 static void journal_done(void) /* {{{ */
1749 {
1750 if (journal_cur == NULL)
1751 return;
1753 pthread_mutex_lock(&journal_lock);
1754 if (journal_fh != NULL)
1755 {
1756 fclose(journal_fh);
1757 journal_fh = NULL;
1758 }
1760 if (config_flush_at_shutdown)
1761 {
1762 RRDD_LOG(LOG_INFO, "removing journals");
1763 unlink(journal_old);
1764 unlink(journal_cur);
1765 }
1766 else
1767 {
1768 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1769 "journals will be used at next startup");
1770 }
1772 pthread_mutex_unlock(&journal_lock);
1774 } /* }}} static void journal_done */
1776 static int journal_write(char *cmd, char *args) /* {{{ */
1777 {
1778 int chars;
1780 if (journal_fh == NULL)
1781 return 0;
1783 pthread_mutex_lock(&journal_lock);
1784 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1785 pthread_mutex_unlock(&journal_lock);
1787 if (chars > 0)
1788 {
1789 pthread_mutex_lock(&stats_lock);
1790 stats_journal_bytes += chars;
1791 pthread_mutex_unlock(&stats_lock);
1792 }
1794 return chars;
1795 } /* }}} static int journal_write */
1797 static int journal_replay (const char *file) /* {{{ */
1798 {
1799 FILE *fh;
1800 int entry_cnt = 0;
1801 int fail_cnt = 0;
1802 uint64_t line = 0;
1803 char entry[CMD_MAX];
1804 time_t now;
1806 if (file == NULL) return 0;
1808 {
1809 char *reason = "unknown error";
1810 int status = 0;
1811 struct stat statbuf;
1813 memset(&statbuf, 0, sizeof(statbuf));
1814 if (stat(file, &statbuf) != 0)
1815 {
1816 if (errno == ENOENT)
1817 return 0;
1819 reason = "stat error";
1820 status = errno;
1821 }
1822 else if (!S_ISREG(statbuf.st_mode))
1823 {
1824 reason = "not a regular file";
1825 status = EPERM;
1826 }
1827 if (statbuf.st_uid != daemon_uid)
1828 {
1829 reason = "not owned by daemon user";
1830 status = EACCES;
1831 }
1832 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1833 {
1834 reason = "must not be user/group writable";
1835 status = EACCES;
1836 }
1838 if (status != 0)
1839 {
1840 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1841 file, rrd_strerror(status), reason);
1842 return 0;
1843 }
1844 }
1846 fh = fopen(file, "r");
1847 if (fh == NULL)
1848 {
1849 if (errno != ENOENT)
1850 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1851 file, rrd_strerror(errno));
1852 return 0;
1853 }
1854 else
1855 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1857 now = time(NULL);
1859 while(!feof(fh))
1860 {
1861 size_t entry_len;
1863 ++line;
1864 if (fgets(entry, sizeof(entry), fh) == NULL)
1865 break;
1866 entry_len = strlen(entry);
1868 /* check \n termination in case journal writing crashed mid-line */
1869 if (entry_len == 0)
1870 continue;
1871 else if (entry[entry_len - 1] != '\n')
1872 {
1873 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1874 ++fail_cnt;
1875 continue;
1876 }
1878 entry[entry_len - 1] = '\0';
1880 if (handle_request(NULL, now, entry, entry_len) == 0)
1881 ++entry_cnt;
1882 else
1883 ++fail_cnt;
1884 }
1886 fclose(fh);
1888 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1889 entry_cnt, fail_cnt);
1891 return entry_cnt > 0 ? 1 : 0;
1892 } /* }}} static int journal_replay */
1894 static void journal_init(void) /* {{{ */
1895 {
1896 int had_journal = 0;
1898 if (journal_cur == NULL) return;
1900 pthread_mutex_lock(&journal_lock);
1902 RRDD_LOG(LOG_INFO, "checking for journal files");
1904 had_journal += journal_replay(journal_old);
1905 had_journal += journal_replay(journal_cur);
1907 /* it must have been a crash. start a flush */
1908 if (had_journal && config_flush_at_shutdown)
1909 flush_old_values(-1);
1911 pthread_mutex_unlock(&journal_lock);
1912 journal_rotate();
1914 RRDD_LOG(LOG_INFO, "journal processing complete");
1916 } /* }}} static void journal_init */
1918 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1919 {
1920 assert(sock != NULL);
1922 free(sock->rbuf); sock->rbuf = NULL;
1923 free(sock->wbuf); sock->wbuf = NULL;
1924 free(sock);
1925 } /* }}} void free_listen_socket */
1927 static void close_connection(listen_socket_t *sock) /* {{{ */
1928 {
1929 if (sock->fd >= 0)
1930 {
1931 close(sock->fd);
1932 sock->fd = -1;
1933 }
1935 free_listen_socket(sock);
1937 } /* }}} void close_connection */
1939 static void *connection_thread_main (void *args) /* {{{ */
1940 {
1941 listen_socket_t *sock;
1942 int i;
1943 int fd;
1945 sock = (listen_socket_t *) args;
1946 fd = sock->fd;
1948 /* init read buffers */
1949 sock->next_read = sock->next_cmd = 0;
1950 sock->rbuf = malloc(RBUF_SIZE);
1951 if (sock->rbuf == NULL)
1952 {
1953 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1954 close_connection(sock);
1955 return NULL;
1956 }
1958 pthread_mutex_lock (&connection_threads_lock);
1959 {
1960 pthread_t *temp;
1962 temp = (pthread_t *) rrd_realloc (connection_threads,
1963 sizeof (pthread_t) * (connection_threads_num + 1));
1964 if (temp == NULL)
1965 {
1966 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1967 }
1968 else
1969 {
1970 connection_threads = temp;
1971 connection_threads[connection_threads_num] = pthread_self ();
1972 connection_threads_num++;
1973 }
1974 }
1975 pthread_mutex_unlock (&connection_threads_lock);
1977 while (do_shutdown == 0)
1978 {
1979 char *cmd;
1980 ssize_t cmd_len;
1981 ssize_t rbytes;
1982 time_t now;
1984 struct pollfd pollfd;
1985 int status;
1987 pollfd.fd = fd;
1988 pollfd.events = POLLIN | POLLPRI;
1989 pollfd.revents = 0;
1991 status = poll (&pollfd, 1, /* timeout = */ 500);
1992 if (do_shutdown)
1993 break;
1994 else if (status == 0) /* timeout */
1995 continue;
1996 else if (status < 0) /* error */
1997 {
1998 status = errno;
1999 if (status != EINTR)
2000 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2001 continue;
2002 }
2004 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2005 break;
2006 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2007 {
2008 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2009 "poll(2) returned something unexpected: %#04hx",
2010 pollfd.revents);
2011 break;
2012 }
2014 rbytes = read(fd, sock->rbuf + sock->next_read,
2015 RBUF_SIZE - sock->next_read);
2016 if (rbytes < 0)
2017 {
2018 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2019 break;
2020 }
2021 else if (rbytes == 0)
2022 break; /* eof */
2024 sock->next_read += rbytes;
2026 if (sock->batch_start)
2027 now = sock->batch_start;
2028 else
2029 now = time(NULL);
2031 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2032 {
2033 status = handle_request (sock, now, cmd, cmd_len+1);
2034 if (status != 0)
2035 goto out_close;
2036 }
2037 }
2039 out_close:
2040 close_connection(sock);
2042 /* Remove this thread from the connection threads list */
2043 pthread_mutex_lock (&connection_threads_lock);
2044 {
2045 pthread_t self;
2046 pthread_t *temp;
2048 /* Find out own index in the array */
2049 self = pthread_self ();
2050 for (i = 0; i < connection_threads_num; i++)
2051 if (pthread_equal (connection_threads[i], self) != 0)
2052 break;
2053 assert (i < connection_threads_num);
2055 /* Move the trailing threads forward. */
2056 if (i < (connection_threads_num - 1))
2057 {
2058 memmove (connection_threads + i,
2059 connection_threads + i + 1,
2060 sizeof (pthread_t) * (connection_threads_num - i - 1));
2061 }
2063 connection_threads_num--;
2065 temp = rrd_realloc(connection_threads,
2066 sizeof(*connection_threads) * connection_threads_num);
2067 if (connection_threads_num > 0 && temp == NULL)
2068 RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2069 else
2070 connection_threads = temp;
2071 }
2072 pthread_mutex_unlock (&connection_threads_lock);
2074 return (NULL);
2075 } /* }}} void *connection_thread_main */
2077 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2078 {
2079 int fd;
2080 struct sockaddr_un sa;
2081 listen_socket_t *temp;
2082 int status;
2083 const char *path;
2085 path = sock->addr;
2086 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2087 path += strlen("unix:");
2089 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2090 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2091 if (temp == NULL)
2092 {
2093 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2094 return (-1);
2095 }
2096 listen_fds = temp;
2097 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2099 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2100 if (fd < 0)
2101 {
2102 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2103 rrd_strerror(errno));
2104 return (-1);
2105 }
2107 memset (&sa, 0, sizeof (sa));
2108 sa.sun_family = AF_UNIX;
2109 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2111 /* if we've gotten this far, we own the pid file. any daemon started
2112 * with the same args must not be alive. therefore, ensure that we can
2113 * create the socket...
2114 */
2115 unlink(path);
2117 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2118 if (status != 0)
2119 {
2120 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2121 path, rrd_strerror(errno));
2122 close (fd);
2123 return (-1);
2124 }
2126 status = listen (fd, /* backlog = */ 10);
2127 if (status != 0)
2128 {
2129 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2130 path, rrd_strerror(errno));
2131 close (fd);
2132 unlink (path);
2133 return (-1);
2134 }
2136 listen_fds[listen_fds_num].fd = fd;
2137 listen_fds[listen_fds_num].family = PF_UNIX;
2138 strncpy(listen_fds[listen_fds_num].addr, path,
2139 sizeof (listen_fds[listen_fds_num].addr) - 1);
2140 listen_fds_num++;
2142 return (0);
2143 } /* }}} int open_listen_socket_unix */
2145 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2146 {
2147 struct addrinfo ai_hints;
2148 struct addrinfo *ai_res;
2149 struct addrinfo *ai_ptr;
2150 char addr_copy[NI_MAXHOST];
2151 char *addr;
2152 char *port;
2153 int status;
2155 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2156 addr_copy[sizeof (addr_copy) - 1] = 0;
2157 addr = addr_copy;
2159 memset (&ai_hints, 0, sizeof (ai_hints));
2160 ai_hints.ai_flags = 0;
2161 #ifdef AI_ADDRCONFIG
2162 ai_hints.ai_flags |= AI_ADDRCONFIG;
2163 #endif
2164 ai_hints.ai_family = AF_UNSPEC;
2165 ai_hints.ai_socktype = SOCK_STREAM;
2167 port = NULL;
2168 if (*addr == '[') /* IPv6+port format */
2169 {
2170 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2171 addr++;
2173 port = strchr (addr, ']');
2174 if (port == NULL)
2175 {
2176 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2177 return (-1);
2178 }
2179 *port = 0;
2180 port++;
2182 if (*port == ':')
2183 port++;
2184 else if (*port == 0)
2185 port = NULL;
2186 else
2187 {
2188 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2189 return (-1);
2190 }
2191 } /* if (*addr = ']') */
2192 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2193 {
2194 port = rindex(addr, ':');
2195 if (port != NULL)
2196 {
2197 *port = 0;
2198 port++;
2199 }
2200 }
2201 ai_res = NULL;
2202 status = getaddrinfo (addr,
2203 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2204 &ai_hints, &ai_res);
2205 if (status != 0)
2206 {
2207 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2208 addr, gai_strerror (status));
2209 return (-1);
2210 }
2212 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2213 {
2214 int fd;
2215 listen_socket_t *temp;
2216 int one = 1;
2218 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2219 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2220 if (temp == NULL)
2221 {
2222 fprintf (stderr,
2223 "rrdcached: open_listen_socket_network: realloc failed.\n");
2224 continue;
2225 }
2226 listen_fds = temp;
2227 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2229 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2230 if (fd < 0)
2231 {
2232 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2233 rrd_strerror(errno));
2234 continue;
2235 }
2237 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2239 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2240 if (status != 0)
2241 {
2242 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2243 sock->addr, rrd_strerror(errno));
2244 close (fd);
2245 continue;
2246 }
2248 status = listen (fd, /* backlog = */ 10);
2249 if (status != 0)
2250 {
2251 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2252 sock->addr, rrd_strerror(errno));
2253 close (fd);
2254 freeaddrinfo(ai_res);
2255 return (-1);
2256 }
2258 listen_fds[listen_fds_num].fd = fd;
2259 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2260 listen_fds_num++;
2261 } /* for (ai_ptr) */
2263 freeaddrinfo(ai_res);
2264 return (0);
2265 } /* }}} static int open_listen_socket_network */
2267 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2268 {
2269 assert(sock != NULL);
2270 assert(sock->addr != NULL);
2272 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2273 || sock->addr[0] == '/')
2274 return (open_listen_socket_unix(sock));
2275 else
2276 return (open_listen_socket_network(sock));
2277 } /* }}} int open_listen_socket */
2279 static int close_listen_sockets (void) /* {{{ */
2280 {
2281 size_t i;
2283 for (i = 0; i < listen_fds_num; i++)
2284 {
2285 close (listen_fds[i].fd);
2287 if (listen_fds[i].family == PF_UNIX)
2288 unlink(listen_fds[i].addr);
2289 }
2291 free (listen_fds);
2292 listen_fds = NULL;
2293 listen_fds_num = 0;
2295 return (0);
2296 } /* }}} int close_listen_sockets */
2298 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2299 {
2300 struct pollfd *pollfds;
2301 int pollfds_num;
2302 int status;
2303 int i;
2305 if (listen_fds_num < 1)
2306 {
2307 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2308 return (NULL);
2309 }
2311 pollfds_num = listen_fds_num;
2312 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2313 if (pollfds == NULL)
2314 {
2315 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2316 return (NULL);
2317 }
2318 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2320 RRDD_LOG(LOG_INFO, "listening for connections");
2322 while (do_shutdown == 0)
2323 {
2324 for (i = 0; i < pollfds_num; i++)
2325 {
2326 pollfds[i].fd = listen_fds[i].fd;
2327 pollfds[i].events = POLLIN | POLLPRI;
2328 pollfds[i].revents = 0;
2329 }
2331 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2332 if (do_shutdown)
2333 break;
2334 else if (status == 0) /* timeout */
2335 continue;
2336 else if (status < 0) /* error */
2337 {
2338 status = errno;
2339 if (status != EINTR)
2340 {
2341 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2342 }
2343 continue;
2344 }
2346 for (i = 0; i < pollfds_num; i++)
2347 {
2348 listen_socket_t *client_sock;
2349 struct sockaddr_storage client_sa;
2350 socklen_t client_sa_size;
2351 pthread_t tid;
2352 pthread_attr_t attr;
2354 if (pollfds[i].revents == 0)
2355 continue;
2357 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2358 {
2359 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2360 "poll(2) returned something unexpected for listen FD #%i.",
2361 pollfds[i].fd);
2362 continue;
2363 }
2365 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2366 if (client_sock == NULL)
2367 {
2368 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2369 continue;
2370 }
2371 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2373 client_sa_size = sizeof (client_sa);
2374 client_sock->fd = accept (pollfds[i].fd,
2375 (struct sockaddr *) &client_sa, &client_sa_size);
2376 if (client_sock->fd < 0)
2377 {
2378 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2379 free(client_sock);
2380 continue;
2381 }
2383 pthread_attr_init (&attr);
2384 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2386 status = pthread_create (&tid, &attr, connection_thread_main,
2387 client_sock);
2388 if (status != 0)
2389 {
2390 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2391 close_connection(client_sock);
2392 continue;
2393 }
2394 } /* for (pollfds_num) */
2395 } /* while (do_shutdown == 0) */
2397 RRDD_LOG(LOG_INFO, "starting shutdown");
2399 close_listen_sockets ();
2401 pthread_mutex_lock (&connection_threads_lock);
2402 while (connection_threads_num > 0)
2403 {
2404 pthread_t wait_for;
2406 wait_for = connection_threads[0];
2408 pthread_mutex_unlock (&connection_threads_lock);
2409 pthread_join (wait_for, /* retval = */ NULL);
2410 pthread_mutex_lock (&connection_threads_lock);
2411 }
2412 pthread_mutex_unlock (&connection_threads_lock);
2414 free(pollfds);
2416 return (NULL);
2417 } /* }}} void *listen_thread_main */
2419 static int daemonize (void) /* {{{ */
2420 {
2421 int pid_fd;
2422 char *base_dir;
2424 daemon_uid = geteuid();
2426 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2427 if (pid_fd < 0)
2428 pid_fd = check_pidfile();
2429 if (pid_fd < 0)
2430 return pid_fd;
2432 /* open all the listen sockets */
2433 if (config_listen_address_list_len > 0)
2434 {
2435 for (int i = 0; i < config_listen_address_list_len; i++)
2436 {
2437 open_listen_socket (config_listen_address_list[i]);
2438 free_listen_socket (config_listen_address_list[i]);
2439 }
2441 free(config_listen_address_list);
2442 }
2443 else
2444 {
2445 listen_socket_t sock;
2446 memset(&sock, 0, sizeof(sock));
2447 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2448 open_listen_socket (&sock);
2449 }
2451 if (listen_fds_num < 1)
2452 {
2453 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2454 goto error;
2455 }
2457 if (!stay_foreground)
2458 {
2459 pid_t child;
2461 child = fork ();
2462 if (child < 0)
2463 {
2464 fprintf (stderr, "daemonize: fork(2) failed.\n");
2465 goto error;
2466 }
2467 else if (child > 0)
2468 exit(0);
2470 /* Become session leader */
2471 setsid ();
2473 /* Open the first three file descriptors to /dev/null */
2474 close (2);
2475 close (1);
2476 close (0);
2478 open ("/dev/null", O_RDWR);
2479 dup (0);
2480 dup (0);
2481 } /* if (!stay_foreground) */
2483 /* Change into the /tmp directory. */
2484 base_dir = (config_base_dir != NULL)
2485 ? config_base_dir
2486 : "/tmp";
2488 if (chdir (base_dir) != 0)
2489 {
2490 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2491 goto error;
2492 }
2494 install_signal_handlers();
2496 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2497 RRDD_LOG(LOG_INFO, "starting up");
2499 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2500 (GDestroyNotify) free_cache_item);
2501 if (cache_tree == NULL)
2502 {
2503 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2504 goto error;
2505 }
2507 return write_pidfile (pid_fd);
2509 error:
2510 remove_pidfile();
2511 return -1;
2512 } /* }}} int daemonize */
2514 static int cleanup (void) /* {{{ */
2515 {
2516 do_shutdown++;
2518 pthread_cond_signal (&cache_cond);
2519 pthread_join (queue_thread, /* return = */ NULL);
2521 remove_pidfile ();
2523 free(config_base_dir);
2524 free(config_pid_file);
2525 free(journal_cur);
2526 free(journal_old);
2528 pthread_mutex_lock(&cache_lock);
2529 g_tree_destroy(cache_tree);
2531 RRDD_LOG(LOG_INFO, "goodbye");
2532 closelog ();
2534 return (0);
2535 } /* }}} int cleanup */
2537 static int read_options (int argc, char **argv) /* {{{ */
2538 {
2539 int option;
2540 int status = 0;
2542 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2543 {
2544 switch (option)
2545 {
2546 case 'g':
2547 stay_foreground=1;
2548 break;
2550 case 'L':
2551 case 'l':
2552 {
2553 listen_socket_t **temp;
2554 listen_socket_t *new;
2556 new = malloc(sizeof(listen_socket_t));
2557 if (new == NULL)
2558 {
2559 fprintf(stderr, "read_options: malloc failed.\n");
2560 return(2);
2561 }
2562 memset(new, 0, sizeof(listen_socket_t));
2564 temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2565 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2566 if (temp == NULL)
2567 {
2568 fprintf (stderr, "read_options: realloc failed.\n");
2569 return (2);
2570 }
2571 config_listen_address_list = temp;
2573 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2574 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2576 temp[config_listen_address_list_len] = new;
2577 config_listen_address_list_len++;
2578 }
2579 break;
2581 case 'f':
2582 {
2583 int temp;
2585 temp = atoi (optarg);
2586 if (temp > 0)
2587 config_flush_interval = temp;
2588 else
2589 {
2590 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2591 status = 3;
2592 }
2593 }
2594 break;
2596 case 'w':
2597 {
2598 int temp;
2600 temp = atoi (optarg);
2601 if (temp > 0)
2602 config_write_interval = temp;
2603 else
2604 {
2605 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2606 status = 2;
2607 }
2608 }
2609 break;
2611 case 'z':
2612 {
2613 int temp;
2615 temp = atoi(optarg);
2616 if (temp > 0)
2617 config_write_jitter = temp;
2618 else
2619 {
2620 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2621 status = 2;
2622 }
2624 break;
2625 }
2627 case 'B':
2628 config_write_base_only = 1;
2629 break;
2631 case 'b':
2632 {
2633 size_t len;
2634 char base_realpath[PATH_MAX];
2636 if (config_base_dir != NULL)
2637 free (config_base_dir);
2638 config_base_dir = strdup (optarg);
2639 if (config_base_dir == NULL)
2640 {
2641 fprintf (stderr, "read_options: strdup failed.\n");
2642 return (3);
2643 }
2645 /* make sure that the base directory is not resolved via
2646 * symbolic links. this makes some performance-enhancing
2647 * assumptions possible (we don't have to resolve paths
2648 * that start with a "/")
2649 */
2650 if (realpath(config_base_dir, base_realpath) == NULL)
2651 {
2652 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2653 return 5;
2654 }
2655 else if (strncmp(config_base_dir,
2656 base_realpath, sizeof(base_realpath)) != 0)
2657 {
2658 fprintf(stderr,
2659 "Base directory (-b) resolved via file system links!\n"
2660 "Please consult rrdcached '-b' documentation!\n"
2661 "Consider specifying the real directory (%s)\n",
2662 base_realpath);
2663 return 5;
2664 }
2666 len = strlen (config_base_dir);
2667 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2668 {
2669 config_base_dir[len - 1] = 0;
2670 len--;
2671 }
2673 if (len < 1)
2674 {
2675 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2676 return (4);
2677 }
2679 _config_base_dir_len = len;
2680 }
2681 break;
2683 case 'p':
2684 {
2685 if (config_pid_file != NULL)
2686 free (config_pid_file);
2687 config_pid_file = strdup (optarg);
2688 if (config_pid_file == NULL)
2689 {
2690 fprintf (stderr, "read_options: strdup failed.\n");
2691 return (3);
2692 }
2693 }
2694 break;
2696 case 'F':
2697 config_flush_at_shutdown = 1;
2698 break;
2700 case 'j':
2701 {
2702 struct stat statbuf;
2703 const char *dir = optarg;
2705 status = stat(dir, &statbuf);
2706 if (status != 0)
2707 {
2708 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2709 return 6;
2710 }
2712 if (!S_ISDIR(statbuf.st_mode)
2713 || access(dir, R_OK|W_OK|X_OK) != 0)
2714 {
2715 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2716 errno ? rrd_strerror(errno) : "");
2717 return 6;
2718 }
2720 journal_cur = malloc(PATH_MAX + 1);
2721 journal_old = malloc(PATH_MAX + 1);
2722 if (journal_cur == NULL || journal_old == NULL)
2723 {
2724 fprintf(stderr, "malloc failure for journal files\n");
2725 return 6;
2726 }
2727 else
2728 {
2729 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2730 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2731 }
2732 }
2733 break;
2735 case 'h':
2736 case '?':
2737 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2738 "\n"
2739 "Usage: rrdcached [options]\n"
2740 "\n"
2741 "Valid options are:\n"
2742 " -l <address> Socket address to listen to.\n"
2743 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2744 " -w <seconds> Interval in which to write data.\n"
2745 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2746 " -f <seconds> Interval in which to flush dead data.\n"
2747 " -p <file> Location of the PID-file.\n"
2748 " -b <dir> Base directory to change to.\n"
2749 " -B Restrict file access to paths within -b <dir>\n"
2750 " -g Do not fork and run in the foreground.\n"
2751 " -j <dir> Directory in which to create the journal files.\n"
2752 " -F Always flush all updates at shutdown\n"
2753 "\n"
2754 "For more information and a detailed description of all options "
2755 "please refer\n"
2756 "to the rrdcached(1) manual page.\n",
2757 VERSION);
2758 status = -1;
2759 break;
2760 } /* switch (option) */
2761 } /* while (getopt) */
2763 /* advise the user when values are not sane */
2764 if (config_flush_interval < 2 * config_write_interval)
2765 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2766 " 2x write interval (-w) !\n");
2767 if (config_write_jitter > config_write_interval)
2768 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2769 " write interval (-w) !\n");
2771 if (config_write_base_only && config_base_dir == NULL)
2772 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2773 " Consult the rrdcached documentation\n");
2775 if (journal_cur == NULL)
2776 config_flush_at_shutdown = 1;
2778 return (status);
2779 } /* }}} int read_options */
2781 int main (int argc, char **argv)
2782 {
2783 int status;
2785 status = read_options (argc, argv);
2786 if (status != 0)
2787 {
2788 if (status < 0)
2789 status = 0;
2790 return (status);
2791 }
2793 status = daemonize ();
2794 if (status != 0)
2795 {
2796 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2797 return (1);
2798 }
2800 journal_init();
2802 /* start the queue thread */
2803 memset (&queue_thread, 0, sizeof (queue_thread));
2804 status = pthread_create (&queue_thread,
2805 NULL, /* attr */
2806 queue_thread_main,
2807 NULL); /* args */
2808 if (status != 0)
2809 {
2810 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2811 cleanup();
2812 return (1);
2813 }
2815 listen_thread_main (NULL);
2816 cleanup ();
2818 return (0);
2819 } /* int main */
2821 /*
2822 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2823 */