68216cb5ee4525f92c01809b5f512c5285b31259
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
101 #include <glib-2.0/glib.h>
102 /* }}} */
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
110 /*
111 * Types
112 */
113 typedef enum
114 {
115 PRIV_LOW,
116 PRIV_HIGH
117 } socket_privilege;
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
121 struct listen_socket_s
122 {
123 int fd;
124 char addr[PATH_MAX + 1];
125 int family;
126 socket_privilege privilege;
128 /* state for BATCH processing */
129 time_t batch_start;
130 int batch_cmd;
132 /* buffered IO */
133 char *rbuf;
134 off_t next_cmd;
135 off_t next_read;
137 char *wbuf;
138 ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146 char *file;
147 char **values;
148 int values_num;
149 time_t last_flush_time;
150 time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153 int flags;
154 pthread_cond_t flushed;
155 cache_item_t *prev;
156 cache_item_t *next;
157 };
159 struct callback_flush_data_s
160 {
161 time_t now;
162 time_t abs_timeout;
163 char **keys;
164 size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
168 enum queue_side_e
169 {
170 HEAD,
171 TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
179 /*
180 * Variables
181 */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
188 static int do_shutdown = 0;
190 static pthread_t *queue_threads;
191 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
192 static int config_queue_threads = 4;
194 static pthread_t flush_thread;
195 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
197 static pthread_t *connection_threads = NULL;
198 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
199 static int connection_threads_num = 0;
201 /* Cache stuff */
202 static GTree *cache_tree = NULL;
203 static cache_item_t *cache_queue_head = NULL;
204 static cache_item_t *cache_queue_tail = NULL;
205 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
207 static int config_write_interval = 300;
208 static int config_write_jitter = 0;
209 static int config_flush_interval = 3600;
210 static int config_flush_at_shutdown = 0;
211 static char *config_pid_file = NULL;
212 static char *config_base_dir = NULL;
213 static size_t _config_base_dir_len = 0;
214 static int config_write_base_only = 0;
216 static listen_socket_t **config_listen_address_list = NULL;
217 static int config_listen_address_list_len = 0;
219 static uint64_t stats_queue_length = 0;
220 static uint64_t stats_updates_received = 0;
221 static uint64_t stats_flush_received = 0;
222 static uint64_t stats_updates_written = 0;
223 static uint64_t stats_data_sets_written = 0;
224 static uint64_t stats_journal_bytes = 0;
225 static uint64_t stats_journal_rotate = 0;
226 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
228 /* Journaled updates */
229 static char *journal_cur = NULL;
230 static char *journal_old = NULL;
231 static FILE *journal_fh = NULL;
232 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
233 static int journal_write(char *cmd, char *args);
234 static void journal_done(void);
235 static void journal_rotate(void);
237 /*
238 * Functions
239 */
240 static void sig_common (const char *sig) /* {{{ */
241 {
242 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
243 do_shutdown++;
244 pthread_cond_broadcast(&flush_cond);
245 pthread_cond_broadcast(&queue_cond);
246 } /* }}} void sig_common */
248 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
249 {
250 sig_common("INT");
251 } /* }}} void sig_int_handler */
253 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
254 {
255 sig_common("TERM");
256 } /* }}} void sig_term_handler */
258 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
259 {
260 config_flush_at_shutdown = 1;
261 sig_common("USR1");
262 } /* }}} void sig_usr1_handler */
264 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
265 {
266 config_flush_at_shutdown = 0;
267 sig_common("USR2");
268 } /* }}} void sig_usr2_handler */
270 static void install_signal_handlers(void) /* {{{ */
271 {
272 /* These structures are static, because `sigaction' behaves weird if the are
273 * overwritten.. */
274 static struct sigaction sa_int;
275 static struct sigaction sa_term;
276 static struct sigaction sa_pipe;
277 static struct sigaction sa_usr1;
278 static struct sigaction sa_usr2;
280 /* Install signal handlers */
281 memset (&sa_int, 0, sizeof (sa_int));
282 sa_int.sa_handler = sig_int_handler;
283 sigaction (SIGINT, &sa_int, NULL);
285 memset (&sa_term, 0, sizeof (sa_term));
286 sa_term.sa_handler = sig_term_handler;
287 sigaction (SIGTERM, &sa_term, NULL);
289 memset (&sa_pipe, 0, sizeof (sa_pipe));
290 sa_pipe.sa_handler = SIG_IGN;
291 sigaction (SIGPIPE, &sa_pipe, NULL);
293 memset (&sa_pipe, 0, sizeof (sa_usr1));
294 sa_usr1.sa_handler = sig_usr1_handler;
295 sigaction (SIGUSR1, &sa_usr1, NULL);
297 memset (&sa_usr2, 0, sizeof (sa_usr2));
298 sa_usr2.sa_handler = sig_usr2_handler;
299 sigaction (SIGUSR2, &sa_usr2, NULL);
301 } /* }}} void install_signal_handlers */
303 static int open_pidfile(char *action, int oflag) /* {{{ */
304 {
305 int fd;
306 char *file;
308 file = (config_pid_file != NULL)
309 ? config_pid_file
310 : LOCALSTATEDIR "/run/rrdcached.pid";
312 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
313 if (fd < 0)
314 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
315 action, file, rrd_strerror(errno));
317 return(fd);
318 } /* }}} static int open_pidfile */
320 /* check existing pid file to see whether a daemon is running */
321 static int check_pidfile(void)
322 {
323 int pid_fd;
324 pid_t pid;
325 char pid_str[16];
327 pid_fd = open_pidfile("open", O_RDWR);
328 if (pid_fd < 0)
329 return pid_fd;
331 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
332 return -1;
334 pid = atoi(pid_str);
335 if (pid <= 0)
336 return -1;
338 /* another running process that we can signal COULD be
339 * a competing rrdcached */
340 if (pid != getpid() && kill(pid, 0) == 0)
341 {
342 fprintf(stderr,
343 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
344 close(pid_fd);
345 return -1;
346 }
348 lseek(pid_fd, 0, SEEK_SET);
349 ftruncate(pid_fd, 0);
351 fprintf(stderr,
352 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
353 "rrdcached: starting normally.\n", pid);
355 return pid_fd;
356 } /* }}} static int check_pidfile */
358 static int write_pidfile (int fd) /* {{{ */
359 {
360 pid_t pid;
361 FILE *fh;
363 pid = getpid ();
365 fh = fdopen (fd, "w");
366 if (fh == NULL)
367 {
368 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
369 close(fd);
370 return (-1);
371 }
373 fprintf (fh, "%i\n", (int) pid);
374 fclose (fh);
376 return (0);
377 } /* }}} int write_pidfile */
379 static int remove_pidfile (void) /* {{{ */
380 {
381 char *file;
382 int status;
384 file = (config_pid_file != NULL)
385 ? config_pid_file
386 : LOCALSTATEDIR "/run/rrdcached.pid";
388 status = unlink (file);
389 if (status == 0)
390 return (0);
391 return (errno);
392 } /* }}} int remove_pidfile */
394 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
395 {
396 char *eol;
398 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
399 sock->next_read - sock->next_cmd);
401 if (eol == NULL)
402 {
403 /* no commands left, move remainder back to front of rbuf */
404 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
405 sock->next_read - sock->next_cmd);
406 sock->next_read -= sock->next_cmd;
407 sock->next_cmd = 0;
408 *len = 0;
409 return NULL;
410 }
411 else
412 {
413 char *cmd = sock->rbuf + sock->next_cmd;
414 *eol = '\0';
416 sock->next_cmd = eol - sock->rbuf + 1;
418 if (eol > sock->rbuf && *(eol-1) == '\r')
419 *(--eol) = '\0'; /* handle "\r\n" EOL */
421 *len = eol - cmd;
423 return cmd;
424 }
426 /* NOTREACHED */
427 assert(1==0);
428 }
430 /* add the characters directly to the write buffer */
431 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
432 {
433 char *new_buf;
435 assert(sock != NULL);
437 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
438 if (new_buf == NULL)
439 {
440 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
441 return -1;
442 }
444 strncpy(new_buf + sock->wbuf_len, str, len + 1);
446 sock->wbuf = new_buf;
447 sock->wbuf_len += len;
449 return 0;
450 } /* }}} static int add_to_wbuf */
452 /* add the text to the "extra" info that's sent after the status line */
453 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
454 {
455 va_list argp;
456 char buffer[CMD_MAX];
457 int len;
459 if (sock == NULL) return 0; /* journal replay mode */
460 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
462 va_start(argp, fmt);
463 #ifdef HAVE_VSNPRINTF
464 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
465 #else
466 len = vsprintf(buffer, fmt, argp);
467 #endif
468 va_end(argp);
469 if (len < 0)
470 {
471 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
472 return -1;
473 }
475 return add_to_wbuf(sock, buffer, len);
476 } /* }}} static int add_response_info */
478 static int count_lines(char *str) /* {{{ */
479 {
480 int lines = 0;
482 if (str != NULL)
483 {
484 while ((str = strchr(str, '\n')) != NULL)
485 {
486 ++lines;
487 ++str;
488 }
489 }
491 return lines;
492 } /* }}} static int count_lines */
494 /* send the response back to the user.
495 * returns 0 on success, -1 on error
496 * write buffer is always zeroed after this call */
497 static int send_response (listen_socket_t *sock, response_code rc,
498 char *fmt, ...) /* {{{ */
499 {
500 va_list argp;
501 char buffer[CMD_MAX];
502 int lines;
503 ssize_t wrote;
504 int rclen, len;
506 if (sock == NULL) return rc; /* journal replay mode */
508 if (sock->batch_start)
509 {
510 if (rc == RESP_OK)
511 return rc; /* no response on success during BATCH */
512 lines = sock->batch_cmd;
513 }
514 else if (rc == RESP_OK)
515 lines = count_lines(sock->wbuf);
516 else
517 lines = -1;
519 rclen = sprintf(buffer, "%d ", lines);
520 va_start(argp, fmt);
521 #ifdef HAVE_VSNPRINTF
522 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
523 #else
524 len = vsprintf(buffer+rclen, fmt, argp);
525 #endif
526 va_end(argp);
527 if (len < 0)
528 return -1;
530 len += rclen;
532 /* append the result to the wbuf, don't write to the user */
533 if (sock->batch_start)
534 return add_to_wbuf(sock, buffer, len);
536 /* first write must be complete */
537 if (len != write(sock->fd, buffer, len))
538 {
539 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
540 return -1;
541 }
543 if (sock->wbuf != NULL && rc == RESP_OK)
544 {
545 wrote = 0;
546 while (wrote < sock->wbuf_len)
547 {
548 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
549 if (wb <= 0)
550 {
551 RRDD_LOG(LOG_INFO, "send_response: could not write results");
552 return -1;
553 }
554 wrote += wb;
555 }
556 }
558 free(sock->wbuf); sock->wbuf = NULL;
559 sock->wbuf_len = 0;
561 return 0;
562 } /* }}} */
564 static void wipe_ci_values(cache_item_t *ci, time_t when)
565 {
566 ci->values = NULL;
567 ci->values_num = 0;
569 ci->last_flush_time = when;
570 if (config_write_jitter > 0)
571 ci->last_flush_time += (random() % config_write_jitter);
572 }
574 /* remove_from_queue
575 * remove a "cache_item_t" item from the queue.
576 * must hold 'cache_lock' when calling this
577 */
578 static void remove_from_queue(cache_item_t *ci) /* {{{ */
579 {
580 if (ci == NULL) return;
581 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
583 if (ci->prev == NULL)
584 cache_queue_head = ci->next; /* reset head */
585 else
586 ci->prev->next = ci->next;
588 if (ci->next == NULL)
589 cache_queue_tail = ci->prev; /* reset the tail */
590 else
591 ci->next->prev = ci->prev;
593 ci->next = ci->prev = NULL;
594 ci->flags &= ~CI_FLAGS_IN_QUEUE;
596 pthread_mutex_lock (&stats_lock);
597 assert (stats_queue_length > 0);
598 stats_queue_length--;
599 pthread_mutex_unlock (&stats_lock);
601 } /* }}} static void remove_from_queue */
603 /* free the resources associated with the cache_item_t
604 * must hold cache_lock when calling this function
605 */
606 static void *free_cache_item(cache_item_t *ci) /* {{{ */
607 {
608 if (ci == NULL) return NULL;
610 remove_from_queue(ci);
612 for (int i=0; i < ci->values_num; i++)
613 free(ci->values[i]);
615 free (ci->values);
616 free (ci->file);
618 /* in case anyone is waiting */
619 pthread_cond_broadcast(&ci->flushed);
621 free (ci);
623 return NULL;
624 } /* }}} static void *free_cache_item */
626 /*
627 * enqueue_cache_item:
628 * `cache_lock' must be acquired before calling this function!
629 */
630 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
631 queue_side_t side)
632 {
633 if (ci == NULL)
634 return (-1);
636 if (ci->values_num == 0)
637 return (0);
639 if (side == HEAD)
640 {
641 if (cache_queue_head == ci)
642 return 0;
644 /* remove if further down in queue */
645 remove_from_queue(ci);
647 ci->prev = NULL;
648 ci->next = cache_queue_head;
649 if (ci->next != NULL)
650 ci->next->prev = ci;
651 cache_queue_head = ci;
653 if (cache_queue_tail == NULL)
654 cache_queue_tail = cache_queue_head;
655 }
656 else /* (side == TAIL) */
657 {
658 /* We don't move values back in the list.. */
659 if (ci->flags & CI_FLAGS_IN_QUEUE)
660 return (0);
662 assert (ci->next == NULL);
663 assert (ci->prev == NULL);
665 ci->prev = cache_queue_tail;
667 if (cache_queue_tail == NULL)
668 cache_queue_head = ci;
669 else
670 cache_queue_tail->next = ci;
672 cache_queue_tail = ci;
673 }
675 ci->flags |= CI_FLAGS_IN_QUEUE;
677 pthread_cond_signal(&queue_cond);
678 pthread_mutex_lock (&stats_lock);
679 stats_queue_length++;
680 pthread_mutex_unlock (&stats_lock);
682 return (0);
683 } /* }}} int enqueue_cache_item */
685 /*
686 * tree_callback_flush:
687 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
688 * while this is in progress.
689 */
690 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
691 gpointer data)
692 {
693 cache_item_t *ci;
694 callback_flush_data_t *cfd;
696 ci = (cache_item_t *) value;
697 cfd = (callback_flush_data_t *) data;
699 if (ci->flags & CI_FLAGS_IN_QUEUE)
700 return FALSE;
702 if ((ci->last_flush_time <= cfd->abs_timeout)
703 && (ci->values_num > 0))
704 {
705 enqueue_cache_item (ci, TAIL);
706 }
707 else if ((do_shutdown != 0)
708 && (ci->values_num > 0))
709 {
710 enqueue_cache_item (ci, TAIL);
711 }
712 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
713 && (ci->values_num <= 0))
714 {
715 char **temp;
717 temp = (char **) rrd_realloc (cfd->keys,
718 sizeof (char *) * (cfd->keys_num + 1));
719 if (temp == NULL)
720 {
721 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
722 return (FALSE);
723 }
724 cfd->keys = temp;
725 /* Make really sure this points to the _same_ place */
726 assert ((char *) key == ci->file);
727 cfd->keys[cfd->keys_num] = (char *) key;
728 cfd->keys_num++;
729 }
731 return (FALSE);
732 } /* }}} gboolean tree_callback_flush */
734 static int flush_old_values (int max_age)
735 {
736 callback_flush_data_t cfd;
737 size_t k;
739 memset (&cfd, 0, sizeof (cfd));
740 /* Pass the current time as user data so that we don't need to call
741 * `time' for each node. */
742 cfd.now = time (NULL);
743 cfd.keys = NULL;
744 cfd.keys_num = 0;
746 if (max_age > 0)
747 cfd.abs_timeout = cfd.now - max_age;
748 else
749 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
751 /* `tree_callback_flush' will return the keys of all values that haven't
752 * been touched in the last `config_flush_interval' seconds in `cfd'.
753 * The char*'s in this array point to the same memory as ci->file, so we
754 * don't need to free them separately. */
755 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
757 for (k = 0; k < cfd.keys_num; k++)
758 {
759 /* should never fail, since we have held the cache_lock
760 * the entire time */
761 assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
762 }
764 if (cfd.keys != NULL)
765 {
766 free (cfd.keys);
767 cfd.keys = NULL;
768 }
770 return (0);
771 } /* int flush_old_values */
773 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
774 {
775 struct timeval now;
776 struct timespec next_flush;
777 int status;
779 gettimeofday (&now, NULL);
780 next_flush.tv_sec = now.tv_sec + config_flush_interval;
781 next_flush.tv_nsec = 1000 * now.tv_usec;
783 pthread_mutex_lock(&cache_lock);
785 while (!do_shutdown)
786 {
787 gettimeofday (&now, NULL);
788 if ((now.tv_sec > next_flush.tv_sec)
789 || ((now.tv_sec == next_flush.tv_sec)
790 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
791 {
792 /* Flush all values that haven't been written in the last
793 * `config_write_interval' seconds. */
794 flush_old_values (config_write_interval);
796 /* Determine the time of the next cache flush. */
797 next_flush.tv_sec =
798 now.tv_sec + next_flush.tv_sec % config_flush_interval;
800 /* unlock the cache while we rotate so we don't block incoming
801 * updates if the fsync() blocks on disk I/O */
802 pthread_mutex_unlock(&cache_lock);
803 journal_rotate();
804 pthread_mutex_lock(&cache_lock);
805 }
807 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
808 if (status != 0 && status != ETIMEDOUT)
809 {
810 RRDD_LOG (LOG_ERR, "flush_thread_main: "
811 "pthread_cond_timedwait returned %i.", status);
812 }
813 }
815 if (config_flush_at_shutdown)
816 flush_old_values (-1); /* flush everything */
818 pthread_mutex_unlock(&cache_lock);
820 return NULL;
821 } /* void *flush_thread_main */
823 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
824 {
825 pthread_mutex_lock (&cache_lock);
827 while (!do_shutdown
828 || (cache_queue_head != NULL && config_flush_at_shutdown))
829 {
830 cache_item_t *ci;
831 char *file;
832 char **values;
833 int values_num;
834 int status;
835 int i;
837 /* Now, check if there's something to store away. If not, wait until
838 * something comes in. if we are shutting down, do not wait around. */
839 if (cache_queue_head == NULL && !do_shutdown)
840 {
841 status = pthread_cond_wait (&queue_cond, &cache_lock);
842 if ((status != 0) && (status != ETIMEDOUT))
843 {
844 RRDD_LOG (LOG_ERR, "queue_thread_main: "
845 "pthread_cond_wait returned %i.", status);
846 }
847 }
849 /* Check if a value has arrived. This may be NULL if we timed out or there
850 * was an interrupt such as a signal. */
851 if (cache_queue_head == NULL)
852 continue;
854 ci = cache_queue_head;
856 /* copy the relevant parts */
857 file = strdup (ci->file);
858 if (file == NULL)
859 {
860 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
861 continue;
862 }
864 assert(ci->values != NULL);
865 assert(ci->values_num > 0);
867 values = ci->values;
868 values_num = ci->values_num;
870 wipe_ci_values(ci, time(NULL));
871 remove_from_queue(ci);
873 pthread_mutex_unlock (&cache_lock);
875 rrd_clear_error ();
876 status = rrd_update_r (file, NULL, values_num, (void *) values);
877 if (status != 0)
878 {
879 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
880 "rrd_update_r (%s) failed with status %i. (%s)",
881 file, status, rrd_get_error());
882 }
884 journal_write("wrote", file);
885 pthread_cond_broadcast(&ci->flushed);
887 for (i = 0; i < values_num; i++)
888 free (values[i]);
890 free(values);
891 free(file);
893 if (status == 0)
894 {
895 pthread_mutex_lock (&stats_lock);
896 stats_updates_written++;
897 stats_data_sets_written += values_num;
898 pthread_mutex_unlock (&stats_lock);
899 }
901 pthread_mutex_lock (&cache_lock);
902 }
903 pthread_mutex_unlock (&cache_lock);
905 return (NULL);
906 } /* }}} void *queue_thread_main */
908 static int buffer_get_field (char **buffer_ret, /* {{{ */
909 size_t *buffer_size_ret, char **field_ret)
910 {
911 char *buffer;
912 size_t buffer_pos;
913 size_t buffer_size;
914 char *field;
915 size_t field_size;
916 int status;
918 buffer = *buffer_ret;
919 buffer_pos = 0;
920 buffer_size = *buffer_size_ret;
921 field = *buffer_ret;
922 field_size = 0;
924 if (buffer_size <= 0)
925 return (-1);
927 /* This is ensured by `handle_request'. */
928 assert (buffer[buffer_size - 1] == '\0');
930 status = -1;
931 while (buffer_pos < buffer_size)
932 {
933 /* Check for end-of-field or end-of-buffer */
934 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
935 {
936 field[field_size] = 0;
937 field_size++;
938 buffer_pos++;
939 status = 0;
940 break;
941 }
942 /* Handle escaped characters. */
943 else if (buffer[buffer_pos] == '\\')
944 {
945 if (buffer_pos >= (buffer_size - 1))
946 break;
947 buffer_pos++;
948 field[field_size] = buffer[buffer_pos];
949 field_size++;
950 buffer_pos++;
951 }
952 /* Normal operation */
953 else
954 {
955 field[field_size] = buffer[buffer_pos];
956 field_size++;
957 buffer_pos++;
958 }
959 } /* while (buffer_pos < buffer_size) */
961 if (status != 0)
962 return (status);
964 *buffer_ret = buffer + buffer_pos;
965 *buffer_size_ret = buffer_size - buffer_pos;
966 *field_ret = field;
968 return (0);
969 } /* }}} int buffer_get_field */
971 /* if we're restricting writes to the base directory,
972 * check whether the file falls within the dir
973 * returns 1 if OK, otherwise 0
974 */
975 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
976 {
977 assert(file != NULL);
979 if (!config_write_base_only
980 || sock == NULL /* journal replay */
981 || config_base_dir == NULL)
982 return 1;
984 if (strstr(file, "../") != NULL) goto err;
986 /* relative paths without "../" are ok */
987 if (*file != '/') return 1;
989 /* file must be of the format base + "/" + <1+ char filename> */
990 if (strlen(file) < _config_base_dir_len + 2) goto err;
991 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
992 if (*(file + _config_base_dir_len) != '/') goto err;
994 return 1;
996 err:
997 if (sock != NULL && sock->fd >= 0)
998 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1000 return 0;
1001 } /* }}} static int check_file_access */
1003 /* when using a base dir, convert relative paths to absolute paths.
1004 * if necessary, modifies the "filename" pointer to point
1005 * to the new path created in "tmp". "tmp" is provided
1006 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1007 *
1008 * this allows us to optimize for the expected case (absolute path)
1009 * with a no-op.
1010 */
1011 static void get_abs_path(char **filename, char *tmp)
1012 {
1013 assert(tmp != NULL);
1014 assert(filename != NULL && *filename != NULL);
1016 if (config_base_dir == NULL || **filename == '/')
1017 return;
1019 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1020 *filename = tmp;
1021 } /* }}} static int get_abs_path */
1023 /* returns 1 if we have the required privilege level,
1024 * otherwise issue an error to the user on sock */
1025 static int has_privilege (listen_socket_t *sock, /* {{{ */
1026 socket_privilege priv)
1027 {
1028 if (sock == NULL) /* journal replay */
1029 return 1;
1031 if (sock->privilege >= priv)
1032 return 1;
1034 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1035 } /* }}} static int has_privilege */
1037 static int flush_file (const char *filename) /* {{{ */
1038 {
1039 cache_item_t *ci;
1041 pthread_mutex_lock (&cache_lock);
1043 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1044 if (ci == NULL)
1045 {
1046 pthread_mutex_unlock (&cache_lock);
1047 return (ENOENT);
1048 }
1050 if (ci->values_num > 0)
1051 {
1052 /* Enqueue at head */
1053 enqueue_cache_item (ci, HEAD);
1054 pthread_cond_wait(&ci->flushed, &cache_lock);
1055 }
1057 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1058 * may have been purged during our cond_wait() */
1060 pthread_mutex_unlock(&cache_lock);
1062 return (0);
1063 } /* }}} int flush_file */
1065 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1066 char *buffer, size_t buffer_size)
1067 {
1068 int status;
1069 char **help_text;
1070 char *command;
1072 char *help_help[2] =
1073 {
1074 "Command overview\n"
1075 ,
1076 "HELP [<command>]\n"
1077 "FLUSH <filename>\n"
1078 "FLUSHALL\n"
1079 "PENDING <filename>\n"
1080 "FORGET <filename>\n"
1081 "QUEUE\n"
1082 "UPDATE <filename> <values> [<values> ...]\n"
1083 "BATCH\n"
1084 "STATS\n"
1085 "QUIT\n"
1086 };
1088 char *help_flush[2] =
1089 {
1090 "Help for FLUSH\n"
1091 ,
1092 "Usage: FLUSH <filename>\n"
1093 "\n"
1094 "Adds the given filename to the head of the update queue and returns\n"
1095 "after it has been dequeued.\n"
1096 };
1098 char *help_flushall[2] =
1099 {
1100 "Help for FLUSHALL\n"
1101 ,
1102 "Usage: FLUSHALL\n"
1103 "\n"
1104 "Triggers writing of all pending updates. Returns immediately.\n"
1105 };
1107 char *help_pending[2] =
1108 {
1109 "Help for PENDING\n"
1110 ,
1111 "Usage: PENDING <filename>\n"
1112 "\n"
1113 "Shows any 'pending' updates for a file, in order.\n"
1114 "The updates shown have not yet been written to the underlying RRD file.\n"
1115 };
1117 char *help_forget[2] =
1118 {
1119 "Help for FORGET\n"
1120 ,
1121 "Usage: FORGET <filename>\n"
1122 "\n"
1123 "Removes the file completely from the cache.\n"
1124 "Any pending updates for the file will be lost.\n"
1125 };
1127 char *help_queue[2] =
1128 {
1129 "Help for QUEUE\n"
1130 ,
1131 "Shows all files in the output queue.\n"
1132 "The output is zero or more lines in the following format:\n"
1133 "(where <num_vals> is the number of values to be written)\n"
1134 "\n"
1135 "<num_vals> <filename>\n"
1136 "\n"
1137 };
1139 char *help_update[2] =
1140 {
1141 "Help for UPDATE\n"
1142 ,
1143 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1144 "\n"
1145 "Adds the given file to the internal cache if it is not yet known and\n"
1146 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1147 "for details.\n"
1148 "\n"
1149 "Each <values> has the following form:\n"
1150 " <values> = <time>:<value>[:<value>[...]]\n"
1151 "See the rrdupdate(1) manpage for details.\n"
1152 };
1154 char *help_stats[2] =
1155 {
1156 "Help for STATS\n"
1157 ,
1158 "Usage: STATS\n"
1159 "\n"
1160 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1161 "a description of the values.\n"
1162 };
1164 char *help_batch[2] =
1165 {
1166 "Help for BATCH\n"
1167 ,
1168 "The 'BATCH' command permits the client to initiate a bulk load\n"
1169 " of commands to rrdcached.\n"
1170 "\n"
1171 "Usage:\n"
1172 "\n"
1173 " client: BATCH\n"
1174 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1175 " client: command #1\n"
1176 " client: command #2\n"
1177 " client: ... and so on\n"
1178 " client: .\n"
1179 " server: 2 errors\n"
1180 " server: 7 message for command #7\n"
1181 " server: 9 message for command #9\n"
1182 "\n"
1183 "For more information, consult the rrdcached(1) documentation.\n"
1184 };
1186 char *help_quit[2] =
1187 {
1188 "Help for QUIT\n"
1189 ,
1190 "Disconnect from rrdcached.\n"
1191 };
1193 status = buffer_get_field (&buffer, &buffer_size, &command);
1194 if (status != 0)
1195 help_text = help_help;
1196 else
1197 {
1198 if (strcasecmp (command, "update") == 0)
1199 help_text = help_update;
1200 else if (strcasecmp (command, "flush") == 0)
1201 help_text = help_flush;
1202 else if (strcasecmp (command, "flushall") == 0)
1203 help_text = help_flushall;
1204 else if (strcasecmp (command, "pending") == 0)
1205 help_text = help_pending;
1206 else if (strcasecmp (command, "forget") == 0)
1207 help_text = help_forget;
1208 else if (strcasecmp (command, "queue") == 0)
1209 help_text = help_queue;
1210 else if (strcasecmp (command, "stats") == 0)
1211 help_text = help_stats;
1212 else if (strcasecmp (command, "batch") == 0)
1213 help_text = help_batch;
1214 else if (strcasecmp (command, "quit") == 0)
1215 help_text = help_quit;
1216 else
1217 help_text = help_help;
1218 }
1220 add_response_info(sock, help_text[1]);
1221 return send_response(sock, RESP_OK, help_text[0]);
1222 } /* }}} int handle_request_help */
1224 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1225 {
1226 uint64_t copy_queue_length;
1227 uint64_t copy_updates_received;
1228 uint64_t copy_flush_received;
1229 uint64_t copy_updates_written;
1230 uint64_t copy_data_sets_written;
1231 uint64_t copy_journal_bytes;
1232 uint64_t copy_journal_rotate;
1234 uint64_t tree_nodes_number;
1235 uint64_t tree_depth;
1237 pthread_mutex_lock (&stats_lock);
1238 copy_queue_length = stats_queue_length;
1239 copy_updates_received = stats_updates_received;
1240 copy_flush_received = stats_flush_received;
1241 copy_updates_written = stats_updates_written;
1242 copy_data_sets_written = stats_data_sets_written;
1243 copy_journal_bytes = stats_journal_bytes;
1244 copy_journal_rotate = stats_journal_rotate;
1245 pthread_mutex_unlock (&stats_lock);
1247 pthread_mutex_lock (&cache_lock);
1248 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1249 tree_depth = (uint64_t) g_tree_height (cache_tree);
1250 pthread_mutex_unlock (&cache_lock);
1252 add_response_info(sock,
1253 "QueueLength: %"PRIu64"\n", copy_queue_length);
1254 add_response_info(sock,
1255 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1256 add_response_info(sock,
1257 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1258 add_response_info(sock,
1259 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1260 add_response_info(sock,
1261 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1262 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1263 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1264 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1265 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1267 send_response(sock, RESP_OK, "Statistics follow\n");
1269 return (0);
1270 } /* }}} int handle_request_stats */
1272 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1273 char *buffer, size_t buffer_size)
1274 {
1275 char *file, file_tmp[PATH_MAX];
1276 int status;
1278 status = buffer_get_field (&buffer, &buffer_size, &file);
1279 if (status != 0)
1280 {
1281 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1282 }
1283 else
1284 {
1285 pthread_mutex_lock(&stats_lock);
1286 stats_flush_received++;
1287 pthread_mutex_unlock(&stats_lock);
1289 get_abs_path(&file, file_tmp);
1290 if (!check_file_access(file, sock)) return 0;
1292 status = flush_file (file);
1293 if (status == 0)
1294 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1295 else if (status == ENOENT)
1296 {
1297 /* no file in our tree; see whether it exists at all */
1298 struct stat statbuf;
1300 memset(&statbuf, 0, sizeof(statbuf));
1301 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1302 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1303 else
1304 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1305 }
1306 else if (status < 0)
1307 return send_response(sock, RESP_ERR, "Internal error.\n");
1308 else
1309 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1310 }
1312 /* NOTREACHED */
1313 assert(1==0);
1314 } /* }}} int handle_request_flush */
1316 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1317 {
1318 int status;
1320 status = has_privilege(sock, PRIV_HIGH);
1321 if (status <= 0)
1322 return status;
1324 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1326 pthread_mutex_lock(&cache_lock);
1327 flush_old_values(-1);
1328 pthread_mutex_unlock(&cache_lock);
1330 return send_response(sock, RESP_OK, "Started flush.\n");
1331 } /* }}} static int handle_request_flushall */
1333 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1334 char *buffer, size_t buffer_size)
1335 {
1336 int status;
1337 char *file, file_tmp[PATH_MAX];
1338 cache_item_t *ci;
1340 status = buffer_get_field(&buffer, &buffer_size, &file);
1341 if (status != 0)
1342 return send_response(sock, RESP_ERR,
1343 "Usage: PENDING <filename>\n");
1345 status = has_privilege(sock, PRIV_HIGH);
1346 if (status <= 0)
1347 return status;
1349 get_abs_path(&file, file_tmp);
1351 pthread_mutex_lock(&cache_lock);
1352 ci = g_tree_lookup(cache_tree, file);
1353 if (ci == NULL)
1354 {
1355 pthread_mutex_unlock(&cache_lock);
1356 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1357 }
1359 for (int i=0; i < ci->values_num; i++)
1360 add_response_info(sock, "%s\n", ci->values[i]);
1362 pthread_mutex_unlock(&cache_lock);
1363 return send_response(sock, RESP_OK, "updates pending\n");
1364 } /* }}} static int handle_request_pending */
1366 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1367 char *buffer, size_t buffer_size)
1368 {
1369 int status;
1370 gboolean found;
1371 char *file, file_tmp[PATH_MAX];
1373 status = buffer_get_field(&buffer, &buffer_size, &file);
1374 if (status != 0)
1375 return send_response(sock, RESP_ERR,
1376 "Usage: FORGET <filename>\n");
1378 status = has_privilege(sock, PRIV_HIGH);
1379 if (status <= 0)
1380 return status;
1382 get_abs_path(&file, file_tmp);
1383 if (!check_file_access(file, sock)) return 0;
1385 pthread_mutex_lock(&cache_lock);
1386 found = g_tree_remove(cache_tree, file);
1387 pthread_mutex_unlock(&cache_lock);
1389 if (found == TRUE)
1390 {
1391 if (sock != NULL)
1392 journal_write("forget", file);
1394 return send_response(sock, RESP_OK, "Gone!\n");
1395 }
1396 else
1397 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1399 /* NOTREACHED */
1400 assert(1==0);
1401 } /* }}} static int handle_request_forget */
1403 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1404 {
1405 cache_item_t *ci;
1407 pthread_mutex_lock(&cache_lock);
1409 ci = cache_queue_head;
1410 while (ci != NULL)
1411 {
1412 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1413 ci = ci->next;
1414 }
1416 pthread_mutex_unlock(&cache_lock);
1418 return send_response(sock, RESP_OK, "in queue.\n");
1419 } /* }}} int handle_request_queue */
1421 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1422 time_t now,
1423 char *buffer, size_t buffer_size)
1424 {
1425 char *file, file_tmp[PATH_MAX];
1426 int values_num = 0;
1427 int status;
1428 char orig_buf[CMD_MAX];
1430 cache_item_t *ci;
1432 status = has_privilege(sock, PRIV_HIGH);
1433 if (status <= 0)
1434 return status;
1436 /* save it for the journal later */
1437 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1439 status = buffer_get_field (&buffer, &buffer_size, &file);
1440 if (status != 0)
1441 return send_response(sock, RESP_ERR,
1442 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1444 pthread_mutex_lock(&stats_lock);
1445 stats_updates_received++;
1446 pthread_mutex_unlock(&stats_lock);
1448 get_abs_path(&file, file_tmp);
1449 if (!check_file_access(file, sock)) return 0;
1451 pthread_mutex_lock (&cache_lock);
1452 ci = g_tree_lookup (cache_tree, file);
1454 if (ci == NULL) /* {{{ */
1455 {
1456 struct stat statbuf;
1458 /* don't hold the lock while we setup; stat(2) might block */
1459 pthread_mutex_unlock(&cache_lock);
1461 memset (&statbuf, 0, sizeof (statbuf));
1462 status = stat (file, &statbuf);
1463 if (status != 0)
1464 {
1465 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1467 status = errno;
1468 if (status == ENOENT)
1469 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1470 else
1471 return send_response(sock, RESP_ERR,
1472 "stat failed with error %i.\n", status);
1473 }
1474 if (!S_ISREG (statbuf.st_mode))
1475 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1477 if (access(file, R_OK|W_OK) != 0)
1478 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1479 file, rrd_strerror(errno));
1481 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1482 if (ci == NULL)
1483 {
1484 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1486 return send_response(sock, RESP_ERR, "malloc failed.\n");
1487 }
1488 memset (ci, 0, sizeof (cache_item_t));
1490 ci->file = strdup (file);
1491 if (ci->file == NULL)
1492 {
1493 free (ci);
1494 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1496 return send_response(sock, RESP_ERR, "strdup failed.\n");
1497 }
1499 wipe_ci_values(ci, now);
1500 ci->flags = CI_FLAGS_IN_TREE;
1501 pthread_cond_init(&ci->flushed, NULL);
1503 pthread_mutex_lock(&cache_lock);
1504 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1505 } /* }}} */
1506 assert (ci != NULL);
1508 /* don't re-write updates in replay mode */
1509 if (sock != NULL)
1510 journal_write("update", orig_buf);
1512 while (buffer_size > 0)
1513 {
1514 char **temp;
1515 char *value;
1516 time_t stamp;
1517 char *eostamp;
1519 status = buffer_get_field (&buffer, &buffer_size, &value);
1520 if (status != 0)
1521 {
1522 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1523 break;
1524 }
1526 /* make sure update time is always moving forward */
1527 stamp = strtol(value, &eostamp, 10);
1528 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1529 {
1530 pthread_mutex_unlock(&cache_lock);
1531 return send_response(sock, RESP_ERR,
1532 "Cannot find timestamp in '%s'!\n", value);
1533 }
1534 else if (stamp <= ci->last_update_stamp)
1535 {
1536 pthread_mutex_unlock(&cache_lock);
1537 return send_response(sock, RESP_ERR,
1538 "illegal attempt to update using time %ld when last"
1539 " update time is %ld (minimum one second step)\n",
1540 stamp, ci->last_update_stamp);
1541 }
1542 else
1543 ci->last_update_stamp = stamp;
1545 temp = (char **) rrd_realloc (ci->values,
1546 sizeof (char *) * (ci->values_num + 1));
1547 if (temp == NULL)
1548 {
1549 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1550 continue;
1551 }
1552 ci->values = temp;
1554 ci->values[ci->values_num] = strdup (value);
1555 if (ci->values[ci->values_num] == NULL)
1556 {
1557 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1558 continue;
1559 }
1560 ci->values_num++;
1562 values_num++;
1563 }
1565 if (((now - ci->last_flush_time) >= config_write_interval)
1566 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1567 && (ci->values_num > 0))
1568 {
1569 enqueue_cache_item (ci, TAIL);
1570 }
1572 pthread_mutex_unlock (&cache_lock);
1574 if (values_num < 1)
1575 return send_response(sock, RESP_ERR, "No values updated.\n");
1576 else
1577 return send_response(sock, RESP_OK,
1578 "errors, enqueued %i value(s).\n", values_num);
1580 /* NOTREACHED */
1581 assert(1==0);
1583 } /* }}} int handle_request_update */
1585 /* we came across a "WROTE" entry during journal replay.
1586 * throw away any values that we have accumulated for this file
1587 */
1588 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1589 {
1590 int i;
1591 cache_item_t *ci;
1592 const char *file = buffer;
1594 pthread_mutex_lock(&cache_lock);
1596 ci = g_tree_lookup(cache_tree, file);
1597 if (ci == NULL)
1598 {
1599 pthread_mutex_unlock(&cache_lock);
1600 return (0);
1601 }
1603 if (ci->values)
1604 {
1605 for (i=0; i < ci->values_num; i++)
1606 free(ci->values[i]);
1608 free(ci->values);
1609 }
1611 wipe_ci_values(ci, now);
1612 remove_from_queue(ci);
1614 pthread_mutex_unlock(&cache_lock);
1615 return (0);
1616 } /* }}} int handle_request_wrote */
1618 /* start "BATCH" processing */
1619 static int batch_start (listen_socket_t *sock) /* {{{ */
1620 {
1621 int status;
1622 if (sock->batch_start)
1623 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1625 status = send_response(sock, RESP_OK,
1626 "Go ahead. End with dot '.' on its own line.\n");
1627 sock->batch_start = time(NULL);
1628 sock->batch_cmd = 0;
1630 return status;
1631 } /* }}} static int batch_start */
1633 /* finish "BATCH" processing and return results to the client */
1634 static int batch_done (listen_socket_t *sock) /* {{{ */
1635 {
1636 assert(sock->batch_start);
1637 sock->batch_start = 0;
1638 sock->batch_cmd = 0;
1639 return send_response(sock, RESP_OK, "errors\n");
1640 } /* }}} static int batch_done */
1642 /* if sock==NULL, we are in journal replay mode */
1643 static int handle_request (listen_socket_t *sock, /* {{{ */
1644 time_t now,
1645 char *buffer, size_t buffer_size)
1646 {
1647 char *buffer_ptr;
1648 char *command;
1649 int status;
1651 assert (buffer[buffer_size - 1] == '\0');
1653 buffer_ptr = buffer;
1654 command = NULL;
1655 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1656 if (status != 0)
1657 {
1658 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1659 return (-1);
1660 }
1662 if (sock != NULL && sock->batch_start)
1663 sock->batch_cmd++;
1665 if (strcasecmp (command, "update") == 0)
1666 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1667 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1668 {
1669 /* this is only valid in replay mode */
1670 return (handle_request_wrote (buffer_ptr, now));
1671 }
1672 else if (strcasecmp (command, "flush") == 0)
1673 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1674 else if (strcasecmp (command, "flushall") == 0)
1675 return (handle_request_flushall(sock));
1676 else if (strcasecmp (command, "pending") == 0)
1677 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1678 else if (strcasecmp (command, "forget") == 0)
1679 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1680 else if (strcasecmp (command, "queue") == 0)
1681 return (handle_request_queue(sock));
1682 else if (strcasecmp (command, "stats") == 0)
1683 return (handle_request_stats (sock));
1684 else if (strcasecmp (command, "help") == 0)
1685 return (handle_request_help (sock, buffer_ptr, buffer_size));
1686 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1687 return batch_start(sock);
1688 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1689 return batch_done(sock);
1690 else if (strcasecmp (command, "quit") == 0)
1691 return -1;
1692 else
1693 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1695 /* NOTREACHED */
1696 assert(1==0);
1697 } /* }}} int handle_request */
1699 /* MUST NOT hold journal_lock before calling this */
1700 static void journal_rotate(void) /* {{{ */
1701 {
1702 FILE *old_fh = NULL;
1703 int new_fd;
1705 if (journal_cur == NULL || journal_old == NULL)
1706 return;
1708 pthread_mutex_lock(&journal_lock);
1710 /* we rotate this way (rename before close) so that the we can release
1711 * the journal lock as fast as possible. Journal writes to the new
1712 * journal can proceed immediately after the new file is opened. The
1713 * fclose can then block without affecting new updates.
1714 */
1715 if (journal_fh != NULL)
1716 {
1717 old_fh = journal_fh;
1718 journal_fh = NULL;
1719 rename(journal_cur, journal_old);
1720 ++stats_journal_rotate;
1721 }
1723 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1724 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1725 if (new_fd >= 0)
1726 {
1727 journal_fh = fdopen(new_fd, "a");
1728 if (journal_fh == NULL)
1729 close(new_fd);
1730 }
1732 pthread_mutex_unlock(&journal_lock);
1734 if (old_fh != NULL)
1735 fclose(old_fh);
1737 if (journal_fh == NULL)
1738 {
1739 RRDD_LOG(LOG_CRIT,
1740 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1741 journal_cur, rrd_strerror(errno));
1743 RRDD_LOG(LOG_ERR,
1744 "JOURNALING DISABLED: All values will be flushed at shutdown");
1745 config_flush_at_shutdown = 1;
1746 }
1748 } /* }}} static void journal_rotate */
1750 static void journal_done(void) /* {{{ */
1751 {
1752 if (journal_cur == NULL)
1753 return;
1755 pthread_mutex_lock(&journal_lock);
1756 if (journal_fh != NULL)
1757 {
1758 fclose(journal_fh);
1759 journal_fh = NULL;
1760 }
1762 if (config_flush_at_shutdown)
1763 {
1764 RRDD_LOG(LOG_INFO, "removing journals");
1765 unlink(journal_old);
1766 unlink(journal_cur);
1767 }
1768 else
1769 {
1770 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1771 "journals will be used at next startup");
1772 }
1774 pthread_mutex_unlock(&journal_lock);
1776 } /* }}} static void journal_done */
1778 static int journal_write(char *cmd, char *args) /* {{{ */
1779 {
1780 int chars;
1782 if (journal_fh == NULL)
1783 return 0;
1785 pthread_mutex_lock(&journal_lock);
1786 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1787 pthread_mutex_unlock(&journal_lock);
1789 if (chars > 0)
1790 {
1791 pthread_mutex_lock(&stats_lock);
1792 stats_journal_bytes += chars;
1793 pthread_mutex_unlock(&stats_lock);
1794 }
1796 return chars;
1797 } /* }}} static int journal_write */
1799 static int journal_replay (const char *file) /* {{{ */
1800 {
1801 FILE *fh;
1802 int entry_cnt = 0;
1803 int fail_cnt = 0;
1804 uint64_t line = 0;
1805 char entry[CMD_MAX];
1806 time_t now;
1808 if (file == NULL) return 0;
1810 {
1811 char *reason = "unknown error";
1812 int status = 0;
1813 struct stat statbuf;
1815 memset(&statbuf, 0, sizeof(statbuf));
1816 if (stat(file, &statbuf) != 0)
1817 {
1818 if (errno == ENOENT)
1819 return 0;
1821 reason = "stat error";
1822 status = errno;
1823 }
1824 else if (!S_ISREG(statbuf.st_mode))
1825 {
1826 reason = "not a regular file";
1827 status = EPERM;
1828 }
1829 if (statbuf.st_uid != daemon_uid)
1830 {
1831 reason = "not owned by daemon user";
1832 status = EACCES;
1833 }
1834 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1835 {
1836 reason = "must not be user/group writable";
1837 status = EACCES;
1838 }
1840 if (status != 0)
1841 {
1842 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1843 file, rrd_strerror(status), reason);
1844 return 0;
1845 }
1846 }
1848 fh = fopen(file, "r");
1849 if (fh == NULL)
1850 {
1851 if (errno != ENOENT)
1852 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1853 file, rrd_strerror(errno));
1854 return 0;
1855 }
1856 else
1857 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1859 now = time(NULL);
1861 while(!feof(fh))
1862 {
1863 size_t entry_len;
1865 ++line;
1866 if (fgets(entry, sizeof(entry), fh) == NULL)
1867 break;
1868 entry_len = strlen(entry);
1870 /* check \n termination in case journal writing crashed mid-line */
1871 if (entry_len == 0)
1872 continue;
1873 else if (entry[entry_len - 1] != '\n')
1874 {
1875 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1876 ++fail_cnt;
1877 continue;
1878 }
1880 entry[entry_len - 1] = '\0';
1882 if (handle_request(NULL, now, entry, entry_len) == 0)
1883 ++entry_cnt;
1884 else
1885 ++fail_cnt;
1886 }
1888 fclose(fh);
1890 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1891 entry_cnt, fail_cnt);
1893 return entry_cnt > 0 ? 1 : 0;
1894 } /* }}} static int journal_replay */
1896 static void journal_init(void) /* {{{ */
1897 {
1898 int had_journal = 0;
1900 if (journal_cur == NULL) return;
1902 pthread_mutex_lock(&journal_lock);
1904 RRDD_LOG(LOG_INFO, "checking for journal files");
1906 had_journal += journal_replay(journal_old);
1907 had_journal += journal_replay(journal_cur);
1909 /* it must have been a crash. start a flush */
1910 if (had_journal && config_flush_at_shutdown)
1911 flush_old_values(-1);
1913 pthread_mutex_unlock(&journal_lock);
1914 journal_rotate();
1916 RRDD_LOG(LOG_INFO, "journal processing complete");
1918 } /* }}} static void journal_init */
1920 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1921 {
1922 assert(sock != NULL);
1924 free(sock->rbuf); sock->rbuf = NULL;
1925 free(sock->wbuf); sock->wbuf = NULL;
1926 free(sock);
1927 } /* }}} void free_listen_socket */
1929 static void close_connection(listen_socket_t *sock) /* {{{ */
1930 {
1931 if (sock->fd >= 0)
1932 {
1933 close(sock->fd);
1934 sock->fd = -1;
1935 }
1937 free_listen_socket(sock);
1939 } /* }}} void close_connection */
1941 static void *connection_thread_main (void *args) /* {{{ */
1942 {
1943 listen_socket_t *sock;
1944 int i;
1945 int fd;
1947 sock = (listen_socket_t *) args;
1948 fd = sock->fd;
1950 /* init read buffers */
1951 sock->next_read = sock->next_cmd = 0;
1952 sock->rbuf = malloc(RBUF_SIZE);
1953 if (sock->rbuf == NULL)
1954 {
1955 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1956 close_connection(sock);
1957 return NULL;
1958 }
1960 pthread_mutex_lock (&connection_threads_lock);
1961 {
1962 pthread_t *temp;
1964 temp = (pthread_t *) rrd_realloc (connection_threads,
1965 sizeof (pthread_t) * (connection_threads_num + 1));
1966 if (temp == NULL)
1967 {
1968 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1969 }
1970 else
1971 {
1972 connection_threads = temp;
1973 connection_threads[connection_threads_num] = pthread_self ();
1974 connection_threads_num++;
1975 }
1976 }
1977 pthread_mutex_unlock (&connection_threads_lock);
1979 while (do_shutdown == 0)
1980 {
1981 char *cmd;
1982 ssize_t cmd_len;
1983 ssize_t rbytes;
1984 time_t now;
1986 struct pollfd pollfd;
1987 int status;
1989 pollfd.fd = fd;
1990 pollfd.events = POLLIN | POLLPRI;
1991 pollfd.revents = 0;
1993 status = poll (&pollfd, 1, /* timeout = */ 500);
1994 if (do_shutdown)
1995 break;
1996 else if (status == 0) /* timeout */
1997 continue;
1998 else if (status < 0) /* error */
1999 {
2000 status = errno;
2001 if (status != EINTR)
2002 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2003 continue;
2004 }
2006 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2007 break;
2008 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2009 {
2010 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2011 "poll(2) returned something unexpected: %#04hx",
2012 pollfd.revents);
2013 break;
2014 }
2016 rbytes = read(fd, sock->rbuf + sock->next_read,
2017 RBUF_SIZE - sock->next_read);
2018 if (rbytes < 0)
2019 {
2020 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2021 break;
2022 }
2023 else if (rbytes == 0)
2024 break; /* eof */
2026 sock->next_read += rbytes;
2028 if (sock->batch_start)
2029 now = sock->batch_start;
2030 else
2031 now = time(NULL);
2033 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2034 {
2035 status = handle_request (sock, now, cmd, cmd_len+1);
2036 if (status != 0)
2037 goto out_close;
2038 }
2039 }
2041 out_close:
2042 close_connection(sock);
2044 /* Remove this thread from the connection threads list */
2045 pthread_mutex_lock (&connection_threads_lock);
2046 {
2047 pthread_t self;
2048 pthread_t *temp;
2050 /* Find out own index in the array */
2051 self = pthread_self ();
2052 for (i = 0; i < connection_threads_num; i++)
2053 if (pthread_equal (connection_threads[i], self) != 0)
2054 break;
2055 assert (i < connection_threads_num);
2057 /* Move the trailing threads forward. */
2058 if (i < (connection_threads_num - 1))
2059 {
2060 memmove (connection_threads + i,
2061 connection_threads + i + 1,
2062 sizeof (pthread_t) * (connection_threads_num - i - 1));
2063 }
2065 connection_threads_num--;
2067 temp = rrd_realloc(connection_threads,
2068 sizeof(*connection_threads) * connection_threads_num);
2069 if (connection_threads_num > 0 && temp == NULL)
2070 RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2071 else
2072 connection_threads = temp;
2073 }
2074 pthread_mutex_unlock (&connection_threads_lock);
2076 return (NULL);
2077 } /* }}} void *connection_thread_main */
2079 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2080 {
2081 int fd;
2082 struct sockaddr_un sa;
2083 listen_socket_t *temp;
2084 int status;
2085 const char *path;
2087 path = sock->addr;
2088 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2089 path += strlen("unix:");
2091 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2092 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2093 if (temp == NULL)
2094 {
2095 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2096 return (-1);
2097 }
2098 listen_fds = temp;
2099 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2101 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2102 if (fd < 0)
2103 {
2104 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2105 rrd_strerror(errno));
2106 return (-1);
2107 }
2109 memset (&sa, 0, sizeof (sa));
2110 sa.sun_family = AF_UNIX;
2111 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2113 /* if we've gotten this far, we own the pid file. any daemon started
2114 * with the same args must not be alive. therefore, ensure that we can
2115 * create the socket...
2116 */
2117 unlink(path);
2119 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2120 if (status != 0)
2121 {
2122 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2123 path, rrd_strerror(errno));
2124 close (fd);
2125 return (-1);
2126 }
2128 status = listen (fd, /* backlog = */ 10);
2129 if (status != 0)
2130 {
2131 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2132 path, rrd_strerror(errno));
2133 close (fd);
2134 unlink (path);
2135 return (-1);
2136 }
2138 listen_fds[listen_fds_num].fd = fd;
2139 listen_fds[listen_fds_num].family = PF_UNIX;
2140 strncpy(listen_fds[listen_fds_num].addr, path,
2141 sizeof (listen_fds[listen_fds_num].addr) - 1);
2142 listen_fds_num++;
2144 return (0);
2145 } /* }}} int open_listen_socket_unix */
2147 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2148 {
2149 struct addrinfo ai_hints;
2150 struct addrinfo *ai_res;
2151 struct addrinfo *ai_ptr;
2152 char addr_copy[NI_MAXHOST];
2153 char *addr;
2154 char *port;
2155 int status;
2157 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2158 addr_copy[sizeof (addr_copy) - 1] = 0;
2159 addr = addr_copy;
2161 memset (&ai_hints, 0, sizeof (ai_hints));
2162 ai_hints.ai_flags = 0;
2163 #ifdef AI_ADDRCONFIG
2164 ai_hints.ai_flags |= AI_ADDRCONFIG;
2165 #endif
2166 ai_hints.ai_family = AF_UNSPEC;
2167 ai_hints.ai_socktype = SOCK_STREAM;
2169 port = NULL;
2170 if (*addr == '[') /* IPv6+port format */
2171 {
2172 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2173 addr++;
2175 port = strchr (addr, ']');
2176 if (port == NULL)
2177 {
2178 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2179 return (-1);
2180 }
2181 *port = 0;
2182 port++;
2184 if (*port == ':')
2185 port++;
2186 else if (*port == 0)
2187 port = NULL;
2188 else
2189 {
2190 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2191 return (-1);
2192 }
2193 } /* if (*addr = ']') */
2194 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2195 {
2196 port = rindex(addr, ':');
2197 if (port != NULL)
2198 {
2199 *port = 0;
2200 port++;
2201 }
2202 }
2203 ai_res = NULL;
2204 status = getaddrinfo (addr,
2205 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2206 &ai_hints, &ai_res);
2207 if (status != 0)
2208 {
2209 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2210 addr, gai_strerror (status));
2211 return (-1);
2212 }
2214 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2215 {
2216 int fd;
2217 listen_socket_t *temp;
2218 int one = 1;
2220 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2221 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2222 if (temp == NULL)
2223 {
2224 fprintf (stderr,
2225 "rrdcached: open_listen_socket_network: realloc failed.\n");
2226 continue;
2227 }
2228 listen_fds = temp;
2229 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2231 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2232 if (fd < 0)
2233 {
2234 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2235 rrd_strerror(errno));
2236 continue;
2237 }
2239 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2241 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2242 if (status != 0)
2243 {
2244 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2245 sock->addr, rrd_strerror(errno));
2246 close (fd);
2247 continue;
2248 }
2250 status = listen (fd, /* backlog = */ 10);
2251 if (status != 0)
2252 {
2253 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2254 sock->addr, rrd_strerror(errno));
2255 close (fd);
2256 freeaddrinfo(ai_res);
2257 return (-1);
2258 }
2260 listen_fds[listen_fds_num].fd = fd;
2261 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2262 listen_fds_num++;
2263 } /* for (ai_ptr) */
2265 freeaddrinfo(ai_res);
2266 return (0);
2267 } /* }}} static int open_listen_socket_network */
2269 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2270 {
2271 assert(sock != NULL);
2272 assert(sock->addr != NULL);
2274 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2275 || sock->addr[0] == '/')
2276 return (open_listen_socket_unix(sock));
2277 else
2278 return (open_listen_socket_network(sock));
2279 } /* }}} int open_listen_socket */
2281 static int close_listen_sockets (void) /* {{{ */
2282 {
2283 size_t i;
2285 for (i = 0; i < listen_fds_num; i++)
2286 {
2287 close (listen_fds[i].fd);
2289 if (listen_fds[i].family == PF_UNIX)
2290 unlink(listen_fds[i].addr);
2291 }
2293 free (listen_fds);
2294 listen_fds = NULL;
2295 listen_fds_num = 0;
2297 return (0);
2298 } /* }}} int close_listen_sockets */
2300 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2301 {
2302 struct pollfd *pollfds;
2303 int pollfds_num;
2304 int status;
2305 int i;
2307 if (listen_fds_num < 1)
2308 {
2309 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2310 return (NULL);
2311 }
2313 pollfds_num = listen_fds_num;
2314 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2315 if (pollfds == NULL)
2316 {
2317 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2318 return (NULL);
2319 }
2320 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2322 RRDD_LOG(LOG_INFO, "listening for connections");
2324 while (do_shutdown == 0)
2325 {
2326 for (i = 0; i < pollfds_num; i++)
2327 {
2328 pollfds[i].fd = listen_fds[i].fd;
2329 pollfds[i].events = POLLIN | POLLPRI;
2330 pollfds[i].revents = 0;
2331 }
2333 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2334 if (do_shutdown)
2335 break;
2336 else if (status == 0) /* timeout */
2337 continue;
2338 else if (status < 0) /* error */
2339 {
2340 status = errno;
2341 if (status != EINTR)
2342 {
2343 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2344 }
2345 continue;
2346 }
2348 for (i = 0; i < pollfds_num; i++)
2349 {
2350 listen_socket_t *client_sock;
2351 struct sockaddr_storage client_sa;
2352 socklen_t client_sa_size;
2353 pthread_t tid;
2354 pthread_attr_t attr;
2356 if (pollfds[i].revents == 0)
2357 continue;
2359 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2360 {
2361 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2362 "poll(2) returned something unexpected for listen FD #%i.",
2363 pollfds[i].fd);
2364 continue;
2365 }
2367 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2368 if (client_sock == NULL)
2369 {
2370 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2371 continue;
2372 }
2373 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2375 client_sa_size = sizeof (client_sa);
2376 client_sock->fd = accept (pollfds[i].fd,
2377 (struct sockaddr *) &client_sa, &client_sa_size);
2378 if (client_sock->fd < 0)
2379 {
2380 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2381 free(client_sock);
2382 continue;
2383 }
2385 pthread_attr_init (&attr);
2386 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2388 status = pthread_create (&tid, &attr, connection_thread_main,
2389 client_sock);
2390 if (status != 0)
2391 {
2392 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2393 close_connection(client_sock);
2394 continue;
2395 }
2396 } /* for (pollfds_num) */
2397 } /* while (do_shutdown == 0) */
2399 RRDD_LOG(LOG_INFO, "starting shutdown");
2401 close_listen_sockets ();
2403 pthread_mutex_lock (&connection_threads_lock);
2404 while (connection_threads_num > 0)
2405 {
2406 pthread_t wait_for;
2408 wait_for = connection_threads[0];
2410 pthread_mutex_unlock (&connection_threads_lock);
2411 pthread_join (wait_for, /* retval = */ NULL);
2412 pthread_mutex_lock (&connection_threads_lock);
2413 }
2414 pthread_mutex_unlock (&connection_threads_lock);
2416 free(pollfds);
2418 return (NULL);
2419 } /* }}} void *listen_thread_main */
2421 static int daemonize (void) /* {{{ */
2422 {
2423 int pid_fd;
2424 char *base_dir;
2426 daemon_uid = geteuid();
2428 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2429 if (pid_fd < 0)
2430 pid_fd = check_pidfile();
2431 if (pid_fd < 0)
2432 return pid_fd;
2434 /* open all the listen sockets */
2435 if (config_listen_address_list_len > 0)
2436 {
2437 for (int i = 0; i < config_listen_address_list_len; i++)
2438 {
2439 open_listen_socket (config_listen_address_list[i]);
2440 free_listen_socket (config_listen_address_list[i]);
2441 }
2443 free(config_listen_address_list);
2444 }
2445 else
2446 {
2447 listen_socket_t sock;
2448 memset(&sock, 0, sizeof(sock));
2449 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2450 open_listen_socket (&sock);
2451 }
2453 if (listen_fds_num < 1)
2454 {
2455 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2456 goto error;
2457 }
2459 if (!stay_foreground)
2460 {
2461 pid_t child;
2463 child = fork ();
2464 if (child < 0)
2465 {
2466 fprintf (stderr, "daemonize: fork(2) failed.\n");
2467 goto error;
2468 }
2469 else if (child > 0)
2470 exit(0);
2472 /* Become session leader */
2473 setsid ();
2475 /* Open the first three file descriptors to /dev/null */
2476 close (2);
2477 close (1);
2478 close (0);
2480 open ("/dev/null", O_RDWR);
2481 dup (0);
2482 dup (0);
2483 } /* if (!stay_foreground) */
2485 /* Change into the /tmp directory. */
2486 base_dir = (config_base_dir != NULL)
2487 ? config_base_dir
2488 : "/tmp";
2490 if (chdir (base_dir) != 0)
2491 {
2492 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2493 goto error;
2494 }
2496 install_signal_handlers();
2498 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2499 RRDD_LOG(LOG_INFO, "starting up");
2501 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2502 (GDestroyNotify) free_cache_item);
2503 if (cache_tree == NULL)
2504 {
2505 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2506 goto error;
2507 }
2509 return write_pidfile (pid_fd);
2511 error:
2512 remove_pidfile();
2513 return -1;
2514 } /* }}} int daemonize */
2516 static int cleanup (void) /* {{{ */
2517 {
2518 do_shutdown++;
2520 pthread_cond_broadcast (&flush_cond);
2521 pthread_join (flush_thread, NULL);
2523 pthread_cond_broadcast (&queue_cond);
2524 for (int i = 0; i < config_queue_threads; i++)
2525 pthread_join (queue_threads[i], NULL);
2527 if (config_flush_at_shutdown)
2528 {
2529 assert(cache_queue_head == NULL);
2530 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2531 }
2533 journal_done();
2534 remove_pidfile ();
2536 free(queue_threads);
2537 free(config_base_dir);
2538 free(config_pid_file);
2539 free(journal_cur);
2540 free(journal_old);
2542 pthread_mutex_lock(&cache_lock);
2543 g_tree_destroy(cache_tree);
2545 RRDD_LOG(LOG_INFO, "goodbye");
2546 closelog ();
2548 return (0);
2549 } /* }}} int cleanup */
2551 static int read_options (int argc, char **argv) /* {{{ */
2552 {
2553 int option;
2554 int status = 0;
2556 while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2557 {
2558 switch (option)
2559 {
2560 case 'g':
2561 stay_foreground=1;
2562 break;
2564 case 'L':
2565 case 'l':
2566 {
2567 listen_socket_t **temp;
2568 listen_socket_t *new;
2570 new = malloc(sizeof(listen_socket_t));
2571 if (new == NULL)
2572 {
2573 fprintf(stderr, "read_options: malloc failed.\n");
2574 return(2);
2575 }
2576 memset(new, 0, sizeof(listen_socket_t));
2578 temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2579 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2580 if (temp == NULL)
2581 {
2582 fprintf (stderr, "read_options: realloc failed.\n");
2583 return (2);
2584 }
2585 config_listen_address_list = temp;
2587 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2588 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2590 temp[config_listen_address_list_len] = new;
2591 config_listen_address_list_len++;
2592 }
2593 break;
2595 case 'f':
2596 {
2597 int temp;
2599 temp = atoi (optarg);
2600 if (temp > 0)
2601 config_flush_interval = temp;
2602 else
2603 {
2604 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2605 status = 3;
2606 }
2607 }
2608 break;
2610 case 'w':
2611 {
2612 int temp;
2614 temp = atoi (optarg);
2615 if (temp > 0)
2616 config_write_interval = temp;
2617 else
2618 {
2619 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2620 status = 2;
2621 }
2622 }
2623 break;
2625 case 'z':
2626 {
2627 int temp;
2629 temp = atoi(optarg);
2630 if (temp > 0)
2631 config_write_jitter = temp;
2632 else
2633 {
2634 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2635 status = 2;
2636 }
2638 break;
2639 }
2641 case 't':
2642 {
2643 int threads;
2644 threads = atoi(optarg);
2645 if (threads >= 1)
2646 config_queue_threads = threads;
2647 else
2648 {
2649 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2650 return 1;
2651 }
2652 }
2653 break;
2655 case 'B':
2656 config_write_base_only = 1;
2657 break;
2659 case 'b':
2660 {
2661 size_t len;
2662 char base_realpath[PATH_MAX];
2664 if (config_base_dir != NULL)
2665 free (config_base_dir);
2666 config_base_dir = strdup (optarg);
2667 if (config_base_dir == NULL)
2668 {
2669 fprintf (stderr, "read_options: strdup failed.\n");
2670 return (3);
2671 }
2673 /* make sure that the base directory is not resolved via
2674 * symbolic links. this makes some performance-enhancing
2675 * assumptions possible (we don't have to resolve paths
2676 * that start with a "/")
2677 */
2678 if (realpath(config_base_dir, base_realpath) == NULL)
2679 {
2680 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2681 return 5;
2682 }
2683 else if (strncmp(config_base_dir,
2684 base_realpath, sizeof(base_realpath)) != 0)
2685 {
2686 fprintf(stderr,
2687 "Base directory (-b) resolved via file system links!\n"
2688 "Please consult rrdcached '-b' documentation!\n"
2689 "Consider specifying the real directory (%s)\n",
2690 base_realpath);
2691 return 5;
2692 }
2694 len = strlen (config_base_dir);
2695 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2696 {
2697 config_base_dir[len - 1] = 0;
2698 len--;
2699 }
2701 if (len < 1)
2702 {
2703 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2704 return (4);
2705 }
2707 _config_base_dir_len = len;
2708 }
2709 break;
2711 case 'p':
2712 {
2713 if (config_pid_file != NULL)
2714 free (config_pid_file);
2715 config_pid_file = strdup (optarg);
2716 if (config_pid_file == NULL)
2717 {
2718 fprintf (stderr, "read_options: strdup failed.\n");
2719 return (3);
2720 }
2721 }
2722 break;
2724 case 'F':
2725 config_flush_at_shutdown = 1;
2726 break;
2728 case 'j':
2729 {
2730 struct stat statbuf;
2731 const char *dir = optarg;
2733 status = stat(dir, &statbuf);
2734 if (status != 0)
2735 {
2736 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2737 return 6;
2738 }
2740 if (!S_ISDIR(statbuf.st_mode)
2741 || access(dir, R_OK|W_OK|X_OK) != 0)
2742 {
2743 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2744 errno ? rrd_strerror(errno) : "");
2745 return 6;
2746 }
2748 journal_cur = malloc(PATH_MAX + 1);
2749 journal_old = malloc(PATH_MAX + 1);
2750 if (journal_cur == NULL || journal_old == NULL)
2751 {
2752 fprintf(stderr, "malloc failure for journal files\n");
2753 return 6;
2754 }
2755 else
2756 {
2757 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2758 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2759 }
2760 }
2761 break;
2763 case 'h':
2764 case '?':
2765 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2766 "\n"
2767 "Usage: rrdcached [options]\n"
2768 "\n"
2769 "Valid options are:\n"
2770 " -l <address> Socket address to listen to.\n"
2771 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2772 " -w <seconds> Interval in which to write data.\n"
2773 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2774 " -t <threads> Number of write threads.\n"
2775 " -f <seconds> Interval in which to flush dead data.\n"
2776 " -p <file> Location of the PID-file.\n"
2777 " -b <dir> Base directory to change to.\n"
2778 " -B Restrict file access to paths within -b <dir>\n"
2779 " -g Do not fork and run in the foreground.\n"
2780 " -j <dir> Directory in which to create the journal files.\n"
2781 " -F Always flush all updates at shutdown\n"
2782 "\n"
2783 "For more information and a detailed description of all options "
2784 "please refer\n"
2785 "to the rrdcached(1) manual page.\n",
2786 VERSION);
2787 status = -1;
2788 break;
2789 } /* switch (option) */
2790 } /* while (getopt) */
2792 /* advise the user when values are not sane */
2793 if (config_flush_interval < 2 * config_write_interval)
2794 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2795 " 2x write interval (-w) !\n");
2796 if (config_write_jitter > config_write_interval)
2797 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2798 " write interval (-w) !\n");
2800 if (config_write_base_only && config_base_dir == NULL)
2801 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2802 " Consult the rrdcached documentation\n");
2804 if (journal_cur == NULL)
2805 config_flush_at_shutdown = 1;
2807 return (status);
2808 } /* }}} int read_options */
2810 int main (int argc, char **argv)
2811 {
2812 int status;
2814 status = read_options (argc, argv);
2815 if (status != 0)
2816 {
2817 if (status < 0)
2818 status = 0;
2819 return (status);
2820 }
2822 status = daemonize ();
2823 if (status != 0)
2824 {
2825 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2826 return (1);
2827 }
2829 journal_init();
2831 /* start the queue threads */
2832 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2833 if (queue_threads == NULL)
2834 {
2835 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2836 cleanup();
2837 return (1);
2838 }
2839 for (int i = 0; i < config_queue_threads; i++)
2840 {
2841 memset (&queue_threads[i], 0, sizeof (*queue_threads));
2842 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2843 if (status != 0)
2844 {
2845 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2846 cleanup();
2847 return (1);
2848 }
2849 }
2851 /* start the flush thread */
2852 memset(&flush_thread, 0, sizeof(flush_thread));
2853 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2854 if (status != 0)
2855 {
2856 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2857 cleanup();
2858 return (1);
2859 }
2861 listen_thread_main (NULL);
2862 cleanup ();
2864 return (0);
2865 } /* int main */
2867 /*
2868 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2869 */