Code

rrd_daemon: after fetching data must be freed ! -- Thorsten von Eicken
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) \
118       fprintf(stderr, __VA_ARGS__); \
119     syslog ((severity), __VA_ARGS__); \
120   } while (0)
122 /*
123  * Types
124  */
125 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127 struct listen_socket_s
129   int fd;
130   char addr[PATH_MAX + 1];
131   int family;
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
142   char *wbuf;
143   ssize_t wbuf_len;
145   uint32_t permissions;
147   gid_t  socket_group;
148   mode_t socket_permissions;
149 };
150 typedef struct listen_socket_s listen_socket_t;
152 struct command_s;
153 typedef struct command_s command_t;
154 /* note: guard against "unused" warnings in the handlers */
155 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
156                         time_t UNUSED(now),\
157                         char  UNUSED(*buffer),\
158                         size_t UNUSED(buffer_size)
160 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
161                         DISPATCH_PROTO
163 struct command_s {
164   char   *cmd;
165   int (*handler)(HANDLER_PROTO);
167   char  context;                /* where we expect to see it */
168 #define CMD_CONTEXT_CLIENT      (1<<0)
169 #define CMD_CONTEXT_BATCH       (1<<1)
170 #define CMD_CONTEXT_JOURNAL     (1<<2)
171 #define CMD_CONTEXT_ANY         (0x7f)
173   char *syntax;
174   char *help;
175 };
177 struct cache_item_s;
178 typedef struct cache_item_s cache_item_t;
179 struct cache_item_s
181   char *file;
182   char **values;
183   size_t values_num;            /* number of valid pointers */
184   size_t values_alloc;          /* number of allocated pointers */
185   time_t last_flush_time;
186   time_t last_update_stamp;
187 #define CI_FLAGS_IN_TREE  (1<<0)
188 #define CI_FLAGS_IN_QUEUE (1<<1)
189   int flags;
190   pthread_cond_t  flushed;
191   cache_item_t *prev;
192   cache_item_t *next;
193 };
195 struct callback_flush_data_s
197   time_t now;
198   time_t abs_timeout;
199   char **keys;
200   size_t keys_num;
201 };
202 typedef struct callback_flush_data_s callback_flush_data_t;
204 enum queue_side_e
206   HEAD,
207   TAIL
208 };
209 typedef enum queue_side_e queue_side_t;
211 /* describe a set of journal files */
212 typedef struct {
213   char **files;
214   size_t files_num;
215 } journal_set;
217 /* max length of socket command or response */
218 #define CMD_MAX 4096
219 #define RBUF_SIZE (CMD_MAX*2)
221 /*
222  * Variables
223  */
224 static int stay_foreground = 0;
225 static uid_t daemon_uid;
227 static listen_socket_t *listen_fds = NULL;
228 static size_t listen_fds_num = 0;
230 enum {
231   RUNNING,              /* normal operation */
232   FLUSHING,             /* flushing remaining values */
233   SHUTDOWN              /* shutting down */
234 } state = RUNNING;
236 static pthread_t *queue_threads;
237 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
238 static int config_queue_threads = 4;
240 static pthread_t flush_thread;
241 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
243 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
244 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
245 static int connection_threads_num = 0;
247 /* Cache stuff */
248 static GTree          *cache_tree = NULL;
249 static cache_item_t   *cache_queue_head = NULL;
250 static cache_item_t   *cache_queue_tail = NULL;
251 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
253 static int config_write_interval = 300;
254 static int config_write_jitter   = 0;
255 static int config_flush_interval = 3600;
256 static int config_flush_at_shutdown = 0;
257 static char *config_pid_file = NULL;
258 static char *config_base_dir = NULL;
259 static size_t _config_base_dir_len = 0;
260 static int config_write_base_only = 0;
261 static size_t config_alloc_chunk = 1;
263 static listen_socket_t **config_listen_address_list = NULL;
264 static size_t config_listen_address_list_len = 0;
266 static uint64_t stats_queue_length = 0;
267 static uint64_t stats_updates_received = 0;
268 static uint64_t stats_flush_received = 0;
269 static uint64_t stats_updates_written = 0;
270 static uint64_t stats_data_sets_written = 0;
271 static uint64_t stats_journal_bytes = 0;
272 static uint64_t stats_journal_rotate = 0;
273 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL;         /* current journal file handle */
282 static long  journal_size = 0;          /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
292 /* 
293  * Functions
294  */
295 static void sig_common (const char *sig) /* {{{ */
297   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298   state = FLUSHING;
299   pthread_cond_broadcast(&flush_cond);
300   pthread_cond_broadcast(&queue_cond);
301 } /* }}} void sig_common */
303 static void sig_int_handler (int UNUSED(s)) /* {{{ */
305   sig_common("INT");
306 } /* }}} void sig_int_handler */
308 static void sig_term_handler (int UNUSED(s)) /* {{{ */
310   sig_common("TERM");
311 } /* }}} void sig_term_handler */
313 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
315   config_flush_at_shutdown = 1;
316   sig_common("USR1");
317 } /* }}} void sig_usr1_handler */
319 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
321   config_flush_at_shutdown = 0;
322   sig_common("USR2");
323 } /* }}} void sig_usr2_handler */
325 static void install_signal_handlers(void) /* {{{ */
327   /* These structures are static, because `sigaction' behaves weird if the are
328    * overwritten.. */
329   static struct sigaction sa_int;
330   static struct sigaction sa_term;
331   static struct sigaction sa_pipe;
332   static struct sigaction sa_usr1;
333   static struct sigaction sa_usr2;
335   /* Install signal handlers */
336   memset (&sa_int, 0, sizeof (sa_int));
337   sa_int.sa_handler = sig_int_handler;
338   sigaction (SIGINT, &sa_int, NULL);
340   memset (&sa_term, 0, sizeof (sa_term));
341   sa_term.sa_handler = sig_term_handler;
342   sigaction (SIGTERM, &sa_term, NULL);
344   memset (&sa_pipe, 0, sizeof (sa_pipe));
345   sa_pipe.sa_handler = SIG_IGN;
346   sigaction (SIGPIPE, &sa_pipe, NULL);
348   memset (&sa_pipe, 0, sizeof (sa_usr1));
349   sa_usr1.sa_handler = sig_usr1_handler;
350   sigaction (SIGUSR1, &sa_usr1, NULL);
352   memset (&sa_usr2, 0, sizeof (sa_usr2));
353   sa_usr2.sa_handler = sig_usr2_handler;
354   sigaction (SIGUSR2, &sa_usr2, NULL);
356 } /* }}} void install_signal_handlers */
358 static int open_pidfile(char *action, int oflag) /* {{{ */
360   int fd;
361   const char *file;
362   char *file_copy, *dir;
364   file = (config_pid_file != NULL)
365     ? config_pid_file
366     : LOCALSTATEDIR "/run/rrdcached.pid";
368   /* dirname may modify its argument */
369   file_copy = strdup(file);
370   if (file_copy == NULL)
371   {
372     fprintf(stderr, "rrdcached: strdup(): %s\n",
373         rrd_strerror(errno));
374     return -1;
375   }
377   dir = dirname(file_copy);
378   if (rrd_mkdir_p(dir, 0777) != 0)
379   {
380     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
381         dir, rrd_strerror(errno));
382     return -1;
383   }
385   free(file_copy);
387   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
388   if (fd < 0)
389     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
390             action, file, rrd_strerror(errno));
392   return(fd);
393 } /* }}} static int open_pidfile */
395 /* check existing pid file to see whether a daemon is running */
396 static int check_pidfile(void)
398   int pid_fd;
399   pid_t pid;
400   char pid_str[16];
402   pid_fd = open_pidfile("open", O_RDWR);
403   if (pid_fd < 0)
404     return pid_fd;
406   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
407     return -1;
409   pid = atoi(pid_str);
410   if (pid <= 0)
411     return -1;
413   /* another running process that we can signal COULD be
414    * a competing rrdcached */
415   if (pid != getpid() && kill(pid, 0) == 0)
416   {
417     fprintf(stderr,
418             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
419     close(pid_fd);
420     return -1;
421   }
423   lseek(pid_fd, 0, SEEK_SET);
424   if (ftruncate(pid_fd, 0) == -1)
425   {
426     fprintf(stderr,
427             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
428     close(pid_fd);
429     return -1;
430   }
432   fprintf(stderr,
433           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
434           "rrdcached: starting normally.\n", pid);
436   return pid_fd;
437 } /* }}} static int check_pidfile */
439 static int write_pidfile (int fd) /* {{{ */
441   pid_t pid;
442   FILE *fh;
444   pid = getpid ();
446   fh = fdopen (fd, "w");
447   if (fh == NULL)
448   {
449     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
450     close(fd);
451     return (-1);
452   }
454   fprintf (fh, "%i\n", (int) pid);
455   fclose (fh);
457   return (0);
458 } /* }}} int write_pidfile */
460 static int remove_pidfile (void) /* {{{ */
462   char *file;
463   int status;
465   file = (config_pid_file != NULL)
466     ? config_pid_file
467     : LOCALSTATEDIR "/run/rrdcached.pid";
469   status = unlink (file);
470   if (status == 0)
471     return (0);
472   return (errno);
473 } /* }}} int remove_pidfile */
475 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
477   char *eol;
479   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
480                sock->next_read - sock->next_cmd);
482   if (eol == NULL)
483   {
484     /* no commands left, move remainder back to front of rbuf */
485     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
486             sock->next_read - sock->next_cmd);
487     sock->next_read -= sock->next_cmd;
488     sock->next_cmd = 0;
489     *len = 0;
490     return NULL;
491   }
492   else
493   {
494     char *cmd = sock->rbuf + sock->next_cmd;
495     *eol = '\0';
497     sock->next_cmd = eol - sock->rbuf + 1;
499     if (eol > sock->rbuf && *(eol-1) == '\r')
500       *(--eol) = '\0'; /* handle "\r\n" EOL */
502     *len = eol - cmd;
504     return cmd;
505   }
507   /* NOTREACHED */
508   assert(1==0);
509 } /* }}} char *next_cmd */
511 /* add the characters directly to the write buffer */
512 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
514   char *new_buf;
516   assert(sock != NULL);
518   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
519   if (new_buf == NULL)
520   {
521     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
522     return -1;
523   }
525   strncpy(new_buf + sock->wbuf_len, str, len + 1);
527   sock->wbuf = new_buf;
528   sock->wbuf_len += len;
530   return 0;
531 } /* }}} static int add_to_wbuf */
533 /* add the text to the "extra" info that's sent after the status line */
534 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
536   va_list argp;
537   char buffer[CMD_MAX];
538   int len;
540   if (JOURNAL_REPLAY(sock)) return 0;
541   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
543   va_start(argp, fmt);
544 #ifdef HAVE_VSNPRINTF
545   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
546 #else
547   len = vsprintf(buffer, fmt, argp);
548 #endif
549   va_end(argp);
550   if (len < 0)
551   {
552     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
553     return -1;
554   }
556   return add_to_wbuf(sock, buffer, len);
557 } /* }}} static int add_response_info */
559 static int count_lines(char *str) /* {{{ */
561   int lines = 0;
563   if (str != NULL)
564   {
565     while ((str = strchr(str, '\n')) != NULL)
566     {
567       ++lines;
568       ++str;
569     }
570   }
572   return lines;
573 } /* }}} static int count_lines */
575 /* send the response back to the user.
576  * returns 0 on success, -1 on error
577  * write buffer is always zeroed after this call */
578 static int send_response (listen_socket_t *sock, response_code rc,
579                           char *fmt, ...) /* {{{ */
581   va_list argp;
582   char buffer[CMD_MAX];
583   int lines;
584   ssize_t wrote;
585   int rclen, len;
587   if (JOURNAL_REPLAY(sock)) return rc;
589   if (sock->batch_start)
590   {
591     if (rc == RESP_OK)
592       return rc; /* no response on success during BATCH */
593     lines = sock->batch_cmd;
594   }
595   else if (rc == RESP_OK)
596     lines = count_lines(sock->wbuf);
597   else
598     lines = -1;
600   rclen = sprintf(buffer, "%d ", lines);
601   va_start(argp, fmt);
602 #ifdef HAVE_VSNPRINTF
603   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
604 #else
605   len = vsprintf(buffer+rclen, fmt, argp);
606 #endif
607   va_end(argp);
608   if (len < 0)
609     return -1;
611   len += rclen;
613   /* append the result to the wbuf, don't write to the user */
614   if (sock->batch_start)
615     return add_to_wbuf(sock, buffer, len);
617   /* first write must be complete */
618   if (len != write(sock->fd, buffer, len))
619   {
620     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
621     return -1;
622   }
624   if (sock->wbuf != NULL && rc == RESP_OK)
625   {
626     wrote = 0;
627     while (wrote < sock->wbuf_len)
628     {
629       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
630       if (wb <= 0)
631       {
632         RRDD_LOG(LOG_INFO, "send_response: could not write results");
633         return -1;
634       }
635       wrote += wb;
636     }
637   }
639   free(sock->wbuf); sock->wbuf = NULL;
640   sock->wbuf_len = 0;
642   return 0;
643 } /* }}} */
645 static void wipe_ci_values(cache_item_t *ci, time_t when)
647   ci->values = NULL;
648   ci->values_num = 0;
649   ci->values_alloc = 0;
651   ci->last_flush_time = when;
652   if (config_write_jitter > 0)
653     ci->last_flush_time += (rrd_random() % config_write_jitter);
656 /* remove_from_queue
657  * remove a "cache_item_t" item from the queue.
658  * must hold 'cache_lock' when calling this
659  */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
662   if (ci == NULL) return;
663   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
665   if (ci->prev == NULL)
666     cache_queue_head = ci->next; /* reset head */
667   else
668     ci->prev->next = ci->next;
670   if (ci->next == NULL)
671     cache_queue_tail = ci->prev; /* reset the tail */
672   else
673     ci->next->prev = ci->prev;
675   ci->next = ci->prev = NULL;
676   ci->flags &= ~CI_FLAGS_IN_QUEUE;
678   pthread_mutex_lock (&stats_lock);
679   assert (stats_queue_length > 0);
680   stats_queue_length--;
681   pthread_mutex_unlock (&stats_lock);
683 } /* }}} static void remove_from_queue */
685 /* free the resources associated with the cache_item_t
686  * must hold cache_lock when calling this function
687  */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
690   if (ci == NULL) return NULL;
692   remove_from_queue(ci);
694   for (size_t i=0; i < ci->values_num; i++)
695     free(ci->values[i]);
697   free (ci->values);
698   free (ci->file);
700   /* in case anyone is waiting */
701   pthread_cond_broadcast(&ci->flushed);
702   pthread_cond_destroy(&ci->flushed);
704   free (ci);
706   return NULL;
707 } /* }}} static void *free_cache_item */
709 /*
710  * enqueue_cache_item:
711  * `cache_lock' must be acquired before calling this function!
712  */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714     queue_side_t side)
716   if (ci == NULL)
717     return (-1);
719   if (ci->values_num == 0)
720     return (0);
722   if (side == HEAD)
723   {
724     if (cache_queue_head == ci)
725       return 0;
727     /* remove if further down in queue */
728     remove_from_queue(ci);
730     ci->prev = NULL;
731     ci->next = cache_queue_head;
732     if (ci->next != NULL)
733       ci->next->prev = ci;
734     cache_queue_head = ci;
736     if (cache_queue_tail == NULL)
737       cache_queue_tail = cache_queue_head;
738   }
739   else /* (side == TAIL) */
740   {
741     /* We don't move values back in the list.. */
742     if (ci->flags & CI_FLAGS_IN_QUEUE)
743       return (0);
745     assert (ci->next == NULL);
746     assert (ci->prev == NULL);
748     ci->prev = cache_queue_tail;
750     if (cache_queue_tail == NULL)
751       cache_queue_head = ci;
752     else
753       cache_queue_tail->next = ci;
755     cache_queue_tail = ci;
756   }
758   ci->flags |= CI_FLAGS_IN_QUEUE;
760   pthread_cond_signal(&queue_cond);
761   pthread_mutex_lock (&stats_lock);
762   stats_queue_length++;
763   pthread_mutex_unlock (&stats_lock);
765   return (0);
766 } /* }}} int enqueue_cache_item */
768 /*
769  * tree_callback_flush:
770  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771  * while this is in progress.
772  */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774     gpointer data)
776   cache_item_t *ci;
777   callback_flush_data_t *cfd;
779   ci = (cache_item_t *) value;
780   cfd = (callback_flush_data_t *) data;
782   if (ci->flags & CI_FLAGS_IN_QUEUE)
783     return FALSE;
785   if (ci->values_num > 0
786       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787   {
788     enqueue_cache_item (ci, TAIL);
789   }
790   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791       && (ci->values_num <= 0))
792   {
793     assert ((char *) key == ci->file);
794     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795     {
796       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797       return (FALSE);
798     }
799   }
801   return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
804 static int flush_old_values (int max_age)
806   callback_flush_data_t cfd;
807   size_t k;
809   memset (&cfd, 0, sizeof (cfd));
810   /* Pass the current time as user data so that we don't need to call
811    * `time' for each node. */
812   cfd.now = time (NULL);
813   cfd.keys = NULL;
814   cfd.keys_num = 0;
816   if (max_age > 0)
817     cfd.abs_timeout = cfd.now - max_age;
818   else
819     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
821   /* `tree_callback_flush' will return the keys of all values that haven't
822    * been touched in the last `config_flush_interval' seconds in `cfd'.
823    * The char*'s in this array point to the same memory as ci->file, so we
824    * don't need to free them separately. */
825   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
827   for (k = 0; k < cfd.keys_num; k++)
828   {
829     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830     /* should never fail, since we have held the cache_lock
831      * the entire time */
832     assert(status == TRUE);
833   }
835   if (cfd.keys != NULL)
836   {
837     free (cfd.keys);
838     cfd.keys = NULL;
839   }
841   return (0);
842 } /* int flush_old_values */
844 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
846   struct timeval now;
847   struct timespec next_flush;
848   int status;
850   gettimeofday (&now, NULL);
851   next_flush.tv_sec = now.tv_sec + config_flush_interval;
852   next_flush.tv_nsec = 1000 * now.tv_usec;
854   pthread_mutex_lock(&cache_lock);
856   while (state == RUNNING)
857   {
858     gettimeofday (&now, NULL);
859     if ((now.tv_sec > next_flush.tv_sec)
860         || ((now.tv_sec == next_flush.tv_sec)
861           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862     {
863       RRDD_LOG(LOG_DEBUG, "flushing old values");
865       /* Determine the time of the next cache flush. */
866       next_flush.tv_sec = now.tv_sec + config_flush_interval;
868       /* Flush all values that haven't been written in the last
869        * `config_write_interval' seconds. */
870       flush_old_values (config_write_interval);
872       /* unlock the cache while we rotate so we don't block incoming
873        * updates if the fsync() blocks on disk I/O */
874       pthread_mutex_unlock(&cache_lock);
875       journal_rotate();
876       pthread_mutex_lock(&cache_lock);
877     }
879     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880     if (status != 0 && status != ETIMEDOUT)
881     {
882       RRDD_LOG (LOG_ERR, "flush_thread_main: "
883                 "pthread_cond_timedwait returned %i.", status);
884     }
885   }
887   if (config_flush_at_shutdown)
888     flush_old_values (-1); /* flush everything */
890   state = SHUTDOWN;
892   pthread_mutex_unlock(&cache_lock);
894   return NULL;
895 } /* void *flush_thread_main */
897 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
899   pthread_mutex_lock (&cache_lock);
901   while (state != SHUTDOWN
902          || (cache_queue_head != NULL && config_flush_at_shutdown))
903   {
904     cache_item_t *ci;
905     char *file;
906     char **values;
907     size_t values_num;
908     int status;
910     /* Now, check if there's something to store away. If not, wait until
911      * something comes in. */
912     if (cache_queue_head == NULL)
913     {
914       status = pthread_cond_wait (&queue_cond, &cache_lock);
915       if ((status != 0) && (status != ETIMEDOUT))
916       {
917         RRDD_LOG (LOG_ERR, "queue_thread_main: "
918             "pthread_cond_wait returned %i.", status);
919       }
920     }
922     /* Check if a value has arrived. This may be NULL if we timed out or there
923      * was an interrupt such as a signal. */
924     if (cache_queue_head == NULL)
925       continue;
927     ci = cache_queue_head;
929     /* copy the relevant parts */
930     file = strdup (ci->file);
931     if (file == NULL)
932     {
933       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934       continue;
935     }
937     assert(ci->values != NULL);
938     assert(ci->values_num > 0);
940     values = ci->values;
941     values_num = ci->values_num;
943     wipe_ci_values(ci, time(NULL));
944     remove_from_queue(ci);
946     pthread_mutex_unlock (&cache_lock);
948     rrd_clear_error ();
949     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950     if (status != 0)
951     {
952       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953           "rrd_update_r (%s) failed with status %i. (%s)",
954           file, status, rrd_get_error());
955     }
957     journal_write("wrote", file);
959     /* Search again in the tree.  It's possible someone issued a "FORGET"
960      * while we were writing the update values. */
961     pthread_mutex_lock(&cache_lock);
962     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963     if (ci)
964       pthread_cond_broadcast(&ci->flushed);
965     pthread_mutex_unlock(&cache_lock);
967     if (status == 0)
968     {
969       pthread_mutex_lock (&stats_lock);
970       stats_updates_written++;
971       stats_data_sets_written += values_num;
972       pthread_mutex_unlock (&stats_lock);
973     }
975     rrd_free_ptrs((void ***) &values, &values_num);
976     free(file);
978     pthread_mutex_lock (&cache_lock);
979   }
980   pthread_mutex_unlock (&cache_lock);
982   return (NULL);
983 } /* }}} void *queue_thread_main */
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986     size_t *buffer_size_ret, char **field_ret)
988   char *buffer;
989   size_t buffer_pos;
990   size_t buffer_size;
991   char *field;
992   size_t field_size;
993   int status;
995   buffer = *buffer_ret;
996   buffer_pos = 0;
997   buffer_size = *buffer_size_ret;
998   field = *buffer_ret;
999   field_size = 0;
1001   if (buffer_size <= 0)
1002     return (-1);
1004   /* This is ensured by `handle_request'. */
1005   assert (buffer[buffer_size - 1] == '\0');
1007   status = -1;
1008   while (buffer_pos < buffer_size)
1009   {
1010     /* Check for end-of-field or end-of-buffer */
1011     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012     {
1013       field[field_size] = 0;
1014       field_size++;
1015       buffer_pos++;
1016       status = 0;
1017       break;
1018     }
1019     /* Handle escaped characters. */
1020     else if (buffer[buffer_pos] == '\\')
1021     {
1022       if (buffer_pos >= (buffer_size - 1))
1023         break;
1024       buffer_pos++;
1025       field[field_size] = buffer[buffer_pos];
1026       field_size++;
1027       buffer_pos++;
1028     }
1029     /* Normal operation */ 
1030     else
1031     {
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036   } /* while (buffer_pos < buffer_size) */
1038   if (status != 0)
1039     return (status);
1041   *buffer_ret = buffer + buffer_pos;
1042   *buffer_size_ret = buffer_size - buffer_pos;
1043   *field_ret = field;
1045   return (0);
1046 } /* }}} int buffer_get_field */
1048 /* if we're restricting writes to the base directory,
1049  * check whether the file falls within the dir
1050  * returns 1 if OK, otherwise 0
1051  */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1054   assert(file != NULL);
1056   if (!config_write_base_only
1057       || JOURNAL_REPLAY(sock)
1058       || config_base_dir == NULL)
1059     return 1;
1061   if (strstr(file, "../") != NULL) goto err;
1063   /* relative paths without "../" are ok */
1064   if (*file != '/') return 1;
1066   /* file must be of the format base + "/" + <1+ char filename> */
1067   if (strlen(file) < _config_base_dir_len + 2) goto err;
1068   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069   if (*(file + _config_base_dir_len) != '/') goto err;
1071   return 1;
1073 err:
1074   if (sock != NULL && sock->fd >= 0)
1075     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1077   return 0;
1078 } /* }}} static int check_file_access */
1080 /* when using a base dir, convert relative paths to absolute paths.
1081  * if necessary, modifies the "filename" pointer to point
1082  * to the new path created in "tmp".  "tmp" is provided
1083  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084  *
1085  * this allows us to optimize for the expected case (absolute path)
1086  * with a no-op.
1087  */
1088 static void get_abs_path(char **filename, char *tmp)
1090   assert(tmp != NULL);
1091   assert(filename != NULL && *filename != NULL);
1093   if (config_base_dir == NULL || **filename == '/')
1094     return;
1096   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097   *filename = tmp;
1098 } /* }}} static int get_abs_path */
1100 static int flush_file (const char *filename) /* {{{ */
1102   cache_item_t *ci;
1104   pthread_mutex_lock (&cache_lock);
1106   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107   if (ci == NULL)
1108   {
1109     pthread_mutex_unlock (&cache_lock);
1110     return (ENOENT);
1111   }
1113   if (ci->values_num > 0)
1114   {
1115     /* Enqueue at head */
1116     enqueue_cache_item (ci, HEAD);
1117     pthread_cond_wait(&ci->flushed, &cache_lock);
1118   }
1120   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1121    * may have been purged during our cond_wait() */
1123   pthread_mutex_unlock(&cache_lock);
1125   return (0);
1126 } /* }}} int flush_file */
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1130   char *err = "Syntax error.\n";
1132   if (cmd && cmd->syntax)
1133     err = cmd->syntax;
1135   return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1140   uint64_t copy_queue_length;
1141   uint64_t copy_updates_received;
1142   uint64_t copy_flush_received;
1143   uint64_t copy_updates_written;
1144   uint64_t copy_data_sets_written;
1145   uint64_t copy_journal_bytes;
1146   uint64_t copy_journal_rotate;
1148   uint64_t tree_nodes_number;
1149   uint64_t tree_depth;
1151   pthread_mutex_lock (&stats_lock);
1152   copy_queue_length       = stats_queue_length;
1153   copy_updates_received   = stats_updates_received;
1154   copy_flush_received     = stats_flush_received;
1155   copy_updates_written    = stats_updates_written;
1156   copy_data_sets_written  = stats_data_sets_written;
1157   copy_journal_bytes      = stats_journal_bytes;
1158   copy_journal_rotate     = stats_journal_rotate;
1159   pthread_mutex_unlock (&stats_lock);
1161   pthread_mutex_lock (&cache_lock);
1162   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1164   pthread_mutex_unlock (&cache_lock);
1166   add_response_info(sock,
1167                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1168   add_response_info(sock,
1169                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170   add_response_info(sock,
1171                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172   add_response_info(sock,
1173                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174   add_response_info(sock,
1175                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1181   send_response(sock, RESP_OK, "Statistics follow\n");
1183   return (0);
1184 } /* }}} int handle_request_stats */
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1188   char *file, file_tmp[PATH_MAX];
1189   int status;
1191   status = buffer_get_field (&buffer, &buffer_size, &file);
1192   if (status != 0)
1193   {
1194     return syntax_error(sock,cmd);
1195   }
1196   else
1197   {
1198     pthread_mutex_lock(&stats_lock);
1199     stats_flush_received++;
1200     pthread_mutex_unlock(&stats_lock);
1202     get_abs_path(&file, file_tmp);
1203     if (!check_file_access(file, sock)) return 0;
1205     status = flush_file (file);
1206     if (status == 0)
1207       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208     else if (status == ENOENT)
1209     {
1210       /* no file in our tree; see whether it exists at all */
1211       struct stat statbuf;
1213       memset(&statbuf, 0, sizeof(statbuf));
1214       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216       else
1217         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218     }
1219     else if (status < 0)
1220       return send_response(sock, RESP_ERR, "Internal error.\n");
1221     else
1222       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223   }
1225   /* NOTREACHED */
1226   assert(1==0);
1227 } /* }}} int handle_request_flush */
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1231   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1233   pthread_mutex_lock(&cache_lock);
1234   flush_old_values(-1);
1235   pthread_mutex_unlock(&cache_lock);
1237   return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1242   int status;
1243   char *file, file_tmp[PATH_MAX];
1244   cache_item_t *ci;
1246   status = buffer_get_field(&buffer, &buffer_size, &file);
1247   if (status != 0)
1248     return syntax_error(sock,cmd);
1250   get_abs_path(&file, file_tmp);
1252   pthread_mutex_lock(&cache_lock);
1253   ci = g_tree_lookup(cache_tree, file);
1254   if (ci == NULL)
1255   {
1256     pthread_mutex_unlock(&cache_lock);
1257     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258   }
1260   for (size_t i=0; i < ci->values_num; i++)
1261     add_response_info(sock, "%s\n", ci->values[i]);
1263   pthread_mutex_unlock(&cache_lock);
1264   return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1269   int status;
1270   gboolean found;
1271   char *file, file_tmp[PATH_MAX];
1273   status = buffer_get_field(&buffer, &buffer_size, &file);
1274   if (status != 0)
1275     return syntax_error(sock,cmd);
1277   get_abs_path(&file, file_tmp);
1278   if (!check_file_access(file, sock)) return 0;
1280   pthread_mutex_lock(&cache_lock);
1281   found = g_tree_remove(cache_tree, file);
1282   pthread_mutex_unlock(&cache_lock);
1284   if (found == TRUE)
1285   {
1286     if (!JOURNAL_REPLAY(sock))
1287       journal_write("forget", file);
1289     return send_response(sock, RESP_OK, "Gone!\n");
1290   }
1291   else
1292     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1294   /* NOTREACHED */
1295   assert(1==0);
1296 } /* }}} static int handle_request_forget */
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1300   cache_item_t *ci;
1302   pthread_mutex_lock(&cache_lock);
1304   ci = cache_queue_head;
1305   while (ci != NULL)
1306   {
1307     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308     ci = ci->next;
1309   }
1311   pthread_mutex_unlock(&cache_lock);
1313   return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1318   char *file, file_tmp[PATH_MAX];
1319   int values_num = 0;
1320   int status;
1321   char orig_buf[CMD_MAX];
1323   cache_item_t *ci;
1325   /* save it for the journal later */
1326   if (!JOURNAL_REPLAY(sock))
1327     strncpy(orig_buf, buffer, buffer_size);
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return syntax_error(sock,cmd);
1333   pthread_mutex_lock(&stats_lock);
1334   stats_updates_received++;
1335   pthread_mutex_unlock(&stats_lock);
1337   get_abs_path(&file, file_tmp);
1338   if (!check_file_access(file, sock)) return 0;
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346     cache_item_t *tmp;
1348     /* don't hold the lock while we setup; stat(2) might block */
1349     pthread_mutex_unlock(&cache_lock);
1351     memset (&statbuf, 0, sizeof (statbuf));
1352     status = stat (file, &statbuf);
1353     if (status != 0)
1354     {
1355       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1357       status = errno;
1358       if (status == ENOENT)
1359         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360       else
1361         return send_response(sock, RESP_ERR,
1362                              "stat failed with error %i.\n", status);
1363     }
1364     if (!S_ISREG (statbuf.st_mode))
1365       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1367     if (access(file, R_OK|W_OK) != 0)
1368       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369                            file, rrd_strerror(errno));
1371     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372     if (ci == NULL)
1373     {
1374       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1376       return send_response(sock, RESP_ERR, "malloc failed.\n");
1377     }
1378     memset (ci, 0, sizeof (cache_item_t));
1380     ci->file = strdup (file);
1381     if (ci->file == NULL)
1382     {
1383       free (ci);
1384       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1386       return send_response(sock, RESP_ERR, "strdup failed.\n");
1387     }
1389     wipe_ci_values(ci, now);
1390     ci->flags = CI_FLAGS_IN_TREE;
1391     pthread_cond_init(&ci->flushed, NULL);
1393     pthread_mutex_lock(&cache_lock);
1395     /* another UPDATE might have added this entry in the meantime */
1396     tmp = g_tree_lookup (cache_tree, file);
1397     if (tmp == NULL)
1398       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399     else
1400     {
1401       free_cache_item (ci);
1402       ci = tmp;
1403     }
1405     /* state may have changed while we were unlocked */
1406     if (state == SHUTDOWN)
1407       return -1;
1408   } /* }}} */
1409   assert (ci != NULL);
1411   /* don't re-write updates in replay mode */
1412   if (!JOURNAL_REPLAY(sock))
1413     journal_write("update", orig_buf);
1415   while (buffer_size > 0)
1416   {
1417     char *value;
1418     time_t stamp;
1419     char *eostamp;
1421     status = buffer_get_field (&buffer, &buffer_size, &value);
1422     if (status != 0)
1423     {
1424       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425       break;
1426     }
1428     /* make sure update time is always moving forward */
1429     stamp = strtol(value, &eostamp, 10);
1430     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1431     {
1432       pthread_mutex_unlock(&cache_lock);
1433       return send_response(sock, RESP_ERR,
1434                            "Cannot find timestamp in '%s'!\n", value);
1435     }
1436     else if (stamp <= ci->last_update_stamp)
1437     {
1438       pthread_mutex_unlock(&cache_lock);
1439       return send_response(sock, RESP_ERR,
1440                            "illegal attempt to update using time %ld when last"
1441                            " update time is %ld (minimum one second step)\n",
1442                            stamp, ci->last_update_stamp);
1443     }
1444     else
1445       ci->last_update_stamp = stamp;
1447     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1448                               &ci->values_alloc, config_alloc_chunk))
1449     {
1450       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1451       continue;
1452     }
1454     values_num++;
1455   }
1457   if (((now - ci->last_flush_time) >= config_write_interval)
1458       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1459       && (ci->values_num > 0))
1460   {
1461     enqueue_cache_item (ci, TAIL);
1462   }
1464   pthread_mutex_unlock (&cache_lock);
1466   if (values_num < 1)
1467     return send_response(sock, RESP_ERR, "No values updated.\n");
1468   else
1469     return send_response(sock, RESP_OK,
1470                          "errors, enqueued %i value(s).\n", values_num);
1472   /* NOTREACHED */
1473   assert(1==0);
1475 } /* }}} int handle_request_update */
1477 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1479   char *file;
1480   char *cf;
1482   char *start_str;
1483   char *end_str;
1484   time_t start_tm;
1485   time_t end_tm;
1487   unsigned long step;
1488   unsigned long ds_cnt;
1489   char **ds_namv;
1490   rrd_value_t *data;
1492   int status;
1493   unsigned long i;
1494   time_t t;
1495   rrd_value_t *data_ptr;
1497   file = NULL;
1498   cf = NULL;
1499   start_str = NULL;
1500   end_str = NULL;
1502   /* Read the arguments */
1503   do /* while (0) */
1504   {
1505     status = buffer_get_field (&buffer, &buffer_size, &file);
1506     if (status != 0)
1507       break;
1509     status = buffer_get_field (&buffer, &buffer_size, &cf);
1510     if (status != 0)
1511       break;
1513     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1514     if (status != 0)
1515     {
1516       start_str = NULL;
1517       status = 0;
1518       break;
1519     }
1521     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1522     if (status != 0)
1523     {
1524       end_str = NULL;
1525       status = 0;
1526       break;
1527     }
1528   } while (0);
1530   if (status != 0)
1531     return (syntax_error(sock,cmd));
1533   status = flush_file (file);
1534   if ((status != 0) && (status != ENOENT))
1535     return (send_response (sock, RESP_ERR,
1536           "flush_file (%s) failed with status %i.\n", file, status));
1538   t = time (NULL); /* "now" */
1540   /* Parse start time */
1541   if (start_str != NULL)
1542   {
1543     char *endptr;
1544     long value;
1546     endptr = NULL;
1547     errno = 0;
1548     value = strtol (start_str, &endptr, /* base = */ 0);
1549     if ((endptr == start_str) || (errno != 0))
1550       return (send_response(sock, RESP_ERR,
1551             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1552             start_str));
1554     if (value > 0)
1555       start_tm = (time_t) value;
1556     else
1557       start_tm = (time_t) (t + value);
1558   }
1559   else
1560   {
1561     start_tm = t - 86400;
1562   }
1564   /* Parse end time */
1565   if (end_str != NULL)
1566   {
1567     char *endptr;
1568     long value;
1570     endptr = NULL;
1571     errno = 0;
1572     value = strtol (end_str, &endptr, /* base = */ 0);
1573     if ((endptr == end_str) || (errno != 0))
1574       return (send_response(sock, RESP_ERR,
1575             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1576             end_str));
1578     if (value > 0)
1579       end_tm = (time_t) value;
1580     else
1581       end_tm = (time_t) (t + value);
1582   }
1583   else
1584   {
1585     end_tm = t;
1586   }
1588   step = -1;
1589   ds_cnt = 0;
1590   ds_namv = NULL;
1591   data = NULL;
1593   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1594       &ds_cnt, &ds_namv, &data);
1595   if (status != 0)
1596     return (send_response(sock, RESP_ERR,
1597           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1599   add_response_info (sock, "FlushVersion: %lu\n", 1);
1600   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1601   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1602   add_response_info (sock, "Step: %lu\n", step);
1603   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1605 #define SSTRCAT(buffer,str,buffer_fill) do { \
1606     size_t str_len = strlen (str); \
1607     if ((buffer_fill + str_len) > sizeof (buffer)) \
1608       str_len = sizeof (buffer) - buffer_fill; \
1609     if (str_len > 0) { \
1610       strncpy (buffer + buffer_fill, str, str_len); \
1611       buffer_fill += str_len; \
1612       assert (buffer_fill <= sizeof (buffer)); \
1613       if (buffer_fill == sizeof (buffer)) \
1614         buffer[buffer_fill - 1] = 0; \
1615       else \
1616         buffer[buffer_fill] = 0; \
1617     } \
1618   } while (0)
1620   { /* Add list of DS names */
1621     char linebuf[1024];
1622     size_t linebuf_fill;
1624     memset (linebuf, 0, sizeof (linebuf));
1625     linebuf_fill = 0;
1626     for (i = 0; i < ds_cnt; i++)
1627     {
1628       if (i > 0)
1629         SSTRCAT (linebuf, " ", linebuf_fill);
1630       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1631       rrd_freemem(ds_namv[i]);
1632     }
1633     rrd_freemem(ds_namv);
1634     add_response_info (sock, "DSName: %s\n", linebuf);
1635   }
1637   /* Add the actual data */
1638   assert (step > 0);
1639   data_ptr = data;
1640   for (t = start_tm + step; t <= end_tm; t += step)
1641   {
1642     char linebuf[1024];
1643     size_t linebuf_fill;
1644     char tmp[128];
1646     memset (linebuf, 0, sizeof (linebuf));
1647     linebuf_fill = 0;
1648     for (i = 0; i < ds_cnt; i++)
1649     {
1650       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1651       tmp[sizeof (tmp) - 1] = 0;
1652       SSTRCAT (linebuf, tmp, linebuf_fill);
1654       data_ptr++;
1655     }
1657     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1658   } /* for (t) */
1659   rrd_freemem(data);
1661   return (send_response (sock, RESP_OK, "Success\n"));
1662 #undef SSTRCAT
1663 } /* }}} int handle_request_fetch */
1665 /* we came across a "WROTE" entry during journal replay.
1666  * throw away any values that we have accumulated for this file
1667  */
1668 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1670   cache_item_t *ci;
1671   const char *file = buffer;
1673   pthread_mutex_lock(&cache_lock);
1675   ci = g_tree_lookup(cache_tree, file);
1676   if (ci == NULL)
1677   {
1678     pthread_mutex_unlock(&cache_lock);
1679     return (0);
1680   }
1682   if (ci->values)
1683     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1685   wipe_ci_values(ci, now);
1686   remove_from_queue(ci);
1688   pthread_mutex_unlock(&cache_lock);
1689   return (0);
1690 } /* }}} int handle_request_wrote */
1692 /* start "BATCH" processing */
1693 static int batch_start (HANDLER_PROTO) /* {{{ */
1695   int status;
1696   if (sock->batch_start)
1697     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1699   status = send_response(sock, RESP_OK,
1700                          "Go ahead.  End with dot '.' on its own line.\n");
1701   sock->batch_start = time(NULL);
1702   sock->batch_cmd = 0;
1704   return status;
1705 } /* }}} static int batch_start */
1707 /* finish "BATCH" processing and return results to the client */
1708 static int batch_done (HANDLER_PROTO) /* {{{ */
1710   assert(sock->batch_start);
1711   sock->batch_start = 0;
1712   sock->batch_cmd  = 0;
1713   return send_response(sock, RESP_OK, "errors\n");
1714 } /* }}} static int batch_done */
1716 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1718   return -1;
1719 } /* }}} static int handle_request_quit */
1721 static command_t list_of_commands[] = { /* {{{ */
1722   {
1723     "UPDATE",
1724     handle_request_update,
1725     CMD_CONTEXT_ANY,
1726     "UPDATE <filename> <values> [<values> ...]\n"
1727     ,
1728     "Adds the given file to the internal cache if it is not yet known and\n"
1729     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1730     "for details.\n"
1731     "\n"
1732     "Each <values> has the following form:\n"
1733     "  <values> = <time>:<value>[:<value>[...]]\n"
1734     "See the rrdupdate(1) manpage for details.\n"
1735   },
1736   {
1737     "WROTE",
1738     handle_request_wrote,
1739     CMD_CONTEXT_JOURNAL,
1740     NULL,
1741     NULL
1742   },
1743   {
1744     "FLUSH",
1745     handle_request_flush,
1746     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1747     "FLUSH <filename>\n"
1748     ,
1749     "Adds the given filename to the head of the update queue and returns\n"
1750     "after it has been dequeued.\n"
1751   },
1752   {
1753     "FLUSHALL",
1754     handle_request_flushall,
1755     CMD_CONTEXT_CLIENT,
1756     "FLUSHALL\n"
1757     ,
1758     "Triggers writing of all pending updates.  Returns immediately.\n"
1759   },
1760   {
1761     "PENDING",
1762     handle_request_pending,
1763     CMD_CONTEXT_CLIENT,
1764     "PENDING <filename>\n"
1765     ,
1766     "Shows any 'pending' updates for a file, in order.\n"
1767     "The updates shown have not yet been written to the underlying RRD file.\n"
1768   },
1769   {
1770     "FORGET",
1771     handle_request_forget,
1772     CMD_CONTEXT_ANY,
1773     "FORGET <filename>\n"
1774     ,
1775     "Removes the file completely from the cache.\n"
1776     "Any pending updates for the file will be lost.\n"
1777   },
1778   {
1779     "QUEUE",
1780     handle_request_queue,
1781     CMD_CONTEXT_CLIENT,
1782     "QUEUE\n"
1783     ,
1784         "Shows all files in the output queue.\n"
1785     "The output is zero or more lines in the following format:\n"
1786     "(where <num_vals> is the number of values to be written)\n"
1787     "\n"
1788     "<num_vals> <filename>\n"
1789   },
1790   {
1791     "STATS",
1792     handle_request_stats,
1793     CMD_CONTEXT_CLIENT,
1794     "STATS\n"
1795     ,
1796     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1797     "a description of the values.\n"
1798   },
1799   {
1800     "HELP",
1801     handle_request_help,
1802     CMD_CONTEXT_CLIENT,
1803     "HELP [<command>]\n",
1804     NULL, /* special! */
1805   },
1806   {
1807     "BATCH",
1808     batch_start,
1809     CMD_CONTEXT_CLIENT,
1810     "BATCH\n"
1811     ,
1812     "The 'BATCH' command permits the client to initiate a bulk load\n"
1813     "   of commands to rrdcached.\n"
1814     "\n"
1815     "Usage:\n"
1816     "\n"
1817     "    client: BATCH\n"
1818     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1819     "    client: command #1\n"
1820     "    client: command #2\n"
1821     "    client: ... and so on\n"
1822     "    client: .\n"
1823     "    server: 2 errors\n"
1824     "    server: 7 message for command #7\n"
1825     "    server: 9 message for command #9\n"
1826     "\n"
1827     "For more information, consult the rrdcached(1) documentation.\n"
1828   },
1829   {
1830     ".",   /* BATCH terminator */
1831     batch_done,
1832     CMD_CONTEXT_BATCH,
1833     NULL,
1834     NULL
1835   },
1836   {
1837     "FETCH",
1838     handle_request_fetch,
1839     CMD_CONTEXT_CLIENT,
1840     "FETCH <file> <CF> [<start> [<end>]]\n"
1841     ,
1842     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1843   },
1844   {
1845     "QUIT",
1846     handle_request_quit,
1847     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1848     "QUIT\n"
1849     ,
1850     "Disconnect from rrdcached.\n"
1851   }
1852 }; /* }}} command_t list_of_commands[] */
1853 static size_t list_of_commands_len = sizeof (list_of_commands)
1854   / sizeof (list_of_commands[0]);
1856 static command_t *find_command(char *cmd)
1858   size_t i;
1860   for (i = 0; i < list_of_commands_len; i++)
1861     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1862       return (&list_of_commands[i]);
1863   return NULL;
1866 /* We currently use the index in the `list_of_commands' array as a bit position
1867  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1868  * outside these functions so that switching to a more elegant storage method
1869  * is easily possible. */
1870 static ssize_t find_command_index (const char *cmd) /* {{{ */
1872   size_t i;
1874   for (i = 0; i < list_of_commands_len; i++)
1875     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1876       return ((ssize_t) i);
1877   return (-1);
1878 } /* }}} ssize_t find_command_index */
1880 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1881     const char *cmd)
1883   ssize_t i;
1885   if (JOURNAL_REPLAY(sock))
1886     return (1);
1888   if (cmd == NULL)
1889     return (-1);
1891   if ((strcasecmp ("QUIT", cmd) == 0)
1892       || (strcasecmp ("HELP", cmd) == 0))
1893     return (1);
1894   else if (strcmp (".", cmd) == 0)
1895     cmd = "BATCH";
1897   i = find_command_index (cmd);
1898   if (i < 0)
1899     return (-1);
1900   assert (i < 32);
1902   if ((sock->permissions & (1 << i)) != 0)
1903     return (1);
1904   return (0);
1905 } /* }}} int socket_permission_check */
1907 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1908     const char *cmd)
1910   ssize_t i;
1912   i = find_command_index (cmd);
1913   if (i < 0)
1914     return (-1);
1915   assert (i < 32);
1917   sock->permissions |= (1 << i);
1918   return (0);
1919 } /* }}} int socket_permission_add */
1921 /* check whether commands are received in the expected context */
1922 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1924   if (JOURNAL_REPLAY(sock))
1925     return (cmd->context & CMD_CONTEXT_JOURNAL);
1926   else if (sock->batch_start)
1927     return (cmd->context & CMD_CONTEXT_BATCH);
1928   else
1929     return (cmd->context & CMD_CONTEXT_CLIENT);
1931   /* NOTREACHED */
1932   assert(1==0);
1935 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1937   int status;
1938   char *cmd_str;
1939   char *resp_txt;
1940   command_t *help = NULL;
1942   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1943   if (status == 0)
1944     help = find_command(cmd_str);
1946   if (help && (help->syntax || help->help))
1947   {
1948     char tmp[CMD_MAX];
1950     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1951     resp_txt = tmp;
1953     if (help->syntax)
1954       add_response_info(sock, "Usage: %s\n", help->syntax);
1956     if (help->help)
1957       add_response_info(sock, "%s\n", help->help);
1958   }
1959   else
1960   {
1961     size_t i;
1963     resp_txt = "Command overview\n";
1965     for (i = 0; i < list_of_commands_len; i++)
1966     {
1967       if (list_of_commands[i].syntax == NULL)
1968         continue;
1969       add_response_info (sock, "%s", list_of_commands[i].syntax);
1970     }
1971   }
1973   return send_response(sock, RESP_OK, resp_txt);
1974 } /* }}} int handle_request_help */
1976 static int handle_request (DISPATCH_PROTO) /* {{{ */
1978   char *buffer_ptr = buffer;
1979   char *cmd_str = NULL;
1980   command_t *cmd = NULL;
1981   int status;
1983   assert (buffer[buffer_size - 1] == '\0');
1985   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1986   if (status != 0)
1987   {
1988     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1989     return (-1);
1990   }
1992   if (sock != NULL && sock->batch_start)
1993     sock->batch_cmd++;
1995   cmd = find_command(cmd_str);
1996   if (!cmd)
1997     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1999   if (!socket_permission_check (sock, cmd->cmd))
2000     return send_response(sock, RESP_ERR, "Permission denied.\n");
2002   if (!command_check_context(sock, cmd))
2003     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2005   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2006 } /* }}} int handle_request */
2008 static void journal_set_free (journal_set *js) /* {{{ */
2010   if (js == NULL)
2011     return;
2013   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2015   free(js);
2016 } /* }}} journal_set_free */
2018 static void journal_set_remove (journal_set *js) /* {{{ */
2020   if (js == NULL)
2021     return;
2023   for (uint i=0; i < js->files_num; i++)
2024   {
2025     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2026     unlink(js->files[i]);
2027   }
2028 } /* }}} journal_set_remove */
2030 /* close current journal file handle.
2031  * MUST hold journal_lock before calling */
2032 static void journal_close(void) /* {{{ */
2034   if (journal_fh != NULL)
2035   {
2036     if (fclose(journal_fh) != 0)
2037       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2038   }
2040   journal_fh = NULL;
2041   journal_size = 0;
2042 } /* }}} journal_close */
2044 /* MUST hold journal_lock before calling */
2045 static void journal_new_file(void) /* {{{ */
2047   struct timeval now;
2048   int  new_fd;
2049   char new_file[PATH_MAX + 1];
2051   assert(journal_dir != NULL);
2052   assert(journal_cur != NULL);
2054   journal_close();
2056   gettimeofday(&now, NULL);
2057   /* this format assures that the files sort in strcmp() order */
2058   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2059            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2061   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2062                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2063   if (new_fd < 0)
2064     goto error;
2066   journal_fh = fdopen(new_fd, "a");
2067   if (journal_fh == NULL)
2068     goto error;
2070   journal_size = ftell(journal_fh);
2071   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2073   /* record the file in the journal set */
2074   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2076   return;
2078 error:
2079   RRDD_LOG(LOG_CRIT,
2080            "JOURNALING DISABLED: Error while trying to create %s : %s",
2081            new_file, rrd_strerror(errno));
2082   RRDD_LOG(LOG_CRIT,
2083            "JOURNALING DISABLED: All values will be flushed at shutdown");
2085   close(new_fd);
2086   config_flush_at_shutdown = 1;
2088 } /* }}} journal_new_file */
2090 /* MUST NOT hold journal_lock before calling this */
2091 static void journal_rotate(void) /* {{{ */
2093   journal_set *old_js = NULL;
2095   if (journal_dir == NULL)
2096     return;
2098   RRDD_LOG(LOG_DEBUG, "rotating journals");
2100   pthread_mutex_lock(&stats_lock);
2101   ++stats_journal_rotate;
2102   pthread_mutex_unlock(&stats_lock);
2104   pthread_mutex_lock(&journal_lock);
2106   journal_close();
2108   /* rotate the journal sets */
2109   old_js = journal_old;
2110   journal_old = journal_cur;
2111   journal_cur = calloc(1, sizeof(journal_set));
2113   if (journal_cur != NULL)
2114     journal_new_file();
2115   else
2116     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2118   pthread_mutex_unlock(&journal_lock);
2120   journal_set_remove(old_js);
2121   journal_set_free  (old_js);
2123 } /* }}} static void journal_rotate */
2125 /* MUST hold journal_lock when calling */
2126 static void journal_done(void) /* {{{ */
2128   if (journal_cur == NULL)
2129     return;
2131   journal_close();
2133   if (config_flush_at_shutdown)
2134   {
2135     RRDD_LOG(LOG_INFO, "removing journals");
2136     journal_set_remove(journal_old);
2137     journal_set_remove(journal_cur);
2138   }
2139   else
2140   {
2141     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2142              "journals will be used at next startup");
2143   }
2145   journal_set_free(journal_cur);
2146   journal_set_free(journal_old);
2147   free(journal_dir);
2149 } /* }}} static void journal_done */
2151 static int journal_write(char *cmd, char *args) /* {{{ */
2153   int chars;
2155   if (journal_fh == NULL)
2156     return 0;
2158   pthread_mutex_lock(&journal_lock);
2159   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2160   journal_size += chars;
2162   if (journal_size > JOURNAL_MAX)
2163     journal_new_file();
2165   pthread_mutex_unlock(&journal_lock);
2167   if (chars > 0)
2168   {
2169     pthread_mutex_lock(&stats_lock);
2170     stats_journal_bytes += chars;
2171     pthread_mutex_unlock(&stats_lock);
2172   }
2174   return chars;
2175 } /* }}} static int journal_write */
2177 static int journal_replay (const char *file) /* {{{ */
2179   FILE *fh;
2180   int entry_cnt = 0;
2181   int fail_cnt = 0;
2182   uint64_t line = 0;
2183   char entry[CMD_MAX];
2184   time_t now;
2186   if (file == NULL) return 0;
2188   {
2189     char *reason = "unknown error";
2190     int status = 0;
2191     struct stat statbuf;
2193     memset(&statbuf, 0, sizeof(statbuf));
2194     if (stat(file, &statbuf) != 0)
2195     {
2196       reason = "stat error";
2197       status = errno;
2198     }
2199     else if (!S_ISREG(statbuf.st_mode))
2200     {
2201       reason = "not a regular file";
2202       status = EPERM;
2203     }
2204     if (statbuf.st_uid != daemon_uid)
2205     {
2206       reason = "not owned by daemon user";
2207       status = EACCES;
2208     }
2209     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2210     {
2211       reason = "must not be user/group writable";
2212       status = EACCES;
2213     }
2215     if (status != 0)
2216     {
2217       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2218                file, rrd_strerror(status), reason);
2219       return 0;
2220     }
2221   }
2223   fh = fopen(file, "r");
2224   if (fh == NULL)
2225   {
2226     if (errno != ENOENT)
2227       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2228                file, rrd_strerror(errno));
2229     return 0;
2230   }
2231   else
2232     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2234   now = time(NULL);
2236   while(!feof(fh))
2237   {
2238     size_t entry_len;
2240     ++line;
2241     if (fgets(entry, sizeof(entry), fh) == NULL)
2242       break;
2243     entry_len = strlen(entry);
2245     /* check \n termination in case journal writing crashed mid-line */
2246     if (entry_len == 0)
2247       continue;
2248     else if (entry[entry_len - 1] != '\n')
2249     {
2250       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2251       ++fail_cnt;
2252       continue;
2253     }
2255     entry[entry_len - 1] = '\0';
2257     if (handle_request(NULL, now, entry, entry_len) == 0)
2258       ++entry_cnt;
2259     else
2260       ++fail_cnt;
2261   }
2263   fclose(fh);
2265   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2266            entry_cnt, fail_cnt);
2268   return entry_cnt > 0 ? 1 : 0;
2269 } /* }}} static int journal_replay */
2271 static int journal_sort(const void *v1, const void *v2)
2273   char **jn1 = (char **) v1;
2274   char **jn2 = (char **) v2;
2276   return strcmp(*jn1,*jn2);
2279 static void journal_init(void) /* {{{ */
2281   int had_journal = 0;
2282   DIR *dir;
2283   struct dirent *dent;
2284   char path[PATH_MAX+1];
2286   if (journal_dir == NULL) return;
2288   pthread_mutex_lock(&journal_lock);
2290   journal_cur = calloc(1, sizeof(journal_set));
2291   if (journal_cur == NULL)
2292   {
2293     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2294     return;
2295   }
2297   RRDD_LOG(LOG_INFO, "checking for journal files");
2299   /* Handle old journal files during transition.  This gives them the
2300    * correct sort order.  TODO: remove after first release
2301    */
2302   {
2303     char old_path[PATH_MAX+1];
2304     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2305     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2306     rename(old_path, path);
2308     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2309     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2310     rename(old_path, path);
2311   }
2313   dir = opendir(journal_dir);
2314   if (!dir) {
2315     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2316     return;
2317   }
2318   while ((dent = readdir(dir)) != NULL)
2319   {
2320     /* looks like a journal file? */
2321     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2322       continue;
2324     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2326     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2327     {
2328       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2329                dent->d_name);
2330       break;
2331     }
2332   }
2333   closedir(dir);
2335   qsort(journal_cur->files, journal_cur->files_num,
2336         sizeof(journal_cur->files[0]), journal_sort);
2338   for (uint i=0; i < journal_cur->files_num; i++)
2339     had_journal += journal_replay(journal_cur->files[i]);
2341   journal_new_file();
2343   /* it must have been a crash.  start a flush */
2344   if (had_journal && config_flush_at_shutdown)
2345     flush_old_values(-1);
2347   pthread_mutex_unlock(&journal_lock);
2349   RRDD_LOG(LOG_INFO, "journal processing complete");
2351 } /* }}} static void journal_init */
2353 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2355   assert(sock != NULL);
2357   free(sock->rbuf);  sock->rbuf = NULL;
2358   free(sock->wbuf);  sock->wbuf = NULL;
2359   free(sock);
2360 } /* }}} void free_listen_socket */
2362 static void close_connection(listen_socket_t *sock) /* {{{ */
2364   if (sock->fd >= 0)
2365   {
2366     close(sock->fd);
2367     sock->fd = -1;
2368   }
2370   free_listen_socket(sock);
2372 } /* }}} void close_connection */
2374 static void *connection_thread_main (void *args) /* {{{ */
2376   listen_socket_t *sock;
2377   int fd;
2379   sock = (listen_socket_t *) args;
2380   fd = sock->fd;
2382   /* init read buffers */
2383   sock->next_read = sock->next_cmd = 0;
2384   sock->rbuf = malloc(RBUF_SIZE);
2385   if (sock->rbuf == NULL)
2386   {
2387     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2388     close_connection(sock);
2389     return NULL;
2390   }
2392   pthread_mutex_lock (&connection_threads_lock);
2393   connection_threads_num++;
2394   pthread_mutex_unlock (&connection_threads_lock);
2396   while (state == RUNNING)
2397   {
2398     char *cmd;
2399     ssize_t cmd_len;
2400     ssize_t rbytes;
2401     time_t now;
2403     struct pollfd pollfd;
2404     int status;
2406     pollfd.fd = fd;
2407     pollfd.events = POLLIN | POLLPRI;
2408     pollfd.revents = 0;
2410     status = poll (&pollfd, 1, /* timeout = */ 500);
2411     if (state != RUNNING)
2412       break;
2413     else if (status == 0) /* timeout */
2414       continue;
2415     else if (status < 0) /* error */
2416     {
2417       status = errno;
2418       if (status != EINTR)
2419         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2420       continue;
2421     }
2423     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2424       break;
2425     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2426     {
2427       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2428           "poll(2) returned something unexpected: %#04hx",
2429           pollfd.revents);
2430       break;
2431     }
2433     rbytes = read(fd, sock->rbuf + sock->next_read,
2434                   RBUF_SIZE - sock->next_read);
2435     if (rbytes < 0)
2436     {
2437       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2438       break;
2439     }
2440     else if (rbytes == 0)
2441       break; /* eof */
2443     sock->next_read += rbytes;
2445     if (sock->batch_start)
2446       now = sock->batch_start;
2447     else
2448       now = time(NULL);
2450     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2451     {
2452       status = handle_request (sock, now, cmd, cmd_len+1);
2453       if (status != 0)
2454         goto out_close;
2455     }
2456   }
2458 out_close:
2459   close_connection(sock);
2461   /* Remove this thread from the connection threads list */
2462   pthread_mutex_lock (&connection_threads_lock);
2463   connection_threads_num--;
2464   if (connection_threads_num <= 0)
2465     pthread_cond_broadcast(&connection_threads_done);
2466   pthread_mutex_unlock (&connection_threads_lock);
2468   return (NULL);
2469 } /* }}} void *connection_thread_main */
2471 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2473   int fd;
2474   struct sockaddr_un sa;
2475   listen_socket_t *temp;
2476   int status;
2477   const char *path;
2478   char *path_copy, *dir;
2480   path = sock->addr;
2481   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2482     path += strlen("unix:");
2484   /* dirname may modify its argument */
2485   path_copy = strdup(path);
2486   if (path_copy == NULL)
2487   {
2488     fprintf(stderr, "rrdcached: strdup(): %s\n",
2489         rrd_strerror(errno));
2490     return (-1);
2491   }
2493   dir = dirname(path_copy);
2494   if (rrd_mkdir_p(dir, 0777) != 0)
2495   {
2496     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2497         dir, rrd_strerror(errno));
2498     return (-1);
2499   }
2501   free(path_copy);
2503   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2504       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2505   if (temp == NULL)
2506   {
2507     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2508     return (-1);
2509   }
2510   listen_fds = temp;
2511   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2513   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2514   if (fd < 0)
2515   {
2516     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2517              rrd_strerror(errno));
2518     return (-1);
2519   }
2521   memset (&sa, 0, sizeof (sa));
2522   sa.sun_family = AF_UNIX;
2523   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2525   /* if we've gotten this far, we own the pid file.  any daemon started
2526    * with the same args must not be alive.  therefore, ensure that we can
2527    * create the socket...
2528    */
2529   unlink(path);
2531   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2532   if (status != 0)
2533   {
2534     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2535              path, rrd_strerror(errno));
2536     close (fd);
2537     return (-1);
2538   }
2540   /* tweak the sockets group ownership */
2541   if (sock->socket_group != (gid_t)-1)
2542   {
2543     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2544          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2545     {
2546       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2547     }
2548   }
2550   if (sock->socket_permissions != (mode_t)-1)
2551   {
2552     if (chmod(path, sock->socket_permissions) != 0)
2553       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2554           (unsigned int)sock->socket_permissions, strerror(errno));
2555   }
2557   status = listen (fd, /* backlog = */ 10);
2558   if (status != 0)
2559   {
2560     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2561              path, rrd_strerror(errno));
2562     close (fd);
2563     unlink (path);
2564     return (-1);
2565   }
2567   listen_fds[listen_fds_num].fd = fd;
2568   listen_fds[listen_fds_num].family = PF_UNIX;
2569   strncpy(listen_fds[listen_fds_num].addr, path,
2570           sizeof (listen_fds[listen_fds_num].addr) - 1);
2571   listen_fds_num++;
2573   return (0);
2574 } /* }}} int open_listen_socket_unix */
2576 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2578   struct addrinfo ai_hints;
2579   struct addrinfo *ai_res;
2580   struct addrinfo *ai_ptr;
2581   char addr_copy[NI_MAXHOST];
2582   char *addr;
2583   char *port;
2584   int status;
2586   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2587   addr_copy[sizeof (addr_copy) - 1] = 0;
2588   addr = addr_copy;
2590   memset (&ai_hints, 0, sizeof (ai_hints));
2591   ai_hints.ai_flags = 0;
2592 #ifdef AI_ADDRCONFIG
2593   ai_hints.ai_flags |= AI_ADDRCONFIG;
2594 #endif
2595   ai_hints.ai_family = AF_UNSPEC;
2596   ai_hints.ai_socktype = SOCK_STREAM;
2598   port = NULL;
2599   if (*addr == '[') /* IPv6+port format */
2600   {
2601     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2602     addr++;
2604     port = strchr (addr, ']');
2605     if (port == NULL)
2606     {
2607       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2608       return (-1);
2609     }
2610     *port = 0;
2611     port++;
2613     if (*port == ':')
2614       port++;
2615     else if (*port == 0)
2616       port = NULL;
2617     else
2618     {
2619       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2620       return (-1);
2621     }
2622   } /* if (*addr == '[') */
2623   else
2624   {
2625     port = rindex(addr, ':');
2626     if (port != NULL)
2627     {
2628       *port = 0;
2629       port++;
2630     }
2631   }
2632   ai_res = NULL;
2633   status = getaddrinfo (addr,
2634                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2635                         &ai_hints, &ai_res);
2636   if (status != 0)
2637   {
2638     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2639              addr, gai_strerror (status));
2640     return (-1);
2641   }
2643   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2644   {
2645     int fd;
2646     listen_socket_t *temp;
2647     int one = 1;
2649     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2650         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2651     if (temp == NULL)
2652     {
2653       fprintf (stderr,
2654                "rrdcached: open_listen_socket_network: realloc failed.\n");
2655       continue;
2656     }
2657     listen_fds = temp;
2658     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2660     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2661     if (fd < 0)
2662     {
2663       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2664                rrd_strerror(errno));
2665       continue;
2666     }
2668     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2670     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2671     if (status != 0)
2672     {
2673       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2674                sock->addr, rrd_strerror(errno));
2675       close (fd);
2676       continue;
2677     }
2679     status = listen (fd, /* backlog = */ 10);
2680     if (status != 0)
2681     {
2682       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2683                sock->addr, rrd_strerror(errno));
2684       close (fd);
2685       freeaddrinfo(ai_res);
2686       return (-1);
2687     }
2689     listen_fds[listen_fds_num].fd = fd;
2690     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2691     listen_fds_num++;
2692   } /* for (ai_ptr) */
2694   freeaddrinfo(ai_res);
2695   return (0);
2696 } /* }}} static int open_listen_socket_network */
2698 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2700   assert(sock != NULL);
2701   assert(sock->addr != NULL);
2703   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2704       || sock->addr[0] == '/')
2705     return (open_listen_socket_unix(sock));
2706   else
2707     return (open_listen_socket_network(sock));
2708 } /* }}} int open_listen_socket */
2710 static int close_listen_sockets (void) /* {{{ */
2712   size_t i;
2714   for (i = 0; i < listen_fds_num; i++)
2715   {
2716     close (listen_fds[i].fd);
2718     if (listen_fds[i].family == PF_UNIX)
2719       unlink(listen_fds[i].addr);
2720   }
2722   free (listen_fds);
2723   listen_fds = NULL;
2724   listen_fds_num = 0;
2726   return (0);
2727 } /* }}} int close_listen_sockets */
2729 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2731   struct pollfd *pollfds;
2732   int pollfds_num;
2733   int status;
2734   int i;
2736   if (listen_fds_num < 1)
2737   {
2738     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2739     return (NULL);
2740   }
2742   pollfds_num = listen_fds_num;
2743   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2744   if (pollfds == NULL)
2745   {
2746     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2747     return (NULL);
2748   }
2749   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2751   RRDD_LOG(LOG_INFO, "listening for connections");
2753   while (state == RUNNING)
2754   {
2755     for (i = 0; i < pollfds_num; i++)
2756     {
2757       pollfds[i].fd = listen_fds[i].fd;
2758       pollfds[i].events = POLLIN | POLLPRI;
2759       pollfds[i].revents = 0;
2760     }
2762     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2763     if (state != RUNNING)
2764       break;
2765     else if (status == 0) /* timeout */
2766       continue;
2767     else if (status < 0) /* error */
2768     {
2769       status = errno;
2770       if (status != EINTR)
2771       {
2772         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2773       }
2774       continue;
2775     }
2777     for (i = 0; i < pollfds_num; i++)
2778     {
2779       listen_socket_t *client_sock;
2780       struct sockaddr_storage client_sa;
2781       socklen_t client_sa_size;
2782       pthread_t tid;
2783       pthread_attr_t attr;
2785       if (pollfds[i].revents == 0)
2786         continue;
2788       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2789       {
2790         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2791             "poll(2) returned something unexpected for listen FD #%i.",
2792             pollfds[i].fd);
2793         continue;
2794       }
2796       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2797       if (client_sock == NULL)
2798       {
2799         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2800         continue;
2801       }
2802       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2804       client_sa_size = sizeof (client_sa);
2805       client_sock->fd = accept (pollfds[i].fd,
2806           (struct sockaddr *) &client_sa, &client_sa_size);
2807       if (client_sock->fd < 0)
2808       {
2809         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2810         free(client_sock);
2811         continue;
2812       }
2814       pthread_attr_init (&attr);
2815       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2817       status = pthread_create (&tid, &attr, connection_thread_main,
2818                                client_sock);
2819       if (status != 0)
2820       {
2821         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2822         close_connection(client_sock);
2823         continue;
2824       }
2825     } /* for (pollfds_num) */
2826   } /* while (state == RUNNING) */
2828   RRDD_LOG(LOG_INFO, "starting shutdown");
2830   close_listen_sockets ();
2832   pthread_mutex_lock (&connection_threads_lock);
2833   while (connection_threads_num > 0)
2834     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2835   pthread_mutex_unlock (&connection_threads_lock);
2837   free(pollfds);
2839   return (NULL);
2840 } /* }}} void *listen_thread_main */
2842 static int daemonize (void) /* {{{ */
2844   int pid_fd;
2845   char *base_dir;
2847   daemon_uid = geteuid();
2849   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2850   if (pid_fd < 0)
2851     pid_fd = check_pidfile();
2852   if (pid_fd < 0)
2853     return pid_fd;
2855   /* open all the listen sockets */
2856   if (config_listen_address_list_len > 0)
2857   {
2858     for (size_t i = 0; i < config_listen_address_list_len; i++)
2859       open_listen_socket (config_listen_address_list[i]);
2861     rrd_free_ptrs((void ***) &config_listen_address_list,
2862                   &config_listen_address_list_len);
2863   }
2864   else
2865   {
2866     listen_socket_t sock;
2867     memset(&sock, 0, sizeof(sock));
2868     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2869     open_listen_socket (&sock);
2870   }
2872   if (listen_fds_num < 1)
2873   {
2874     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2875     goto error;
2876   }
2878   if (!stay_foreground)
2879   {
2880     pid_t child;
2882     child = fork ();
2883     if (child < 0)
2884     {
2885       fprintf (stderr, "daemonize: fork(2) failed.\n");
2886       goto error;
2887     }
2888     else if (child > 0)
2889       exit(0);
2891     /* Become session leader */
2892     setsid ();
2894     /* Open the first three file descriptors to /dev/null */
2895     close (2);
2896     close (1);
2897     close (0);
2899     open ("/dev/null", O_RDWR);
2900     if (dup(0) == -1 || dup(0) == -1){
2901         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2902     }
2903   } /* if (!stay_foreground) */
2905   /* Change into the /tmp directory. */
2906   base_dir = (config_base_dir != NULL)
2907     ? config_base_dir
2908     : "/tmp";
2910   if (chdir (base_dir) != 0)
2911   {
2912     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2913     goto error;
2914   }
2916   install_signal_handlers();
2918   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2919   RRDD_LOG(LOG_INFO, "starting up");
2921   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2922                                 (GDestroyNotify) free_cache_item);
2923   if (cache_tree == NULL)
2924   {
2925     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2926     goto error;
2927   }
2929   return write_pidfile (pid_fd);
2931 error:
2932   remove_pidfile();
2933   return -1;
2934 } /* }}} int daemonize */
2936 static int cleanup (void) /* {{{ */
2938   pthread_cond_broadcast (&flush_cond);
2939   pthread_join (flush_thread, NULL);
2941   pthread_cond_broadcast (&queue_cond);
2942   for (int i = 0; i < config_queue_threads; i++)
2943     pthread_join (queue_threads[i], NULL);
2945   if (config_flush_at_shutdown)
2946   {
2947     assert(cache_queue_head == NULL);
2948     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2949   }
2951   free(queue_threads);
2952   free(config_base_dir);
2954   pthread_mutex_lock(&cache_lock);
2955   g_tree_destroy(cache_tree);
2957   pthread_mutex_lock(&journal_lock);
2958   journal_done();
2960   RRDD_LOG(LOG_INFO, "goodbye");
2961   closelog ();
2963   remove_pidfile ();
2964   free(config_pid_file);
2966   return (0);
2967 } /* }}} int cleanup */
2969 static int read_options (int argc, char **argv) /* {{{ */
2971   int option;
2972   int status = 0;
2974   char **permissions = NULL;
2975   size_t permissions_len = 0;
2977   gid_t  socket_group = (gid_t)-1;
2978   mode_t socket_permissions = (mode_t)-1;
2980   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2981   {
2982     switch (option)
2983     {
2984       case 'g':
2985         stay_foreground=1;
2986         break;
2988       case 'l':
2989       {
2990         listen_socket_t *new;
2992         new = malloc(sizeof(listen_socket_t));
2993         if (new == NULL)
2994         {
2995           fprintf(stderr, "read_options: malloc failed.\n");
2996           return(2);
2997         }
2998         memset(new, 0, sizeof(listen_socket_t));
3000         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3002         /* Add permissions to the socket {{{ */
3003         if (permissions_len != 0)
3004         {
3005           size_t i;
3006           for (i = 0; i < permissions_len; i++)
3007           {
3008             status = socket_permission_add (new, permissions[i]);
3009             if (status != 0)
3010             {
3011               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3012                   "socket failed. Most likely, this permission doesn't "
3013                   "exist. Check your command line.\n", permissions[i]);
3014               status = 4;
3015             }
3016           }
3017         }
3018         else /* if (permissions_len == 0) */
3019         {
3020           /* Add permission for ALL commands to the socket. */
3021           size_t i;
3022           for (i = 0; i < list_of_commands_len; i++)
3023           {
3024             status = socket_permission_add (new, list_of_commands[i].cmd);
3025             if (status != 0)
3026             {
3027               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3028                   "socket failed. This should never happen, ever! Sorry.\n",
3029                   permissions[i]);
3030               status = 4;
3031             }
3032           }
3033         }
3034         /* }}} Done adding permissions. */
3036         new->socket_group = socket_group;
3037         new->socket_permissions = socket_permissions;
3039         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3040                          &config_listen_address_list_len, new))
3041         {
3042           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3043           return (2);
3044         }
3045       }
3046       break;
3048       /* set socket group permissions */
3049       case 's':
3050       {
3051         gid_t group_gid;
3052         struct group *grp;
3054         group_gid = strtoul(optarg, NULL, 10);
3055         if (errno != EINVAL && group_gid>0)
3056         {
3057           /* we were passed a number */
3058           grp = getgrgid(group_gid);
3059         }
3060         else
3061         {
3062           grp = getgrnam(optarg);
3063         }
3065         if (grp)
3066         {
3067           socket_group = grp->gr_gid;
3068         }
3069         else
3070         {
3071           /* no idea what the user wanted... */
3072           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3073           return (5);
3074         }
3075       }
3076       break;
3078       /* set socket file permissions */
3079       case 'm':
3080       {
3081         long  tmp;
3082         char *endptr = NULL;
3084         tmp = strtol (optarg, &endptr, 8);
3085         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3086             || (tmp > 07777) || (tmp < 0)) {
3087           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3088               optarg);
3089           return (5);
3090         }
3092         socket_permissions = (mode_t)tmp;
3093       }
3094       break;
3096       case 'P':
3097       {
3098         char *optcopy;
3099         char *saveptr;
3100         char *dummy;
3101         char *ptr;
3103         rrd_free_ptrs ((void *) &permissions, &permissions_len);
3105         optcopy = strdup (optarg);
3106         dummy = optcopy;
3107         saveptr = NULL;
3108         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3109         {
3110           dummy = NULL;
3111           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3112         }
3114         free (optcopy);
3115       }
3116       break;
3118       case 'f':
3119       {
3120         int temp;
3122         temp = atoi (optarg);
3123         if (temp > 0)
3124           config_flush_interval = temp;
3125         else
3126         {
3127           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3128           status = 3;
3129         }
3130       }
3131       break;
3133       case 'w':
3134       {
3135         int temp;
3137         temp = atoi (optarg);
3138         if (temp > 0)
3139           config_write_interval = temp;
3140         else
3141         {
3142           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3143           status = 2;
3144         }
3145       }
3146       break;
3148       case 'z':
3149       {
3150         int temp;
3152         temp = atoi(optarg);
3153         if (temp > 0)
3154           config_write_jitter = temp;
3155         else
3156         {
3157           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3158           status = 2;
3159         }
3161         break;
3162       }
3164       case 't':
3165       {
3166         int threads;
3167         threads = atoi(optarg);
3168         if (threads >= 1)
3169           config_queue_threads = threads;
3170         else
3171         {
3172           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3173           return 1;
3174         }
3175       }
3176       break;
3178       case 'B':
3179         config_write_base_only = 1;
3180         break;
3182       case 'b':
3183       {
3184         size_t len;
3185         char base_realpath[PATH_MAX];
3187         if (config_base_dir != NULL)
3188           free (config_base_dir);
3189         config_base_dir = strdup (optarg);
3190         if (config_base_dir == NULL)
3191         {
3192           fprintf (stderr, "read_options: strdup failed.\n");
3193           return (3);
3194         }
3196         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3197         {
3198           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3199               config_base_dir, rrd_strerror (errno));
3200           return (3);
3201         }
3203         /* make sure that the base directory is not resolved via
3204          * symbolic links.  this makes some performance-enhancing
3205          * assumptions possible (we don't have to resolve paths
3206          * that start with a "/")
3207          */
3208         if (realpath(config_base_dir, base_realpath) == NULL)
3209         {
3210           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3211               "%s\n", config_base_dir, rrd_strerror(errno));
3212           return 5;
3213         }
3215         len = strlen (config_base_dir);
3216         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3217         {
3218           config_base_dir[len - 1] = 0;
3219           len--;
3220         }
3222         if (len < 1)
3223         {
3224           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3225           return (4);
3226         }
3228         _config_base_dir_len = len;
3230         len = strlen (base_realpath);
3231         while ((len > 0) && (base_realpath[len - 1] == '/'))
3232         {
3233           base_realpath[len - 1] = '\0';
3234           len--;
3235         }
3237         if (strncmp(config_base_dir,
3238                          base_realpath, sizeof(base_realpath)) != 0)
3239         {
3240           fprintf(stderr,
3241                   "Base directory (-b) resolved via file system links!\n"
3242                   "Please consult rrdcached '-b' documentation!\n"
3243                   "Consider specifying the real directory (%s)\n",
3244                   base_realpath);
3245           return 5;
3246         }
3247       }
3248       break;
3250       case 'p':
3251       {
3252         if (config_pid_file != NULL)
3253           free (config_pid_file);
3254         config_pid_file = strdup (optarg);
3255         if (config_pid_file == NULL)
3256         {
3257           fprintf (stderr, "read_options: strdup failed.\n");
3258           return (3);
3259         }
3260       }
3261       break;
3263       case 'F':
3264         config_flush_at_shutdown = 1;
3265         break;
3267       case 'j':
3268       {
3269         char journal_dir_actual[PATH_MAX];
3270         const char *dir;
3271         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3273         status = rrd_mkdir_p(dir, 0777);
3274         if (status != 0)
3275         {
3276           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3277               dir, rrd_strerror(errno));
3278           return 6;
3279         }
3281         if (access(dir, R_OK|W_OK|X_OK) != 0)
3282         {
3283           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3284                   errno ? rrd_strerror(errno) : "");
3285           return 6;
3286         }
3287       }
3288       break;
3290       case 'a':
3291       {
3292         int temp = atoi(optarg);
3293         if (temp > 0)
3294           config_alloc_chunk = temp;
3295         else
3296         {
3297           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3298           return 10;
3299         }
3300       }
3301       break;
3303       case 'h':
3304       case '?':
3305         printf ("RRDCacheD %s\n"
3306             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3307             "\n"
3308             "Usage: rrdcached [options]\n"
3309             "\n"
3310             "Valid options are:\n"
3311             "  -l <address>  Socket address to listen to.\n"
3312             "  -P <perms>    Sets the permissions to assign to all following "
3313                             "sockets\n"
3314             "  -w <seconds>  Interval in which to write data.\n"
3315             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3316             "  -t <threads>  Number of write threads.\n"
3317             "  -f <seconds>  Interval in which to flush dead data.\n"
3318             "  -p <file>     Location of the PID-file.\n"
3319             "  -b <dir>      Base directory to change to.\n"
3320             "  -B            Restrict file access to paths within -b <dir>\n"
3321             "  -g            Do not fork and run in the foreground.\n"
3322             "  -j <dir>      Directory in which to create the journal files.\n"
3323             "  -F            Always flush all updates at shutdown\n"
3324             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3325             "                (the socket will also have read/write permissions "
3326                             "for that group)\n"
3327             "  -m <mode>     File permissions (octal) of all following UNIX "
3328                             "sockets\n"
3329             "  -a <size>     Memory allocation chunk size. Default is 1."
3330             "\n"
3331             "For more information and a detailed description of all options "
3332             "please refer\n"
3333             "to the rrdcached(1) manual page.\n",
3334             VERSION);
3335         if (option == 'h')
3336           status = -1;
3337         else
3338           status = 1;
3339         break;
3340     } /* switch (option) */
3341   } /* while (getopt) */
3343   /* advise the user when values are not sane */
3344   if (config_flush_interval < 2 * config_write_interval)
3345     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3346             " 2x write interval (-w) !\n");
3347   if (config_write_jitter > config_write_interval)
3348     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3349             " write interval (-w) !\n");
3351   if (config_write_base_only && config_base_dir == NULL)
3352     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3353             "  Consult the rrdcached documentation\n");
3355   if (journal_dir == NULL)
3356     config_flush_at_shutdown = 1;
3358   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3360   return (status);
3361 } /* }}} int read_options */
3363 int main (int argc, char **argv)
3365   int status;
3367   status = read_options (argc, argv);
3368   if (status != 0)
3369   {
3370     if (status < 0)
3371       status = 0;
3372     return (status);
3373   }
3375   status = daemonize ();
3376   if (status != 0)
3377   {
3378     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3379     return (1);
3380   }
3382   journal_init();
3384   /* start the queue threads */
3385   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3386   if (queue_threads == NULL)
3387   {
3388     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3389     cleanup();
3390     return (1);
3391   }
3392   for (int i = 0; i < config_queue_threads; i++)
3393   {
3394     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3395     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3396     if (status != 0)
3397     {
3398       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3399       cleanup();
3400       return (1);
3401     }
3402   }
3404   /* start the flush thread */
3405   memset(&flush_thread, 0, sizeof(flush_thread));
3406   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3407   if (status != 0)
3408   {
3409     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3410     cleanup();
3411     return (1);
3412   }
3414   listen_thread_main (NULL);
3415   cleanup ();
3417   return (0);
3418 } /* int main */
3420 /*
3421  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3422  */