Code

add hosts_access support to rrdcached -- Shaun Reitan mailinglists@unix-scripts.com
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120   do { \
121     if (stay_foreground) { \
122       fprintf(stderr, __VA_ARGS__); \
123       fprintf(stderr, "\n"); } \
124     syslog ((severity), __VA_ARGS__); \
125   } while (0)
127 /*
128  * Types
129  */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
134   int fd;
135   char addr[PATH_MAX + 1];
136   int family;
138   /* state for BATCH processing */
139   time_t batch_start;
140   int batch_cmd;
142   /* buffered IO */
143   char *rbuf;
144   off_t next_cmd;
145   off_t next_read;
147   char *wbuf;
148   ssize_t wbuf_len;
150   uint32_t permissions;
152   gid_t  socket_group;
153   mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
161                         time_t UNUSED(now),\
162                         char  UNUSED(*buffer),\
163                         size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
166                         DISPATCH_PROTO
168 struct command_s {
169   char   *cmd;
170   int (*handler)(HANDLER_PROTO);
172   char  context;                /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT      (1<<0)
174 #define CMD_CONTEXT_BATCH       (1<<1)
175 #define CMD_CONTEXT_JOURNAL     (1<<2)
176 #define CMD_CONTEXT_ANY         (0x7f)
178   char *syntax;
179   char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
186   char *file;
187   char **values;
188   size_t values_num;            /* number of valid pointers */
189   size_t values_alloc;          /* number of allocated pointers */
190   time_t last_flush_time;
191   time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE  (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194   int flags;
195   pthread_cond_t  flushed;
196   cache_item_t *prev;
197   cache_item_t *next;
198 };
200 struct callback_flush_data_s
202   time_t now;
203   time_t abs_timeout;
204   char **keys;
205   size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
209 enum queue_side_e
211   HEAD,
212   TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
216 /* describe a set of journal files */
217 typedef struct {
218   char **files;
219   size_t files_num;
220 } journal_set;
222 /* max length of socket command or response */
223 #define CMD_MAX 4096
224 #define RBUF_SIZE (CMD_MAX*2)
226 /*
227  * Variables
228  */
229 static int stay_foreground = 0;
230 static uid_t daemon_uid;
232 static listen_socket_t *listen_fds = NULL;
233 static size_t listen_fds_num = 0;
235 static listen_socket_t default_socket;
237 enum {
238   RUNNING,              /* normal operation */
239   FLUSHING,             /* flushing remaining values */
240   SHUTDOWN              /* shutting down */
241 } state = RUNNING;
243 static pthread_t *queue_threads;
244 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
245 static int config_queue_threads = 4;
247 static pthread_t flush_thread;
248 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
250 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
251 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
252 static int connection_threads_num = 0;
254 /* Cache stuff */
255 static GTree          *cache_tree = NULL;
256 static cache_item_t   *cache_queue_head = NULL;
257 static cache_item_t   *cache_queue_tail = NULL;
258 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
260 static int config_write_interval = 300;
261 static int config_write_jitter   = 0;
262 static int config_flush_interval = 3600;
263 static int config_flush_at_shutdown = 0;
264 static char *config_pid_file = NULL;
265 static char *config_base_dir = NULL;
266 static size_t _config_base_dir_len = 0;
267 static int config_write_base_only = 0;
268 static size_t config_alloc_chunk = 1;
270 static listen_socket_t **config_listen_address_list = NULL;
271 static size_t config_listen_address_list_len = 0;
273 static uint64_t stats_queue_length = 0;
274 static uint64_t stats_updates_received = 0;
275 static uint64_t stats_flush_received = 0;
276 static uint64_t stats_updates_written = 0;
277 static uint64_t stats_data_sets_written = 0;
278 static uint64_t stats_journal_bytes = 0;
279 static uint64_t stats_journal_rotate = 0;
280 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
282 static int opt_no_overwrite = 0; /* default for the daemon */
284 /* Journaled updates */
285 #define JOURNAL_REPLAY(s) ((s) == NULL)
286 #define JOURNAL_BASE "rrd.journal"
287 static journal_set *journal_cur = NULL;
288 static journal_set *journal_old = NULL;
289 static char *journal_dir = NULL;
290 static FILE *journal_fh = NULL;         /* current journal file handle */
291 static long  journal_size = 0;          /* current journal size */
292 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
293 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
294 static int journal_write(char *cmd, char *args);
295 static void journal_done(void);
296 static void journal_rotate(void);
298 /* prototypes for forward refernces */
299 static int handle_request_help (HANDLER_PROTO);
301 /* 
302  * Functions
303  */
304 static void sig_common (const char *sig) /* {{{ */
306   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
307   state = FLUSHING;
308   pthread_cond_broadcast(&flush_cond);
309   pthread_cond_broadcast(&queue_cond);
310 } /* }}} void sig_common */
312 static void sig_int_handler (int UNUSED(s)) /* {{{ */
314   sig_common("INT");
315 } /* }}} void sig_int_handler */
317 static void sig_term_handler (int UNUSED(s)) /* {{{ */
319   sig_common("TERM");
320 } /* }}} void sig_term_handler */
322 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
324   config_flush_at_shutdown = 1;
325   sig_common("USR1");
326 } /* }}} void sig_usr1_handler */
328 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
330   config_flush_at_shutdown = 0;
331   sig_common("USR2");
332 } /* }}} void sig_usr2_handler */
334 static void install_signal_handlers(void) /* {{{ */
336   /* These structures are static, because `sigaction' behaves weird if the are
337    * overwritten.. */
338   static struct sigaction sa_int;
339   static struct sigaction sa_term;
340   static struct sigaction sa_pipe;
341   static struct sigaction sa_usr1;
342   static struct sigaction sa_usr2;
344   /* Install signal handlers */
345   memset (&sa_int, 0, sizeof (sa_int));
346   sa_int.sa_handler = sig_int_handler;
347   sigaction (SIGINT, &sa_int, NULL);
349   memset (&sa_term, 0, sizeof (sa_term));
350   sa_term.sa_handler = sig_term_handler;
351   sigaction (SIGTERM, &sa_term, NULL);
353   memset (&sa_pipe, 0, sizeof (sa_pipe));
354   sa_pipe.sa_handler = SIG_IGN;
355   sigaction (SIGPIPE, &sa_pipe, NULL);
357   memset (&sa_pipe, 0, sizeof (sa_usr1));
358   sa_usr1.sa_handler = sig_usr1_handler;
359   sigaction (SIGUSR1, &sa_usr1, NULL);
361   memset (&sa_usr2, 0, sizeof (sa_usr2));
362   sa_usr2.sa_handler = sig_usr2_handler;
363   sigaction (SIGUSR2, &sa_usr2, NULL);
365 } /* }}} void install_signal_handlers */
367 static int open_pidfile(char *action, int oflag) /* {{{ */
369   int fd;
370   const char *file;
371   char *file_copy, *dir;
373   file = (config_pid_file != NULL)
374     ? config_pid_file
375     : LOCALSTATEDIR "/run/rrdcached.pid";
377   /* dirname may modify its argument */
378   file_copy = strdup(file);
379   if (file_copy == NULL)
380   {
381     fprintf(stderr, "rrdcached: strdup(): %s\n",
382         rrd_strerror(errno));
383     return -1;
384   }
386   dir = dirname(file_copy);
387   if (rrd_mkdir_p(dir, 0777) != 0)
388   {
389     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
390         dir, rrd_strerror(errno));
391     return -1;
392   }
394   free(file_copy);
396   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
397   if (fd < 0)
398     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
399             action, file, rrd_strerror(errno));
401   return(fd);
402 } /* }}} static int open_pidfile */
404 /* check existing pid file to see whether a daemon is running */
405 static int check_pidfile(void)
407   int pid_fd;
408   pid_t pid;
409   char pid_str[16];
411   pid_fd = open_pidfile("open", O_RDWR);
412   if (pid_fd < 0)
413     return pid_fd;
415   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
416     return -1;
418   pid = atoi(pid_str);
419   if (pid <= 0)
420     return -1;
422   /* another running process that we can signal COULD be
423    * a competing rrdcached */
424   if (pid != getpid() && kill(pid, 0) == 0)
425   {
426     fprintf(stderr,
427             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
428     close(pid_fd);
429     return -1;
430   }
432   lseek(pid_fd, 0, SEEK_SET);
433   if (ftruncate(pid_fd, 0) == -1)
434   {
435     fprintf(stderr,
436             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
437     close(pid_fd);
438     return -1;
439   }
441   fprintf(stderr,
442           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
443           "rrdcached: starting normally.\n", pid);
445   return pid_fd;
446 } /* }}} static int check_pidfile */
448 static int write_pidfile (int fd) /* {{{ */
450   pid_t pid;
451   FILE *fh;
453   pid = getpid ();
455   fh = fdopen (fd, "w");
456   if (fh == NULL)
457   {
458     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
459     close(fd);
460     return (-1);
461   }
463   fprintf (fh, "%i\n", (int) pid);
464   fclose (fh);
466   return (0);
467 } /* }}} int write_pidfile */
469 static int remove_pidfile (void) /* {{{ */
471   char *file;
472   int status;
474   file = (config_pid_file != NULL)
475     ? config_pid_file
476     : LOCALSTATEDIR "/run/rrdcached.pid";
478   status = unlink (file);
479   if (status == 0)
480     return (0);
481   return (errno);
482 } /* }}} int remove_pidfile */
484 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
486   char *eol;
488   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
489                sock->next_read - sock->next_cmd);
491   if (eol == NULL)
492   {
493     /* no commands left, move remainder back to front of rbuf */
494     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
495             sock->next_read - sock->next_cmd);
496     sock->next_read -= sock->next_cmd;
497     sock->next_cmd = 0;
498     *len = 0;
499     return NULL;
500   }
501   else
502   {
503     char *cmd = sock->rbuf + sock->next_cmd;
504     *eol = '\0';
506     sock->next_cmd = eol - sock->rbuf + 1;
508     if (eol > sock->rbuf && *(eol-1) == '\r')
509       *(--eol) = '\0'; /* handle "\r\n" EOL */
511     *len = eol - cmd;
513     return cmd;
514   }
516   /* NOTREACHED */
517   assert(1==0);
518 } /* }}} char *next_cmd */
520 /* add the characters directly to the write buffer */
521 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
523   char *new_buf;
525   assert(sock != NULL);
527   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
528   if (new_buf == NULL)
529   {
530     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
531     return -1;
532   }
534   strncpy(new_buf + sock->wbuf_len, str, len + 1);
536   sock->wbuf = new_buf;
537   sock->wbuf_len += len;
539   return 0;
540 } /* }}} static int add_to_wbuf */
542 /* add the text to the "extra" info that's sent after the status line */
543 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
545   va_list argp;
546   char buffer[CMD_MAX];
547   int len;
549   if (JOURNAL_REPLAY(sock)) return 0;
550   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
552   va_start(argp, fmt);
553 #ifdef HAVE_VSNPRINTF
554   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
555 #else
556   len = vsprintf(buffer, fmt, argp);
557 #endif
558   va_end(argp);
559   if (len < 0)
560   {
561     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
562     return -1;
563   }
565   return add_to_wbuf(sock, buffer, len);
566 } /* }}} static int add_response_info */
568 static int count_lines(char *str) /* {{{ */
570   int lines = 0;
572   if (str != NULL)
573   {
574     while ((str = strchr(str, '\n')) != NULL)
575     {
576       ++lines;
577       ++str;
578     }
579   }
581   return lines;
582 } /* }}} static int count_lines */
584 /* send the response back to the user.
585  * returns 0 on success, -1 on error
586  * write buffer is always zeroed after this call */
587 static int send_response (listen_socket_t *sock, response_code rc,
588                           char *fmt, ...) /* {{{ */
590   va_list argp;
591   char buffer[CMD_MAX];
592   int lines;
593   ssize_t wrote;
594   int rclen, len;
596   if (JOURNAL_REPLAY(sock)) return rc;
598   if (sock->batch_start)
599   {
600     if (rc == RESP_OK)
601       return rc; /* no response on success during BATCH */
602     lines = sock->batch_cmd;
603   }
604   else if (rc == RESP_OK)
605     lines = count_lines(sock->wbuf);
606   else
607     lines = -1;
609   rclen = sprintf(buffer, "%d ", lines);
610   va_start(argp, fmt);
611 #ifdef HAVE_VSNPRINTF
612   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
613 #else
614   len = vsprintf(buffer+rclen, fmt, argp);
615 #endif
616   va_end(argp);
617   if (len < 0)
618     return -1;
620   len += rclen;
622   /* append the result to the wbuf, don't write to the user */
623   if (sock->batch_start)
624     return add_to_wbuf(sock, buffer, len);
626   /* first write must be complete */
627   if (len != write(sock->fd, buffer, len))
628   {
629     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
630     return -1;
631   }
633   if (sock->wbuf != NULL && rc == RESP_OK)
634   {
635     wrote = 0;
636     while (wrote < sock->wbuf_len)
637     {
638       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
639       if (wb <= 0)
640       {
641         RRDD_LOG(LOG_INFO, "send_response: could not write results");
642         return -1;
643       }
644       wrote += wb;
645     }
646   }
648   free(sock->wbuf); sock->wbuf = NULL;
649   sock->wbuf_len = 0;
651   return 0;
652 } /* }}} */
654 static void wipe_ci_values(cache_item_t *ci, time_t when)
656   ci->values = NULL;
657   ci->values_num = 0;
658   ci->values_alloc = 0;
660   ci->last_flush_time = when;
661   if (config_write_jitter > 0)
662     ci->last_flush_time += (rrd_random() % config_write_jitter);
665 /* remove_from_queue
666  * remove a "cache_item_t" item from the queue.
667  * must hold 'cache_lock' when calling this
668  */
669 static void remove_from_queue(cache_item_t *ci) /* {{{ */
671   if (ci == NULL) return;
672   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
674   if (ci->prev == NULL)
675     cache_queue_head = ci->next; /* reset head */
676   else
677     ci->prev->next = ci->next;
679   if (ci->next == NULL)
680     cache_queue_tail = ci->prev; /* reset the tail */
681   else
682     ci->next->prev = ci->prev;
684   ci->next = ci->prev = NULL;
685   ci->flags &= ~CI_FLAGS_IN_QUEUE;
687   pthread_mutex_lock (&stats_lock);
688   assert (stats_queue_length > 0);
689   stats_queue_length--;
690   pthread_mutex_unlock (&stats_lock);
692 } /* }}} static void remove_from_queue */
694 /* free the resources associated with the cache_item_t
695  * must hold cache_lock when calling this function
696  */
697 static void *free_cache_item(cache_item_t *ci) /* {{{ */
699   if (ci == NULL) return NULL;
701   remove_from_queue(ci);
703   for (size_t i=0; i < ci->values_num; i++)
704     free(ci->values[i]);
706   free (ci->values);
707   free (ci->file);
709   /* in case anyone is waiting */
710   pthread_cond_broadcast(&ci->flushed);
711   pthread_cond_destroy(&ci->flushed);
713   free (ci);
715   return NULL;
716 } /* }}} static void *free_cache_item */
718 /*
719  * enqueue_cache_item:
720  * `cache_lock' must be acquired before calling this function!
721  */
722 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
723     queue_side_t side)
725   if (ci == NULL)
726     return (-1);
728   if (ci->values_num == 0)
729     return (0);
731   if (side == HEAD)
732   {
733     if (cache_queue_head == ci)
734       return 0;
736     /* remove if further down in queue */
737     remove_from_queue(ci);
739     ci->prev = NULL;
740     ci->next = cache_queue_head;
741     if (ci->next != NULL)
742       ci->next->prev = ci;
743     cache_queue_head = ci;
745     if (cache_queue_tail == NULL)
746       cache_queue_tail = cache_queue_head;
747   }
748   else /* (side == TAIL) */
749   {
750     /* We don't move values back in the list.. */
751     if (ci->flags & CI_FLAGS_IN_QUEUE)
752       return (0);
754     assert (ci->next == NULL);
755     assert (ci->prev == NULL);
757     ci->prev = cache_queue_tail;
759     if (cache_queue_tail == NULL)
760       cache_queue_head = ci;
761     else
762       cache_queue_tail->next = ci;
764     cache_queue_tail = ci;
765   }
767   ci->flags |= CI_FLAGS_IN_QUEUE;
769   pthread_cond_signal(&queue_cond);
770   pthread_mutex_lock (&stats_lock);
771   stats_queue_length++;
772   pthread_mutex_unlock (&stats_lock);
774   return (0);
775 } /* }}} int enqueue_cache_item */
777 /*
778  * tree_callback_flush:
779  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
780  * while this is in progress.
781  */
782 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
783     gpointer data)
785   cache_item_t *ci;
786   callback_flush_data_t *cfd;
788   ci = (cache_item_t *) value;
789   cfd = (callback_flush_data_t *) data;
791   if (ci->flags & CI_FLAGS_IN_QUEUE)
792     return FALSE;
794   if (ci->values_num > 0
795       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
796   {
797     enqueue_cache_item (ci, TAIL);
798   }
799   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
800       && (ci->values_num <= 0))
801   {
802     assert ((char *) key == ci->file);
803     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
804     {
805       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
806       return (FALSE);
807     }
808   }
810   return (FALSE);
811 } /* }}} gboolean tree_callback_flush */
813 static int flush_old_values (int max_age)
815   callback_flush_data_t cfd;
816   size_t k;
818   memset (&cfd, 0, sizeof (cfd));
819   /* Pass the current time as user data so that we don't need to call
820    * `time' for each node. */
821   cfd.now = time (NULL);
822   cfd.keys = NULL;
823   cfd.keys_num = 0;
825   if (max_age > 0)
826     cfd.abs_timeout = cfd.now - max_age;
827   else
828     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
830   /* `tree_callback_flush' will return the keys of all values that haven't
831    * been touched in the last `config_flush_interval' seconds in `cfd'.
832    * The char*'s in this array point to the same memory as ci->file, so we
833    * don't need to free them separately. */
834   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
836   for (k = 0; k < cfd.keys_num; k++)
837   {
838     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
839     /* should never fail, since we have held the cache_lock
840      * the entire time */
841     assert(status == TRUE);
842   }
844   if (cfd.keys != NULL)
845   {
846     free (cfd.keys);
847     cfd.keys = NULL;
848   }
850   return (0);
851 } /* int flush_old_values */
853 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
855   struct timeval now;
856   struct timespec next_flush;
857   int status;
859   gettimeofday (&now, NULL);
860   next_flush.tv_sec = now.tv_sec + config_flush_interval;
861   next_flush.tv_nsec = 1000 * now.tv_usec;
863   pthread_mutex_lock(&cache_lock);
865   while (state == RUNNING)
866   {
867     gettimeofday (&now, NULL);
868     if ((now.tv_sec > next_flush.tv_sec)
869         || ((now.tv_sec == next_flush.tv_sec)
870           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
871     {
872       RRDD_LOG(LOG_DEBUG, "flushing old values");
874       /* Determine the time of the next cache flush. */
875       next_flush.tv_sec = now.tv_sec + config_flush_interval;
877       /* Flush all values that haven't been written in the last
878        * `config_write_interval' seconds. */
879       flush_old_values (config_write_interval);
881       /* unlock the cache while we rotate so we don't block incoming
882        * updates if the fsync() blocks on disk I/O */
883       pthread_mutex_unlock(&cache_lock);
884       journal_rotate();
885       pthread_mutex_lock(&cache_lock);
886     }
888     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
889     if (status != 0 && status != ETIMEDOUT)
890     {
891       RRDD_LOG (LOG_ERR, "flush_thread_main: "
892                 "pthread_cond_timedwait returned %i.", status);
893     }
894   }
896   if (config_flush_at_shutdown)
897     flush_old_values (-1); /* flush everything */
899   state = SHUTDOWN;
901   pthread_mutex_unlock(&cache_lock);
903   return NULL;
904 } /* void *flush_thread_main */
906 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
908   pthread_mutex_lock (&cache_lock);
910   while (state != SHUTDOWN
911          || (cache_queue_head != NULL && config_flush_at_shutdown))
912   {
913     cache_item_t *ci;
914     char *file;
915     char **values;
916     size_t values_num;
917     int status;
919     /* Now, check if there's something to store away. If not, wait until
920      * something comes in. */
921     if (cache_queue_head == NULL)
922     {
923       status = pthread_cond_wait (&queue_cond, &cache_lock);
924       if ((status != 0) && (status != ETIMEDOUT))
925       {
926         RRDD_LOG (LOG_ERR, "queue_thread_main: "
927             "pthread_cond_wait returned %i.", status);
928       }
929     }
931     /* Check if a value has arrived. This may be NULL if we timed out or there
932      * was an interrupt such as a signal. */
933     if (cache_queue_head == NULL)
934       continue;
936     ci = cache_queue_head;
938     /* copy the relevant parts */
939     file = strdup (ci->file);
940     if (file == NULL)
941     {
942       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
943       continue;
944     }
946     assert(ci->values != NULL);
947     assert(ci->values_num > 0);
949     values = ci->values;
950     values_num = ci->values_num;
952     wipe_ci_values(ci, time(NULL));
953     remove_from_queue(ci);
955     pthread_mutex_unlock (&cache_lock);
957     rrd_clear_error ();
958     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
959     if (status != 0)
960     {
961       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
962           "rrd_update_r (%s) failed with status %i. (%s)",
963           file, status, rrd_get_error());
964     }
966     journal_write("wrote", file);
968     /* Search again in the tree.  It's possible someone issued a "FORGET"
969      * while we were writing the update values. */
970     pthread_mutex_lock(&cache_lock);
971     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
972     if (ci)
973       pthread_cond_broadcast(&ci->flushed);
974     pthread_mutex_unlock(&cache_lock);
976     if (status == 0)
977     {
978       pthread_mutex_lock (&stats_lock);
979       stats_updates_written++;
980       stats_data_sets_written += values_num;
981       pthread_mutex_unlock (&stats_lock);
982     }
984     rrd_free_ptrs((void ***) &values, &values_num);
985     free(file);
987     pthread_mutex_lock (&cache_lock);
988   }
989   pthread_mutex_unlock (&cache_lock);
991   return (NULL);
992 } /* }}} void *queue_thread_main */
994 static int buffer_get_field (char **buffer_ret, /* {{{ */
995     size_t *buffer_size_ret, char **field_ret)
997   char *buffer;
998   size_t buffer_pos;
999   size_t buffer_size;
1000   char *field;
1001   size_t field_size;
1002   int status;
1004   buffer = *buffer_ret;
1005   buffer_pos = 0;
1006   buffer_size = *buffer_size_ret;
1007   field = *buffer_ret;
1008   field_size = 0;
1010   if (buffer_size <= 0)
1011     return (-1);
1013   /* This is ensured by `handle_request'. */
1014   assert (buffer[buffer_size - 1] == '\0');
1016   status = -1;
1017   while (buffer_pos < buffer_size)
1018   {
1019     /* Check for end-of-field or end-of-buffer */
1020     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1021     {
1022       field[field_size] = 0;
1023       field_size++;
1024       buffer_pos++;
1025       status = 0;
1026       break;
1027     }
1028     /* Handle escaped characters. */
1029     else if (buffer[buffer_pos] == '\\')
1030     {
1031       if (buffer_pos >= (buffer_size - 1))
1032         break;
1033       buffer_pos++;
1034       field[field_size] = buffer[buffer_pos];
1035       field_size++;
1036       buffer_pos++;
1037     }
1038     /* Normal operation */ 
1039     else
1040     {
1041       field[field_size] = buffer[buffer_pos];
1042       field_size++;
1043       buffer_pos++;
1044     }
1045   } /* while (buffer_pos < buffer_size) */
1047   if (status != 0)
1048     return (status);
1050   *buffer_ret = buffer + buffer_pos;
1051   *buffer_size_ret = buffer_size - buffer_pos;
1052   *field_ret = field;
1054   return (0);
1055 } /* }}} int buffer_get_field */
1057 /* if we're restricting writes to the base directory,
1058  * check whether the file falls within the dir
1059  * returns 1 if OK, otherwise 0
1060  */
1061 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1063   assert(file != NULL);
1065   if (!config_write_base_only
1066       || JOURNAL_REPLAY(sock)
1067       || config_base_dir == NULL)
1068     return 1;
1070   if (strstr(file, "../") != NULL) goto err;
1072   /* relative paths without "../" are ok */
1073   if (*file != '/') return 1;
1075   /* file must be of the format base + "/" + <1+ char filename> */
1076   if (strlen(file) < _config_base_dir_len + 2) goto err;
1077   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1078   if (*(file + _config_base_dir_len) != '/') goto err;
1080   return 1;
1082 err:
1083   if (sock != NULL && sock->fd >= 0)
1084     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1086   return 0;
1087 } /* }}} static int check_file_access */
1089 /* when using a base dir, convert relative paths to absolute paths.
1090  * if necessary, modifies the "filename" pointer to point
1091  * to the new path created in "tmp".  "tmp" is provided
1092  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1093  *
1094  * this allows us to optimize for the expected case (absolute path)
1095  * with a no-op.
1096  */
1097 static void get_abs_path(char **filename, char *tmp)
1099   assert(tmp != NULL);
1100   assert(filename != NULL && *filename != NULL);
1102   if (config_base_dir == NULL || **filename == '/')
1103     return;
1105   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1106   *filename = tmp;
1107 } /* }}} static int get_abs_path */
1109 static int flush_file (const char *filename) /* {{{ */
1111   cache_item_t *ci;
1113   pthread_mutex_lock (&cache_lock);
1115   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1116   if (ci == NULL)
1117   {
1118     pthread_mutex_unlock (&cache_lock);
1119     return (ENOENT);
1120   }
1122   if (ci->values_num > 0)
1123   {
1124     /* Enqueue at head */
1125     enqueue_cache_item (ci, HEAD);
1126     pthread_cond_wait(&ci->flushed, &cache_lock);
1127   }
1129   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1130    * may have been purged during our cond_wait() */
1132   pthread_mutex_unlock(&cache_lock);
1134   return (0);
1135 } /* }}} int flush_file */
1137 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1139   char *err = "Syntax error.\n";
1141   if (cmd && cmd->syntax)
1142     err = cmd->syntax;
1144   return send_response(sock, RESP_ERR, "Usage: %s", err);
1145 } /* }}} static int syntax_error() */
1147 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1149   uint64_t copy_queue_length;
1150   uint64_t copy_updates_received;
1151   uint64_t copy_flush_received;
1152   uint64_t copy_updates_written;
1153   uint64_t copy_data_sets_written;
1154   uint64_t copy_journal_bytes;
1155   uint64_t copy_journal_rotate;
1157   uint64_t tree_nodes_number;
1158   uint64_t tree_depth;
1160   pthread_mutex_lock (&stats_lock);
1161   copy_queue_length       = stats_queue_length;
1162   copy_updates_received   = stats_updates_received;
1163   copy_flush_received     = stats_flush_received;
1164   copy_updates_written    = stats_updates_written;
1165   copy_data_sets_written  = stats_data_sets_written;
1166   copy_journal_bytes      = stats_journal_bytes;
1167   copy_journal_rotate     = stats_journal_rotate;
1168   pthread_mutex_unlock (&stats_lock);
1170   pthread_mutex_lock (&cache_lock);
1171   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1172   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1173   pthread_mutex_unlock (&cache_lock);
1175   add_response_info(sock,
1176                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1177   add_response_info(sock,
1178                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1179   add_response_info(sock,
1180                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1181   add_response_info(sock,
1182                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1183   add_response_info(sock,
1184                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1185   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1186   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1187   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1188   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1190   send_response(sock, RESP_OK, "Statistics follow\n");
1192   return (0);
1193 } /* }}} int handle_request_stats */
1195 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1197   char *file, file_tmp[PATH_MAX];
1198   int status;
1200   status = buffer_get_field (&buffer, &buffer_size, &file);
1201   if (status != 0)
1202   {
1203     return syntax_error(sock,cmd);
1204   }
1205   else
1206   {
1207     pthread_mutex_lock(&stats_lock);
1208     stats_flush_received++;
1209     pthread_mutex_unlock(&stats_lock);
1211     get_abs_path(&file, file_tmp);
1212     if (!check_file_access(file, sock)) return 0;
1214     status = flush_file (file);
1215     if (status == 0)
1216       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1217     else if (status == ENOENT)
1218     {
1219       /* no file in our tree; see whether it exists at all */
1220       struct stat statbuf;
1222       memset(&statbuf, 0, sizeof(statbuf));
1223       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1224         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1225       else
1226         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1227     }
1228     else if (status < 0)
1229       return send_response(sock, RESP_ERR, "Internal error.\n");
1230     else
1231       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1232   }
1234   /* NOTREACHED */
1235   assert(1==0);
1236 } /* }}} int handle_request_flush */
1238 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1240   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1242   pthread_mutex_lock(&cache_lock);
1243   flush_old_values(-1);
1244   pthread_mutex_unlock(&cache_lock);
1246   return send_response(sock, RESP_OK, "Started flush.\n");
1247 } /* }}} static int handle_request_flushall */
1249 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1251   int status;
1252   char *file, file_tmp[PATH_MAX];
1253   cache_item_t *ci;
1255   status = buffer_get_field(&buffer, &buffer_size, &file);
1256   if (status != 0)
1257     return syntax_error(sock,cmd);
1259   get_abs_path(&file, file_tmp);
1261   pthread_mutex_lock(&cache_lock);
1262   ci = g_tree_lookup(cache_tree, file);
1263   if (ci == NULL)
1264   {
1265     pthread_mutex_unlock(&cache_lock);
1266     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267   }
1269   for (size_t i=0; i < ci->values_num; i++)
1270     add_response_info(sock, "%s\n", ci->values[i]);
1272   pthread_mutex_unlock(&cache_lock);
1273   return send_response(sock, RESP_OK, "updates pending\n");
1274 } /* }}} static int handle_request_pending */
1276 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1278   int status;
1279   gboolean found;
1280   char *file, file_tmp[PATH_MAX];
1282   status = buffer_get_field(&buffer, &buffer_size, &file);
1283   if (status != 0)
1284     return syntax_error(sock,cmd);
1286   get_abs_path(&file, file_tmp);
1287   if (!check_file_access(file, sock)) return 0;
1289   pthread_mutex_lock(&cache_lock);
1290   found = g_tree_remove(cache_tree, file);
1291   pthread_mutex_unlock(&cache_lock);
1293   if (found == TRUE)
1294   {
1295     if (!JOURNAL_REPLAY(sock))
1296       journal_write("forget", file);
1298     return send_response(sock, RESP_OK, "Gone!\n");
1299   }
1300   else
1301     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1303   /* NOTREACHED */
1304   assert(1==0);
1305 } /* }}} static int handle_request_forget */
1307 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1309   cache_item_t *ci;
1311   pthread_mutex_lock(&cache_lock);
1313   ci = cache_queue_head;
1314   while (ci != NULL)
1315   {
1316     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1317     ci = ci->next;
1318   }
1320   pthread_mutex_unlock(&cache_lock);
1322   return send_response(sock, RESP_OK, "in queue.\n");
1323 } /* }}} int handle_request_queue */
1325 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1327   char *file, file_tmp[PATH_MAX];
1328   int values_num = 0;
1329   int status;
1330   char orig_buf[CMD_MAX];
1332   cache_item_t *ci;
1334   /* save it for the journal later */
1335   if (!JOURNAL_REPLAY(sock))
1336     strncpy(orig_buf, buffer, buffer_size);
1338   status = buffer_get_field (&buffer, &buffer_size, &file);
1339   if (status != 0)
1340     return syntax_error(sock,cmd);
1342   pthread_mutex_lock(&stats_lock);
1343   stats_updates_received++;
1344   pthread_mutex_unlock(&stats_lock);
1346   get_abs_path(&file, file_tmp);
1347   if (!check_file_access(file, sock)) return 0;
1349   pthread_mutex_lock (&cache_lock);
1350   ci = g_tree_lookup (cache_tree, file);
1352   if (ci == NULL) /* {{{ */
1353   {
1354     struct stat statbuf;
1355     cache_item_t *tmp;
1357     /* don't hold the lock while we setup; stat(2) might block */
1358     pthread_mutex_unlock(&cache_lock);
1360     memset (&statbuf, 0, sizeof (statbuf));
1361     status = stat (file, &statbuf);
1362     if (status != 0)
1363     {
1364       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1366       status = errno;
1367       if (status == ENOENT)
1368         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1369       else
1370         return send_response(sock, RESP_ERR,
1371                              "stat failed with error %i.\n", status);
1372     }
1373     if (!S_ISREG (statbuf.st_mode))
1374       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1376     if (access(file, R_OK|W_OK) != 0)
1377       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1378                            file, rrd_strerror(errno));
1380     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1381     if (ci == NULL)
1382     {
1383       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1385       return send_response(sock, RESP_ERR, "malloc failed.\n");
1386     }
1387     memset (ci, 0, sizeof (cache_item_t));
1389     ci->file = strdup (file);
1390     if (ci->file == NULL)
1391     {
1392       free (ci);
1393       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1395       return send_response(sock, RESP_ERR, "strdup failed.\n");
1396     }
1398     wipe_ci_values(ci, now);
1399     ci->flags = CI_FLAGS_IN_TREE;
1400     pthread_cond_init(&ci->flushed, NULL);
1402     pthread_mutex_lock(&cache_lock);
1404     /* another UPDATE might have added this entry in the meantime */
1405     tmp = g_tree_lookup (cache_tree, file);
1406     if (tmp == NULL)
1407       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1408     else
1409     {
1410       free_cache_item (ci);
1411       ci = tmp;
1412     }
1414     /* state may have changed while we were unlocked */
1415     if (state == SHUTDOWN)
1416       return -1;
1417   } /* }}} */
1418   assert (ci != NULL);
1420   /* don't re-write updates in replay mode */
1421   if (!JOURNAL_REPLAY(sock))
1422     journal_write("update", orig_buf);
1424   while (buffer_size > 0)
1425   {
1426     char *value;
1427     time_t stamp;
1428     char *eostamp;
1430     status = buffer_get_field (&buffer, &buffer_size, &value);
1431     if (status != 0)
1432     {
1433       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1434       break;
1435     }
1437     /* make sure update time is always moving forward */
1438     stamp = strtol(value, &eostamp, 10);
1439     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "Cannot find timestamp in '%s'!\n", value);
1444     }
1445     else if (stamp <= ci->last_update_stamp)
1446     {
1447       pthread_mutex_unlock(&cache_lock);
1448       return send_response(sock, RESP_ERR,
1449                            "illegal attempt to update using time %ld when last"
1450                            " update time is %ld (minimum one second step)\n",
1451                            stamp, ci->last_update_stamp);
1452     }
1453     else
1454       ci->last_update_stamp = stamp;
1456     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1457                               &ci->values_alloc, config_alloc_chunk))
1458     {
1459       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1460       continue;
1461     }
1463     values_num++;
1464   }
1466   if (((now - ci->last_flush_time) >= config_write_interval)
1467       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1468       && (ci->values_num > 0))
1469   {
1470     enqueue_cache_item (ci, TAIL);
1471   }
1473   pthread_mutex_unlock (&cache_lock);
1475   if (values_num < 1)
1476     return send_response(sock, RESP_ERR, "No values updated.\n");
1477   else
1478     return send_response(sock, RESP_OK,
1479                          "errors, enqueued %i value(s).\n", values_num);
1481   /* NOTREACHED */
1482   assert(1==0);
1484 } /* }}} int handle_request_update */
1486 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1488   char *file, file_tmp[PATH_MAX];
1489   char *cf;
1491   char *start_str;
1492   char *end_str;
1493   time_t start_tm;
1494   time_t end_tm;
1496   unsigned long step;
1497   unsigned long ds_cnt;
1498   char **ds_namv;
1499   rrd_value_t *data;
1501   int status;
1502   unsigned long i;
1503   time_t t;
1504   rrd_value_t *data_ptr;
1506   file = NULL;
1507   cf = NULL;
1508   start_str = NULL;
1509   end_str = NULL;
1511   /* Read the arguments */
1512   do /* while (0) */
1513   {
1514     status = buffer_get_field (&buffer, &buffer_size, &file);
1515     if (status != 0)
1516       break;
1518     status = buffer_get_field (&buffer, &buffer_size, &cf);
1519     if (status != 0)
1520       break;
1522     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1523     if (status != 0)
1524     {
1525       start_str = NULL;
1526       status = 0;
1527       break;
1528     }
1530     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1531     if (status != 0)
1532     {
1533       end_str = NULL;
1534       status = 0;
1535       break;
1536     }
1537   } while (0);
1539   if (status != 0)
1540     return (syntax_error(sock,cmd));
1542   get_abs_path(&file, file_tmp);
1543   if (!check_file_access(file, sock)) return 0;
1545   status = flush_file (file);
1546   if ((status != 0) && (status != ENOENT))
1547     return (send_response (sock, RESP_ERR,
1548           "flush_file (%s) failed with status %i.\n", file, status));
1550   t = time (NULL); /* "now" */
1552   /* Parse start time */
1553   if (start_str != NULL)
1554   {
1555     char *endptr;
1556     long value;
1558     endptr = NULL;
1559     errno = 0;
1560     value = strtol (start_str, &endptr, /* base = */ 0);
1561     if ((endptr == start_str) || (errno != 0))
1562       return (send_response(sock, RESP_ERR,
1563             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1564             start_str));
1566     if (value > 0)
1567       start_tm = (time_t) value;
1568     else
1569       start_tm = (time_t) (t + value);
1570   }
1571   else
1572   {
1573     start_tm = t - 86400;
1574   }
1576   /* Parse end time */
1577   if (end_str != NULL)
1578   {
1579     char *endptr;
1580     long value;
1582     endptr = NULL;
1583     errno = 0;
1584     value = strtol (end_str, &endptr, /* base = */ 0);
1585     if ((endptr == end_str) || (errno != 0))
1586       return (send_response(sock, RESP_ERR,
1587             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1588             end_str));
1590     if (value > 0)
1591       end_tm = (time_t) value;
1592     else
1593       end_tm = (time_t) (t + value);
1594   }
1595   else
1596   {
1597     end_tm = t;
1598   }
1600   step = -1;
1601   ds_cnt = 0;
1602   ds_namv = NULL;
1603   data = NULL;
1605   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1606       &ds_cnt, &ds_namv, &data);
1607   if (status != 0)
1608     return (send_response(sock, RESP_ERR,
1609           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1611   add_response_info (sock, "FlushVersion: %lu\n", 1);
1612   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1613   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1614   add_response_info (sock, "Step: %lu\n", step);
1615   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1617 #define SSTRCAT(buffer,str,buffer_fill) do { \
1618     size_t str_len = strlen (str); \
1619     if ((buffer_fill + str_len) > sizeof (buffer)) \
1620       str_len = sizeof (buffer) - buffer_fill; \
1621     if (str_len > 0) { \
1622       strncpy (buffer + buffer_fill, str, str_len); \
1623       buffer_fill += str_len; \
1624       assert (buffer_fill <= sizeof (buffer)); \
1625       if (buffer_fill == sizeof (buffer)) \
1626         buffer[buffer_fill - 1] = 0; \
1627       else \
1628         buffer[buffer_fill] = 0; \
1629     } \
1630   } while (0)
1632   { /* Add list of DS names */
1633     char linebuf[1024];
1634     size_t linebuf_fill;
1636     memset (linebuf, 0, sizeof (linebuf));
1637     linebuf_fill = 0;
1638     for (i = 0; i < ds_cnt; i++)
1639     {
1640       if (i > 0)
1641         SSTRCAT (linebuf, " ", linebuf_fill);
1642       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1643       rrd_freemem(ds_namv[i]);
1644     }
1645     rrd_freemem(ds_namv);
1646     add_response_info (sock, "DSName: %s\n", linebuf);
1647   }
1649   /* Add the actual data */
1650   assert (step > 0);
1651   data_ptr = data;
1652   for (t = start_tm + step; t <= end_tm; t += step)
1653   {
1654     char linebuf[1024];
1655     size_t linebuf_fill;
1656     char tmp[128];
1658     memset (linebuf, 0, sizeof (linebuf));
1659     linebuf_fill = 0;
1660     for (i = 0; i < ds_cnt; i++)
1661     {
1662       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1663       tmp[sizeof (tmp) - 1] = 0;
1664       SSTRCAT (linebuf, tmp, linebuf_fill);
1666       data_ptr++;
1667     }
1669     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1670   } /* for (t) */
1671   rrd_freemem(data);
1673   return (send_response (sock, RESP_OK, "Success\n"));
1674 #undef SSTRCAT
1675 } /* }}} int handle_request_fetch */
1677 /* we came across a "WROTE" entry during journal replay.
1678  * throw away any values that we have accumulated for this file
1679  */
1680 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1682   cache_item_t *ci;
1683   const char *file = buffer;
1685   pthread_mutex_lock(&cache_lock);
1687   ci = g_tree_lookup(cache_tree, file);
1688   if (ci == NULL)
1689   {
1690     pthread_mutex_unlock(&cache_lock);
1691     return (0);
1692   }
1694   if (ci->values)
1695     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1697   wipe_ci_values(ci, now);
1698   remove_from_queue(ci);
1700   pthread_mutex_unlock(&cache_lock);
1701   return (0);
1702 } /* }}} int handle_request_wrote */
1704 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1706   char *file, file_tmp[PATH_MAX];
1707   int status;
1708   rrd_info_t *info;
1710   /* obtain filename */
1711   status = buffer_get_field(&buffer, &buffer_size, &file);
1712   if (status != 0)
1713     return syntax_error(sock,cmd);
1714   /* get full pathname */
1715   get_abs_path(&file, file_tmp);
1716   if (!check_file_access(file, sock)) {
1717     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1718   }
1719   /* get data */
1720   rrd_clear_error ();
1721   info = rrd_info_r(file);
1722   if(!info) {
1723     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1724   }
1725   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1726       switch (data->type) {
1727       case RD_I_VAL:
1728           if (isnan(data->value.u_val))
1729               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1730           else
1731               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1732           break;
1733       case RD_I_CNT:
1734           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1735           break;
1736       case RD_I_INT:
1737           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1738           break;
1739       case RD_I_STR:
1740           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1741           break;
1742       case RD_I_BLO:
1743           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1744           break;
1745       }
1746   }
1748   rrd_info_free(info);
1750   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1751 } /* }}} static int handle_request_info  */
1753 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1755   char *i, *file, file_tmp[PATH_MAX];
1756   int status;
1757   int idx;
1758   time_t t;
1760   /* obtain filename */
1761   status = buffer_get_field(&buffer, &buffer_size, &file);
1762   if (status != 0)
1763     return syntax_error(sock,cmd);
1764   /* get full pathname */
1765   get_abs_path(&file, file_tmp);
1766   if (!check_file_access(file, sock)) {
1767     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1768   }
1770   status = buffer_get_field(&buffer, &buffer_size, &i);
1771   if (status != 0)
1772     return syntax_error(sock,cmd);
1773   idx = atoi(i);
1774   if(idx<0) { 
1775     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1776   }
1778   /* get data */
1779   rrd_clear_error ();
1780   t = rrd_first_r(file,idx);
1781   if(t<1) {
1782     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1783   }
1784   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1785 } /* }}} static int handle_request_first  */
1788 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1790   char *file, file_tmp[PATH_MAX];
1791   int status;
1792   time_t t, from_file, step;
1793   rrd_file_t * rrd_file;
1794   cache_item_t * ci;
1795   rrd_t rrd; 
1797   /* obtain filename */
1798   status = buffer_get_field(&buffer, &buffer_size, &file);
1799   if (status != 0)
1800     return syntax_error(sock,cmd);
1801   /* get full pathname */
1802   get_abs_path(&file, file_tmp);
1803   if (!check_file_access(file, sock)) {
1804     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1805   }
1806   rrd_clear_error();
1807   rrd_init(&rrd);
1808   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1809   if(!rrd_file) {
1810     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1811   }
1812   from_file = rrd.live_head->last_up;
1813   step = rrd.stat_head->pdp_step;
1814   rrd_close(rrd_file);
1815   pthread_mutex_lock(&cache_lock);
1816   ci = g_tree_lookup(cache_tree, file);
1817   if (ci)
1818     t = ci->last_update_stamp;
1819   else
1820     t = from_file;
1821   pthread_mutex_unlock(&cache_lock);
1822   t -= t % step;
1823   rrd_free(&rrd);
1824   if(t<1) {
1825     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1826   }
1827   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1828 } /* }}} static int handle_request_last  */
1830 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1832   char *file, file_tmp[PATH_MAX];
1833   char *tok;
1834   int ac = 0;
1835   char *av[128];
1836   int status;
1837   unsigned long step = 300;
1838   time_t last_up = time(NULL)-10;
1839   int no_overwrite = opt_no_overwrite;
1842   /* obtain filename */
1843   status = buffer_get_field(&buffer, &buffer_size, &file);
1844   if (status != 0)
1845     return syntax_error(sock,cmd);
1846   /* get full pathname */
1847   get_abs_path(&file, file_tmp);
1848   if (!check_file_access(file, sock)) {
1849     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1850   }
1851   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1853   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1854     if( ! strncmp(tok,"-b",2) ) {
1855       status = buffer_get_field(&buffer, &buffer_size, &tok );
1856       if (status != 0) return syntax_error(sock,cmd);
1857       last_up = (time_t) atol(tok);
1858       continue;
1859     }
1860     if( ! strncmp(tok,"-s",2) ) {
1861       status = buffer_get_field(&buffer, &buffer_size, &tok );
1862       if (status != 0) return syntax_error(sock,cmd);
1863       step = atol(tok);
1864       continue;
1865     }
1866     if( ! strncmp(tok,"-O",2) ) {
1867       no_overwrite = 1;
1868       continue;
1869     }
1870     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1871     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1872     return syntax_error(sock,cmd);
1873   }
1874   if(step<1) {
1875     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1876   }
1877   if (last_up < 3600 * 24 * 365 * 10) {
1878     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1879   }
1881   rrd_clear_error ();
1882   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1884   if(!status) {
1885     return send_response(sock, RESP_OK, "RRD created OK\n");
1886   }
1887   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1888 } /* }}} static int handle_request_create  */
1890 /* start "BATCH" processing */
1891 static int batch_start (HANDLER_PROTO) /* {{{ */
1893   int status;
1894   if (sock->batch_start)
1895     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1897   status = send_response(sock, RESP_OK,
1898                          "Go ahead.  End with dot '.' on its own line.\n");
1899   sock->batch_start = time(NULL);
1900   sock->batch_cmd = 0;
1902   return status;
1903 } /* }}} static int batch_start */
1905 /* finish "BATCH" processing and return results to the client */
1906 static int batch_done (HANDLER_PROTO) /* {{{ */
1908   assert(sock->batch_start);
1909   sock->batch_start = 0;
1910   sock->batch_cmd  = 0;
1911   return send_response(sock, RESP_OK, "errors\n");
1912 } /* }}} static int batch_done */
1914 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1916   return -1;
1917 } /* }}} static int handle_request_quit */
1919 static command_t list_of_commands[] = { /* {{{ */
1920   {
1921     "UPDATE",
1922     handle_request_update,
1923     CMD_CONTEXT_ANY,
1924     "UPDATE <filename> <values> [<values> ...]\n"
1925     ,
1926     "Adds the given file to the internal cache if it is not yet known and\n"
1927     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1928     "for details.\n"
1929     "\n"
1930     "Each <values> has the following form:\n"
1931     "  <values> = <time>:<value>[:<value>[...]]\n"
1932     "See the rrdupdate(1) manpage for details.\n"
1933   },
1934   {
1935     "WROTE",
1936     handle_request_wrote,
1937     CMD_CONTEXT_JOURNAL,
1938     NULL,
1939     NULL
1940   },
1941   {
1942     "FLUSH",
1943     handle_request_flush,
1944     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1945     "FLUSH <filename>\n"
1946     ,
1947     "Adds the given filename to the head of the update queue and returns\n"
1948     "after it has been dequeued.\n"
1949   },
1950   {
1951     "FLUSHALL",
1952     handle_request_flushall,
1953     CMD_CONTEXT_CLIENT,
1954     "FLUSHALL\n"
1955     ,
1956     "Triggers writing of all pending updates.  Returns immediately.\n"
1957   },
1958   {
1959     "PENDING",
1960     handle_request_pending,
1961     CMD_CONTEXT_CLIENT,
1962     "PENDING <filename>\n"
1963     ,
1964     "Shows any 'pending' updates for a file, in order.\n"
1965     "The updates shown have not yet been written to the underlying RRD file.\n"
1966   },
1967   {
1968     "FORGET",
1969     handle_request_forget,
1970     CMD_CONTEXT_ANY,
1971     "FORGET <filename>\n"
1972     ,
1973     "Removes the file completely from the cache.\n"
1974     "Any pending updates for the file will be lost.\n"
1975   },
1976   {
1977     "QUEUE",
1978     handle_request_queue,
1979     CMD_CONTEXT_CLIENT,
1980     "QUEUE\n"
1981     ,
1982         "Shows all files in the output queue.\n"
1983     "The output is zero or more lines in the following format:\n"
1984     "(where <num_vals> is the number of values to be written)\n"
1985     "\n"
1986     "<num_vals> <filename>\n"
1987   },
1988   {
1989     "STATS",
1990     handle_request_stats,
1991     CMD_CONTEXT_CLIENT,
1992     "STATS\n"
1993     ,
1994     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1995     "a description of the values.\n"
1996   },
1997   {
1998     "HELP",
1999     handle_request_help,
2000     CMD_CONTEXT_CLIENT,
2001     "HELP [<command>]\n",
2002     NULL, /* special! */
2003   },
2004   {
2005     "BATCH",
2006     batch_start,
2007     CMD_CONTEXT_CLIENT,
2008     "BATCH\n"
2009     ,
2010     "The 'BATCH' command permits the client to initiate a bulk load\n"
2011     "   of commands to rrdcached.\n"
2012     "\n"
2013     "Usage:\n"
2014     "\n"
2015     "    client: BATCH\n"
2016     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2017     "    client: command #1\n"
2018     "    client: command #2\n"
2019     "    client: ... and so on\n"
2020     "    client: .\n"
2021     "    server: 2 errors\n"
2022     "    server: 7 message for command #7\n"
2023     "    server: 9 message for command #9\n"
2024     "\n"
2025     "For more information, consult the rrdcached(1) documentation.\n"
2026   },
2027   {
2028     ".",   /* BATCH terminator */
2029     batch_done,
2030     CMD_CONTEXT_BATCH,
2031     NULL,
2032     NULL
2033   },
2034   {
2035     "FETCH",
2036     handle_request_fetch,
2037     CMD_CONTEXT_CLIENT,
2038     "FETCH <file> <CF> [<start> [<end>]]\n"
2039     ,
2040     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2041   },
2042   {
2043     "INFO",
2044     handle_request_info,
2045     CMD_CONTEXT_CLIENT,
2046     "INFO <filename>\n",
2047     "The INFO command retrieves information about a specified RRD file.\n"
2048     "This is returned in standard rrdinfo format, a sequence of lines\n"
2049     "with the format <keyname> = <value>\n"
2050     "Note that this is the data as of the last update of the RRD file itself,\n"
2051     "not the last time data was received via rrdcached, so there may be pending\n"
2052     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2053   },
2054   {
2055     "FIRST",
2056     handle_request_first,
2057     CMD_CONTEXT_CLIENT,
2058     "FIRST <filename> <rra index>\n",
2059     "The FIRST command retrieves the first data time for a specified RRA in\n"
2060     "an RRD file.\n"
2061   },
2062   {
2063     "LAST",
2064     handle_request_last,
2065     CMD_CONTEXT_CLIENT,
2066     "LAST <filename>\n",
2067     "The LAST command retrieves the last update time for a specified RRD file.\n"
2068     "Note that this is the time of the last update of the RRD file itself, not\n"
2069     "the last time data was received via rrdcached, so there may be pending\n"
2070     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2071   },
2072   {
2073     "CREATE",
2074     handle_request_create,
2075     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2076     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2077     "The CREATE command will create an RRD file, overwriting any existing file\n"
2078     "unless the -O option is given or rrdcached was started with the -O option.\n"
2079     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2080     "not acceptable) and the step is in seconds (default is 300).\n"
2081     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2082   },
2083   {
2084     "QUIT",
2085     handle_request_quit,
2086     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2087     "QUIT\n"
2088     ,
2089     "Disconnect from rrdcached.\n"
2090   }
2091 }; /* }}} command_t list_of_commands[] */
2092 static size_t list_of_commands_len = sizeof (list_of_commands)
2093   / sizeof (list_of_commands[0]);
2095 static command_t *find_command(char *cmd)
2097   size_t i;
2099   for (i = 0; i < list_of_commands_len; i++)
2100     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2101       return (&list_of_commands[i]);
2102   return NULL;
2105 /* We currently use the index in the `list_of_commands' array as a bit position
2106  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2107  * outside these functions so that switching to a more elegant storage method
2108  * is easily possible. */
2109 static ssize_t find_command_index (const char *cmd) /* {{{ */
2111   size_t i;
2113   for (i = 0; i < list_of_commands_len; i++)
2114     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2115       return ((ssize_t) i);
2116   return (-1);
2117 } /* }}} ssize_t find_command_index */
2119 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2120     const char *cmd)
2122   ssize_t i;
2124   if (JOURNAL_REPLAY(sock))
2125     return (1);
2127   if (cmd == NULL)
2128     return (-1);
2130   if ((strcasecmp ("QUIT", cmd) == 0)
2131       || (strcasecmp ("HELP", cmd) == 0))
2132     return (1);
2133   else if (strcmp (".", cmd) == 0)
2134     cmd = "BATCH";
2136   i = find_command_index (cmd);
2137   if (i < 0)
2138     return (-1);
2139   assert (i < 32);
2141   if ((sock->permissions & (1 << i)) != 0)
2142     return (1);
2143   return (0);
2144 } /* }}} int socket_permission_check */
2146 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2147     const char *cmd)
2149   ssize_t i;
2151   i = find_command_index (cmd);
2152   if (i < 0)
2153     return (-1);
2154   assert (i < 32);
2156   sock->permissions |= (1 << i);
2157   return (0);
2158 } /* }}} int socket_permission_add */
2160 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2162   sock->permissions = 0;
2163 } /* }}} socket_permission_clear */
2165 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2166     listen_socket_t *src)
2168   dest->permissions = src->permissions;
2169 } /* }}} socket_permission_copy */
2171 /* check whether commands are received in the expected context */
2172 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2174   if (JOURNAL_REPLAY(sock))
2175     return (cmd->context & CMD_CONTEXT_JOURNAL);
2176   else if (sock->batch_start)
2177     return (cmd->context & CMD_CONTEXT_BATCH);
2178   else
2179     return (cmd->context & CMD_CONTEXT_CLIENT);
2181   /* NOTREACHED */
2182   assert(1==0);
2185 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2187   int status;
2188   char *cmd_str;
2189   char *resp_txt;
2190   command_t *help = NULL;
2192   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2193   if (status == 0)
2194     help = find_command(cmd_str);
2196   if (help && (help->syntax || help->help))
2197   {
2198     char tmp[CMD_MAX];
2200     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2201     resp_txt = tmp;
2203     if (help->syntax)
2204       add_response_info(sock, "Usage: %s\n", help->syntax);
2206     if (help->help)
2207       add_response_info(sock, "%s\n", help->help);
2208   }
2209   else
2210   {
2211     size_t i;
2213     resp_txt = "Command overview\n";
2215     for (i = 0; i < list_of_commands_len; i++)
2216     {
2217       if (list_of_commands[i].syntax == NULL)
2218         continue;
2219       add_response_info (sock, "%s", list_of_commands[i].syntax);
2220     }
2221   }
2223   return send_response(sock, RESP_OK, resp_txt);
2224 } /* }}} int handle_request_help */
2226 static int handle_request (DISPATCH_PROTO) /* {{{ */
2228   char *buffer_ptr = buffer;
2229   char *cmd_str = NULL;
2230   command_t *cmd = NULL;
2231   int status;
2233   assert (buffer[buffer_size - 1] == '\0');
2235   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2236   if (status != 0)
2237   {
2238     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2239     return (-1);
2240   }
2242   if (sock != NULL && sock->batch_start)
2243     sock->batch_cmd++;
2245   cmd = find_command(cmd_str);
2246   if (!cmd)
2247     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2249   if (!socket_permission_check (sock, cmd->cmd))
2250     return send_response(sock, RESP_ERR, "Permission denied.\n");
2252   if (!command_check_context(sock, cmd))
2253     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2255   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2256 } /* }}} int handle_request */
2258 static void journal_set_free (journal_set *js) /* {{{ */
2260   if (js == NULL)
2261     return;
2263   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2265   free(js);
2266 } /* }}} journal_set_free */
2268 static void journal_set_remove (journal_set *js) /* {{{ */
2270   if (js == NULL)
2271     return;
2273   for (uint i=0; i < js->files_num; i++)
2274   {
2275     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2276     unlink(js->files[i]);
2277   }
2278 } /* }}} journal_set_remove */
2280 /* close current journal file handle.
2281  * MUST hold journal_lock before calling */
2282 static void journal_close(void) /* {{{ */
2284   if (journal_fh != NULL)
2285   {
2286     if (fclose(journal_fh) != 0)
2287       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2288   }
2290   journal_fh = NULL;
2291   journal_size = 0;
2292 } /* }}} journal_close */
2294 /* MUST hold journal_lock before calling */
2295 static void journal_new_file(void) /* {{{ */
2297   struct timeval now;
2298   int  new_fd;
2299   char new_file[PATH_MAX + 1];
2301   assert(journal_dir != NULL);
2302   assert(journal_cur != NULL);
2304   journal_close();
2306   gettimeofday(&now, NULL);
2307   /* this format assures that the files sort in strcmp() order */
2308   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2309            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2311   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2312                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2313   if (new_fd < 0)
2314     goto error;
2316   journal_fh = fdopen(new_fd, "a");
2317   if (journal_fh == NULL)
2318     goto error;
2320   journal_size = ftell(journal_fh);
2321   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2323   /* record the file in the journal set */
2324   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2326   return;
2328 error:
2329   RRDD_LOG(LOG_CRIT,
2330            "JOURNALING DISABLED: Error while trying to create %s : %s",
2331            new_file, rrd_strerror(errno));
2332   RRDD_LOG(LOG_CRIT,
2333            "JOURNALING DISABLED: All values will be flushed at shutdown");
2335   close(new_fd);
2336   config_flush_at_shutdown = 1;
2338 } /* }}} journal_new_file */
2340 /* MUST NOT hold journal_lock before calling this */
2341 static void journal_rotate(void) /* {{{ */
2343   journal_set *old_js = NULL;
2345   if (journal_dir == NULL)
2346     return;
2348   RRDD_LOG(LOG_DEBUG, "rotating journals");
2350   pthread_mutex_lock(&stats_lock);
2351   ++stats_journal_rotate;
2352   pthread_mutex_unlock(&stats_lock);
2354   pthread_mutex_lock(&journal_lock);
2356   journal_close();
2358   /* rotate the journal sets */
2359   old_js = journal_old;
2360   journal_old = journal_cur;
2361   journal_cur = calloc(1, sizeof(journal_set));
2363   if (journal_cur != NULL)
2364     journal_new_file();
2365   else
2366     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2368   pthread_mutex_unlock(&journal_lock);
2370   journal_set_remove(old_js);
2371   journal_set_free  (old_js);
2373 } /* }}} static void journal_rotate */
2375 /* MUST hold journal_lock when calling */
2376 static void journal_done(void) /* {{{ */
2378   if (journal_cur == NULL)
2379     return;
2381   journal_close();
2383   if (config_flush_at_shutdown)
2384   {
2385     RRDD_LOG(LOG_INFO, "removing journals");
2386     journal_set_remove(journal_old);
2387     journal_set_remove(journal_cur);
2388   }
2389   else
2390   {
2391     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2392              "journals will be used at next startup");
2393   }
2395   journal_set_free(journal_cur);
2396   journal_set_free(journal_old);
2397   free(journal_dir);
2399 } /* }}} static void journal_done */
2401 static int journal_write(char *cmd, char *args) /* {{{ */
2403   int chars;
2405   if (journal_fh == NULL)
2406     return 0;
2408   pthread_mutex_lock(&journal_lock);
2409   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2410   journal_size += chars;
2412   if (journal_size > JOURNAL_MAX)
2413     journal_new_file();
2415   pthread_mutex_unlock(&journal_lock);
2417   if (chars > 0)
2418   {
2419     pthread_mutex_lock(&stats_lock);
2420     stats_journal_bytes += chars;
2421     pthread_mutex_unlock(&stats_lock);
2422   }
2424   return chars;
2425 } /* }}} static int journal_write */
2427 static int journal_replay (const char *file) /* {{{ */
2429   FILE *fh;
2430   int entry_cnt = 0;
2431   int fail_cnt = 0;
2432   uint64_t line = 0;
2433   char entry[CMD_MAX];
2434   time_t now;
2436   if (file == NULL) return 0;
2438   {
2439     char *reason = "unknown error";
2440     int status = 0;
2441     struct stat statbuf;
2443     memset(&statbuf, 0, sizeof(statbuf));
2444     if (stat(file, &statbuf) != 0)
2445     {
2446       reason = "stat error";
2447       status = errno;
2448     }
2449     else if (!S_ISREG(statbuf.st_mode))
2450     {
2451       reason = "not a regular file";
2452       status = EPERM;
2453     }
2454     if (statbuf.st_uid != daemon_uid)
2455     {
2456       reason = "not owned by daemon user";
2457       status = EACCES;
2458     }
2459     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2460     {
2461       reason = "must not be user/group writable";
2462       status = EACCES;
2463     }
2465     if (status != 0)
2466     {
2467       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2468                file, rrd_strerror(status), reason);
2469       return 0;
2470     }
2471   }
2473   fh = fopen(file, "r");
2474   if (fh == NULL)
2475   {
2476     if (errno != ENOENT)
2477       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2478                file, rrd_strerror(errno));
2479     return 0;
2480   }
2481   else
2482     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2484   now = time(NULL);
2486   while(!feof(fh))
2487   {
2488     size_t entry_len;
2490     ++line;
2491     if (fgets(entry, sizeof(entry), fh) == NULL)
2492       break;
2493     entry_len = strlen(entry);
2495     /* check \n termination in case journal writing crashed mid-line */
2496     if (entry_len == 0)
2497       continue;
2498     else if (entry[entry_len - 1] != '\n')
2499     {
2500       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2501       ++fail_cnt;
2502       continue;
2503     }
2505     entry[entry_len - 1] = '\0';
2507     if (handle_request(NULL, now, entry, entry_len) == 0)
2508       ++entry_cnt;
2509     else
2510       ++fail_cnt;
2511   }
2513   fclose(fh);
2515   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2516            entry_cnt, fail_cnt);
2518   return entry_cnt > 0 ? 1 : 0;
2519 } /* }}} static int journal_replay */
2521 static int journal_sort(const void *v1, const void *v2)
2523   char **jn1 = (char **) v1;
2524   char **jn2 = (char **) v2;
2526   return strcmp(*jn1,*jn2);
2529 static void journal_init(void) /* {{{ */
2531   int had_journal = 0;
2532   DIR *dir;
2533   struct dirent *dent;
2534   char path[PATH_MAX+1];
2536   if (journal_dir == NULL) return;
2538   pthread_mutex_lock(&journal_lock);
2540   journal_cur = calloc(1, sizeof(journal_set));
2541   if (journal_cur == NULL)
2542   {
2543     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2544     return;
2545   }
2547   RRDD_LOG(LOG_INFO, "checking for journal files");
2549   /* Handle old journal files during transition.  This gives them the
2550    * correct sort order.  TODO: remove after first release
2551    */
2552   {
2553     char old_path[PATH_MAX+1];
2554     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2555     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2556     rename(old_path, path);
2558     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2559     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2560     rename(old_path, path);
2561   }
2563   dir = opendir(journal_dir);
2564   if (!dir) {
2565     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2566     return;
2567   }
2568   while ((dent = readdir(dir)) != NULL)
2569   {
2570     /* looks like a journal file? */
2571     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2572       continue;
2574     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2576     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2577     {
2578       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2579                dent->d_name);
2580       break;
2581     }
2582   }
2583   closedir(dir);
2585   qsort(journal_cur->files, journal_cur->files_num,
2586         sizeof(journal_cur->files[0]), journal_sort);
2588   for (uint i=0; i < journal_cur->files_num; i++)
2589     had_journal += journal_replay(journal_cur->files[i]);
2591   journal_new_file();
2593   /* it must have been a crash.  start a flush */
2594   if (had_journal && config_flush_at_shutdown)
2595     flush_old_values(-1);
2597   pthread_mutex_unlock(&journal_lock);
2599   RRDD_LOG(LOG_INFO, "journal processing complete");
2601 } /* }}} static void journal_init */
2603 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2605   assert(sock != NULL);
2607   free(sock->rbuf);  sock->rbuf = NULL;
2608   free(sock->wbuf);  sock->wbuf = NULL;
2609   free(sock);
2610 } /* }}} void free_listen_socket */
2612 static void close_connection(listen_socket_t *sock) /* {{{ */
2614   if (sock->fd >= 0)
2615   {
2616     close(sock->fd);
2617     sock->fd = -1;
2618   }
2620   free_listen_socket(sock);
2622 } /* }}} void close_connection */
2624 static void *connection_thread_main (void *args) /* {{{ */
2626   listen_socket_t *sock;
2627   int fd;
2629   sock = (listen_socket_t *) args;
2630   fd = sock->fd;
2632   /* init read buffers */
2633   sock->next_read = sock->next_cmd = 0;
2634   sock->rbuf = malloc(RBUF_SIZE);
2635   if (sock->rbuf == NULL)
2636   {
2637     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2638     close_connection(sock);
2639     return NULL;
2640   }
2642   pthread_mutex_lock (&connection_threads_lock);
2643 #ifdef HAVE_LIBWRAP
2644   /* LIBWRAP does not support multiple threads! By putting this code
2645      inside pthread_mutex_lock we do not have to worry about request_info
2646      getting overwritten by another thread.
2647   */
2648   struct request_info req;
2649   request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2650   fromhost(&req);
2651   if(!hosts_access(&req)) {
2652     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2653     pthread_mutex_unlock (&connection_threads_lock);
2654     close_connection(sock);
2655     return NULL;
2656   }
2657 #endif /* HAVE_LIBWRAP */
2658   connection_threads_num++;
2659   pthread_mutex_unlock (&connection_threads_lock);
2661   while (state == RUNNING)
2662   {
2663     char *cmd;
2664     ssize_t cmd_len;
2665     ssize_t rbytes;
2666     time_t now;
2668     struct pollfd pollfd;
2669     int status;
2671     pollfd.fd = fd;
2672     pollfd.events = POLLIN | POLLPRI;
2673     pollfd.revents = 0;
2675     status = poll (&pollfd, 1, /* timeout = */ 500);
2676     if (state != RUNNING)
2677       break;
2678     else if (status == 0) /* timeout */
2679       continue;
2680     else if (status < 0) /* error */
2681     {
2682       status = errno;
2683       if (status != EINTR)
2684         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2685       continue;
2686     }
2688     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2689       break;
2690     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2691     {
2692       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2693           "poll(2) returned something unexpected: %#04hx",
2694           pollfd.revents);
2695       break;
2696     }
2698     rbytes = read(fd, sock->rbuf + sock->next_read,
2699                   RBUF_SIZE - sock->next_read);
2700     if (rbytes < 0)
2701     {
2702       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2703       break;
2704     }
2705     else if (rbytes == 0)
2706       break; /* eof */
2708     sock->next_read += rbytes;
2710     if (sock->batch_start)
2711       now = sock->batch_start;
2712     else
2713       now = time(NULL);
2715     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2716     {
2717       status = handle_request (sock, now, cmd, cmd_len+1);
2718       if (status != 0)
2719         goto out_close;
2720     }
2721   }
2723 out_close:
2724   close_connection(sock);
2726   /* Remove this thread from the connection threads list */
2727   pthread_mutex_lock (&connection_threads_lock);
2728   connection_threads_num--;
2729   if (connection_threads_num <= 0)
2730     pthread_cond_broadcast(&connection_threads_done);
2731   pthread_mutex_unlock (&connection_threads_lock);
2733   return (NULL);
2734 } /* }}} void *connection_thread_main */
2736 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2738   int fd;
2739   struct sockaddr_un sa;
2740   listen_socket_t *temp;
2741   int status;
2742   const char *path;
2743   char *path_copy, *dir;
2745   path = sock->addr;
2746   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2747     path += strlen("unix:");
2749   /* dirname may modify its argument */
2750   path_copy = strdup(path);
2751   if (path_copy == NULL)
2752   {
2753     fprintf(stderr, "rrdcached: strdup(): %s\n",
2754         rrd_strerror(errno));
2755     return (-1);
2756   }
2758   dir = dirname(path_copy);
2759   if (rrd_mkdir_p(dir, 0777) != 0)
2760   {
2761     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2762         dir, rrd_strerror(errno));
2763     return (-1);
2764   }
2766   free(path_copy);
2768   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2769       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2770   if (temp == NULL)
2771   {
2772     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2773     return (-1);
2774   }
2775   listen_fds = temp;
2776   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2778   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2779   if (fd < 0)
2780   {
2781     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2782              rrd_strerror(errno));
2783     return (-1);
2784   }
2786   memset (&sa, 0, sizeof (sa));
2787   sa.sun_family = AF_UNIX;
2788   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2790   /* if we've gotten this far, we own the pid file.  any daemon started
2791    * with the same args must not be alive.  therefore, ensure that we can
2792    * create the socket...
2793    */
2794   unlink(path);
2796   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2797   if (status != 0)
2798   {
2799     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2800              path, rrd_strerror(errno));
2801     close (fd);
2802     return (-1);
2803   }
2805   /* tweak the sockets group ownership */
2806   if (sock->socket_group != (gid_t)-1)
2807   {
2808     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2809          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2810     {
2811       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2812     }
2813   }
2815   if (sock->socket_permissions != (mode_t)-1)
2816   {
2817     if (chmod(path, sock->socket_permissions) != 0)
2818       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2819           (unsigned int)sock->socket_permissions, strerror(errno));
2820   }
2822   status = listen (fd, /* backlog = */ 10);
2823   if (status != 0)
2824   {
2825     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2826              path, rrd_strerror(errno));
2827     close (fd);
2828     unlink (path);
2829     return (-1);
2830   }
2832   listen_fds[listen_fds_num].fd = fd;
2833   listen_fds[listen_fds_num].family = PF_UNIX;
2834   strncpy(listen_fds[listen_fds_num].addr, path,
2835           sizeof (listen_fds[listen_fds_num].addr) - 1);
2836   listen_fds_num++;
2838   return (0);
2839 } /* }}} int open_listen_socket_unix */
2841 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2843   struct addrinfo ai_hints;
2844   struct addrinfo *ai_res;
2845   struct addrinfo *ai_ptr;
2846   char addr_copy[NI_MAXHOST];
2847   char *addr;
2848   char *port;
2849   int status;
2851   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2852   addr_copy[sizeof (addr_copy) - 1] = 0;
2853   addr = addr_copy;
2855   memset (&ai_hints, 0, sizeof (ai_hints));
2856   ai_hints.ai_flags = 0;
2857 #ifdef AI_ADDRCONFIG
2858   ai_hints.ai_flags |= AI_ADDRCONFIG;
2859 #endif
2860   ai_hints.ai_family = AF_UNSPEC;
2861   ai_hints.ai_socktype = SOCK_STREAM;
2863   port = NULL;
2864   if (*addr == '[') /* IPv6+port format */
2865   {
2866     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2867     addr++;
2869     port = strchr (addr, ']');
2870     if (port == NULL)
2871     {
2872       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2873       return (-1);
2874     }
2875     *port = 0;
2876     port++;
2878     if (*port == ':')
2879       port++;
2880     else if (*port == 0)
2881       port = NULL;
2882     else
2883     {
2884       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2885       return (-1);
2886     }
2887   } /* if (*addr == '[') */
2888   else
2889   {
2890     port = rindex(addr, ':');
2891     if (port != NULL)
2892     {
2893       *port = 0;
2894       port++;
2895     }
2896   }
2897   ai_res = NULL;
2898   status = getaddrinfo (addr,
2899                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2900                         &ai_hints, &ai_res);
2901   if (status != 0)
2902   {
2903     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2904              addr, gai_strerror (status));
2905     return (-1);
2906   }
2908   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2909   {
2910     int fd;
2911     listen_socket_t *temp;
2912     int one = 1;
2914     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2915         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2916     if (temp == NULL)
2917     {
2918       fprintf (stderr,
2919                "rrdcached: open_listen_socket_network: realloc failed.\n");
2920       continue;
2921     }
2922     listen_fds = temp;
2923     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2925     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2926     if (fd < 0)
2927     {
2928       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2929                rrd_strerror(errno));
2930       continue;
2931     }
2933     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2935     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2936     if (status != 0)
2937     {
2938       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2939                sock->addr, rrd_strerror(errno));
2940       close (fd);
2941       continue;
2942     }
2944     status = listen (fd, /* backlog = */ 10);
2945     if (status != 0)
2946     {
2947       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2948                sock->addr, rrd_strerror(errno));
2949       close (fd);
2950       freeaddrinfo(ai_res);
2951       return (-1);
2952     }
2954     listen_fds[listen_fds_num].fd = fd;
2955     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2956     listen_fds_num++;
2957   } /* for (ai_ptr) */
2959   freeaddrinfo(ai_res);
2960   return (0);
2961 } /* }}} static int open_listen_socket_network */
2963 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2965   assert(sock != NULL);
2966   assert(sock->addr != NULL);
2968   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2969       || sock->addr[0] == '/')
2970     return (open_listen_socket_unix(sock));
2971   else
2972     return (open_listen_socket_network(sock));
2973 } /* }}} int open_listen_socket */
2975 static int close_listen_sockets (void) /* {{{ */
2977   size_t i;
2979   for (i = 0; i < listen_fds_num; i++)
2980   {
2981     close (listen_fds[i].fd);
2983     if (listen_fds[i].family == PF_UNIX)
2984       unlink(listen_fds[i].addr);
2985   }
2987   free (listen_fds);
2988   listen_fds = NULL;
2989   listen_fds_num = 0;
2991   return (0);
2992 } /* }}} int close_listen_sockets */
2994 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2996   struct pollfd *pollfds;
2997   int pollfds_num;
2998   int status;
2999   int i;
3001   if (listen_fds_num < 1)
3002   {
3003     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3004     return (NULL);
3005   }
3007   pollfds_num = listen_fds_num;
3008   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3009   if (pollfds == NULL)
3010   {
3011     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3012     return (NULL);
3013   }
3014   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3016   RRDD_LOG(LOG_INFO, "listening for connections");
3018   while (state == RUNNING)
3019   {
3020     for (i = 0; i < pollfds_num; i++)
3021     {
3022       pollfds[i].fd = listen_fds[i].fd;
3023       pollfds[i].events = POLLIN | POLLPRI;
3024       pollfds[i].revents = 0;
3025     }
3027     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3028     if (state != RUNNING)
3029       break;
3030     else if (status == 0) /* timeout */
3031       continue;
3032     else if (status < 0) /* error */
3033     {
3034       status = errno;
3035       if (status != EINTR)
3036       {
3037         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3038       }
3039       continue;
3040     }
3042     for (i = 0; i < pollfds_num; i++)
3043     {
3044       listen_socket_t *client_sock;
3045       struct sockaddr_storage client_sa;
3046       socklen_t client_sa_size;
3047       pthread_t tid;
3048       pthread_attr_t attr;
3050       if (pollfds[i].revents == 0)
3051         continue;
3053       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3054       {
3055         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3056             "poll(2) returned something unexpected for listen FD #%i.",
3057             pollfds[i].fd);
3058         continue;
3059       }
3061       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3062       if (client_sock == NULL)
3063       {
3064         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3065         continue;
3066       }
3067       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3069       client_sa_size = sizeof (client_sa);
3070       client_sock->fd = accept (pollfds[i].fd,
3071           (struct sockaddr *) &client_sa, &client_sa_size);
3072       if (client_sock->fd < 0)
3073       {
3074         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3075         free(client_sock);
3076         continue;
3077       }
3079       pthread_attr_init (&attr);
3080       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3082       status = pthread_create (&tid, &attr, connection_thread_main,
3083                                client_sock);
3084       if (status != 0)
3085       {
3086         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3087         close_connection(client_sock);
3088         continue;
3089       }
3090     } /* for (pollfds_num) */
3091   } /* while (state == RUNNING) */
3093   RRDD_LOG(LOG_INFO, "starting shutdown");
3095   close_listen_sockets ();
3097   pthread_mutex_lock (&connection_threads_lock);
3098   while (connection_threads_num > 0)
3099     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3100   pthread_mutex_unlock (&connection_threads_lock);
3102   free(pollfds);
3104   return (NULL);
3105 } /* }}} void *listen_thread_main */
3107 static int daemonize (void) /* {{{ */
3109   int pid_fd;
3110   char *base_dir;
3112   daemon_uid = geteuid();
3114   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3115   if (pid_fd < 0)
3116     pid_fd = check_pidfile();
3117   if (pid_fd < 0)
3118     return pid_fd;
3120   /* open all the listen sockets */
3121   if (config_listen_address_list_len > 0)
3122   {
3123     for (size_t i = 0; i < config_listen_address_list_len; i++)
3124       open_listen_socket (config_listen_address_list[i]);
3126     rrd_free_ptrs((void ***) &config_listen_address_list,
3127                   &config_listen_address_list_len);
3128   }
3129   else
3130   {
3131     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3132         sizeof(default_socket.addr) - 1);
3133     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3134     open_listen_socket (&default_socket);
3135   }
3137   if (listen_fds_num < 1)
3138   {
3139     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3140     goto error;
3141   }
3143   if (!stay_foreground)
3144   {
3145     pid_t child;
3147     child = fork ();
3148     if (child < 0)
3149     {
3150       fprintf (stderr, "daemonize: fork(2) failed.\n");
3151       goto error;
3152     }
3153     else if (child > 0)
3154       exit(0);
3156     /* Become session leader */
3157     setsid ();
3159     /* Open the first three file descriptors to /dev/null */
3160     close (2);
3161     close (1);
3162     close (0);
3164     open ("/dev/null", O_RDWR);
3165     if (dup(0) == -1 || dup(0) == -1){
3166         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3167     }
3168   } /* if (!stay_foreground) */
3170   /* Change into the /tmp directory. */
3171   base_dir = (config_base_dir != NULL)
3172     ? config_base_dir
3173     : "/tmp";
3175   if (chdir (base_dir) != 0)
3176   {
3177     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3178     goto error;
3179   }
3181   install_signal_handlers();
3183   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3184   RRDD_LOG(LOG_INFO, "starting up");
3186   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3187                                 (GDestroyNotify) free_cache_item);
3188   if (cache_tree == NULL)
3189   {
3190     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3191     goto error;
3192   }
3194   return write_pidfile (pid_fd);
3196 error:
3197   remove_pidfile();
3198   return -1;
3199 } /* }}} int daemonize */
3201 static int cleanup (void) /* {{{ */
3203   pthread_cond_broadcast (&flush_cond);
3204   pthread_join (flush_thread, NULL);
3206   pthread_cond_broadcast (&queue_cond);
3207   for (int i = 0; i < config_queue_threads; i++)
3208     pthread_join (queue_threads[i], NULL);
3210   if (config_flush_at_shutdown)
3211   {
3212     assert(cache_queue_head == NULL);
3213     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3214   }
3216   free(queue_threads);
3217   free(config_base_dir);
3219   pthread_mutex_lock(&cache_lock);
3220   g_tree_destroy(cache_tree);
3222   pthread_mutex_lock(&journal_lock);
3223   journal_done();
3225   RRDD_LOG(LOG_INFO, "goodbye");
3226   closelog ();
3228   remove_pidfile ();
3229   free(config_pid_file);
3231   return (0);
3232 } /* }}} int cleanup */
3234 static int read_options (int argc, char **argv) /* {{{ */
3236   int option;
3237   int status = 0;
3239   socket_permission_clear (&default_socket);
3241   default_socket.socket_group = (gid_t)-1;
3242   default_socket.socket_permissions = (mode_t)-1;
3244   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3245   {
3246     switch (option)
3247     {
3248       case 'O':
3249         opt_no_overwrite = 1;
3250         break;
3252       case 'g':
3253         stay_foreground=1;
3254         break;
3256       case 'l':
3257       {
3258         listen_socket_t *new;
3260         new = malloc(sizeof(listen_socket_t));
3261         if (new == NULL)
3262         {
3263           fprintf(stderr, "read_options: malloc failed.\n");
3264           return(2);
3265         }
3266         memset(new, 0, sizeof(listen_socket_t));
3268         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3270         /* Add permissions to the socket {{{ */
3271         if (default_socket.permissions != 0)
3272         {
3273           socket_permission_copy (new, &default_socket);
3274         }
3275         else /* if (default_socket.permissions == 0) */
3276         {
3277           /* Add permission for ALL commands to the socket. */
3278           size_t i;
3279           for (i = 0; i < list_of_commands_len; i++)
3280           {
3281             status = socket_permission_add (new, list_of_commands[i].cmd);
3282             if (status != 0)
3283             {
3284               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3285                   "socket failed. This should never happen, ever! Sorry.\n",
3286                   list_of_commands[i].cmd);
3287               status = 4;
3288             }
3289           }
3290         }
3291         /* }}} Done adding permissions. */
3293         new->socket_group = default_socket.socket_group;
3294         new->socket_permissions = default_socket.socket_permissions;
3296         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3297                          &config_listen_address_list_len, new))
3298         {
3299           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3300           return (2);
3301         }
3302       }
3303       break;
3305       /* set socket group permissions */
3306       case 's':
3307       {
3308         gid_t group_gid;
3309         struct group *grp;
3311         group_gid = strtoul(optarg, NULL, 10);
3312         if (errno != EINVAL && group_gid>0)
3313         {
3314           /* we were passed a number */
3315           grp = getgrgid(group_gid);
3316         }
3317         else
3318         {
3319           grp = getgrnam(optarg);
3320         }
3322         if (grp)
3323         {
3324           default_socket.socket_group = grp->gr_gid;
3325         }
3326         else
3327         {
3328           /* no idea what the user wanted... */
3329           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3330           return (5);
3331         }
3332       }
3333       break;
3335       /* set socket file permissions */
3336       case 'm':
3337       {
3338         long  tmp;
3339         char *endptr = NULL;
3341         tmp = strtol (optarg, &endptr, 8);
3342         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3343             || (tmp > 07777) || (tmp < 0)) {
3344           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3345               optarg);
3346           return (5);
3347         }
3349         default_socket.socket_permissions = (mode_t)tmp;
3350       }
3351       break;
3353       case 'P':
3354       {
3355         char *optcopy;
3356         char *saveptr;
3357         char *dummy;
3358         char *ptr;
3360         socket_permission_clear (&default_socket);
3362         optcopy = strdup (optarg);
3363         dummy = optcopy;
3364         saveptr = NULL;
3365         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3366         {
3367           dummy = NULL;
3368           status = socket_permission_add (&default_socket, ptr);
3369           if (status != 0)
3370           {
3371             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3372                 "socket failed. Most likely, this permission doesn't "
3373                 "exist. Check your command line.\n", ptr);
3374             status = 4;
3375           }
3376         }
3378         free (optcopy);
3379       }
3380       break;
3382       case 'f':
3383       {
3384         int temp;
3386         temp = atoi (optarg);
3387         if (temp > 0)
3388           config_flush_interval = temp;
3389         else
3390         {
3391           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3392           status = 3;
3393         }
3394       }
3395       break;
3397       case 'w':
3398       {
3399         int temp;
3401         temp = atoi (optarg);
3402         if (temp > 0)
3403           config_write_interval = temp;
3404         else
3405         {
3406           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3407           status = 2;
3408         }
3409       }
3410       break;
3412       case 'z':
3413       {
3414         int temp;
3416         temp = atoi(optarg);
3417         if (temp > 0)
3418           config_write_jitter = temp;
3419         else
3420         {
3421           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3422           status = 2;
3423         }
3425         break;
3426       }
3428       case 't':
3429       {
3430         int threads;
3431         threads = atoi(optarg);
3432         if (threads >= 1)
3433           config_queue_threads = threads;
3434         else
3435         {
3436           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3437           return 1;
3438         }
3439       }
3440       break;
3442       case 'B':
3443         config_write_base_only = 1;
3444         break;
3446       case 'b':
3447       {
3448         size_t len;
3449         char base_realpath[PATH_MAX];
3451         if (config_base_dir != NULL)
3452           free (config_base_dir);
3453         config_base_dir = strdup (optarg);
3454         if (config_base_dir == NULL)
3455         {
3456           fprintf (stderr, "read_options: strdup failed.\n");
3457           return (3);
3458         }
3460         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3461         {
3462           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3463               config_base_dir, rrd_strerror (errno));
3464           return (3);
3465         }
3467         /* make sure that the base directory is not resolved via
3468          * symbolic links.  this makes some performance-enhancing
3469          * assumptions possible (we don't have to resolve paths
3470          * that start with a "/")
3471          */
3472         if (realpath(config_base_dir, base_realpath) == NULL)
3473         {
3474           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3475               "%s\n", config_base_dir, rrd_strerror(errno));
3476           return 5;
3477         }
3479         len = strlen (config_base_dir);
3480         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3481         {
3482           config_base_dir[len - 1] = 0;
3483           len--;
3484         }
3486         if (len < 1)
3487         {
3488           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3489           return (4);
3490         }
3492         _config_base_dir_len = len;
3494         len = strlen (base_realpath);
3495         while ((len > 0) && (base_realpath[len - 1] == '/'))
3496         {
3497           base_realpath[len - 1] = '\0';
3498           len--;
3499         }
3501         if (strncmp(config_base_dir,
3502                          base_realpath, sizeof(base_realpath)) != 0)
3503         {
3504           fprintf(stderr,
3505                   "Base directory (-b) resolved via file system links!\n"
3506                   "Please consult rrdcached '-b' documentation!\n"
3507                   "Consider specifying the real directory (%s)\n",
3508                   base_realpath);
3509           return 5;
3510         }
3511       }
3512       break;
3514       case 'p':
3515       {
3516         if (config_pid_file != NULL)
3517           free (config_pid_file);
3518         config_pid_file = strdup (optarg);
3519         if (config_pid_file == NULL)
3520         {
3521           fprintf (stderr, "read_options: strdup failed.\n");
3522           return (3);
3523         }
3524       }
3525       break;
3527       case 'F':
3528         config_flush_at_shutdown = 1;
3529         break;
3531       case 'j':
3532       {
3533         char journal_dir_actual[PATH_MAX];
3534         const char *dir;
3535         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3537         status = rrd_mkdir_p(dir, 0777);
3538         if (status != 0)
3539         {
3540           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3541               dir, rrd_strerror(errno));
3542           return 6;
3543         }
3545         if (access(dir, R_OK|W_OK|X_OK) != 0)
3546         {
3547           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3548                   errno ? rrd_strerror(errno) : "");
3549           return 6;
3550         }
3551       }
3552       break;
3554       case 'a':
3555       {
3556         int temp = atoi(optarg);
3557         if (temp > 0)
3558           config_alloc_chunk = temp;
3559         else
3560         {
3561           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3562           return 10;
3563         }
3564       }
3565       break;
3567       case 'h':
3568       case '?':
3569         printf ("RRDCacheD %s\n"
3570             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3571             "\n"
3572             "Usage: rrdcached [options]\n"
3573             "\n"
3574             "Valid options are:\n"
3575             "  -l <address>  Socket address to listen to.\n"
3576             "  -P <perms>    Sets the permissions to assign to all following "
3577                             "sockets\n"
3578             "  -w <seconds>  Interval in which to write data.\n"
3579             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3580             "  -t <threads>  Number of write threads.\n"
3581             "  -f <seconds>  Interval in which to flush dead data.\n"
3582             "  -p <file>     Location of the PID-file.\n"
3583             "  -b <dir>      Base directory to change to.\n"
3584             "  -B            Restrict file access to paths within -b <dir>\n"
3585             "  -g            Do not fork and run in the foreground.\n"
3586             "  -j <dir>      Directory in which to create the journal files.\n"
3587             "  -F            Always flush all updates at shutdown\n"
3588             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3589             "                (the socket will also have read/write permissions "
3590                             "for that group)\n"
3591             "  -m <mode>     File permissions (octal) of all following UNIX "
3592                             "sockets\n"
3593             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3594             "  -O            Do not allow CREATE commands to overwrite existing\n"
3595             "                files, even if asked to.\n"
3596             "\n"
3597             "For more information and a detailed description of all options "
3598             "please refer\n"
3599             "to the rrdcached(1) manual page.\n",
3600             VERSION);
3601         if (option == 'h')
3602           status = -1;
3603         else
3604           status = 1;
3605         break;
3606     } /* switch (option) */
3607   } /* while (getopt) */
3609   /* advise the user when values are not sane */
3610   if (config_flush_interval < 2 * config_write_interval)
3611     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3612             " 2x write interval (-w) !\n");
3613   if (config_write_jitter > config_write_interval)
3614     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3615             " write interval (-w) !\n");
3617   if (config_write_base_only && config_base_dir == NULL)
3618     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3619             "  Consult the rrdcached documentation\n");
3621   if (journal_dir == NULL)
3622     config_flush_at_shutdown = 1;
3624   return (status);
3625 } /* }}} int read_options */
3627 int main (int argc, char **argv)
3629   int status;
3631   status = read_options (argc, argv);
3632   if (status != 0)
3633   {
3634     if (status < 0)
3635       status = 0;
3636     return (status);
3637   }
3639   status = daemonize ();
3640   if (status != 0)
3641   {
3642     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3643     return (1);
3644   }
3646   journal_init();
3648   /* start the queue threads */
3649   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3650   if (queue_threads == NULL)
3651   {
3652     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3653     cleanup();
3654     return (1);
3655   }
3656   for (int i = 0; i < config_queue_threads; i++)
3657   {
3658     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3659     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3660     if (status != 0)
3661     {
3662       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3663       cleanup();
3664       return (1);
3665     }
3666   }
3668   /* start the flush thread */
3669   memset(&flush_thread, 0, sizeof(flush_thread));
3670   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3671   if (status != 0)
3672   {
3673     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3674     cleanup();
3675     return (1);
3676   }
3678   listen_thread_main (NULL);
3679   cleanup ();
3681   return (0);
3682 } /* int main */
3684 /*
3685  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3686  */