bee21a2d7e6e248cb49f36b9983694892caed058
1 /**
2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * Authors:
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
22 **/
24 #if 0
25 /*
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
27 * possible.
28 */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63 * Now for some includes..
64 */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
92 #include <glib-2.0/glib.h>
93 /* }}} */
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
101 /*
102 * Types
103 */
104 typedef enum
105 {
106 PRIV_LOW,
107 PRIV_HIGH
108 } socket_privilege;
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
113 {
114 int fd;
115 char addr[PATH_MAX + 1];
116 int family;
117 socket_privilege privilege;
119 /* state for BATCH processing */
120 time_t batch_start;
121 int batch_cmd;
123 /* buffered IO */
124 char *rbuf;
125 off_t next_cmd;
126 off_t next_read;
128 char *wbuf;
129 ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137 char *file;
138 char **values;
139 int values_num;
140 time_t last_flush_time;
141 time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144 int flags;
145 pthread_cond_t flushed;
146 cache_item_t *prev;
147 cache_item_t *next;
148 };
150 struct callback_flush_data_s
151 {
152 time_t now;
153 time_t abs_timeout;
154 char **keys;
155 size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
159 enum queue_side_e
160 {
161 HEAD,
162 TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
170 /*
171 * Variables
172 */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
187 /* Cache stuff */
188 static GTree *cache_tree = NULL;
189 static cache_item_t *cache_queue_head = NULL;
190 static cache_item_t *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
224 /*
225 * Functions
226 */
227 static void sig_common (const char *sig) /* {{{ */
228 {
229 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230 do_shutdown++;
231 pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
235 {
236 sig_common("INT");
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
240 {
241 sig_common("TERM");
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
245 {
246 config_flush_at_shutdown = 1;
247 sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
251 {
252 config_flush_at_shutdown = 0;
253 sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
257 {
258 /* These structures are static, because `sigaction' behaves weird if the are
259 * overwritten.. */
260 static struct sigaction sa_int;
261 static struct sigaction sa_term;
262 static struct sigaction sa_pipe;
263 static struct sigaction sa_usr1;
264 static struct sigaction sa_usr2;
266 /* Install signal handlers */
267 memset (&sa_int, 0, sizeof (sa_int));
268 sa_int.sa_handler = sig_int_handler;
269 sigaction (SIGINT, &sa_int, NULL);
271 memset (&sa_term, 0, sizeof (sa_term));
272 sa_term.sa_handler = sig_term_handler;
273 sigaction (SIGTERM, &sa_term, NULL);
275 memset (&sa_pipe, 0, sizeof (sa_pipe));
276 sa_pipe.sa_handler = SIG_IGN;
277 sigaction (SIGPIPE, &sa_pipe, NULL);
279 memset (&sa_pipe, 0, sizeof (sa_usr1));
280 sa_usr1.sa_handler = sig_usr1_handler;
281 sigaction (SIGUSR1, &sa_usr1, NULL);
283 memset (&sa_usr2, 0, sizeof (sa_usr2));
284 sa_usr2.sa_handler = sig_usr2_handler;
285 sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(char *action, int oflag) /* {{{ */
290 {
291 int fd;
292 char *file;
294 file = (config_pid_file != NULL)
295 ? config_pid_file
296 : LOCALSTATEDIR "/run/rrdcached.pid";
298 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
299 if (fd < 0)
300 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
301 action, file, rrd_strerror(errno));
303 return(fd);
304 } /* }}} static int open_pidfile */
306 /* check existing pid file to see whether a daemon is running */
307 static int check_pidfile(void)
308 {
309 int pid_fd;
310 pid_t pid;
311 char pid_str[16];
313 pid_fd = open_pidfile("open", O_RDWR);
314 if (pid_fd < 0)
315 return pid_fd;
317 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
318 return -1;
320 pid = atoi(pid_str);
321 if (pid <= 0)
322 return -1;
324 /* another running process that we can signal COULD be
325 * a competing rrdcached */
326 if (pid != getpid() && kill(pid, 0) == 0)
327 {
328 fprintf(stderr,
329 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
330 close(pid_fd);
331 return -1;
332 }
334 lseek(pid_fd, 0, SEEK_SET);
335 ftruncate(pid_fd, 0);
337 fprintf(stderr,
338 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
339 "rrdcached: starting normally.\n", pid);
341 return pid_fd;
342 } /* }}} static int check_pidfile */
344 static int write_pidfile (int fd) /* {{{ */
345 {
346 pid_t pid;
347 FILE *fh;
349 pid = getpid ();
351 fh = fdopen (fd, "w");
352 if (fh == NULL)
353 {
354 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
355 close(fd);
356 return (-1);
357 }
359 fprintf (fh, "%i\n", (int) pid);
360 fclose (fh);
362 return (0);
363 } /* }}} int write_pidfile */
365 static int remove_pidfile (void) /* {{{ */
366 {
367 char *file;
368 int status;
370 file = (config_pid_file != NULL)
371 ? config_pid_file
372 : LOCALSTATEDIR "/run/rrdcached.pid";
374 status = unlink (file);
375 if (status == 0)
376 return (0);
377 return (errno);
378 } /* }}} int remove_pidfile */
380 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
381 {
382 char *eol;
384 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
385 sock->next_read - sock->next_cmd);
387 if (eol == NULL)
388 {
389 /* no commands left, move remainder back to front of rbuf */
390 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
391 sock->next_read - sock->next_cmd);
392 sock->next_read -= sock->next_cmd;
393 sock->next_cmd = 0;
394 *len = 0;
395 return NULL;
396 }
397 else
398 {
399 char *cmd = sock->rbuf + sock->next_cmd;
400 *eol = '\0';
402 sock->next_cmd = eol - sock->rbuf + 1;
404 if (eol > sock->rbuf && *(eol-1) == '\r')
405 *(--eol) = '\0'; /* handle "\r\n" EOL */
407 *len = eol - cmd;
409 return cmd;
410 }
412 /* NOTREACHED */
413 assert(1==0);
414 }
416 /* add the characters directly to the write buffer */
417 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
418 {
419 char *new_buf;
421 assert(sock != NULL);
423 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
424 if (new_buf == NULL)
425 {
426 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
427 return -1;
428 }
430 strncpy(new_buf + sock->wbuf_len, str, len + 1);
432 sock->wbuf = new_buf;
433 sock->wbuf_len += len;
435 return 0;
436 } /* }}} static int add_to_wbuf */
438 /* add the text to the "extra" info that's sent after the status line */
439 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
440 {
441 va_list argp;
442 char buffer[CMD_MAX];
443 int len;
445 if (sock == NULL) return 0; /* journal replay mode */
446 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
448 va_start(argp, fmt);
449 #ifdef HAVE_VSNPRINTF
450 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
451 #else
452 len = vsprintf(buffer, fmt, argp);
453 #endif
454 va_end(argp);
455 if (len < 0)
456 {
457 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
458 return -1;
459 }
461 return add_to_wbuf(sock, buffer, len);
462 } /* }}} static int add_response_info */
464 static int count_lines(char *str) /* {{{ */
465 {
466 int lines = 0;
468 if (str != NULL)
469 {
470 while ((str = strchr(str, '\n')) != NULL)
471 {
472 ++lines;
473 ++str;
474 }
475 }
477 return lines;
478 } /* }}} static int count_lines */
480 /* send the response back to the user.
481 * returns 0 on success, -1 on error
482 * write buffer is always zeroed after this call */
483 static int send_response (listen_socket_t *sock, response_code rc,
484 char *fmt, ...) /* {{{ */
485 {
486 va_list argp;
487 char buffer[CMD_MAX];
488 int lines;
489 ssize_t wrote;
490 int rclen, len;
492 if (sock == NULL) return rc; /* journal replay mode */
494 if (sock->batch_start)
495 {
496 if (rc == RESP_OK)
497 return rc; /* no response on success during BATCH */
498 lines = sock->batch_cmd;
499 }
500 else if (rc == RESP_OK)
501 lines = count_lines(sock->wbuf);
502 else
503 lines = -1;
505 rclen = sprintf(buffer, "%d ", lines);
506 va_start(argp, fmt);
507 #ifdef HAVE_VSNPRINTF
508 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
509 #else
510 len = vsprintf(buffer+rclen, fmt, argp);
511 #endif
512 va_end(argp);
513 if (len < 0)
514 return -1;
516 len += rclen;
518 /* append the result to the wbuf, don't write to the user */
519 if (sock->batch_start)
520 return add_to_wbuf(sock, buffer, len);
522 /* first write must be complete */
523 if (len != write(sock->fd, buffer, len))
524 {
525 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
526 return -1;
527 }
529 if (sock->wbuf != NULL && rc == RESP_OK)
530 {
531 wrote = 0;
532 while (wrote < sock->wbuf_len)
533 {
534 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
535 if (wb <= 0)
536 {
537 RRDD_LOG(LOG_INFO, "send_response: could not write results");
538 return -1;
539 }
540 wrote += wb;
541 }
542 }
544 free(sock->wbuf); sock->wbuf = NULL;
545 sock->wbuf_len = 0;
547 return 0;
548 } /* }}} */
550 static void wipe_ci_values(cache_item_t *ci, time_t when)
551 {
552 ci->values = NULL;
553 ci->values_num = 0;
555 ci->last_flush_time = when;
556 if (config_write_jitter > 0)
557 ci->last_flush_time += (random() % config_write_jitter);
558 }
560 /* remove_from_queue
561 * remove a "cache_item_t" item from the queue.
562 * must hold 'cache_lock' when calling this
563 */
564 static void remove_from_queue(cache_item_t *ci) /* {{{ */
565 {
566 if (ci == NULL) return;
567 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
569 if (ci->prev == NULL)
570 cache_queue_head = ci->next; /* reset head */
571 else
572 ci->prev->next = ci->next;
574 if (ci->next == NULL)
575 cache_queue_tail = ci->prev; /* reset the tail */
576 else
577 ci->next->prev = ci->prev;
579 ci->next = ci->prev = NULL;
580 ci->flags &= ~CI_FLAGS_IN_QUEUE;
581 } /* }}} static void remove_from_queue */
583 /* remove an entry from the tree and free all its resources.
584 * must hold 'cache lock' while calling this.
585 * returns 0 on success, otherwise errno */
586 static int forget_file(const char *file)
587 {
588 cache_item_t *ci;
590 ci = g_tree_lookup(cache_tree, file);
591 if (ci == NULL)
592 return ENOENT;
594 g_tree_remove (cache_tree, file);
595 remove_from_queue(ci);
597 for (int i=0; i < ci->values_num; i++)
598 free(ci->values[i]);
600 free (ci->values);
601 free (ci->file);
603 /* in case anyone is waiting */
604 pthread_cond_broadcast(&ci->flushed);
606 free (ci);
608 return 0;
609 } /* }}} static int forget_file */
611 /*
612 * enqueue_cache_item:
613 * `cache_lock' must be acquired before calling this function!
614 */
615 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
616 queue_side_t side)
617 {
618 if (ci == NULL)
619 return (-1);
621 if (ci->values_num == 0)
622 return (0);
624 if (side == HEAD)
625 {
626 if (cache_queue_head == ci)
627 return 0;
629 /* remove if further down in queue */
630 remove_from_queue(ci);
632 ci->prev = NULL;
633 ci->next = cache_queue_head;
634 if (ci->next != NULL)
635 ci->next->prev = ci;
636 cache_queue_head = ci;
638 if (cache_queue_tail == NULL)
639 cache_queue_tail = cache_queue_head;
640 }
641 else /* (side == TAIL) */
642 {
643 /* We don't move values back in the list.. */
644 if (ci->flags & CI_FLAGS_IN_QUEUE)
645 return (0);
647 assert (ci->next == NULL);
648 assert (ci->prev == NULL);
650 ci->prev = cache_queue_tail;
652 if (cache_queue_tail == NULL)
653 cache_queue_head = ci;
654 else
655 cache_queue_tail->next = ci;
657 cache_queue_tail = ci;
658 }
660 ci->flags |= CI_FLAGS_IN_QUEUE;
662 pthread_cond_broadcast(&cache_cond);
663 pthread_mutex_lock (&stats_lock);
664 stats_queue_length++;
665 pthread_mutex_unlock (&stats_lock);
667 return (0);
668 } /* }}} int enqueue_cache_item */
670 /*
671 * tree_callback_flush:
672 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
673 * while this is in progress.
674 */
675 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
676 gpointer data)
677 {
678 cache_item_t *ci;
679 callback_flush_data_t *cfd;
681 ci = (cache_item_t *) value;
682 cfd = (callback_flush_data_t *) data;
684 if (ci->flags & CI_FLAGS_IN_QUEUE)
685 return FALSE;
687 if ((ci->last_flush_time <= cfd->abs_timeout)
688 && (ci->values_num > 0))
689 {
690 enqueue_cache_item (ci, TAIL);
691 }
692 else if ((do_shutdown != 0)
693 && (ci->values_num > 0))
694 {
695 enqueue_cache_item (ci, TAIL);
696 }
697 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
698 && (ci->values_num <= 0))
699 {
700 char **temp;
702 temp = (char **) realloc (cfd->keys,
703 sizeof (char *) * (cfd->keys_num + 1));
704 if (temp == NULL)
705 {
706 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
707 return (FALSE);
708 }
709 cfd->keys = temp;
710 /* Make really sure this points to the _same_ place */
711 assert ((char *) key == ci->file);
712 cfd->keys[cfd->keys_num] = (char *) key;
713 cfd->keys_num++;
714 }
716 return (FALSE);
717 } /* }}} gboolean tree_callback_flush */
719 static int flush_old_values (int max_age)
720 {
721 callback_flush_data_t cfd;
722 size_t k;
724 memset (&cfd, 0, sizeof (cfd));
725 /* Pass the current time as user data so that we don't need to call
726 * `time' for each node. */
727 cfd.now = time (NULL);
728 cfd.keys = NULL;
729 cfd.keys_num = 0;
731 if (max_age > 0)
732 cfd.abs_timeout = cfd.now - max_age;
733 else
734 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
736 /* `tree_callback_flush' will return the keys of all values that haven't
737 * been touched in the last `config_flush_interval' seconds in `cfd'.
738 * The char*'s in this array point to the same memory as ci->file, so we
739 * don't need to free them separately. */
740 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
742 for (k = 0; k < cfd.keys_num; k++)
743 {
744 /* should never fail, since we have held the cache_lock
745 * the entire time */
746 assert( forget_file(cfd.keys[k]) == 0 );
747 }
749 if (cfd.keys != NULL)
750 {
751 free (cfd.keys);
752 cfd.keys = NULL;
753 }
755 return (0);
756 } /* int flush_old_values */
758 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
759 {
760 struct timeval now;
761 struct timespec next_flush;
762 int final_flush = 0; /* make sure we only flush once on shutdown */
764 gettimeofday (&now, NULL);
765 next_flush.tv_sec = now.tv_sec + config_flush_interval;
766 next_flush.tv_nsec = 1000 * now.tv_usec;
768 pthread_mutex_lock (&cache_lock);
769 while ((do_shutdown == 0) || (cache_queue_head != NULL))
770 {
771 cache_item_t *ci;
772 char *file;
773 char **values;
774 int values_num;
775 int status;
776 int i;
778 /* First, check if it's time to do the cache flush. */
779 gettimeofday (&now, NULL);
780 if ((now.tv_sec > next_flush.tv_sec)
781 || ((now.tv_sec == next_flush.tv_sec)
782 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
783 {
784 /* Flush all values that haven't been written in the last
785 * `config_write_interval' seconds. */
786 flush_old_values (config_write_interval);
788 /* Determine the time of the next cache flush. */
789 next_flush.tv_sec =
790 now.tv_sec + next_flush.tv_sec % config_flush_interval;
792 /* unlock the cache while we rotate so we don't block incoming
793 * updates if the fsync() blocks on disk I/O */
794 pthread_mutex_unlock(&cache_lock);
795 journal_rotate();
796 pthread_mutex_lock(&cache_lock);
797 }
799 /* Now, check if there's something to store away. If not, wait until
800 * something comes in or it's time to do the cache flush. if we are
801 * shutting down, do not wait around. */
802 if (cache_queue_head == NULL && !do_shutdown)
803 {
804 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
805 if ((status != 0) && (status != ETIMEDOUT))
806 {
807 RRDD_LOG (LOG_ERR, "queue_thread_main: "
808 "pthread_cond_timedwait returned %i.", status);
809 }
810 }
812 /* We're about to shut down */
813 if (do_shutdown != 0 && !final_flush++)
814 {
815 if (config_flush_at_shutdown)
816 flush_old_values (-1); /* flush everything */
817 else
818 break;
819 }
821 /* Check if a value has arrived. This may be NULL if we timed out or there
822 * was an interrupt such as a signal. */
823 if (cache_queue_head == NULL)
824 continue;
826 ci = cache_queue_head;
828 /* copy the relevant parts */
829 file = strdup (ci->file);
830 if (file == NULL)
831 {
832 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
833 continue;
834 }
836 assert(ci->values != NULL);
837 assert(ci->values_num > 0);
839 values = ci->values;
840 values_num = ci->values_num;
842 wipe_ci_values(ci, time(NULL));
843 remove_from_queue(ci);
845 pthread_mutex_lock (&stats_lock);
846 assert (stats_queue_length > 0);
847 stats_queue_length--;
848 pthread_mutex_unlock (&stats_lock);
850 pthread_mutex_unlock (&cache_lock);
852 rrd_clear_error ();
853 status = rrd_update_r (file, NULL, values_num, (void *) values);
854 if (status != 0)
855 {
856 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
857 "rrd_update_r (%s) failed with status %i. (%s)",
858 file, status, rrd_get_error());
859 }
861 journal_write("wrote", file);
862 pthread_cond_broadcast(&ci->flushed);
864 for (i = 0; i < values_num; i++)
865 free (values[i]);
867 free(values);
868 free(file);
870 if (status == 0)
871 {
872 pthread_mutex_lock (&stats_lock);
873 stats_updates_written++;
874 stats_data_sets_written += values_num;
875 pthread_mutex_unlock (&stats_lock);
876 }
878 pthread_mutex_lock (&cache_lock);
880 /* We're about to shut down */
881 if (do_shutdown != 0 && !final_flush++)
882 {
883 if (config_flush_at_shutdown)
884 flush_old_values (-1); /* flush everything */
885 else
886 break;
887 }
888 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
889 pthread_mutex_unlock (&cache_lock);
891 if (config_flush_at_shutdown)
892 {
893 assert(cache_queue_head == NULL);
894 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
895 }
897 journal_done();
899 return (NULL);
900 } /* }}} void *queue_thread_main */
902 static int buffer_get_field (char **buffer_ret, /* {{{ */
903 size_t *buffer_size_ret, char **field_ret)
904 {
905 char *buffer;
906 size_t buffer_pos;
907 size_t buffer_size;
908 char *field;
909 size_t field_size;
910 int status;
912 buffer = *buffer_ret;
913 buffer_pos = 0;
914 buffer_size = *buffer_size_ret;
915 field = *buffer_ret;
916 field_size = 0;
918 if (buffer_size <= 0)
919 return (-1);
921 /* This is ensured by `handle_request'. */
922 assert (buffer[buffer_size - 1] == '\0');
924 status = -1;
925 while (buffer_pos < buffer_size)
926 {
927 /* Check for end-of-field or end-of-buffer */
928 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
929 {
930 field[field_size] = 0;
931 field_size++;
932 buffer_pos++;
933 status = 0;
934 break;
935 }
936 /* Handle escaped characters. */
937 else if (buffer[buffer_pos] == '\\')
938 {
939 if (buffer_pos >= (buffer_size - 1))
940 break;
941 buffer_pos++;
942 field[field_size] = buffer[buffer_pos];
943 field_size++;
944 buffer_pos++;
945 }
946 /* Normal operation */
947 else
948 {
949 field[field_size] = buffer[buffer_pos];
950 field_size++;
951 buffer_pos++;
952 }
953 } /* while (buffer_pos < buffer_size) */
955 if (status != 0)
956 return (status);
958 *buffer_ret = buffer + buffer_pos;
959 *buffer_size_ret = buffer_size - buffer_pos;
960 *field_ret = field;
962 return (0);
963 } /* }}} int buffer_get_field */
965 /* if we're restricting writes to the base directory,
966 * check whether the file falls within the dir
967 * returns 1 if OK, otherwise 0
968 */
969 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
970 {
971 assert(file != NULL);
973 if (!config_write_base_only
974 || sock == NULL /* journal replay */
975 || config_base_dir == NULL)
976 return 1;
978 if (strstr(file, "../") != NULL) goto err;
980 /* relative paths without "../" are ok */
981 if (*file != '/') return 1;
983 /* file must be of the format base + "/" + <1+ char filename> */
984 if (strlen(file) < _config_base_dir_len + 2) goto err;
985 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
986 if (*(file + _config_base_dir_len) != '/') goto err;
988 return 1;
990 err:
991 if (sock != NULL && sock->fd >= 0)
992 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
994 return 0;
995 } /* }}} static int check_file_access */
997 /* when using a base dir, convert relative paths to absolute paths.
998 * if necessary, modifies the "filename" pointer to point
999 * to the new path created in "tmp". "tmp" is provided
1000 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1001 *
1002 * this allows us to optimize for the expected case (absolute path)
1003 * with a no-op.
1004 */
1005 static void get_abs_path(char **filename, char *tmp)
1006 {
1007 assert(tmp != NULL);
1008 assert(filename != NULL && *filename != NULL);
1010 if (config_base_dir == NULL || **filename == '/')
1011 return;
1013 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1014 *filename = tmp;
1015 } /* }}} static int get_abs_path */
1017 /* returns 1 if we have the required privilege level,
1018 * otherwise issue an error to the user on sock */
1019 static int has_privilege (listen_socket_t *sock, /* {{{ */
1020 socket_privilege priv)
1021 {
1022 if (sock == NULL) /* journal replay */
1023 return 1;
1025 if (sock->privilege >= priv)
1026 return 1;
1028 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1029 } /* }}} static int has_privilege */
1031 static int flush_file (const char *filename) /* {{{ */
1032 {
1033 cache_item_t *ci;
1035 pthread_mutex_lock (&cache_lock);
1037 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1038 if (ci == NULL)
1039 {
1040 pthread_mutex_unlock (&cache_lock);
1041 return (ENOENT);
1042 }
1044 if (ci->values_num > 0)
1045 {
1046 /* Enqueue at head */
1047 enqueue_cache_item (ci, HEAD);
1048 pthread_cond_wait(&ci->flushed, &cache_lock);
1049 }
1051 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1052 * may have been purged during our cond_wait() */
1054 pthread_mutex_unlock(&cache_lock);
1056 return (0);
1057 } /* }}} int flush_file */
1059 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1060 char *buffer, size_t buffer_size)
1061 {
1062 int status;
1063 char **help_text;
1064 char *command;
1066 char *help_help[2] =
1067 {
1068 "Command overview\n"
1069 ,
1070 "HELP [<command>]\n"
1071 "FLUSH <filename>\n"
1072 "FLUSHALL\n"
1073 "PENDING <filename>\n"
1074 "FORGET <filename>\n"
1075 "UPDATE <filename> <values> [<values> ...]\n"
1076 "BATCH\n"
1077 "STATS\n"
1078 };
1080 char *help_flush[2] =
1081 {
1082 "Help for FLUSH\n"
1083 ,
1084 "Usage: FLUSH <filename>\n"
1085 "\n"
1086 "Adds the given filename to the head of the update queue and returns\n"
1087 "after is has been dequeued.\n"
1088 };
1090 char *help_flushall[2] =
1091 {
1092 "Help for FLUSHALL\n"
1093 ,
1094 "Usage: FLUSHALL\n"
1095 "\n"
1096 "Triggers writing of all pending updates. Returns immediately.\n"
1097 };
1099 char *help_pending[2] =
1100 {
1101 "Help for PENDING\n"
1102 ,
1103 "Usage: PENDING <filename>\n"
1104 "\n"
1105 "Shows any 'pending' updates for a file, in order.\n"
1106 "The updates shown have not yet been written to the underlying RRD file.\n"
1107 };
1109 char *help_forget[2] =
1110 {
1111 "Help for FORGET\n"
1112 ,
1113 "Usage: FORGET <filename>\n"
1114 "\n"
1115 "Removes the file completely from the cache.\n"
1116 "Any pending updates for the file will be lost.\n"
1117 };
1119 char *help_update[2] =
1120 {
1121 "Help for UPDATE\n"
1122 ,
1123 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1124 "\n"
1125 "Adds the given file to the internal cache if it is not yet known and\n"
1126 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1127 "for details.\n"
1128 "\n"
1129 "Each <values> has the following form:\n"
1130 " <values> = <time>:<value>[:<value>[...]]\n"
1131 "See the rrdupdate(1) manpage for details.\n"
1132 };
1134 char *help_stats[2] =
1135 {
1136 "Help for STATS\n"
1137 ,
1138 "Usage: STATS\n"
1139 "\n"
1140 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1141 "a description of the values.\n"
1142 };
1144 char *help_batch[2] =
1145 {
1146 "Help for BATCH\n"
1147 ,
1148 "The 'BATCH' command permits the client to initiate a bulk load\n"
1149 " of commands to rrdcached.\n"
1150 "\n"
1151 "Usage:\n"
1152 "\n"
1153 " client: BATCH\n"
1154 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1155 " client: command #1\n"
1156 " client: command #2\n"
1157 " client: ... and so on\n"
1158 " client: .\n"
1159 " server: 2 errors\n"
1160 " server: 7 message for command #7\n"
1161 " server: 9 message for command #9\n"
1162 "\n"
1163 "For more information, consult the rrdcached(1) documentation.\n"
1164 };
1166 status = buffer_get_field (&buffer, &buffer_size, &command);
1167 if (status != 0)
1168 help_text = help_help;
1169 else
1170 {
1171 if (strcasecmp (command, "update") == 0)
1172 help_text = help_update;
1173 else if (strcasecmp (command, "flush") == 0)
1174 help_text = help_flush;
1175 else if (strcasecmp (command, "flushall") == 0)
1176 help_text = help_flushall;
1177 else if (strcasecmp (command, "pending") == 0)
1178 help_text = help_pending;
1179 else if (strcasecmp (command, "forget") == 0)
1180 help_text = help_forget;
1181 else if (strcasecmp (command, "stats") == 0)
1182 help_text = help_stats;
1183 else if (strcasecmp (command, "batch") == 0)
1184 help_text = help_batch;
1185 else
1186 help_text = help_help;
1187 }
1189 add_response_info(sock, help_text[1]);
1190 return send_response(sock, RESP_OK, help_text[0]);
1191 } /* }}} int handle_request_help */
1193 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1194 {
1195 uint64_t copy_queue_length;
1196 uint64_t copy_updates_received;
1197 uint64_t copy_flush_received;
1198 uint64_t copy_updates_written;
1199 uint64_t copy_data_sets_written;
1200 uint64_t copy_journal_bytes;
1201 uint64_t copy_journal_rotate;
1203 uint64_t tree_nodes_number;
1204 uint64_t tree_depth;
1206 pthread_mutex_lock (&stats_lock);
1207 copy_queue_length = stats_queue_length;
1208 copy_updates_received = stats_updates_received;
1209 copy_flush_received = stats_flush_received;
1210 copy_updates_written = stats_updates_written;
1211 copy_data_sets_written = stats_data_sets_written;
1212 copy_journal_bytes = stats_journal_bytes;
1213 copy_journal_rotate = stats_journal_rotate;
1214 pthread_mutex_unlock (&stats_lock);
1216 pthread_mutex_lock (&cache_lock);
1217 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1218 tree_depth = (uint64_t) g_tree_height (cache_tree);
1219 pthread_mutex_unlock (&cache_lock);
1221 add_response_info(sock,
1222 "QueueLength: %"PRIu64"\n", copy_queue_length);
1223 add_response_info(sock,
1224 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1225 add_response_info(sock,
1226 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1227 add_response_info(sock,
1228 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1229 add_response_info(sock,
1230 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1231 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1232 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1233 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1234 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1236 send_response(sock, RESP_OK, "Statistics follow\n");
1238 return (0);
1239 } /* }}} int handle_request_stats */
1241 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1242 char *buffer, size_t buffer_size)
1243 {
1244 char *file, file_tmp[PATH_MAX];
1245 int status;
1247 status = buffer_get_field (&buffer, &buffer_size, &file);
1248 if (status != 0)
1249 {
1250 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1251 }
1252 else
1253 {
1254 pthread_mutex_lock(&stats_lock);
1255 stats_flush_received++;
1256 pthread_mutex_unlock(&stats_lock);
1258 get_abs_path(&file, file_tmp);
1259 if (!check_file_access(file, sock)) return 0;
1261 status = flush_file (file);
1262 if (status == 0)
1263 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1264 else if (status == ENOENT)
1265 {
1266 /* no file in our tree; see whether it exists at all */
1267 struct stat statbuf;
1269 memset(&statbuf, 0, sizeof(statbuf));
1270 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1271 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1272 else
1273 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1274 }
1275 else if (status < 0)
1276 return send_response(sock, RESP_ERR, "Internal error.\n");
1277 else
1278 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1279 }
1281 /* NOTREACHED */
1282 assert(1==0);
1283 } /* }}} int handle_request_flush */
1285 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1286 {
1287 int status;
1289 status = has_privilege(sock, PRIV_HIGH);
1290 if (status <= 0)
1291 return status;
1293 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1295 pthread_mutex_lock(&cache_lock);
1296 flush_old_values(-1);
1297 pthread_mutex_unlock(&cache_lock);
1299 return send_response(sock, RESP_OK, "Started flush.\n");
1300 } /* }}} static int handle_request_flushall */
1302 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1303 char *buffer, size_t buffer_size)
1304 {
1305 int status;
1306 char *file, file_tmp[PATH_MAX];
1307 cache_item_t *ci;
1309 status = buffer_get_field(&buffer, &buffer_size, &file);
1310 if (status != 0)
1311 return send_response(sock, RESP_ERR,
1312 "Usage: PENDING <filename>\n");
1314 status = has_privilege(sock, PRIV_HIGH);
1315 if (status <= 0)
1316 return status;
1318 get_abs_path(&file, file_tmp);
1320 pthread_mutex_lock(&cache_lock);
1321 ci = g_tree_lookup(cache_tree, file);
1322 if (ci == NULL)
1323 {
1324 pthread_mutex_unlock(&cache_lock);
1325 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1326 }
1328 for (int i=0; i < ci->values_num; i++)
1329 add_response_info(sock, "%s\n", ci->values[i]);
1331 pthread_mutex_unlock(&cache_lock);
1332 return send_response(sock, RESP_OK, "updates pending\n");
1333 } /* }}} static int handle_request_pending */
1335 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1336 char *buffer, size_t buffer_size)
1337 {
1338 int status;
1339 char *file, file_tmp[PATH_MAX];
1341 status = buffer_get_field(&buffer, &buffer_size, &file);
1342 if (status != 0)
1343 return send_response(sock, RESP_ERR,
1344 "Usage: FORGET <filename>\n");
1346 status = has_privilege(sock, PRIV_HIGH);
1347 if (status <= 0)
1348 return status;
1350 get_abs_path(&file, file_tmp);
1351 if (!check_file_access(file, sock)) return 0;
1353 pthread_mutex_lock(&cache_lock);
1354 status = forget_file(file);
1355 pthread_mutex_unlock(&cache_lock);
1357 if (status == 0)
1358 {
1359 if (sock != NULL)
1360 journal_write("forget", file);
1362 return send_response(sock, RESP_OK, "Gone!\n");
1363 }
1364 else
1365 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1366 status < 0 ? "Internal error" : rrd_strerror(status));
1368 /* NOTREACHED */
1369 assert(1==0);
1370 } /* }}} static int handle_request_forget */
1372 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1373 time_t now,
1374 char *buffer, size_t buffer_size)
1375 {
1376 char *file, file_tmp[PATH_MAX];
1377 int values_num = 0;
1378 int bad_timestamps = 0;
1379 int status;
1380 char orig_buf[CMD_MAX];
1382 cache_item_t *ci;
1384 status = has_privilege(sock, PRIV_HIGH);
1385 if (status <= 0)
1386 return status;
1388 /* save it for the journal later */
1389 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1391 status = buffer_get_field (&buffer, &buffer_size, &file);
1392 if (status != 0)
1393 return send_response(sock, RESP_ERR,
1394 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1396 pthread_mutex_lock(&stats_lock);
1397 stats_updates_received++;
1398 pthread_mutex_unlock(&stats_lock);
1400 get_abs_path(&file, file_tmp);
1401 if (!check_file_access(file, sock)) return 0;
1403 pthread_mutex_lock (&cache_lock);
1404 ci = g_tree_lookup (cache_tree, file);
1406 if (ci == NULL) /* {{{ */
1407 {
1408 struct stat statbuf;
1410 /* don't hold the lock while we setup; stat(2) might block */
1411 pthread_mutex_unlock(&cache_lock);
1413 memset (&statbuf, 0, sizeof (statbuf));
1414 status = stat (file, &statbuf);
1415 if (status != 0)
1416 {
1417 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1419 status = errno;
1420 if (status == ENOENT)
1421 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1422 else
1423 return send_response(sock, RESP_ERR,
1424 "stat failed with error %i.\n", status);
1425 }
1426 if (!S_ISREG (statbuf.st_mode))
1427 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1429 if (access(file, R_OK|W_OK) != 0)
1430 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1431 file, rrd_strerror(errno));
1433 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1434 if (ci == NULL)
1435 {
1436 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1438 return send_response(sock, RESP_ERR, "malloc failed.\n");
1439 }
1440 memset (ci, 0, sizeof (cache_item_t));
1442 ci->file = strdup (file);
1443 if (ci->file == NULL)
1444 {
1445 free (ci);
1446 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1448 return send_response(sock, RESP_ERR, "strdup failed.\n");
1449 }
1451 wipe_ci_values(ci, now);
1452 ci->flags = CI_FLAGS_IN_TREE;
1453 ci->flushed = PTHREAD_COND_INITIALIZER;
1455 pthread_mutex_lock(&cache_lock);
1456 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1457 } /* }}} */
1458 assert (ci != NULL);
1460 /* don't re-write updates in replay mode */
1461 if (sock != NULL)
1462 journal_write("update", orig_buf);
1464 while (buffer_size > 0)
1465 {
1466 char **temp;
1467 char *value;
1468 time_t stamp;
1469 char *eostamp;
1471 status = buffer_get_field (&buffer, &buffer_size, &value);
1472 if (status != 0)
1473 {
1474 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1475 break;
1476 }
1478 /* make sure update time is always moving forward */
1479 stamp = strtol(value, &eostamp, 10);
1480 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1481 {
1482 ++bad_timestamps;
1483 add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1484 continue;
1485 }
1486 else if (stamp <= ci->last_update_stamp)
1487 {
1488 ++bad_timestamps;
1489 add_response_info(sock,
1490 "illegal attempt to update using time %ld when"
1491 " last update time is %ld (minimum one second step)\n",
1492 stamp, ci->last_update_stamp);
1493 continue;
1494 }
1495 else
1496 ci->last_update_stamp = stamp;
1498 temp = (char **) realloc (ci->values,
1499 sizeof (char *) * (ci->values_num + 1));
1500 if (temp == NULL)
1501 {
1502 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1503 continue;
1504 }
1505 ci->values = temp;
1507 ci->values[ci->values_num] = strdup (value);
1508 if (ci->values[ci->values_num] == NULL)
1509 {
1510 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1511 continue;
1512 }
1513 ci->values_num++;
1515 values_num++;
1516 }
1518 if (((now - ci->last_flush_time) >= config_write_interval)
1519 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1520 && (ci->values_num > 0))
1521 {
1522 enqueue_cache_item (ci, TAIL);
1523 }
1525 pthread_mutex_unlock (&cache_lock);
1527 if (values_num < 1)
1528 {
1529 /* journal replay mode */
1530 if (sock == NULL) return RESP_ERR;
1532 /* if we had only one update attempt, then return the full
1533 error message... try to get the most information out
1534 of the limited error space allowed by the protocol
1535 */
1536 if (bad_timestamps == 1)
1537 return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1538 else
1539 return send_response(sock, RESP_ERR,
1540 "No values updated (%d bad timestamps).\n",
1541 bad_timestamps);
1542 }
1543 else
1544 return send_response(sock, RESP_OK,
1545 "errors, enqueued %i value(s).\n", values_num);
1547 /* NOTREACHED */
1548 assert(1==0);
1550 } /* }}} int handle_request_update */
1552 /* we came across a "WROTE" entry during journal replay.
1553 * throw away any values that we have accumulated for this file
1554 */
1555 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1556 {
1557 int i;
1558 cache_item_t *ci;
1559 const char *file = buffer;
1561 pthread_mutex_lock(&cache_lock);
1563 ci = g_tree_lookup(cache_tree, file);
1564 if (ci == NULL)
1565 {
1566 pthread_mutex_unlock(&cache_lock);
1567 return (0);
1568 }
1570 if (ci->values)
1571 {
1572 for (i=0; i < ci->values_num; i++)
1573 free(ci->values[i]);
1575 free(ci->values);
1576 }
1578 wipe_ci_values(ci, now);
1579 remove_from_queue(ci);
1581 pthread_mutex_unlock(&cache_lock);
1582 return (0);
1583 } /* }}} int handle_request_wrote */
1585 /* start "BATCH" processing */
1586 static int batch_start (listen_socket_t *sock) /* {{{ */
1587 {
1588 int status;
1589 if (sock->batch_start)
1590 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1592 status = send_response(sock, RESP_OK,
1593 "Go ahead. End with dot '.' on its own line.\n");
1594 sock->batch_start = time(NULL);
1595 sock->batch_cmd = 0;
1597 return status;
1598 } /* }}} static int batch_start */
1600 /* finish "BATCH" processing and return results to the client */
1601 static int batch_done (listen_socket_t *sock) /* {{{ */
1602 {
1603 assert(sock->batch_start);
1604 sock->batch_start = 0;
1605 sock->batch_cmd = 0;
1606 return send_response(sock, RESP_OK, "errors\n");
1607 } /* }}} static int batch_done */
1609 /* if sock==NULL, we are in journal replay mode */
1610 static int handle_request (listen_socket_t *sock, /* {{{ */
1611 time_t now,
1612 char *buffer, size_t buffer_size)
1613 {
1614 char *buffer_ptr;
1615 char *command;
1616 int status;
1618 assert (buffer[buffer_size - 1] == '\0');
1620 buffer_ptr = buffer;
1621 command = NULL;
1622 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1623 if (status != 0)
1624 {
1625 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1626 return (-1);
1627 }
1629 if (sock != NULL && sock->batch_start)
1630 sock->batch_cmd++;
1632 if (strcasecmp (command, "update") == 0)
1633 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1634 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1635 {
1636 /* this is only valid in replay mode */
1637 return (handle_request_wrote (buffer_ptr, now));
1638 }
1639 else if (strcasecmp (command, "flush") == 0)
1640 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1641 else if (strcasecmp (command, "flushall") == 0)
1642 return (handle_request_flushall(sock));
1643 else if (strcasecmp (command, "pending") == 0)
1644 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1645 else if (strcasecmp (command, "forget") == 0)
1646 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1647 else if (strcasecmp (command, "stats") == 0)
1648 return (handle_request_stats (sock));
1649 else if (strcasecmp (command, "help") == 0)
1650 return (handle_request_help (sock, buffer_ptr, buffer_size));
1651 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1652 return batch_start(sock);
1653 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1654 return batch_done(sock);
1655 else
1656 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1658 /* NOTREACHED */
1659 assert(1==0);
1660 } /* }}} int handle_request */
1662 /* MUST NOT hold journal_lock before calling this */
1663 static void journal_rotate(void) /* {{{ */
1664 {
1665 FILE *old_fh = NULL;
1666 int new_fd;
1668 if (journal_cur == NULL || journal_old == NULL)
1669 return;
1671 pthread_mutex_lock(&journal_lock);
1673 /* we rotate this way (rename before close) so that the we can release
1674 * the journal lock as fast as possible. Journal writes to the new
1675 * journal can proceed immediately after the new file is opened. The
1676 * fclose can then block without affecting new updates.
1677 */
1678 if (journal_fh != NULL)
1679 {
1680 old_fh = journal_fh;
1681 journal_fh = NULL;
1682 rename(journal_cur, journal_old);
1683 ++stats_journal_rotate;
1684 }
1686 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1687 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1688 if (new_fd >= 0)
1689 {
1690 journal_fh = fdopen(new_fd, "a");
1691 if (journal_fh == NULL)
1692 close(new_fd);
1693 }
1695 pthread_mutex_unlock(&journal_lock);
1697 if (old_fh != NULL)
1698 fclose(old_fh);
1700 if (journal_fh == NULL)
1701 {
1702 RRDD_LOG(LOG_CRIT,
1703 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1704 journal_cur, rrd_strerror(errno));
1706 RRDD_LOG(LOG_ERR,
1707 "JOURNALING DISABLED: All values will be flushed at shutdown");
1708 config_flush_at_shutdown = 1;
1709 }
1711 } /* }}} static void journal_rotate */
1713 static void journal_done(void) /* {{{ */
1714 {
1715 if (journal_cur == NULL)
1716 return;
1718 pthread_mutex_lock(&journal_lock);
1719 if (journal_fh != NULL)
1720 {
1721 fclose(journal_fh);
1722 journal_fh = NULL;
1723 }
1725 if (config_flush_at_shutdown)
1726 {
1727 RRDD_LOG(LOG_INFO, "removing journals");
1728 unlink(journal_old);
1729 unlink(journal_cur);
1730 }
1731 else
1732 {
1733 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1734 "journals will be used at next startup");
1735 }
1737 pthread_mutex_unlock(&journal_lock);
1739 } /* }}} static void journal_done */
1741 static int journal_write(char *cmd, char *args) /* {{{ */
1742 {
1743 int chars;
1745 if (journal_fh == NULL)
1746 return 0;
1748 pthread_mutex_lock(&journal_lock);
1749 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1750 pthread_mutex_unlock(&journal_lock);
1752 if (chars > 0)
1753 {
1754 pthread_mutex_lock(&stats_lock);
1755 stats_journal_bytes += chars;
1756 pthread_mutex_unlock(&stats_lock);
1757 }
1759 return chars;
1760 } /* }}} static int journal_write */
1762 static int journal_replay (const char *file) /* {{{ */
1763 {
1764 FILE *fh;
1765 int entry_cnt = 0;
1766 int fail_cnt = 0;
1767 uint64_t line = 0;
1768 char entry[CMD_MAX];
1769 time_t now;
1771 if (file == NULL) return 0;
1773 {
1774 char *reason;
1775 int status = 0;
1776 struct stat statbuf;
1778 memset(&statbuf, 0, sizeof(statbuf));
1779 if (stat(file, &statbuf) != 0)
1780 {
1781 if (errno == ENOENT)
1782 return 0;
1784 reason = "stat error";
1785 status = errno;
1786 }
1787 else if (!S_ISREG(statbuf.st_mode))
1788 {
1789 reason = "not a regular file";
1790 status = EPERM;
1791 }
1792 if (statbuf.st_uid != daemon_uid)
1793 {
1794 reason = "not owned by daemon user";
1795 status = EACCES;
1796 }
1797 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1798 {
1799 reason = "must not be user/group writable";
1800 status = EACCES;
1801 }
1803 if (status != 0)
1804 {
1805 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1806 file, rrd_strerror(status), reason);
1807 return 0;
1808 }
1809 }
1811 fh = fopen(file, "r");
1812 if (fh == NULL)
1813 {
1814 if (errno != ENOENT)
1815 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1816 file, rrd_strerror(errno));
1817 return 0;
1818 }
1819 else
1820 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1822 now = time(NULL);
1824 while(!feof(fh))
1825 {
1826 size_t entry_len;
1828 ++line;
1829 if (fgets(entry, sizeof(entry), fh) == NULL)
1830 break;
1831 entry_len = strlen(entry);
1833 /* check \n termination in case journal writing crashed mid-line */
1834 if (entry_len == 0)
1835 continue;
1836 else if (entry[entry_len - 1] != '\n')
1837 {
1838 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1839 ++fail_cnt;
1840 continue;
1841 }
1843 entry[entry_len - 1] = '\0';
1845 if (handle_request(NULL, now, entry, entry_len) == 0)
1846 ++entry_cnt;
1847 else
1848 ++fail_cnt;
1849 }
1851 fclose(fh);
1853 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1854 entry_cnt, fail_cnt);
1856 return entry_cnt > 0 ? 1 : 0;
1857 } /* }}} static int journal_replay */
1859 static void journal_init(void) /* {{{ */
1860 {
1861 int had_journal = 0;
1863 if (journal_cur == NULL) return;
1865 pthread_mutex_lock(&journal_lock);
1867 RRDD_LOG(LOG_INFO, "checking for journal files");
1869 had_journal += journal_replay(journal_old);
1870 had_journal += journal_replay(journal_cur);
1872 /* it must have been a crash. start a flush */
1873 if (had_journal && config_flush_at_shutdown)
1874 flush_old_values(-1);
1876 pthread_mutex_unlock(&journal_lock);
1877 journal_rotate();
1879 RRDD_LOG(LOG_INFO, "journal processing complete");
1881 } /* }}} static void journal_init */
1883 static void close_connection(listen_socket_t *sock)
1884 {
1885 close(sock->fd) ; sock->fd = -1;
1886 free(sock->rbuf); sock->rbuf = NULL;
1887 free(sock->wbuf); sock->wbuf = NULL;
1889 free(sock);
1890 }
1892 static void *connection_thread_main (void *args) /* {{{ */
1893 {
1894 pthread_t self;
1895 listen_socket_t *sock;
1896 int i;
1897 int fd;
1899 sock = (listen_socket_t *) args;
1900 fd = sock->fd;
1902 /* init read buffers */
1903 sock->next_read = sock->next_cmd = 0;
1904 sock->rbuf = malloc(RBUF_SIZE);
1905 if (sock->rbuf == NULL)
1906 {
1907 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1908 close_connection(sock);
1909 return NULL;
1910 }
1912 pthread_mutex_lock (&connection_threads_lock);
1913 {
1914 pthread_t *temp;
1916 temp = (pthread_t *) realloc (connection_threads,
1917 sizeof (pthread_t) * (connection_threads_num + 1));
1918 if (temp == NULL)
1919 {
1920 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1921 }
1922 else
1923 {
1924 connection_threads = temp;
1925 connection_threads[connection_threads_num] = pthread_self ();
1926 connection_threads_num++;
1927 }
1928 }
1929 pthread_mutex_unlock (&connection_threads_lock);
1931 while (do_shutdown == 0)
1932 {
1933 char *cmd;
1934 ssize_t cmd_len;
1935 ssize_t rbytes;
1936 time_t now;
1938 struct pollfd pollfd;
1939 int status;
1941 pollfd.fd = fd;
1942 pollfd.events = POLLIN | POLLPRI;
1943 pollfd.revents = 0;
1945 status = poll (&pollfd, 1, /* timeout = */ 500);
1946 if (do_shutdown)
1947 break;
1948 else if (status == 0) /* timeout */
1949 continue;
1950 else if (status < 0) /* error */
1951 {
1952 status = errno;
1953 if (status != EINTR)
1954 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1955 continue;
1956 }
1958 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1959 break;
1960 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1961 {
1962 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1963 "poll(2) returned something unexpected: %#04hx",
1964 pollfd.revents);
1965 break;
1966 }
1968 rbytes = read(fd, sock->rbuf + sock->next_read,
1969 RBUF_SIZE - sock->next_read);
1970 if (rbytes < 0)
1971 {
1972 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1973 break;
1974 }
1975 else if (rbytes == 0)
1976 break; /* eof */
1978 sock->next_read += rbytes;
1980 if (sock->batch_start)
1981 now = sock->batch_start;
1982 else
1983 now = time(NULL);
1985 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1986 {
1987 status = handle_request (sock, now, cmd, cmd_len+1);
1988 if (status != 0)
1989 goto out_close;
1990 }
1991 }
1993 out_close:
1994 close_connection(sock);
1996 self = pthread_self ();
1997 /* Remove this thread from the connection threads list */
1998 pthread_mutex_lock (&connection_threads_lock);
1999 /* Find out own index in the array */
2000 for (i = 0; i < connection_threads_num; i++)
2001 if (pthread_equal (connection_threads[i], self) != 0)
2002 break;
2003 assert (i < connection_threads_num);
2005 /* Move the trailing threads forward. */
2006 if (i < (connection_threads_num - 1))
2007 {
2008 memmove (connection_threads + i,
2009 connection_threads + i + 1,
2010 sizeof (pthread_t) * (connection_threads_num - i - 1));
2011 }
2013 connection_threads_num--;
2014 pthread_mutex_unlock (&connection_threads_lock);
2016 return (NULL);
2017 } /* }}} void *connection_thread_main */
2019 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2020 {
2021 int fd;
2022 struct sockaddr_un sa;
2023 listen_socket_t *temp;
2024 int status;
2025 const char *path;
2027 path = sock->addr;
2028 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2029 path += strlen("unix:");
2031 temp = (listen_socket_t *) realloc (listen_fds,
2032 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2033 if (temp == NULL)
2034 {
2035 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2036 return (-1);
2037 }
2038 listen_fds = temp;
2039 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2041 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2042 if (fd < 0)
2043 {
2044 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2045 rrd_strerror(errno));
2046 return (-1);
2047 }
2049 memset (&sa, 0, sizeof (sa));
2050 sa.sun_family = AF_UNIX;
2051 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2053 /* if we've gotten this far, we own the pid file. any daemon started
2054 * with the same args must not be alive. therefore, ensure that we can
2055 * create the socket...
2056 */
2057 unlink(path);
2059 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2060 if (status != 0)
2061 {
2062 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2063 path, rrd_strerror(errno));
2064 close (fd);
2065 return (-1);
2066 }
2068 status = listen (fd, /* backlog = */ 10);
2069 if (status != 0)
2070 {
2071 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2072 path, rrd_strerror(errno));
2073 close (fd);
2074 unlink (path);
2075 return (-1);
2076 }
2078 listen_fds[listen_fds_num].fd = fd;
2079 listen_fds[listen_fds_num].family = PF_UNIX;
2080 strncpy(listen_fds[listen_fds_num].addr, path,
2081 sizeof (listen_fds[listen_fds_num].addr) - 1);
2082 listen_fds_num++;
2084 return (0);
2085 } /* }}} int open_listen_socket_unix */
2087 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2088 {
2089 struct addrinfo ai_hints;
2090 struct addrinfo *ai_res;
2091 struct addrinfo *ai_ptr;
2092 char addr_copy[NI_MAXHOST];
2093 char *addr;
2094 char *port;
2095 int status;
2097 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2098 addr_copy[sizeof (addr_copy) - 1] = 0;
2099 addr = addr_copy;
2101 memset (&ai_hints, 0, sizeof (ai_hints));
2102 ai_hints.ai_flags = 0;
2103 #ifdef AI_ADDRCONFIG
2104 ai_hints.ai_flags |= AI_ADDRCONFIG;
2105 #endif
2106 ai_hints.ai_family = AF_UNSPEC;
2107 ai_hints.ai_socktype = SOCK_STREAM;
2109 port = NULL;
2110 if (*addr == '[') /* IPv6+port format */
2111 {
2112 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2113 addr++;
2115 port = strchr (addr, ']');
2116 if (port == NULL)
2117 {
2118 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2119 return (-1);
2120 }
2121 *port = 0;
2122 port++;
2124 if (*port == ':')
2125 port++;
2126 else if (*port == 0)
2127 port = NULL;
2128 else
2129 {
2130 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2131 return (-1);
2132 }
2133 } /* if (*addr = ']') */
2134 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2135 {
2136 port = rindex(addr, ':');
2137 if (port != NULL)
2138 {
2139 *port = 0;
2140 port++;
2141 }
2142 }
2143 ai_res = NULL;
2144 status = getaddrinfo (addr,
2145 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2146 &ai_hints, &ai_res);
2147 if (status != 0)
2148 {
2149 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2150 addr, gai_strerror (status));
2151 return (-1);
2152 }
2154 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2155 {
2156 int fd;
2157 listen_socket_t *temp;
2158 int one = 1;
2160 temp = (listen_socket_t *) realloc (listen_fds,
2161 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2162 if (temp == NULL)
2163 {
2164 fprintf (stderr,
2165 "rrdcached: open_listen_socket_network: realloc failed.\n");
2166 continue;
2167 }
2168 listen_fds = temp;
2169 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2171 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2172 if (fd < 0)
2173 {
2174 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2175 rrd_strerror(errno));
2176 continue;
2177 }
2179 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2181 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2182 if (status != 0)
2183 {
2184 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2185 sock->addr, rrd_strerror(errno));
2186 close (fd);
2187 continue;
2188 }
2190 status = listen (fd, /* backlog = */ 10);
2191 if (status != 0)
2192 {
2193 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2194 sock->addr, rrd_strerror(errno));
2195 close (fd);
2196 return (-1);
2197 }
2199 listen_fds[listen_fds_num].fd = fd;
2200 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2201 listen_fds_num++;
2202 } /* for (ai_ptr) */
2204 return (0);
2205 } /* }}} static int open_listen_socket_network */
2207 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2208 {
2209 assert(sock != NULL);
2210 assert(sock->addr != NULL);
2212 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2213 || sock->addr[0] == '/')
2214 return (open_listen_socket_unix(sock));
2215 else
2216 return (open_listen_socket_network(sock));
2217 } /* }}} int open_listen_socket */
2219 static int close_listen_sockets (void) /* {{{ */
2220 {
2221 size_t i;
2223 for (i = 0; i < listen_fds_num; i++)
2224 {
2225 close (listen_fds[i].fd);
2227 if (listen_fds[i].family == PF_UNIX)
2228 unlink(listen_fds[i].addr);
2229 }
2231 free (listen_fds);
2232 listen_fds = NULL;
2233 listen_fds_num = 0;
2235 return (0);
2236 } /* }}} int close_listen_sockets */
2238 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2239 {
2240 struct pollfd *pollfds;
2241 int pollfds_num;
2242 int status;
2243 int i;
2245 if (listen_fds_num < 1)
2246 {
2247 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2248 return (NULL);
2249 }
2251 pollfds_num = listen_fds_num;
2252 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2253 if (pollfds == NULL)
2254 {
2255 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2256 return (NULL);
2257 }
2258 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2260 RRDD_LOG(LOG_INFO, "listening for connections");
2262 while (do_shutdown == 0)
2263 {
2264 assert (pollfds_num == ((int) listen_fds_num));
2265 for (i = 0; i < pollfds_num; i++)
2266 {
2267 pollfds[i].fd = listen_fds[i].fd;
2268 pollfds[i].events = POLLIN | POLLPRI;
2269 pollfds[i].revents = 0;
2270 }
2272 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2273 if (do_shutdown)
2274 break;
2275 else if (status == 0) /* timeout */
2276 continue;
2277 else if (status < 0) /* error */
2278 {
2279 status = errno;
2280 if (status != EINTR)
2281 {
2282 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2283 }
2284 continue;
2285 }
2287 for (i = 0; i < pollfds_num; i++)
2288 {
2289 listen_socket_t *client_sock;
2290 struct sockaddr_storage client_sa;
2291 socklen_t client_sa_size;
2292 pthread_t tid;
2293 pthread_attr_t attr;
2295 if (pollfds[i].revents == 0)
2296 continue;
2298 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2299 {
2300 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2301 "poll(2) returned something unexpected for listen FD #%i.",
2302 pollfds[i].fd);
2303 continue;
2304 }
2306 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2307 if (client_sock == NULL)
2308 {
2309 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2310 continue;
2311 }
2312 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2314 client_sa_size = sizeof (client_sa);
2315 client_sock->fd = accept (pollfds[i].fd,
2316 (struct sockaddr *) &client_sa, &client_sa_size);
2317 if (client_sock->fd < 0)
2318 {
2319 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2320 free(client_sock);
2321 continue;
2322 }
2324 pthread_attr_init (&attr);
2325 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2327 status = pthread_create (&tid, &attr, connection_thread_main,
2328 client_sock);
2329 if (status != 0)
2330 {
2331 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2332 close_connection(client_sock);
2333 continue;
2334 }
2335 } /* for (pollfds_num) */
2336 } /* while (do_shutdown == 0) */
2338 RRDD_LOG(LOG_INFO, "starting shutdown");
2340 close_listen_sockets ();
2342 pthread_mutex_lock (&connection_threads_lock);
2343 while (connection_threads_num > 0)
2344 {
2345 pthread_t wait_for;
2347 wait_for = connection_threads[0];
2349 pthread_mutex_unlock (&connection_threads_lock);
2350 pthread_join (wait_for, /* retval = */ NULL);
2351 pthread_mutex_lock (&connection_threads_lock);
2352 }
2353 pthread_mutex_unlock (&connection_threads_lock);
2355 return (NULL);
2356 } /* }}} void *listen_thread_main */
2358 static int daemonize (void) /* {{{ */
2359 {
2360 int pid_fd;
2361 char *base_dir;
2363 daemon_uid = geteuid();
2365 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2366 if (pid_fd < 0)
2367 pid_fd = check_pidfile();
2368 if (pid_fd < 0)
2369 return pid_fd;
2371 /* open all the listen sockets */
2372 if (config_listen_address_list_len > 0)
2373 {
2374 for (int i = 0; i < config_listen_address_list_len; i++)
2375 open_listen_socket (config_listen_address_list[i]);
2376 }
2377 else
2378 {
2379 listen_socket_t sock;
2380 memset(&sock, 0, sizeof(sock));
2381 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2382 open_listen_socket (&sock);
2383 }
2385 if (listen_fds_num < 1)
2386 {
2387 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2388 goto error;
2389 }
2391 if (!stay_foreground)
2392 {
2393 pid_t child;
2395 child = fork ();
2396 if (child < 0)
2397 {
2398 fprintf (stderr, "daemonize: fork(2) failed.\n");
2399 goto error;
2400 }
2401 else if (child > 0)
2402 exit(0);
2404 /* Become session leader */
2405 setsid ();
2407 /* Open the first three file descriptors to /dev/null */
2408 close (2);
2409 close (1);
2410 close (0);
2412 open ("/dev/null", O_RDWR);
2413 dup (0);
2414 dup (0);
2415 } /* if (!stay_foreground) */
2417 /* Change into the /tmp directory. */
2418 base_dir = (config_base_dir != NULL)
2419 ? config_base_dir
2420 : "/tmp";
2422 if (chdir (base_dir) != 0)
2423 {
2424 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2425 goto error;
2426 }
2428 install_signal_handlers();
2430 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2431 RRDD_LOG(LOG_INFO, "starting up");
2433 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2434 if (cache_tree == NULL)
2435 {
2436 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2437 goto error;
2438 }
2440 return write_pidfile (pid_fd);
2442 error:
2443 remove_pidfile();
2444 return -1;
2445 } /* }}} int daemonize */
2447 static int cleanup (void) /* {{{ */
2448 {
2449 do_shutdown++;
2451 pthread_cond_signal (&cache_cond);
2452 pthread_join (queue_thread, /* return = */ NULL);
2454 remove_pidfile ();
2456 RRDD_LOG(LOG_INFO, "goodbye");
2457 closelog ();
2459 return (0);
2460 } /* }}} int cleanup */
2462 static int read_options (int argc, char **argv) /* {{{ */
2463 {
2464 int option;
2465 int status = 0;
2467 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2468 {
2469 switch (option)
2470 {
2471 case 'g':
2472 stay_foreground=1;
2473 break;
2475 case 'L':
2476 case 'l':
2477 {
2478 listen_socket_t **temp;
2479 listen_socket_t *new;
2481 new = malloc(sizeof(listen_socket_t));
2482 if (new == NULL)
2483 {
2484 fprintf(stderr, "read_options: malloc failed.\n");
2485 return(2);
2486 }
2487 memset(new, 0, sizeof(listen_socket_t));
2489 temp = (listen_socket_t **) realloc (config_listen_address_list,
2490 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2491 if (temp == NULL)
2492 {
2493 fprintf (stderr, "read_options: realloc failed.\n");
2494 return (2);
2495 }
2496 config_listen_address_list = temp;
2498 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2499 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2501 temp[config_listen_address_list_len] = new;
2502 config_listen_address_list_len++;
2503 }
2504 break;
2506 case 'f':
2507 {
2508 int temp;
2510 temp = atoi (optarg);
2511 if (temp > 0)
2512 config_flush_interval = temp;
2513 else
2514 {
2515 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2516 status = 3;
2517 }
2518 }
2519 break;
2521 case 'w':
2522 {
2523 int temp;
2525 temp = atoi (optarg);
2526 if (temp > 0)
2527 config_write_interval = temp;
2528 else
2529 {
2530 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2531 status = 2;
2532 }
2533 }
2534 break;
2536 case 'z':
2537 {
2538 int temp;
2540 temp = atoi(optarg);
2541 if (temp > 0)
2542 config_write_jitter = temp;
2543 else
2544 {
2545 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2546 status = 2;
2547 }
2549 break;
2550 }
2552 case 'B':
2553 config_write_base_only = 1;
2554 break;
2556 case 'b':
2557 {
2558 size_t len;
2559 char base_realpath[PATH_MAX];
2561 if (config_base_dir != NULL)
2562 free (config_base_dir);
2563 config_base_dir = strdup (optarg);
2564 if (config_base_dir == NULL)
2565 {
2566 fprintf (stderr, "read_options: strdup failed.\n");
2567 return (3);
2568 }
2570 /* make sure that the base directory is not resolved via
2571 * symbolic links. this makes some performance-enhancing
2572 * assumptions possible (we don't have to resolve paths
2573 * that start with a "/")
2574 */
2575 if (realpath(config_base_dir, base_realpath) == NULL)
2576 {
2577 fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2578 return 5;
2579 }
2580 else if (strncmp(config_base_dir,
2581 base_realpath, sizeof(base_realpath)) != 0)
2582 {
2583 fprintf(stderr,
2584 "Base directory (-b) resolved via file system links!\n"
2585 "Please consult rrdcached '-b' documentation!\n"
2586 "Consider specifying the real directory (%s)\n",
2587 base_realpath);
2588 return 5;
2589 }
2591 len = strlen (config_base_dir);
2592 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2593 {
2594 config_base_dir[len - 1] = 0;
2595 len--;
2596 }
2598 if (len < 1)
2599 {
2600 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2601 return (4);
2602 }
2604 _config_base_dir_len = len;
2605 }
2606 break;
2608 case 'p':
2609 {
2610 if (config_pid_file != NULL)
2611 free (config_pid_file);
2612 config_pid_file = strdup (optarg);
2613 if (config_pid_file == NULL)
2614 {
2615 fprintf (stderr, "read_options: strdup failed.\n");
2616 return (3);
2617 }
2618 }
2619 break;
2621 case 'F':
2622 config_flush_at_shutdown = 1;
2623 break;
2625 case 'j':
2626 {
2627 struct stat statbuf;
2628 const char *dir = optarg;
2630 status = stat(dir, &statbuf);
2631 if (status != 0)
2632 {
2633 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2634 return 6;
2635 }
2637 if (!S_ISDIR(statbuf.st_mode)
2638 || access(dir, R_OK|W_OK|X_OK) != 0)
2639 {
2640 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2641 errno ? rrd_strerror(errno) : "");
2642 return 6;
2643 }
2645 journal_cur = malloc(PATH_MAX + 1);
2646 journal_old = malloc(PATH_MAX + 1);
2647 if (journal_cur == NULL || journal_old == NULL)
2648 {
2649 fprintf(stderr, "malloc failure for journal files\n");
2650 return 6;
2651 }
2652 else
2653 {
2654 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2655 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2656 }
2657 }
2658 break;
2660 case 'h':
2661 case '?':
2662 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2663 "\n"
2664 "Usage: rrdcached [options]\n"
2665 "\n"
2666 "Valid options are:\n"
2667 " -l <address> Socket address to listen to.\n"
2668 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2669 " -w <seconds> Interval in which to write data.\n"
2670 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2671 " -f <seconds> Interval in which to flush dead data.\n"
2672 " -p <file> Location of the PID-file.\n"
2673 " -b <dir> Base directory to change to.\n"
2674 " -B Restrict file access to paths within -b <dir>\n"
2675 " -g Do not fork and run in the foreground.\n"
2676 " -j <dir> Directory in which to create the journal files.\n"
2677 " -F Always flush all updates at shutdown\n"
2678 "\n"
2679 "For more information and a detailed description of all options "
2680 "please refer\n"
2681 "to the rrdcached(1) manual page.\n",
2682 VERSION);
2683 status = -1;
2684 break;
2685 } /* switch (option) */
2686 } /* while (getopt) */
2688 /* advise the user when values are not sane */
2689 if (config_flush_interval < 2 * config_write_interval)
2690 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2691 " 2x write interval (-w) !\n");
2692 if (config_write_jitter > config_write_interval)
2693 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2694 " write interval (-w) !\n");
2696 if (config_write_base_only && config_base_dir == NULL)
2697 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2698 " Consult the rrdcached documentation\n");
2700 if (journal_cur == NULL)
2701 config_flush_at_shutdown = 1;
2703 return (status);
2704 } /* }}} int read_options */
2706 int main (int argc, char **argv)
2707 {
2708 int status;
2710 status = read_options (argc, argv);
2711 if (status != 0)
2712 {
2713 if (status < 0)
2714 status = 0;
2715 return (status);
2716 }
2718 status = daemonize ();
2719 if (status != 0)
2720 {
2721 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2722 return (1);
2723 }
2725 journal_init();
2727 /* start the queue thread */
2728 memset (&queue_thread, 0, sizeof (queue_thread));
2729 status = pthread_create (&queue_thread,
2730 NULL, /* attr */
2731 queue_thread_main,
2732 NULL); /* args */
2733 if (status != 0)
2734 {
2735 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2736 cleanup();
2737 return (1);
2738 }
2740 listen_thread_main (NULL);
2741 cleanup ();
2743 return (0);
2744 } /* int main */
2746 /*
2747 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2748 */