Code

fix win32 distributables
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
67 #include "rrd_tool.h"
68 #include "rrd_client.h"
69 #include "unused.h"
71 #include <stdlib.h>
73 #ifndef WIN32
74 #ifdef HAVE_STDINT_H
75 #  include <stdint.h>
76 #endif
77 #include <unistd.h>
78 #include <strings.h>
79 #include <inttypes.h>
80 #include <sys/socket.h>
82 #else
84 #endif
85 #include <stdio.h>
86 #include <string.h>
88 #include <sys/types.h>
89 #include <sys/stat.h>
90 #include <dirent.h>
91 #include <fcntl.h>
92 #include <signal.h>
93 #include <sys/un.h>
94 #include <netdb.h>
95 #include <poll.h>
96 #include <syslog.h>
97 #include <pthread.h>
98 #include <errno.h>
99 #include <assert.h>
100 #include <sys/time.h>
101 #include <time.h>
102 #include <libgen.h>
103 #include <grp.h>
105 #ifdef HAVE_LIBWRAP
106 #include <tcpd.h>
107 #endif /* HAVE_LIBWRAP */
109 #include <glib-2.0/glib.h>
110 /* }}} */
112 #define RRDD_LOG(severity, ...) \
113   do { \
114     if (stay_foreground) { \
115       fprintf(stderr, __VA_ARGS__); \
116       fprintf(stderr, "\n"); } \
117     syslog ((severity), __VA_ARGS__); \
118   } while (0)
120 /*
121  * Types
122  */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125 struct listen_socket_s
127   int fd;
128   char addr[PATH_MAX + 1];
129   int family;
131   /* state for BATCH processing */
132   time_t batch_start;
133   int batch_cmd;
135   /* buffered IO */
136   char *rbuf;
137   off_t next_cmd;
138   off_t next_read;
140   char *wbuf;
141   ssize_t wbuf_len;
143   uint32_t permissions;
145   gid_t  socket_group;
146   mode_t socket_permissions;
147 };
148 typedef struct listen_socket_s listen_socket_t;
150 struct command_s;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
154                         time_t UNUSED(now),\
155                         char  UNUSED(*buffer),\
156                         size_t UNUSED(buffer_size)
158 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
159                         DISPATCH_PROTO
161 struct command_s {
162   char   *cmd;
163   int (*handler)(HANDLER_PROTO);
165   char  context;                /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT      (1<<0)
167 #define CMD_CONTEXT_BATCH       (1<<1)
168 #define CMD_CONTEXT_JOURNAL     (1<<2)
169 #define CMD_CONTEXT_ANY         (0x7f)
171   char *syntax;
172   char *help;
173 };
175 struct cache_item_s;
176 typedef struct cache_item_s cache_item_t;
177 struct cache_item_s
179   char *file;
180   char **values;
181   size_t values_num;            /* number of valid pointers */
182   size_t values_alloc;          /* number of allocated pointers */
183   time_t last_flush_time;
184   double last_update_stamp;
185 #define CI_FLAGS_IN_TREE  (1<<0)
186 #define CI_FLAGS_IN_QUEUE (1<<1)
187   int flags;
188   pthread_cond_t  flushed;
189   cache_item_t *prev;
190   cache_item_t *next;
191 };
193 struct callback_flush_data_s
195   time_t now;
196   time_t abs_timeout;
197   char **keys;
198   size_t keys_num;
199 };
200 typedef struct callback_flush_data_s callback_flush_data_t;
202 enum queue_side_e
204   HEAD,
205   TAIL
206 };
207 typedef enum queue_side_e queue_side_t;
209 /* describe a set of journal files */
210 typedef struct {
211   char **files;
212   size_t files_num;
213 } journal_set;
215 #define RBUF_SIZE (RRD_CMD_MAX*2)
217 /*
218  * Variables
219  */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
226 static listen_socket_t default_socket;
228 enum {
229   RUNNING,              /* normal operation */
230   FLUSHING,             /* flushing remaining values */
231   SHUTDOWN              /* shutting down */
232 } state = RUNNING;
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
245 /* Cache stuff */
246 static GTree          *cache_tree = NULL;
247 static cache_item_t   *cache_queue_head = NULL;
248 static cache_item_t   *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
251 static int config_write_interval = 300;
252 static int config_write_jitter   = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
259 static size_t config_alloc_chunk = 1;
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
273 static int opt_no_overwrite = 0; /* default for the daemon */
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL;         /* current journal file handle */
282 static long  journal_size = 0;          /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
292 /* 
293  * Functions
294  */
295 static void sig_common (const char *sig) /* {{{ */
297   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298   state = FLUSHING;
299   pthread_cond_broadcast(&flush_cond);
300   pthread_cond_broadcast(&queue_cond);
301 } /* }}} void sig_common */
303 static void sig_int_handler (int UNUSED(s)) /* {{{ */
305   sig_common("INT");
306 } /* }}} void sig_int_handler */
308 static void sig_term_handler (int UNUSED(s)) /* {{{ */
310   sig_common("TERM");
311 } /* }}} void sig_term_handler */
313 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
315   config_flush_at_shutdown = 1;
316   sig_common("USR1");
317 } /* }}} void sig_usr1_handler */
319 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
321   config_flush_at_shutdown = 0;
322   sig_common("USR2");
323 } /* }}} void sig_usr2_handler */
325 static void install_signal_handlers(void) /* {{{ */
327   /* These structures are static, because `sigaction' behaves weird if the are
328    * overwritten.. */
329   static struct sigaction sa_int;
330   static struct sigaction sa_term;
331   static struct sigaction sa_pipe;
332   static struct sigaction sa_usr1;
333   static struct sigaction sa_usr2;
335   /* Install signal handlers */
336   memset (&sa_int, 0, sizeof (sa_int));
337   sa_int.sa_handler = sig_int_handler;
338   sigaction (SIGINT, &sa_int, NULL);
340   memset (&sa_term, 0, sizeof (sa_term));
341   sa_term.sa_handler = sig_term_handler;
342   sigaction (SIGTERM, &sa_term, NULL);
344   memset (&sa_pipe, 0, sizeof (sa_pipe));
345   sa_pipe.sa_handler = SIG_IGN;
346   sigaction (SIGPIPE, &sa_pipe, NULL);
348   memset (&sa_pipe, 0, sizeof (sa_usr1));
349   sa_usr1.sa_handler = sig_usr1_handler;
350   sigaction (SIGUSR1, &sa_usr1, NULL);
352   memset (&sa_usr2, 0, sizeof (sa_usr2));
353   sa_usr2.sa_handler = sig_usr2_handler;
354   sigaction (SIGUSR2, &sa_usr2, NULL);
356 } /* }}} void install_signal_handlers */
358 static int open_pidfile(char *action, int oflag) /* {{{ */
360   int fd;
361   const char *file;
362   char *file_copy, *dir;
364   file = (config_pid_file != NULL)
365     ? config_pid_file
366     : LOCALSTATEDIR "/run/rrdcached.pid";
368   /* dirname may modify its argument */
369   file_copy = strdup(file);
370   if (file_copy == NULL)
371   {
372     fprintf(stderr, "rrdcached: strdup(): %s\n",
373         rrd_strerror(errno));
374     return -1;
375   }
377   dir = dirname(file_copy);
378   if (rrd_mkdir_p(dir, 0777) != 0)
379   {
380     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
381         dir, rrd_strerror(errno));
382     return -1;
383   }
385   free(file_copy);
387   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
388   if (fd < 0)
389     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
390             action, file, rrd_strerror(errno));
392   return(fd);
393 } /* }}} static int open_pidfile */
395 /* check existing pid file to see whether a daemon is running */
396 static int check_pidfile(void)
398   int pid_fd;
399   pid_t pid;
400   char pid_str[16];
402   pid_fd = open_pidfile("open", O_RDWR);
403   if (pid_fd < 0)
404     return pid_fd;
406   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
407     return -1;
409   pid = atoi(pid_str);
410   if (pid <= 0)
411     return -1;
413   /* another running process that we can signal COULD be
414    * a competing rrdcached */
415   if (pid != getpid() && kill(pid, 0) == 0)
416   {
417     fprintf(stderr,
418             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
419     close(pid_fd);
420     return -1;
421   }
423   lseek(pid_fd, 0, SEEK_SET);
424   if (ftruncate(pid_fd, 0) == -1)
425   {
426     fprintf(stderr,
427             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
428     close(pid_fd);
429     return -1;
430   }
432   fprintf(stderr,
433           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
434           "rrdcached: starting normally.\n", pid);
436   return pid_fd;
437 } /* }}} static int check_pidfile */
439 static int write_pidfile (int fd) /* {{{ */
441   pid_t pid;
442   FILE *fh;
444   pid = getpid ();
446   fh = fdopen (fd, "w");
447   if (fh == NULL)
448   {
449     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
450     close(fd);
451     return (-1);
452   }
454   fprintf (fh, "%i\n", (int) pid);
455   fclose (fh);
457   return (0);
458 } /* }}} int write_pidfile */
460 static int remove_pidfile (void) /* {{{ */
462   char *file;
463   int status;
465   file = (config_pid_file != NULL)
466     ? config_pid_file
467     : LOCALSTATEDIR "/run/rrdcached.pid";
469   status = unlink (file);
470   if (status == 0)
471     return (0);
472   return (errno);
473 } /* }}} int remove_pidfile */
475 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
477   char *eol;
479   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
480                sock->next_read - sock->next_cmd);
482   if (eol == NULL)
483   {
484     /* no commands left, move remainder back to front of rbuf */
485     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
486             sock->next_read - sock->next_cmd);
487     sock->next_read -= sock->next_cmd;
488     sock->next_cmd = 0;
489     *len = 0;
490     return NULL;
491   }
492   else
493   {
494     char *cmd = sock->rbuf + sock->next_cmd;
495     *eol = '\0';
497     sock->next_cmd = eol - sock->rbuf + 1;
499     if (eol > sock->rbuf && *(eol-1) == '\r')
500       *(--eol) = '\0'; /* handle "\r\n" EOL */
502     *len = eol - cmd;
504     return cmd;
505   }
507   /* NOTREACHED */
508   assert(1==0);
509 } /* }}} char *next_cmd */
511 /* add the characters directly to the write buffer */
512 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
514   char *new_buf;
516   assert(sock != NULL);
518   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
519   if (new_buf == NULL)
520   {
521     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
522     return -1;
523   }
525   strncpy(new_buf + sock->wbuf_len, str, len + 1);
527   sock->wbuf = new_buf;
528   sock->wbuf_len += len;
530   return 0;
531 } /* }}} static int add_to_wbuf */
533 /* add the text to the "extra" info that's sent after the status line */
534 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
536   va_list argp;
537   char buffer[RRD_CMD_MAX];
538   int len;
540   if (JOURNAL_REPLAY(sock)) return 0;
541   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
543   va_start(argp, fmt);
544 #ifdef HAVE_VSNPRINTF
545   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
546 #else
547   len = vsprintf(buffer, fmt, argp);
548 #endif
549   va_end(argp);
550   if (len < 0)
551   {
552     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
553     return -1;
554   }
556   return add_to_wbuf(sock, buffer, len);
557 } /* }}} static int add_response_info */
559 static int count_lines(char *str) /* {{{ */
561   int lines = 0;
563   if (str != NULL)
564   {
565     while ((str = strchr(str, '\n')) != NULL)
566     {
567       ++lines;
568       ++str;
569     }
570   }
572   return lines;
573 } /* }}} static int count_lines */
575 /* send the response back to the user.
576  * returns 0 on success, -1 on error
577  * write buffer is always zeroed after this call */
578 static int send_response (listen_socket_t *sock, response_code rc,
579                           char *fmt, ...) /* {{{ */
581   va_list argp;
582   char buffer[RRD_CMD_MAX];
583   int lines;
584   ssize_t wrote;
585   int rclen, len;
587   if (JOURNAL_REPLAY(sock)) return rc;
589   if (sock->batch_start)
590   {
591     if (rc == RESP_OK)
592       return rc; /* no response on success during BATCH */
593     lines = sock->batch_cmd;
594   }
595   else if (rc == RESP_OK)
596     lines = count_lines(sock->wbuf);
597   else
598     lines = -1;
600   rclen = sprintf(buffer, "%d ", lines);
601   va_start(argp, fmt);
602 #ifdef HAVE_VSNPRINTF
603   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
604 #else
605   len = vsprintf(buffer+rclen, fmt, argp);
606 #endif
607   va_end(argp);
608   if (len < 0)
609     return -1;
611   len += rclen;
613   /* append the result to the wbuf, don't write to the user */
614   if (sock->batch_start)
615     return add_to_wbuf(sock, buffer, len);
617   /* first write must be complete */
618   if (len != write(sock->fd, buffer, len))
619   {
620     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
621     return -1;
622   }
624   if (sock->wbuf != NULL && rc == RESP_OK)
625   {
626     wrote = 0;
627     while (wrote < sock->wbuf_len)
628     {
629       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
630       if (wb <= 0)
631       {
632         RRDD_LOG(LOG_INFO, "send_response: could not write results");
633         return -1;
634       }
635       wrote += wb;
636     }
637   }
639   free(sock->wbuf); sock->wbuf = NULL;
640   sock->wbuf_len = 0;
642   return 0;
643 } /* }}} */
645 static void wipe_ci_values(cache_item_t *ci, time_t when)
647   ci->values = NULL;
648   ci->values_num = 0;
649   ci->values_alloc = 0;
651   ci->last_flush_time = when;
652   if (config_write_jitter > 0)
653     ci->last_flush_time += (rrd_random() % config_write_jitter);
656 /* remove_from_queue
657  * remove a "cache_item_t" item from the queue.
658  * must hold 'cache_lock' when calling this
659  */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
662   if (ci == NULL) return;
663   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
665   if (ci->prev == NULL)
666     cache_queue_head = ci->next; /* reset head */
667   else
668     ci->prev->next = ci->next;
670   if (ci->next == NULL)
671     cache_queue_tail = ci->prev; /* reset the tail */
672   else
673     ci->next->prev = ci->prev;
675   ci->next = ci->prev = NULL;
676   ci->flags &= ~CI_FLAGS_IN_QUEUE;
678   pthread_mutex_lock (&stats_lock);
679   assert (stats_queue_length > 0);
680   stats_queue_length--;
681   pthread_mutex_unlock (&stats_lock);
683 } /* }}} static void remove_from_queue */
685 /* free the resources associated with the cache_item_t
686  * must hold cache_lock when calling this function
687  */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
690   if (ci == NULL) return NULL;
692   remove_from_queue(ci);
694   for (size_t i=0; i < ci->values_num; i++)
695     free(ci->values[i]);
697   free (ci->values);
698   free (ci->file);
700   /* in case anyone is waiting */
701   pthread_cond_broadcast(&ci->flushed);
702   pthread_cond_destroy(&ci->flushed);
704   free (ci);
706   return NULL;
707 } /* }}} static void *free_cache_item */
709 /*
710  * enqueue_cache_item:
711  * `cache_lock' must be acquired before calling this function!
712  */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714     queue_side_t side)
716   if (ci == NULL)
717     return (-1);
719   if (ci->values_num == 0)
720     return (0);
722   if (side == HEAD)
723   {
724     if (cache_queue_head == ci)
725       return 0;
727     /* remove if further down in queue */
728     remove_from_queue(ci);
730     ci->prev = NULL;
731     ci->next = cache_queue_head;
732     if (ci->next != NULL)
733       ci->next->prev = ci;
734     cache_queue_head = ci;
736     if (cache_queue_tail == NULL)
737       cache_queue_tail = cache_queue_head;
738   }
739   else /* (side == TAIL) */
740   {
741     /* We don't move values back in the list.. */
742     if (ci->flags & CI_FLAGS_IN_QUEUE)
743       return (0);
745     assert (ci->next == NULL);
746     assert (ci->prev == NULL);
748     ci->prev = cache_queue_tail;
750     if (cache_queue_tail == NULL)
751       cache_queue_head = ci;
752     else
753       cache_queue_tail->next = ci;
755     cache_queue_tail = ci;
756   }
758   ci->flags |= CI_FLAGS_IN_QUEUE;
760   pthread_cond_signal(&queue_cond);
761   pthread_mutex_lock (&stats_lock);
762   stats_queue_length++;
763   pthread_mutex_unlock (&stats_lock);
765   return (0);
766 } /* }}} int enqueue_cache_item */
768 /*
769  * tree_callback_flush:
770  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771  * while this is in progress.
772  */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774     gpointer data)
776   cache_item_t *ci;
777   callback_flush_data_t *cfd;
779   ci = (cache_item_t *) value;
780   cfd = (callback_flush_data_t *) data;
782   if (ci->flags & CI_FLAGS_IN_QUEUE)
783     return FALSE;
785   if (ci->values_num > 0
786       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787   {
788     enqueue_cache_item (ci, TAIL);
789   }
790   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791       && (ci->values_num <= 0))
792   {
793     assert ((char *) key == ci->file);
794     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795     {
796       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797       return (FALSE);
798     }
799   }
801   return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
804 static int flush_old_values (int max_age)
806   callback_flush_data_t cfd;
807   size_t k;
809   memset (&cfd, 0, sizeof (cfd));
810   /* Pass the current time as user data so that we don't need to call
811    * `time' for each node. */
812   cfd.now = time (NULL);
813   cfd.keys = NULL;
814   cfd.keys_num = 0;
816   if (max_age > 0)
817     cfd.abs_timeout = cfd.now - max_age;
818   else
819     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
821   /* `tree_callback_flush' will return the keys of all values that haven't
822    * been touched in the last `config_flush_interval' seconds in `cfd'.
823    * The char*'s in this array point to the same memory as ci->file, so we
824    * don't need to free them separately. */
825   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
827   for (k = 0; k < cfd.keys_num; k++)
828   {
829     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830     /* should never fail, since we have held the cache_lock
831      * the entire time */
832     assert(status == TRUE);
833   }
835   if (cfd.keys != NULL)
836   {
837     free (cfd.keys);
838     cfd.keys = NULL;
839   }
841   return (0);
842 } /* int flush_old_values */
844 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
846   struct timeval now;
847   struct timespec next_flush;
848   int status;
850   gettimeofday (&now, NULL);
851   next_flush.tv_sec = now.tv_sec + config_flush_interval;
852   next_flush.tv_nsec = 1000 * now.tv_usec;
854   pthread_mutex_lock(&cache_lock);
856   while (state == RUNNING)
857   {
858     gettimeofday (&now, NULL);
859     if ((now.tv_sec > next_flush.tv_sec)
860         || ((now.tv_sec == next_flush.tv_sec)
861           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862     {
863       RRDD_LOG(LOG_DEBUG, "flushing old values");
865       /* Determine the time of the next cache flush. */
866       next_flush.tv_sec = now.tv_sec + config_flush_interval;
868       /* Flush all values that haven't been written in the last
869        * `config_write_interval' seconds. */
870       flush_old_values (config_write_interval);
872       /* unlock the cache while we rotate so we don't block incoming
873        * updates if the fsync() blocks on disk I/O */
874       pthread_mutex_unlock(&cache_lock);
875       journal_rotate();
876       pthread_mutex_lock(&cache_lock);
877     }
879     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880     if (status != 0 && status != ETIMEDOUT)
881     {
882       RRDD_LOG (LOG_ERR, "flush_thread_main: "
883                 "pthread_cond_timedwait returned %i.", status);
884     }
885   }
887   if (config_flush_at_shutdown)
888     flush_old_values (-1); /* flush everything */
890   state = SHUTDOWN;
892   pthread_mutex_unlock(&cache_lock);
894   return NULL;
895 } /* void *flush_thread_main */
897 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
899   pthread_mutex_lock (&cache_lock);
901   while (state != SHUTDOWN
902          || (cache_queue_head != NULL && config_flush_at_shutdown))
903   {
904     cache_item_t *ci;
905     char *file;
906     char **values;
907     size_t values_num;
908     int status;
910     /* Now, check if there's something to store away. If not, wait until
911      * something comes in. */
912     if (cache_queue_head == NULL)
913     {
914       status = pthread_cond_wait (&queue_cond, &cache_lock);
915       if ((status != 0) && (status != ETIMEDOUT))
916       {
917         RRDD_LOG (LOG_ERR, "queue_thread_main: "
918             "pthread_cond_wait returned %i.", status);
919       }
920     }
922     /* Check if a value has arrived. This may be NULL if we timed out or there
923      * was an interrupt such as a signal. */
924     if (cache_queue_head == NULL)
925       continue;
927     ci = cache_queue_head;
929     /* copy the relevant parts */
930     file = strdup (ci->file);
931     if (file == NULL)
932     {
933       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934       continue;
935     }
937     assert(ci->values != NULL);
938     assert(ci->values_num > 0);
940     values = ci->values;
941     values_num = ci->values_num;
943     wipe_ci_values(ci, time(NULL));
944     remove_from_queue(ci);
946     pthread_mutex_unlock (&cache_lock);
948     rrd_clear_error ();
949     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950     if (status != 0)
951     {
952       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953           "rrd_update_r (%s) failed with status %i. (%s)",
954           file, status, rrd_get_error());
955     }
957     journal_write("wrote", file);
959     /* Search again in the tree.  It's possible someone issued a "FORGET"
960      * while we were writing the update values. */
961     pthread_mutex_lock(&cache_lock);
962     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963     if (ci)
964       pthread_cond_broadcast(&ci->flushed);
965     pthread_mutex_unlock(&cache_lock);
967     if (status == 0)
968     {
969       pthread_mutex_lock (&stats_lock);
970       stats_updates_written++;
971       stats_data_sets_written += values_num;
972       pthread_mutex_unlock (&stats_lock);
973     }
975     rrd_free_ptrs((void ***) &values, &values_num);
976     free(file);
978     pthread_mutex_lock (&cache_lock);
979   }
980   pthread_mutex_unlock (&cache_lock);
982   return (NULL);
983 } /* }}} void *queue_thread_main */
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986     size_t *buffer_size_ret, char **field_ret)
988   char *buffer;
989   size_t buffer_pos;
990   size_t buffer_size;
991   char *field;
992   size_t field_size;
993   int status;
995   buffer = *buffer_ret;
996   buffer_pos = 0;
997   buffer_size = *buffer_size_ret;
998   field = *buffer_ret;
999   field_size = 0;
1001   if (buffer_size <= 0)
1002     return (-1);
1004   /* This is ensured by `handle_request'. */
1005   assert (buffer[buffer_size - 1] == '\0');
1007   status = -1;
1008   while (buffer_pos < buffer_size)
1009   {
1010     /* Check for end-of-field or end-of-buffer */
1011     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012     {
1013       field[field_size] = 0;
1014       field_size++;
1015       buffer_pos++;
1016       status = 0;
1017       break;
1018     }
1019     /* Handle escaped characters. */
1020     else if (buffer[buffer_pos] == '\\')
1021     {
1022       if (buffer_pos >= (buffer_size - 1))
1023         break;
1024       buffer_pos++;
1025       field[field_size] = buffer[buffer_pos];
1026       field_size++;
1027       buffer_pos++;
1028     }
1029     /* Normal operation */ 
1030     else
1031     {
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036   } /* while (buffer_pos < buffer_size) */
1038   if (status != 0)
1039     return (status);
1041   *buffer_ret = buffer + buffer_pos;
1042   *buffer_size_ret = buffer_size - buffer_pos;
1043   *field_ret = field;
1045   return (0);
1046 } /* }}} int buffer_get_field */
1048 /* if we're restricting writes to the base directory,
1049  * check whether the file falls within the dir
1050  * returns 1 if OK, otherwise 0
1051  */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1054   assert(file != NULL);
1056   if (!config_write_base_only
1057       || JOURNAL_REPLAY(sock)
1058       || config_base_dir == NULL)
1059     return 1;
1061   if (strstr(file, "../") != NULL) goto err;
1063   /* relative paths without "../" are ok */
1064   if (*file != '/') return 1;
1066   /* file must be of the format base + "/" + <1+ char filename> */
1067   if (strlen(file) < _config_base_dir_len + 2) goto err;
1068   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069   if (*(file + _config_base_dir_len) != '/') goto err;
1071   return 1;
1073 err:
1074   if (sock != NULL && sock->fd >= 0)
1075     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1077   return 0;
1078 } /* }}} static int check_file_access */
1080 /* when using a base dir, convert relative paths to absolute paths.
1081  * if necessary, modifies the "filename" pointer to point
1082  * to the new path created in "tmp".  "tmp" is provided
1083  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084  *
1085  * this allows us to optimize for the expected case (absolute path)
1086  * with a no-op.
1087  */
1088 static void get_abs_path(char **filename, char *tmp)
1090   assert(tmp != NULL);
1091   assert(filename != NULL && *filename != NULL);
1093   if (config_base_dir == NULL || **filename == '/')
1094     return;
1096   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097   *filename = tmp;
1098 } /* }}} static int get_abs_path */
1100 static int flush_file (const char *filename) /* {{{ */
1102   cache_item_t *ci;
1104   pthread_mutex_lock (&cache_lock);
1106   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107   if (ci == NULL)
1108   {
1109     pthread_mutex_unlock (&cache_lock);
1110     return (ENOENT);
1111   }
1113   if (ci->values_num > 0)
1114   {
1115     /* Enqueue at head */
1116     enqueue_cache_item (ci, HEAD);
1117     pthread_cond_wait(&ci->flushed, &cache_lock);
1118   }
1120   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1121    * may have been purged during our cond_wait() */
1123   pthread_mutex_unlock(&cache_lock);
1125   return (0);
1126 } /* }}} int flush_file */
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1130   char *err = "Syntax error.\n";
1132   if (cmd && cmd->syntax)
1133     err = cmd->syntax;
1135   return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1140   uint64_t copy_queue_length;
1141   uint64_t copy_updates_received;
1142   uint64_t copy_flush_received;
1143   uint64_t copy_updates_written;
1144   uint64_t copy_data_sets_written;
1145   uint64_t copy_journal_bytes;
1146   uint64_t copy_journal_rotate;
1148   uint64_t tree_nodes_number;
1149   uint64_t tree_depth;
1151   pthread_mutex_lock (&stats_lock);
1152   copy_queue_length       = stats_queue_length;
1153   copy_updates_received   = stats_updates_received;
1154   copy_flush_received     = stats_flush_received;
1155   copy_updates_written    = stats_updates_written;
1156   copy_data_sets_written  = stats_data_sets_written;
1157   copy_journal_bytes      = stats_journal_bytes;
1158   copy_journal_rotate     = stats_journal_rotate;
1159   pthread_mutex_unlock (&stats_lock);
1161   pthread_mutex_lock (&cache_lock);
1162   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1164   pthread_mutex_unlock (&cache_lock);
1166   add_response_info(sock,
1167                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1168   add_response_info(sock,
1169                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170   add_response_info(sock,
1171                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172   add_response_info(sock,
1173                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174   add_response_info(sock,
1175                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1181   send_response(sock, RESP_OK, "Statistics follow\n");
1183   return (0);
1184 } /* }}} int handle_request_stats */
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1188   char *file, file_tmp[PATH_MAX];
1189   int status;
1191   status = buffer_get_field (&buffer, &buffer_size, &file);
1192   if (status != 0)
1193   {
1194     return syntax_error(sock,cmd);
1195   }
1196   else
1197   {
1198     pthread_mutex_lock(&stats_lock);
1199     stats_flush_received++;
1200     pthread_mutex_unlock(&stats_lock);
1202     get_abs_path(&file, file_tmp);
1203     if (!check_file_access(file, sock)) return 0;
1205     status = flush_file (file);
1206     if (status == 0)
1207       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208     else if (status == ENOENT)
1209     {
1210       /* no file in our tree; see whether it exists at all */
1211       struct stat statbuf;
1213       memset(&statbuf, 0, sizeof(statbuf));
1214       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216       else
1217         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218     }
1219     else if (status < 0)
1220       return send_response(sock, RESP_ERR, "Internal error.\n");
1221     else
1222       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223   }
1225   /* NOTREACHED */
1226   assert(1==0);
1227 } /* }}} int handle_request_flush */
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1231   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1233   pthread_mutex_lock(&cache_lock);
1234   flush_old_values(-1);
1235   pthread_mutex_unlock(&cache_lock);
1237   return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1242   int status;
1243   char *file, file_tmp[PATH_MAX];
1244   cache_item_t *ci;
1246   status = buffer_get_field(&buffer, &buffer_size, &file);
1247   if (status != 0)
1248     return syntax_error(sock,cmd);
1250   get_abs_path(&file, file_tmp);
1252   pthread_mutex_lock(&cache_lock);
1253   ci = g_tree_lookup(cache_tree, file);
1254   if (ci == NULL)
1255   {
1256     pthread_mutex_unlock(&cache_lock);
1257     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258   }
1260   for (size_t i=0; i < ci->values_num; i++)
1261     add_response_info(sock, "%s\n", ci->values[i]);
1263   pthread_mutex_unlock(&cache_lock);
1264   return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1269   int status;
1270   gboolean found;
1271   char *file, file_tmp[PATH_MAX];
1273   status = buffer_get_field(&buffer, &buffer_size, &file);
1274   if (status != 0)
1275     return syntax_error(sock,cmd);
1277   get_abs_path(&file, file_tmp);
1278   if (!check_file_access(file, sock)) return 0;
1280   pthread_mutex_lock(&cache_lock);
1281   found = g_tree_remove(cache_tree, file);
1282   pthread_mutex_unlock(&cache_lock);
1284   if (found == TRUE)
1285   {
1286     if (!JOURNAL_REPLAY(sock))
1287       journal_write("forget", file);
1289     return send_response(sock, RESP_OK, "Gone!\n");
1290   }
1291   else
1292     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1294   /* NOTREACHED */
1295   assert(1==0);
1296 } /* }}} static int handle_request_forget */
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1300   cache_item_t *ci;
1302   pthread_mutex_lock(&cache_lock);
1304   ci = cache_queue_head;
1305   while (ci != NULL)
1306   {
1307     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308     ci = ci->next;
1309   }
1311   pthread_mutex_unlock(&cache_lock);
1313   return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1318   char *file, file_tmp[PATH_MAX];
1319   int values_num = 0;
1320   int status;
1321   char orig_buf[RRD_CMD_MAX];
1323   cache_item_t *ci;
1325   /* save it for the journal later */
1326   if (!JOURNAL_REPLAY(sock))
1327     strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return syntax_error(sock,cmd);
1333   pthread_mutex_lock(&stats_lock);
1334   stats_updates_received++;
1335   pthread_mutex_unlock(&stats_lock);
1337   get_abs_path(&file, file_tmp);
1338   if (!check_file_access(file, sock)) return 0;
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346     cache_item_t *tmp;
1348     /* don't hold the lock while we setup; stat(2) might block */
1349     pthread_mutex_unlock(&cache_lock);
1351     memset (&statbuf, 0, sizeof (statbuf));
1352     status = stat (file, &statbuf);
1353     if (status != 0)
1354     {
1355       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1357       status = errno;
1358       if (status == ENOENT)
1359         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360       else
1361         return send_response(sock, RESP_ERR,
1362                              "stat failed with error %i.\n", status);
1363     }
1364     if (!S_ISREG (statbuf.st_mode))
1365       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1367     if (access(file, R_OK|W_OK) != 0)
1368       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369                            file, rrd_strerror(errno));
1371     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372     if (ci == NULL)
1373     {
1374       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1376       return send_response(sock, RESP_ERR, "malloc failed.\n");
1377     }
1378     memset (ci, 0, sizeof (cache_item_t));
1380     ci->file = strdup (file);
1381     if (ci->file == NULL)
1382     {
1383       free (ci);
1384       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1386       return send_response(sock, RESP_ERR, "strdup failed.\n");
1387     }
1389     wipe_ci_values(ci, now);
1390     ci->flags = CI_FLAGS_IN_TREE;
1391     pthread_cond_init(&ci->flushed, NULL);
1393     pthread_mutex_lock(&cache_lock);
1395     /* another UPDATE might have added this entry in the meantime */
1396     tmp = g_tree_lookup (cache_tree, file);
1397     if (tmp == NULL)
1398       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399     else
1400     {
1401       free_cache_item (ci);
1402       ci = tmp;
1403     }
1405     /* state may have changed while we were unlocked */
1406     if (state == SHUTDOWN)
1407       return -1;
1408   } /* }}} */
1409   assert (ci != NULL);
1411   /* don't re-write updates in replay mode */
1412   if (!JOURNAL_REPLAY(sock))
1413     journal_write("update", orig_buf);
1415   while (buffer_size > 0)
1416   {
1417     char *value;
1418     double stamp;
1419     char *eostamp;
1421     status = buffer_get_field (&buffer, &buffer_size, &value);
1422     if (status != 0)
1423     {
1424       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425       break;
1426     }
1428     /* make sure update time is always moving forward. We use double here since
1429        update does support subsecond precision for timestamps ... */
1430     stamp = strtod(value, &eostamp);
1431     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1432     {
1433       pthread_mutex_unlock(&cache_lock);
1434       return send_response(sock, RESP_ERR,
1435                            "Cannot find timestamp in '%s'!\n", value);
1436     }
1437     else if (stamp <= ci->last_update_stamp)
1438     {
1439       pthread_mutex_unlock(&cache_lock);
1440       return send_response(sock, RESP_ERR,
1441                            "illegal attempt to update using time %lf when last"
1442                            " update time is %lf (minimum one second step)\n",
1443                            stamp, ci->last_update_stamp);
1444     }
1445     else
1446       ci->last_update_stamp = stamp;
1448     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1449                               &ci->values_alloc, config_alloc_chunk))
1450     {
1451       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1452       continue;
1453     }
1455     values_num++;
1456   }
1458   if (((now - ci->last_flush_time) >= config_write_interval)
1459       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1460       && (ci->values_num > 0))
1461   {
1462     enqueue_cache_item (ci, TAIL);
1463   }
1465   pthread_mutex_unlock (&cache_lock);
1467   if (values_num < 1)
1468     return send_response(sock, RESP_ERR, "No values updated.\n");
1469   else
1470     return send_response(sock, RESP_OK,
1471                          "errors, enqueued %i value(s).\n", values_num);
1473   /* NOTREACHED */
1474   assert(1==0);
1476 } /* }}} int handle_request_update */
1478 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1480   char *file, file_tmp[PATH_MAX];
1481   char *cf;
1483   char *start_str;
1484   char *end_str;
1485   time_t start_tm;
1486   time_t end_tm;
1488   unsigned long step;
1489   unsigned long ds_cnt;
1490   char **ds_namv;
1491   rrd_value_t *data;
1493   int status;
1494   unsigned long i;
1495   time_t t;
1496   rrd_value_t *data_ptr;
1498   file = NULL;
1499   cf = NULL;
1500   start_str = NULL;
1501   end_str = NULL;
1503   /* Read the arguments */
1504   do /* while (0) */
1505   {
1506     status = buffer_get_field (&buffer, &buffer_size, &file);
1507     if (status != 0)
1508       break;
1510     status = buffer_get_field (&buffer, &buffer_size, &cf);
1511     if (status != 0)
1512       break;
1514     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1515     if (status != 0)
1516     {
1517       start_str = NULL;
1518       status = 0;
1519       break;
1520     }
1522     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1523     if (status != 0)
1524     {
1525       end_str = NULL;
1526       status = 0;
1527       break;
1528     }
1529   } while (0);
1531   if (status != 0)
1532     return (syntax_error(sock,cmd));
1534   get_abs_path(&file, file_tmp);
1535   if (!check_file_access(file, sock)) return 0;
1537   status = flush_file (file);
1538   if ((status != 0) && (status != ENOENT))
1539     return (send_response (sock, RESP_ERR,
1540           "flush_file (%s) failed with status %i.\n", file, status));
1542   t = time (NULL); /* "now" */
1544   /* Parse start time */
1545   if (start_str != NULL)
1546   {
1547     char *endptr;
1548     long value;
1550     endptr = NULL;
1551     errno = 0;
1552     value = strtol (start_str, &endptr, /* base = */ 0);
1553     if ((endptr == start_str) || (errno != 0))
1554       return (send_response(sock, RESP_ERR,
1555             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1556             start_str));
1558     if (value > 0)
1559       start_tm = (time_t) value;
1560     else
1561       start_tm = (time_t) (t + value);
1562   }
1563   else
1564   {
1565     start_tm = t - 86400;
1566   }
1568   /* Parse end time */
1569   if (end_str != NULL)
1570   {
1571     char *endptr;
1572     long value;
1574     endptr = NULL;
1575     errno = 0;
1576     value = strtol (end_str, &endptr, /* base = */ 0);
1577     if ((endptr == end_str) || (errno != 0))
1578       return (send_response(sock, RESP_ERR,
1579             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1580             end_str));
1582     if (value > 0)
1583       end_tm = (time_t) value;
1584     else
1585       end_tm = (time_t) (t + value);
1586   }
1587   else
1588   {
1589     end_tm = t;
1590   }
1592   step = -1;
1593   ds_cnt = 0;
1594   ds_namv = NULL;
1595   data = NULL;
1597   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1598       &ds_cnt, &ds_namv, &data);
1599   if (status != 0)
1600     return (send_response(sock, RESP_ERR,
1601           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1603   add_response_info (sock, "FlushVersion: %lu\n", 1);
1604   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1605   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1606   add_response_info (sock, "Step: %lu\n", step);
1607   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1609 #define SSTRCAT(buffer,str,buffer_fill) do { \
1610     size_t str_len = strlen (str); \
1611     if ((buffer_fill + str_len) > sizeof (buffer)) \
1612       str_len = sizeof (buffer) - buffer_fill; \
1613     if (str_len > 0) { \
1614       strncpy (buffer + buffer_fill, str, str_len); \
1615       buffer_fill += str_len; \
1616       assert (buffer_fill <= sizeof (buffer)); \
1617       if (buffer_fill == sizeof (buffer)) \
1618         buffer[buffer_fill - 1] = 0; \
1619       else \
1620         buffer[buffer_fill] = 0; \
1621     } \
1622   } while (0)
1624   { /* Add list of DS names */
1625     char linebuf[1024];
1626     size_t linebuf_fill;
1628     memset (linebuf, 0, sizeof (linebuf));
1629     linebuf_fill = 0;
1630     for (i = 0; i < ds_cnt; i++)
1631     {
1632       if (i > 0)
1633         SSTRCAT (linebuf, " ", linebuf_fill);
1634       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1635       rrd_freemem(ds_namv[i]);
1636     }
1637     rrd_freemem(ds_namv);
1638     add_response_info (sock, "DSName: %s\n", linebuf);
1639   }
1641   /* Add the actual data */
1642   assert (step > 0);
1643   data_ptr = data;
1644   for (t = start_tm + step; t <= end_tm; t += step)
1645   {
1646     char linebuf[1024];
1647     size_t linebuf_fill;
1648     char tmp[128];
1650     memset (linebuf, 0, sizeof (linebuf));
1651     linebuf_fill = 0;
1652     for (i = 0; i < ds_cnt; i++)
1653     {
1654       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1655       tmp[sizeof (tmp) - 1] = 0;
1656       SSTRCAT (linebuf, tmp, linebuf_fill);
1658       data_ptr++;
1659     }
1661     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1662   } /* for (t) */
1663   rrd_freemem(data);
1665   return (send_response (sock, RESP_OK, "Success\n"));
1666 #undef SSTRCAT
1667 } /* }}} int handle_request_fetch */
1669 /* we came across a "WROTE" entry during journal replay.
1670  * throw away any values that we have accumulated for this file
1671  */
1672 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1674   cache_item_t *ci;
1675   const char *file = buffer;
1677   pthread_mutex_lock(&cache_lock);
1679   ci = g_tree_lookup(cache_tree, file);
1680   if (ci == NULL)
1681   {
1682     pthread_mutex_unlock(&cache_lock);
1683     return (0);
1684   }
1686   if (ci->values)
1687     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1689   wipe_ci_values(ci, now);
1690   remove_from_queue(ci);
1692   pthread_mutex_unlock(&cache_lock);
1693   return (0);
1694 } /* }}} int handle_request_wrote */
1696 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1698   char *file, file_tmp[PATH_MAX];
1699   int status;
1700   rrd_info_t *info;
1702   /* obtain filename */
1703   status = buffer_get_field(&buffer, &buffer_size, &file);
1704   if (status != 0)
1705     return syntax_error(sock,cmd);
1706   /* get full pathname */
1707   get_abs_path(&file, file_tmp);
1708   if (!check_file_access(file, sock)) {
1709     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1710   }
1711   /* get data */
1712   rrd_clear_error ();
1713   info = rrd_info_r(file);
1714   if(!info) {
1715     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1716   }
1717   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1718       switch (data->type) {
1719       case RD_I_VAL:
1720           if (isnan(data->value.u_val))
1721               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1722           else
1723               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1724           break;
1725       case RD_I_CNT:
1726           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1727           break;
1728       case RD_I_INT:
1729           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1730           break;
1731       case RD_I_STR:
1732           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1733           break;
1734       case RD_I_BLO:
1735           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1736           break;
1737       }
1738   }
1740   rrd_info_free(info);
1742   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1743 } /* }}} static int handle_request_info  */
1745 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1747   char *i, *file, file_tmp[PATH_MAX];
1748   int status;
1749   int idx;
1750   time_t t;
1752   /* obtain filename */
1753   status = buffer_get_field(&buffer, &buffer_size, &file);
1754   if (status != 0)
1755     return syntax_error(sock,cmd);
1756   /* get full pathname */
1757   get_abs_path(&file, file_tmp);
1758   if (!check_file_access(file, sock)) {
1759     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1760   }
1762   status = buffer_get_field(&buffer, &buffer_size, &i);
1763   if (status != 0)
1764     return syntax_error(sock,cmd);
1765   idx = atoi(i);
1766   if(idx<0) { 
1767     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1768   }
1770   /* get data */
1771   rrd_clear_error ();
1772   t = rrd_first_r(file,idx);
1773   if(t<1) {
1774     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1775   }
1776   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1777 } /* }}} static int handle_request_first  */
1780 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1782   char *file, file_tmp[PATH_MAX];
1783   int status;
1784   time_t t, from_file, step;
1785   rrd_file_t * rrd_file;
1786   cache_item_t * ci;
1787   rrd_t rrd; 
1789   /* obtain filename */
1790   status = buffer_get_field(&buffer, &buffer_size, &file);
1791   if (status != 0)
1792     return syntax_error(sock,cmd);
1793   /* get full pathname */
1794   get_abs_path(&file, file_tmp);
1795   if (!check_file_access(file, sock)) {
1796     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1797   }
1798   rrd_clear_error();
1799   rrd_init(&rrd);
1800   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1801   if(!rrd_file) {
1802     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1803   }
1804   from_file = rrd.live_head->last_up;
1805   step = rrd.stat_head->pdp_step;
1806   rrd_close(rrd_file);
1807   pthread_mutex_lock(&cache_lock);
1808   ci = g_tree_lookup(cache_tree, file);
1809   if (ci)
1810     t = ci->last_update_stamp;
1811   else
1812     t = from_file;
1813   pthread_mutex_unlock(&cache_lock);
1814   t -= t % step;
1815   rrd_free(&rrd);
1816   if(t<1) {
1817     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1818   }
1819   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1820 } /* }}} static int handle_request_last  */
1822 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1824   char *file, file_tmp[PATH_MAX];
1825   char *tok;
1826   int ac = 0;
1827   char *av[128];
1828   int status;
1829   unsigned long step = 300;
1830   time_t last_up = time(NULL)-10;
1831   int no_overwrite = opt_no_overwrite;
1834   /* obtain filename */
1835   status = buffer_get_field(&buffer, &buffer_size, &file);
1836   if (status != 0)
1837     return syntax_error(sock,cmd);
1838   /* get full pathname */
1839   get_abs_path(&file, file_tmp);
1840   if (!check_file_access(file, sock)) {
1841     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1842   }
1843   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1845   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1846     if( ! strncmp(tok,"-b",2) ) {
1847       status = buffer_get_field(&buffer, &buffer_size, &tok );
1848       if (status != 0) return syntax_error(sock,cmd);
1849       last_up = (time_t) atol(tok);
1850       continue;
1851     }
1852     if( ! strncmp(tok,"-s",2) ) {
1853       status = buffer_get_field(&buffer, &buffer_size, &tok );
1854       if (status != 0) return syntax_error(sock,cmd);
1855       step = atol(tok);
1856       continue;
1857     }
1858     if( ! strncmp(tok,"-O",2) ) {
1859       no_overwrite = 1;
1860       continue;
1861     }
1862     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1863     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1864     return syntax_error(sock,cmd);
1865   }
1866   if(step<1) {
1867     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1868   }
1869   if (last_up < 3600 * 24 * 365 * 10) {
1870     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1871   }
1873   rrd_clear_error ();
1874   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1876   if(!status) {
1877     return send_response(sock, RESP_OK, "RRD created OK\n");
1878   }
1879   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1880 } /* }}} static int handle_request_create  */
1882 /* start "BATCH" processing */
1883 static int batch_start (HANDLER_PROTO) /* {{{ */
1885   int status;
1886   if (sock->batch_start)
1887     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1889   status = send_response(sock, RESP_OK,
1890                          "Go ahead.  End with dot '.' on its own line.\n");
1891   sock->batch_start = time(NULL);
1892   sock->batch_cmd = 0;
1894   return status;
1895 } /* }}} static int batch_start */
1897 /* finish "BATCH" processing and return results to the client */
1898 static int batch_done (HANDLER_PROTO) /* {{{ */
1900   assert(sock->batch_start);
1901   sock->batch_start = 0;
1902   sock->batch_cmd  = 0;
1903   return send_response(sock, RESP_OK, "errors\n");
1904 } /* }}} static int batch_done */
1906 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1908   return -1;
1909 } /* }}} static int handle_request_quit */
1911 static command_t list_of_commands[] = { /* {{{ */
1912   {
1913     "UPDATE",
1914     handle_request_update,
1915     CMD_CONTEXT_ANY,
1916     "UPDATE <filename> <values> [<values> ...]\n"
1917     ,
1918     "Adds the given file to the internal cache if it is not yet known and\n"
1919     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1920     "for details.\n"
1921     "\n"
1922     "Each <values> has the following form:\n"
1923     "  <values> = <time>:<value>[:<value>[...]]\n"
1924     "See the rrdupdate(1) manpage for details.\n"
1925   },
1926   {
1927     "WROTE",
1928     handle_request_wrote,
1929     CMD_CONTEXT_JOURNAL,
1930     NULL,
1931     NULL
1932   },
1933   {
1934     "FLUSH",
1935     handle_request_flush,
1936     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1937     "FLUSH <filename>\n"
1938     ,
1939     "Adds the given filename to the head of the update queue and returns\n"
1940     "after it has been dequeued.\n"
1941   },
1942   {
1943     "FLUSHALL",
1944     handle_request_flushall,
1945     CMD_CONTEXT_CLIENT,
1946     "FLUSHALL\n"
1947     ,
1948     "Triggers writing of all pending updates.  Returns immediately.\n"
1949   },
1950   {
1951     "PENDING",
1952     handle_request_pending,
1953     CMD_CONTEXT_CLIENT,
1954     "PENDING <filename>\n"
1955     ,
1956     "Shows any 'pending' updates for a file, in order.\n"
1957     "The updates shown have not yet been written to the underlying RRD file.\n"
1958   },
1959   {
1960     "FORGET",
1961     handle_request_forget,
1962     CMD_CONTEXT_ANY,
1963     "FORGET <filename>\n"
1964     ,
1965     "Removes the file completely from the cache.\n"
1966     "Any pending updates for the file will be lost.\n"
1967   },
1968   {
1969     "QUEUE",
1970     handle_request_queue,
1971     CMD_CONTEXT_CLIENT,
1972     "QUEUE\n"
1973     ,
1974         "Shows all files in the output queue.\n"
1975     "The output is zero or more lines in the following format:\n"
1976     "(where <num_vals> is the number of values to be written)\n"
1977     "\n"
1978     "<num_vals> <filename>\n"
1979   },
1980   {
1981     "STATS",
1982     handle_request_stats,
1983     CMD_CONTEXT_CLIENT,
1984     "STATS\n"
1985     ,
1986     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1987     "a description of the values.\n"
1988   },
1989   {
1990     "HELP",
1991     handle_request_help,
1992     CMD_CONTEXT_CLIENT,
1993     "HELP [<command>]\n",
1994     NULL, /* special! */
1995   },
1996   {
1997     "BATCH",
1998     batch_start,
1999     CMD_CONTEXT_CLIENT,
2000     "BATCH\n"
2001     ,
2002     "The 'BATCH' command permits the client to initiate a bulk load\n"
2003     "   of commands to rrdcached.\n"
2004     "\n"
2005     "Usage:\n"
2006     "\n"
2007     "    client: BATCH\n"
2008     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2009     "    client: command #1\n"
2010     "    client: command #2\n"
2011     "    client: ... and so on\n"
2012     "    client: .\n"
2013     "    server: 2 errors\n"
2014     "    server: 7 message for command #7\n"
2015     "    server: 9 message for command #9\n"
2016     "\n"
2017     "For more information, consult the rrdcached(1) documentation.\n"
2018   },
2019   {
2020     ".",   /* BATCH terminator */
2021     batch_done,
2022     CMD_CONTEXT_BATCH,
2023     NULL,
2024     NULL
2025   },
2026   {
2027     "FETCH",
2028     handle_request_fetch,
2029     CMD_CONTEXT_CLIENT,
2030     "FETCH <file> <CF> [<start> [<end>]]\n"
2031     ,
2032     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2033   },
2034   {
2035     "INFO",
2036     handle_request_info,
2037     CMD_CONTEXT_CLIENT,
2038     "INFO <filename>\n",
2039     "The INFO command retrieves information about a specified RRD file.\n"
2040     "This is returned in standard rrdinfo format, a sequence of lines\n"
2041     "with the format <keyname> = <value>\n"
2042     "Note that this is the data as of the last update of the RRD file itself,\n"
2043     "not the last time data was received via rrdcached, so there may be pending\n"
2044     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2045   },
2046   {
2047     "FIRST",
2048     handle_request_first,
2049     CMD_CONTEXT_CLIENT,
2050     "FIRST <filename> <rra index>\n",
2051     "The FIRST command retrieves the first data time for a specified RRA in\n"
2052     "an RRD file.\n"
2053   },
2054   {
2055     "LAST",
2056     handle_request_last,
2057     CMD_CONTEXT_CLIENT,
2058     "LAST <filename>\n",
2059     "The LAST command retrieves the last update time for a specified RRD file.\n"
2060     "Note that this is the time of the last update of the RRD file itself, not\n"
2061     "the last time data was received via rrdcached, so there may be pending\n"
2062     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2063   },
2064   {
2065     "CREATE",
2066     handle_request_create,
2067     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2068     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2069     "The CREATE command will create an RRD file, overwriting any existing file\n"
2070     "unless the -O option is given or rrdcached was started with the -O option.\n"
2071     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2072     "not acceptable) and the step is in seconds (default is 300).\n"
2073     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2074   },
2075   {
2076     "QUIT",
2077     handle_request_quit,
2078     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2079     "QUIT\n"
2080     ,
2081     "Disconnect from rrdcached.\n"
2082   }
2083 }; /* }}} command_t list_of_commands[] */
2084 static size_t list_of_commands_len = sizeof (list_of_commands)
2085   / sizeof (list_of_commands[0]);
2087 static command_t *find_command(char *cmd)
2089   size_t i;
2091   for (i = 0; i < list_of_commands_len; i++)
2092     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2093       return (&list_of_commands[i]);
2094   return NULL;
2097 /* We currently use the index in the `list_of_commands' array as a bit position
2098  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2099  * outside these functions so that switching to a more elegant storage method
2100  * is easily possible. */
2101 static ssize_t find_command_index (const char *cmd) /* {{{ */
2103   size_t i;
2105   for (i = 0; i < list_of_commands_len; i++)
2106     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2107       return ((ssize_t) i);
2108   return (-1);
2109 } /* }}} ssize_t find_command_index */
2111 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2112     const char *cmd)
2114   ssize_t i;
2116   if (JOURNAL_REPLAY(sock))
2117     return (1);
2119   if (cmd == NULL)
2120     return (-1);
2122   if ((strcasecmp ("QUIT", cmd) == 0)
2123       || (strcasecmp ("HELP", cmd) == 0))
2124     return (1);
2125   else if (strcmp (".", cmd) == 0)
2126     cmd = "BATCH";
2128   i = find_command_index (cmd);
2129   if (i < 0)
2130     return (-1);
2131   assert (i < 32);
2133   if ((sock->permissions & (1 << i)) != 0)
2134     return (1);
2135   return (0);
2136 } /* }}} int socket_permission_check */
2138 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2139     const char *cmd)
2141   ssize_t i;
2143   i = find_command_index (cmd);
2144   if (i < 0)
2145     return (-1);
2146   assert (i < 32);
2148   sock->permissions |= (1 << i);
2149   return (0);
2150 } /* }}} int socket_permission_add */
2152 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2154   sock->permissions = 0;
2155 } /* }}} socket_permission_clear */
2157 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2158     listen_socket_t *src)
2160   dest->permissions = src->permissions;
2161 } /* }}} socket_permission_copy */
2163 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2165   size_t i;
2167   sock->permissions = 0;
2168   for (i = 0; i < list_of_commands_len; i++)
2169     sock->permissions |= (1 << i);
2170 } /* }}} void socket_permission_set_all */
2172 /* check whether commands are received in the expected context */
2173 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2175   if (JOURNAL_REPLAY(sock))
2176     return (cmd->context & CMD_CONTEXT_JOURNAL);
2177   else if (sock->batch_start)
2178     return (cmd->context & CMD_CONTEXT_BATCH);
2179   else
2180     return (cmd->context & CMD_CONTEXT_CLIENT);
2182   /* NOTREACHED */
2183   assert(1==0);
2186 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2188   int status;
2189   char *cmd_str;
2190   char *resp_txt;
2191   command_t *help = NULL;
2193   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2194   if (status == 0)
2195     help = find_command(cmd_str);
2197   if (help && (help->syntax || help->help))
2198   {
2199     char tmp[RRD_CMD_MAX];
2201     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2202     resp_txt = tmp;
2204     if (help->syntax)
2205       add_response_info(sock, "Usage: %s\n", help->syntax);
2207     if (help->help)
2208       add_response_info(sock, "%s\n", help->help);
2209   }
2210   else
2211   {
2212     size_t i;
2214     resp_txt = "Command overview\n";
2216     for (i = 0; i < list_of_commands_len; i++)
2217     {
2218       if (list_of_commands[i].syntax == NULL)
2219         continue;
2220       add_response_info (sock, "%s", list_of_commands[i].syntax);
2221     }
2222   }
2224   return send_response(sock, RESP_OK, resp_txt);
2225 } /* }}} int handle_request_help */
2227 static int handle_request (DISPATCH_PROTO) /* {{{ */
2229   char *buffer_ptr = buffer;
2230   char *cmd_str = NULL;
2231   command_t *cmd = NULL;
2232   int status;
2234   assert (buffer[buffer_size - 1] == '\0');
2236   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2237   if (status != 0)
2238   {
2239     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2240     return (-1);
2241   }
2243   if (sock != NULL && sock->batch_start)
2244     sock->batch_cmd++;
2246   cmd = find_command(cmd_str);
2247   if (!cmd)
2248     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2250   if (!socket_permission_check (sock, cmd->cmd))
2251     return send_response(sock, RESP_ERR, "Permission denied.\n");
2253   if (!command_check_context(sock, cmd))
2254     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2256   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2257 } /* }}} int handle_request */
2259 static void journal_set_free (journal_set *js) /* {{{ */
2261   if (js == NULL)
2262     return;
2264   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2266   free(js);
2267 } /* }}} journal_set_free */
2269 static void journal_set_remove (journal_set *js) /* {{{ */
2271   if (js == NULL)
2272     return;
2274   for (uint i=0; i < js->files_num; i++)
2275   {
2276     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2277     unlink(js->files[i]);
2278   }
2279 } /* }}} journal_set_remove */
2281 /* close current journal file handle.
2282  * MUST hold journal_lock before calling */
2283 static void journal_close(void) /* {{{ */
2285   if (journal_fh != NULL)
2286   {
2287     if (fclose(journal_fh) != 0)
2288       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2289   }
2291   journal_fh = NULL;
2292   journal_size = 0;
2293 } /* }}} journal_close */
2295 /* MUST hold journal_lock before calling */
2296 static void journal_new_file(void) /* {{{ */
2298   struct timeval now;
2299   int  new_fd;
2300   char new_file[PATH_MAX + 1];
2302   assert(journal_dir != NULL);
2303   assert(journal_cur != NULL);
2305   journal_close();
2307   gettimeofday(&now, NULL);
2308   /* this format assures that the files sort in strcmp() order */
2309   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2310            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2312   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2313                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2314   if (new_fd < 0)
2315     goto error;
2317   journal_fh = fdopen(new_fd, "a");
2318   if (journal_fh == NULL)
2319     goto error;
2321   journal_size = ftell(journal_fh);
2322   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2324   /* record the file in the journal set */
2325   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2327   return;
2329 error:
2330   RRDD_LOG(LOG_CRIT,
2331            "JOURNALING DISABLED: Error while trying to create %s : %s",
2332            new_file, rrd_strerror(errno));
2333   RRDD_LOG(LOG_CRIT,
2334            "JOURNALING DISABLED: All values will be flushed at shutdown");
2336   close(new_fd);
2337   config_flush_at_shutdown = 1;
2339 } /* }}} journal_new_file */
2341 /* MUST NOT hold journal_lock before calling this */
2342 static void journal_rotate(void) /* {{{ */
2344   journal_set *old_js = NULL;
2346   if (journal_dir == NULL)
2347     return;
2349   RRDD_LOG(LOG_DEBUG, "rotating journals");
2351   pthread_mutex_lock(&stats_lock);
2352   ++stats_journal_rotate;
2353   pthread_mutex_unlock(&stats_lock);
2355   pthread_mutex_lock(&journal_lock);
2357   journal_close();
2359   /* rotate the journal sets */
2360   old_js = journal_old;
2361   journal_old = journal_cur;
2362   journal_cur = calloc(1, sizeof(journal_set));
2364   if (journal_cur != NULL)
2365     journal_new_file();
2366   else
2367     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2369   pthread_mutex_unlock(&journal_lock);
2371   journal_set_remove(old_js);
2372   journal_set_free  (old_js);
2374 } /* }}} static void journal_rotate */
2376 /* MUST hold journal_lock when calling */
2377 static void journal_done(void) /* {{{ */
2379   if (journal_cur == NULL)
2380     return;
2382   journal_close();
2384   if (config_flush_at_shutdown)
2385   {
2386     RRDD_LOG(LOG_INFO, "removing journals");
2387     journal_set_remove(journal_old);
2388     journal_set_remove(journal_cur);
2389   }
2390   else
2391   {
2392     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2393              "journals will be used at next startup");
2394   }
2396   journal_set_free(journal_cur);
2397   journal_set_free(journal_old);
2398   free(journal_dir);
2400 } /* }}} static void journal_done */
2402 static int journal_write(char *cmd, char *args) /* {{{ */
2404   int chars;
2406   if (journal_fh == NULL)
2407     return 0;
2409   pthread_mutex_lock(&journal_lock);
2410   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2411   journal_size += chars;
2413   if (journal_size > JOURNAL_MAX)
2414     journal_new_file();
2416   pthread_mutex_unlock(&journal_lock);
2418   if (chars > 0)
2419   {
2420     pthread_mutex_lock(&stats_lock);
2421     stats_journal_bytes += chars;
2422     pthread_mutex_unlock(&stats_lock);
2423   }
2425   return chars;
2426 } /* }}} static int journal_write */
2428 static int journal_replay (const char *file) /* {{{ */
2430   FILE *fh;
2431   int entry_cnt = 0;
2432   int fail_cnt = 0;
2433   uint64_t line = 0;
2434   char entry[RRD_CMD_MAX];
2435   time_t now;
2437   if (file == NULL) return 0;
2439   {
2440     char *reason = "unknown error";
2441     int status = 0;
2442     struct stat statbuf;
2444     memset(&statbuf, 0, sizeof(statbuf));
2445     if (stat(file, &statbuf) != 0)
2446     {
2447       reason = "stat error";
2448       status = errno;
2449     }
2450     else if (!S_ISREG(statbuf.st_mode))
2451     {
2452       reason = "not a regular file";
2453       status = EPERM;
2454     }
2455     if (statbuf.st_uid != daemon_uid)
2456     {
2457       reason = "not owned by daemon user";
2458       status = EACCES;
2459     }
2460     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2461     {
2462       reason = "must not be user/group writable";
2463       status = EACCES;
2464     }
2466     if (status != 0)
2467     {
2468       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2469                file, rrd_strerror(status), reason);
2470       return 0;
2471     }
2472   }
2474   fh = fopen(file, "r");
2475   if (fh == NULL)
2476   {
2477     if (errno != ENOENT)
2478       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2479                file, rrd_strerror(errno));
2480     return 0;
2481   }
2482   else
2483     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2485   now = time(NULL);
2487   while(!feof(fh))
2488   {
2489     size_t entry_len;
2491     ++line;
2492     if (fgets(entry, sizeof(entry), fh) == NULL)
2493       break;
2494     entry_len = strlen(entry);
2496     /* check \n termination in case journal writing crashed mid-line */
2497     if (entry_len == 0)
2498       continue;
2499     else if (entry[entry_len - 1] != '\n')
2500     {
2501       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2502       ++fail_cnt;
2503       continue;
2504     }
2506     entry[entry_len - 1] = '\0';
2508     if (handle_request(NULL, now, entry, entry_len) == 0)
2509       ++entry_cnt;
2510     else
2511       ++fail_cnt;
2512   }
2514   fclose(fh);
2516   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2517            entry_cnt, fail_cnt);
2519   return entry_cnt > 0 ? 1 : 0;
2520 } /* }}} static int journal_replay */
2522 static int journal_sort(const void *v1, const void *v2)
2524   char **jn1 = (char **) v1;
2525   char **jn2 = (char **) v2;
2527   return strcmp(*jn1,*jn2);
2530 static void journal_init(void) /* {{{ */
2532   int had_journal = 0;
2533   DIR *dir;
2534   struct dirent *dent;
2535   char path[PATH_MAX+1];
2537   if (journal_dir == NULL) return;
2539   pthread_mutex_lock(&journal_lock);
2541   journal_cur = calloc(1, sizeof(journal_set));
2542   if (journal_cur == NULL)
2543   {
2544     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2545     return;
2546   }
2548   RRDD_LOG(LOG_INFO, "checking for journal files");
2550   /* Handle old journal files during transition.  This gives them the
2551    * correct sort order.  TODO: remove after first release
2552    */
2553   {
2554     char old_path[PATH_MAX+1];
2555     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2556     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2557     rename(old_path, path);
2559     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2560     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2561     rename(old_path, path);
2562   }
2564   dir = opendir(journal_dir);
2565   if (!dir) {
2566     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2567     return;
2568   }
2569   while ((dent = readdir(dir)) != NULL)
2570   {
2571     /* looks like a journal file? */
2572     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2573       continue;
2575     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2577     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2578     {
2579       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2580                dent->d_name);
2581       break;
2582     }
2583   }
2584   closedir(dir);
2586   qsort(journal_cur->files, journal_cur->files_num,
2587         sizeof(journal_cur->files[0]), journal_sort);
2589   for (uint i=0; i < journal_cur->files_num; i++)
2590     had_journal += journal_replay(journal_cur->files[i]);
2592   journal_new_file();
2594   /* it must have been a crash.  start a flush */
2595   if (had_journal && config_flush_at_shutdown)
2596     flush_old_values(-1);
2598   pthread_mutex_unlock(&journal_lock);
2600   RRDD_LOG(LOG_INFO, "journal processing complete");
2602 } /* }}} static void journal_init */
2604 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2606   assert(sock != NULL);
2608   free(sock->rbuf);  sock->rbuf = NULL;
2609   free(sock->wbuf);  sock->wbuf = NULL;
2610   free(sock);
2611 } /* }}} void free_listen_socket */
2613 static void close_connection(listen_socket_t *sock) /* {{{ */
2615   if (sock->fd >= 0)
2616   {
2617     close(sock->fd);
2618     sock->fd = -1;
2619   }
2621   free_listen_socket(sock);
2623 } /* }}} void close_connection */
2625 static void *connection_thread_main (void *args) /* {{{ */
2627   listen_socket_t *sock;
2628   int fd;
2630   sock = (listen_socket_t *) args;
2631   fd = sock->fd;
2633   /* init read buffers */
2634   sock->next_read = sock->next_cmd = 0;
2635   sock->rbuf = malloc(RBUF_SIZE);
2636   if (sock->rbuf == NULL)
2637   {
2638     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2639     close_connection(sock);
2640     return NULL;
2641   }
2643   pthread_mutex_lock (&connection_threads_lock);
2644 #ifdef HAVE_LIBWRAP
2645   /* LIBWRAP does not support multiple threads! By putting this code
2646      inside pthread_mutex_lock we do not have to worry about request_info
2647      getting overwritten by another thread.
2648   */
2649   struct request_info req;
2650   request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2651   fromhost(&req);
2652   if(!hosts_access(&req)) {
2653     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2654     pthread_mutex_unlock (&connection_threads_lock);
2655     close_connection(sock);
2656     return NULL;
2657   }
2658 #endif /* HAVE_LIBWRAP */
2659   connection_threads_num++;
2660   pthread_mutex_unlock (&connection_threads_lock);
2662   while (state == RUNNING)
2663   {
2664     char *cmd;
2665     ssize_t cmd_len;
2666     ssize_t rbytes;
2667     time_t now;
2669     struct pollfd pollfd;
2670     int status;
2672     pollfd.fd = fd;
2673     pollfd.events = POLLIN | POLLPRI;
2674     pollfd.revents = 0;
2676     status = poll (&pollfd, 1, /* timeout = */ 500);
2677     if (state != RUNNING)
2678       break;
2679     else if (status == 0) /* timeout */
2680       continue;
2681     else if (status < 0) /* error */
2682     {
2683       status = errno;
2684       if (status != EINTR)
2685         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2686       continue;
2687     }
2689     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2690       break;
2691     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2692     {
2693       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2694           "poll(2) returned something unexpected: %#04hx",
2695           pollfd.revents);
2696       break;
2697     }
2699     rbytes = read(fd, sock->rbuf + sock->next_read,
2700                   RBUF_SIZE - sock->next_read);
2701     if (rbytes < 0)
2702     {
2703       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2704       break;
2705     }
2706     else if (rbytes == 0)
2707       break; /* eof */
2709     sock->next_read += rbytes;
2711     if (sock->batch_start)
2712       now = sock->batch_start;
2713     else
2714       now = time(NULL);
2716     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2717     {
2718       status = handle_request (sock, now, cmd, cmd_len+1);
2719       if (status != 0)
2720         goto out_close;
2721     }
2722   }
2724 out_close:
2725   close_connection(sock);
2727   /* Remove this thread from the connection threads list */
2728   pthread_mutex_lock (&connection_threads_lock);
2729   connection_threads_num--;
2730   if (connection_threads_num <= 0)
2731     pthread_cond_broadcast(&connection_threads_done);
2732   pthread_mutex_unlock (&connection_threads_lock);
2734   return (NULL);
2735 } /* }}} void *connection_thread_main */
2737 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2739   int fd;
2740   struct sockaddr_un sa;
2741   listen_socket_t *temp;
2742   int status;
2743   const char *path;
2744   char *path_copy, *dir;
2746   path = sock->addr;
2747   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2748     path += strlen("unix:");
2750   /* dirname may modify its argument */
2751   path_copy = strdup(path);
2752   if (path_copy == NULL)
2753   {
2754     fprintf(stderr, "rrdcached: strdup(): %s\n",
2755         rrd_strerror(errno));
2756     return (-1);
2757   }
2759   dir = dirname(path_copy);
2760   if (rrd_mkdir_p(dir, 0777) != 0)
2761   {
2762     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2763         dir, rrd_strerror(errno));
2764     return (-1);
2765   }
2767   free(path_copy);
2769   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2770       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2771   if (temp == NULL)
2772   {
2773     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2774     return (-1);
2775   }
2776   listen_fds = temp;
2777   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2779   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2780   if (fd < 0)
2781   {
2782     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2783              rrd_strerror(errno));
2784     return (-1);
2785   }
2787   memset (&sa, 0, sizeof (sa));
2788   sa.sun_family = AF_UNIX;
2789   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2791   /* if we've gotten this far, we own the pid file.  any daemon started
2792    * with the same args must not be alive.  therefore, ensure that we can
2793    * create the socket...
2794    */
2795   unlink(path);
2797   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2798   if (status != 0)
2799   {
2800     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2801              path, rrd_strerror(errno));
2802     close (fd);
2803     return (-1);
2804   }
2806   /* tweak the sockets group ownership */
2807   if (sock->socket_group != (gid_t)-1)
2808   {
2809     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2810          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2811     {
2812       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2813     }
2814   }
2816   if (sock->socket_permissions != (mode_t)-1)
2817   {
2818     if (chmod(path, sock->socket_permissions) != 0)
2819       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2820           (unsigned int)sock->socket_permissions, strerror(errno));
2821   }
2823   status = listen (fd, /* backlog = */ 10);
2824   if (status != 0)
2825   {
2826     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2827              path, rrd_strerror(errno));
2828     close (fd);
2829     unlink (path);
2830     return (-1);
2831   }
2833   listen_fds[listen_fds_num].fd = fd;
2834   listen_fds[listen_fds_num].family = PF_UNIX;
2835   strncpy(listen_fds[listen_fds_num].addr, path,
2836           sizeof (listen_fds[listen_fds_num].addr) - 1);
2837   listen_fds_num++;
2839   return (0);
2840 } /* }}} int open_listen_socket_unix */
2842 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2844   struct addrinfo ai_hints;
2845   struct addrinfo *ai_res;
2846   struct addrinfo *ai_ptr;
2847   char addr_copy[NI_MAXHOST];
2848   char *addr;
2849   char *port;
2850   int status;
2852   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2853   addr_copy[sizeof (addr_copy) - 1] = 0;
2854   addr = addr_copy;
2856   memset (&ai_hints, 0, sizeof (ai_hints));
2857   ai_hints.ai_flags = 0;
2858 #ifdef AI_ADDRCONFIG
2859   ai_hints.ai_flags |= AI_ADDRCONFIG;
2860 #endif
2861   ai_hints.ai_family = AF_UNSPEC;
2862   ai_hints.ai_socktype = SOCK_STREAM;
2864   port = NULL;
2865   if (*addr == '[') /* IPv6+port format */
2866   {
2867     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2868     addr++;
2870     port = strchr (addr, ']');
2871     if (port == NULL)
2872     {
2873       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2874       return (-1);
2875     }
2876     *port = 0;
2877     port++;
2879     if (*port == ':')
2880       port++;
2881     else if (*port == 0)
2882       port = NULL;
2883     else
2884     {
2885       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2886       return (-1);
2887     }
2888   } /* if (*addr == '[') */
2889   else
2890   {
2891     port = rindex(addr, ':');
2892     if (port != NULL)
2893     {
2894       *port = 0;
2895       port++;
2896     }
2897   }
2898   ai_res = NULL;
2899   status = getaddrinfo (addr,
2900                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2901                         &ai_hints, &ai_res);
2902   if (status != 0)
2903   {
2904     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2905              addr, gai_strerror (status));
2906     return (-1);
2907   }
2909   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2910   {
2911     int fd;
2912     listen_socket_t *temp;
2913     int one = 1;
2915     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2916         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2917     if (temp == NULL)
2918     {
2919       fprintf (stderr,
2920                "rrdcached: open_listen_socket_network: realloc failed.\n");
2921       continue;
2922     }
2923     listen_fds = temp;
2924     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2926     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2927     if (fd < 0)
2928     {
2929       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2930                rrd_strerror(errno));
2931       continue;
2932     }
2934     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2936     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2937     if (status != 0)
2938     {
2939       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2940                sock->addr, rrd_strerror(errno));
2941       close (fd);
2942       continue;
2943     }
2945     status = listen (fd, /* backlog = */ 10);
2946     if (status != 0)
2947     {
2948       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2949                sock->addr, rrd_strerror(errno));
2950       close (fd);
2951       freeaddrinfo(ai_res);
2952       return (-1);
2953     }
2955     listen_fds[listen_fds_num].fd = fd;
2956     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2957     listen_fds_num++;
2958   } /* for (ai_ptr) */
2960   freeaddrinfo(ai_res);
2961   return (0);
2962 } /* }}} static int open_listen_socket_network */
2964 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2966   assert(sock != NULL);
2967   assert(sock->addr != NULL);
2969   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2970       || sock->addr[0] == '/')
2971     return (open_listen_socket_unix(sock));
2972   else
2973     return (open_listen_socket_network(sock));
2974 } /* }}} int open_listen_socket */
2976 static int close_listen_sockets (void) /* {{{ */
2978   size_t i;
2980   for (i = 0; i < listen_fds_num; i++)
2981   {
2982     close (listen_fds[i].fd);
2984     if (listen_fds[i].family == PF_UNIX)
2985       unlink(listen_fds[i].addr);
2986   }
2988   free (listen_fds);
2989   listen_fds = NULL;
2990   listen_fds_num = 0;
2992   return (0);
2993 } /* }}} int close_listen_sockets */
2995 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2997   struct pollfd *pollfds;
2998   int pollfds_num;
2999   int status;
3000   int i;
3002   if (listen_fds_num < 1)
3003   {
3004     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3005     return (NULL);
3006   }
3008   pollfds_num = listen_fds_num;
3009   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3010   if (pollfds == NULL)
3011   {
3012     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3013     return (NULL);
3014   }
3015   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3017   RRDD_LOG(LOG_INFO, "listening for connections");
3019   while (state == RUNNING)
3020   {
3021     for (i = 0; i < pollfds_num; i++)
3022     {
3023       pollfds[i].fd = listen_fds[i].fd;
3024       pollfds[i].events = POLLIN | POLLPRI;
3025       pollfds[i].revents = 0;
3026     }
3028     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3029     if (state != RUNNING)
3030       break;
3031     else if (status == 0) /* timeout */
3032       continue;
3033     else if (status < 0) /* error */
3034     {
3035       status = errno;
3036       if (status != EINTR)
3037       {
3038         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3039       }
3040       continue;
3041     }
3043     for (i = 0; i < pollfds_num; i++)
3044     {
3045       listen_socket_t *client_sock;
3046       struct sockaddr_storage client_sa;
3047       socklen_t client_sa_size;
3048       pthread_t tid;
3049       pthread_attr_t attr;
3051       if (pollfds[i].revents == 0)
3052         continue;
3054       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3055       {
3056         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3057             "poll(2) returned something unexpected for listen FD #%i.",
3058             pollfds[i].fd);
3059         continue;
3060       }
3062       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3063       if (client_sock == NULL)
3064       {
3065         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3066         continue;
3067       }
3068       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3070       client_sa_size = sizeof (client_sa);
3071       client_sock->fd = accept (pollfds[i].fd,
3072           (struct sockaddr *) &client_sa, &client_sa_size);
3073       if (client_sock->fd < 0)
3074       {
3075         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3076         free(client_sock);
3077         continue;
3078       }
3080       pthread_attr_init (&attr);
3081       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3083       status = pthread_create (&tid, &attr, connection_thread_main,
3084                                client_sock);
3085       if (status != 0)
3086       {
3087         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3088         close_connection(client_sock);
3089         continue;
3090       }
3091     } /* for (pollfds_num) */
3092   } /* while (state == RUNNING) */
3094   RRDD_LOG(LOG_INFO, "starting shutdown");
3096   close_listen_sockets ();
3098   pthread_mutex_lock (&connection_threads_lock);
3099   while (connection_threads_num > 0)
3100     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3101   pthread_mutex_unlock (&connection_threads_lock);
3103   free(pollfds);
3105   return (NULL);
3106 } /* }}} void *listen_thread_main */
3108 static int daemonize (void) /* {{{ */
3110   int pid_fd;
3111   char *base_dir;
3113   daemon_uid = geteuid();
3115   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3116   if (pid_fd < 0)
3117     pid_fd = check_pidfile();
3118   if (pid_fd < 0)
3119     return pid_fd;
3121   /* open all the listen sockets */
3122   if (config_listen_address_list_len > 0)
3123   {
3124     for (size_t i = 0; i < config_listen_address_list_len; i++)
3125       open_listen_socket (config_listen_address_list[i]);
3127     rrd_free_ptrs((void ***) &config_listen_address_list,
3128                   &config_listen_address_list_len);
3129   }
3130   else
3131   {
3132     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3133         sizeof(default_socket.addr) - 1);
3134     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3136     if (default_socket.permissions == 0)
3137       socket_permission_set_all (&default_socket);
3139     open_listen_socket (&default_socket);
3140   }
3142   if (listen_fds_num < 1)
3143   {
3144     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3145     goto error;
3146   }
3148   if (!stay_foreground)
3149   {
3150     pid_t child;
3152     child = fork ();
3153     if (child < 0)
3154     {
3155       fprintf (stderr, "daemonize: fork(2) failed.\n");
3156       goto error;
3157     }
3158     else if (child > 0)
3159       exit(0);
3161     /* Become session leader */
3162     setsid ();
3164     /* Open the first three file descriptors to /dev/null */
3165     close (2);
3166     close (1);
3167     close (0);
3169     open ("/dev/null", O_RDWR);
3170     if (dup(0) == -1 || dup(0) == -1){
3171         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3172     }
3173   } /* if (!stay_foreground) */
3175   /* Change into the /tmp directory. */
3176   base_dir = (config_base_dir != NULL)
3177     ? config_base_dir
3178     : "/tmp";
3180   if (chdir (base_dir) != 0)
3181   {
3182     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3183     goto error;
3184   }
3186   install_signal_handlers();
3188   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3189   RRDD_LOG(LOG_INFO, "starting up");
3191   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3192                                 (GDestroyNotify) free_cache_item);
3193   if (cache_tree == NULL)
3194   {
3195     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3196     goto error;
3197   }
3199   return write_pidfile (pid_fd);
3201 error:
3202   remove_pidfile();
3203   return -1;
3204 } /* }}} int daemonize */
3206 static int cleanup (void) /* {{{ */
3208   pthread_cond_broadcast (&flush_cond);
3209   pthread_join (flush_thread, NULL);
3211   pthread_cond_broadcast (&queue_cond);
3212   for (int i = 0; i < config_queue_threads; i++)
3213     pthread_join (queue_threads[i], NULL);
3215   if (config_flush_at_shutdown)
3216   {
3217     assert(cache_queue_head == NULL);
3218     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3219   }
3221   free(queue_threads);
3222   free(config_base_dir);
3224   pthread_mutex_lock(&cache_lock);
3225   g_tree_destroy(cache_tree);
3227   pthread_mutex_lock(&journal_lock);
3228   journal_done();
3230   RRDD_LOG(LOG_INFO, "goodbye");
3231   closelog ();
3233   remove_pidfile ();
3234   free(config_pid_file);
3236   return (0);
3237 } /* }}} int cleanup */
3239 static int read_options (int argc, char **argv) /* {{{ */
3241   int option;
3242   int status = 0;
3244   socket_permission_clear (&default_socket);
3246   default_socket.socket_group = (gid_t)-1;
3247   default_socket.socket_permissions = (mode_t)-1;
3249   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3250   {
3251     switch (option)
3252     {
3253       case 'O':
3254         opt_no_overwrite = 1;
3255         break;
3257       case 'g':
3258         stay_foreground=1;
3259         break;
3261       case 'l':
3262       {
3263         listen_socket_t *new;
3265         new = malloc(sizeof(listen_socket_t));
3266         if (new == NULL)
3267         {
3268           fprintf(stderr, "read_options: malloc failed.\n");
3269           return(2);
3270         }
3271         memset(new, 0, sizeof(listen_socket_t));
3273         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3275         /* Add permissions to the socket {{{ */
3276         if (default_socket.permissions != 0)
3277         {
3278           socket_permission_copy (new, &default_socket);
3279         }
3280         else /* if (default_socket.permissions == 0) */
3281         {
3282           /* Add permission for ALL commands to the socket. */
3283           socket_permission_set_all (new);
3284         }
3285         /* }}} Done adding permissions. */
3287         new->socket_group = default_socket.socket_group;
3288         new->socket_permissions = default_socket.socket_permissions;
3290         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3291                          &config_listen_address_list_len, new))
3292         {
3293           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3294           return (2);
3295         }
3296       }
3297       break;
3299       /* set socket group permissions */
3300       case 's':
3301       {
3302         gid_t group_gid;
3303         struct group *grp;
3305         group_gid = strtoul(optarg, NULL, 10);
3306         if (errno != EINVAL && group_gid>0)
3307         {
3308           /* we were passed a number */
3309           grp = getgrgid(group_gid);
3310         }
3311         else
3312         {
3313           grp = getgrnam(optarg);
3314         }
3316         if (grp)
3317         {
3318           default_socket.socket_group = grp->gr_gid;
3319         }
3320         else
3321         {
3322           /* no idea what the user wanted... */
3323           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3324           return (5);
3325         }
3326       }
3327       break;
3329       /* set socket file permissions */
3330       case 'm':
3331       {
3332         long  tmp;
3333         char *endptr = NULL;
3335         tmp = strtol (optarg, &endptr, 8);
3336         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3337             || (tmp > 07777) || (tmp < 0)) {
3338           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3339               optarg);
3340           return (5);
3341         }
3343         default_socket.socket_permissions = (mode_t)tmp;
3344       }
3345       break;
3347       case 'P':
3348       {
3349         char *optcopy;
3350         char *saveptr;
3351         char *dummy;
3352         char *ptr;
3354         socket_permission_clear (&default_socket);
3356         optcopy = strdup (optarg);
3357         dummy = optcopy;
3358         saveptr = NULL;
3359         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3360         {
3361           dummy = NULL;
3362           status = socket_permission_add (&default_socket, ptr);
3363           if (status != 0)
3364           {
3365             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3366                 "socket failed. Most likely, this permission doesn't "
3367                 "exist. Check your command line.\n", ptr);
3368             status = 4;
3369           }
3370         }
3372         free (optcopy);
3373       }
3374       break;
3376       case 'f':
3377       {
3378         int temp;
3380         temp = atoi (optarg);
3381         if (temp > 0)
3382           config_flush_interval = temp;
3383         else
3384         {
3385           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3386           status = 3;
3387         }
3388       }
3389       break;
3391       case 'w':
3392       {
3393         int temp;
3395         temp = atoi (optarg);
3396         if (temp > 0)
3397           config_write_interval = temp;
3398         else
3399         {
3400           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3401           status = 2;
3402         }
3403       }
3404       break;
3406       case 'z':
3407       {
3408         int temp;
3410         temp = atoi(optarg);
3411         if (temp > 0)
3412           config_write_jitter = temp;
3413         else
3414         {
3415           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3416           status = 2;
3417         }
3419         break;
3420       }
3422       case 't':
3423       {
3424         int threads;
3425         threads = atoi(optarg);
3426         if (threads >= 1)
3427           config_queue_threads = threads;
3428         else
3429         {
3430           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3431           return 1;
3432         }
3433       }
3434       break;
3436       case 'B':
3437         config_write_base_only = 1;
3438         break;
3440       case 'b':
3441       {
3442         size_t len;
3443         char base_realpath[PATH_MAX];
3445         if (config_base_dir != NULL)
3446           free (config_base_dir);
3447         config_base_dir = strdup (optarg);
3448         if (config_base_dir == NULL)
3449         {
3450           fprintf (stderr, "read_options: strdup failed.\n");
3451           return (3);
3452         }
3454         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3455         {
3456           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3457               config_base_dir, rrd_strerror (errno));
3458           return (3);
3459         }
3461         /* make sure that the base directory is not resolved via
3462          * symbolic links.  this makes some performance-enhancing
3463          * assumptions possible (we don't have to resolve paths
3464          * that start with a "/")
3465          */
3466         if (realpath(config_base_dir, base_realpath) == NULL)
3467         {
3468           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3469               "%s\n", config_base_dir, rrd_strerror(errno));
3470           return 5;
3471         }
3473         len = strlen (config_base_dir);
3474         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3475         {
3476           config_base_dir[len - 1] = 0;
3477           len--;
3478         }
3480         if (len < 1)
3481         {
3482           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3483           return (4);
3484         }
3486         _config_base_dir_len = len;
3488         len = strlen (base_realpath);
3489         while ((len > 0) && (base_realpath[len - 1] == '/'))
3490         {
3491           base_realpath[len - 1] = '\0';
3492           len--;
3493         }
3495         if (strncmp(config_base_dir,
3496                          base_realpath, sizeof(base_realpath)) != 0)
3497         {
3498           fprintf(stderr,
3499                   "Base directory (-b) resolved via file system links!\n"
3500                   "Please consult rrdcached '-b' documentation!\n"
3501                   "Consider specifying the real directory (%s)\n",
3502                   base_realpath);
3503           return 5;
3504         }
3505       }
3506       break;
3508       case 'p':
3509       {
3510         if (config_pid_file != NULL)
3511           free (config_pid_file);
3512         config_pid_file = strdup (optarg);
3513         if (config_pid_file == NULL)
3514         {
3515           fprintf (stderr, "read_options: strdup failed.\n");
3516           return (3);
3517         }
3518       }
3519       break;
3521       case 'F':
3522         config_flush_at_shutdown = 1;
3523         break;
3525       case 'j':
3526       {
3527         char journal_dir_actual[PATH_MAX];
3528         journal_dir = realpath((const char *)optarg, journal_dir_actual);
3529         if (journal_dir)
3530         {
3531           // if we were able to properly resolve the path, lets have a copy
3532           // for use outside this block.
3533           journal_dir = strdup(journal_dir);           
3534           status = rrd_mkdir_p(journal_dir, 0777);
3535           if (status != 0)
3536           {
3537             fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3538                     journal_dir, rrd_strerror(errno));
3539             return 6;
3540           }
3541           if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3542           {
3543             fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3544                     errno ? rrd_strerror(errno) : "");
3545             return 6;
3546           }
3547         } else {
3548           fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3549                   errno ? rrd_strerror(errno) : "");
3550           return 6;
3551         }
3552       }
3553       break;
3555       case 'a':
3556       {
3557         int temp = atoi(optarg);
3558         if (temp > 0)
3559           config_alloc_chunk = temp;
3560         else
3561         {
3562           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3563           return 10;
3564         }
3565       }
3566       break;
3568       case 'h':
3569       case '?':
3570         printf ("RRDCacheD %s\n"
3571             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3572             "\n"
3573             "Usage: rrdcached [options]\n"
3574             "\n"
3575             "Valid options are:\n"
3576             "  -l <address>  Socket address to listen to.\n"
3577             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3578             "  -P <perms>    Sets the permissions to assign to all following "
3579                             "sockets\n"
3580             "  -w <seconds>  Interval in which to write data.\n"
3581             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3582             "  -t <threads>  Number of write threads.\n"
3583             "  -f <seconds>  Interval in which to flush dead data.\n"
3584             "  -p <file>     Location of the PID-file.\n"
3585             "  -b <dir>      Base directory to change to.\n"
3586             "  -B            Restrict file access to paths within -b <dir>\n"
3587             "  -g            Do not fork and run in the foreground.\n"
3588             "  -j <dir>      Directory in which to create the journal files.\n"
3589             "  -F            Always flush all updates at shutdown\n"
3590             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3591             "                (the socket will also have read/write permissions "
3592                             "for that group)\n"
3593             "  -m <mode>     File permissions (octal) of all following UNIX "
3594                             "sockets\n"
3595             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3596             "  -O            Do not allow CREATE commands to overwrite existing\n"
3597             "                files, even if asked to.\n"
3598             "\n"
3599             "For more information and a detailed description of all options "
3600             "please refer\n"
3601             "to the rrdcached(1) manual page.\n",
3602             VERSION);
3603         if (option == 'h')
3604           status = -1;
3605         else
3606           status = 1;
3607         break;
3608     } /* switch (option) */
3609   } /* while (getopt) */
3611   /* advise the user when values are not sane */
3612   if (config_flush_interval < 2 * config_write_interval)
3613     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3614             " 2x write interval (-w) !\n");
3615   if (config_write_jitter > config_write_interval)
3616     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3617             " write interval (-w) !\n");
3619   if (config_write_base_only && config_base_dir == NULL)
3620     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3621             "  Consult the rrdcached documentation\n");
3623   if (journal_dir == NULL)
3624     config_flush_at_shutdown = 1;
3626   return (status);
3627 } /* }}} int read_options */
3629 int main (int argc, char **argv)
3631   int status;
3633   status = read_options (argc, argv);
3634   if (status != 0)
3635   {
3636     if (status < 0)
3637       status = 0;
3638     return (status);
3639   }
3641   status = daemonize ();
3642   if (status != 0)
3643   {
3644     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3645     return (1);
3646   }
3648   journal_init();
3650   /* start the queue threads */
3651   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3652   if (queue_threads == NULL)
3653   {
3654     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3655     cleanup();
3656     return (1);
3657   }
3658   for (int i = 0; i < config_queue_threads; i++)
3659   {
3660     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3661     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3662     if (status != 0)
3663     {
3664       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3665       cleanup();
3666       return (1);
3667     }
3668   }
3670   /* start the flush thread */
3671   memset(&flush_thread, 0, sizeof(flush_thread));
3672   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3673   if (status != 0)
3674   {
3675     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3676     cleanup();
3677     return (1);
3678   }
3680   listen_thread_main (NULL);
3681   cleanup ();
3683   return (0);
3684 } /* int main */
3686 /*
3687  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3688  */