Code

print \n for log messages while runing rrdcached in the foreground ... suggested...
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) { \
118       fprintf(stderr, __VA_ARGS__); \
119       fprintf(stderr, "\n"); } \
120     syslog ((severity), __VA_ARGS__); \
121   } while (0)
123 /*
124  * Types
125  */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
130   int fd;
131   char addr[PATH_MAX + 1];
132   int family;
134   /* state for BATCH processing */
135   time_t batch_start;
136   int batch_cmd;
138   /* buffered IO */
139   char *rbuf;
140   off_t next_cmd;
141   off_t next_read;
143   char *wbuf;
144   ssize_t wbuf_len;
146   uint32_t permissions;
148   gid_t  socket_group;
149   mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
157                         time_t UNUSED(now),\
158                         char  UNUSED(*buffer),\
159                         size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
162                         DISPATCH_PROTO
164 struct command_s {
165   char   *cmd;
166   int (*handler)(HANDLER_PROTO);
168   char  context;                /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT      (1<<0)
170 #define CMD_CONTEXT_BATCH       (1<<1)
171 #define CMD_CONTEXT_JOURNAL     (1<<2)
172 #define CMD_CONTEXT_ANY         (0x7f)
174   char *syntax;
175   char *help;
176 };
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
182   char *file;
183   char **values;
184   size_t values_num;            /* number of valid pointers */
185   size_t values_alloc;          /* number of allocated pointers */
186   time_t last_flush_time;
187   time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE  (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
190   int flags;
191   pthread_cond_t  flushed;
192   cache_item_t *prev;
193   cache_item_t *next;
194 };
196 struct callback_flush_data_s
198   time_t now;
199   time_t abs_timeout;
200   char **keys;
201   size_t keys_num;
202 };
203 typedef struct callback_flush_data_s callback_flush_data_t;
205 enum queue_side_e
207   HEAD,
208   TAIL
209 };
210 typedef enum queue_side_e queue_side_t;
212 /* describe a set of journal files */
213 typedef struct {
214   char **files;
215   size_t files_num;
216 } journal_set;
218 /* max length of socket command or response */
219 #define CMD_MAX 4096
220 #define RBUF_SIZE (CMD_MAX*2)
222 /*
223  * Variables
224  */
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
231 enum {
232   RUNNING,              /* normal operation */
233   FLUSHING,             /* flushing remaining values */
234   SHUTDOWN              /* shutting down */
235 } state = RUNNING;
237 static pthread_t *queue_threads;
238 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
239 static int config_queue_threads = 4;
241 static pthread_t flush_thread;
242 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
244 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
245 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
246 static int connection_threads_num = 0;
248 /* Cache stuff */
249 static GTree          *cache_tree = NULL;
250 static cache_item_t   *cache_queue_head = NULL;
251 static cache_item_t   *cache_queue_tail = NULL;
252 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
254 static int config_write_interval = 300;
255 static int config_write_jitter   = 0;
256 static int config_flush_interval = 3600;
257 static int config_flush_at_shutdown = 0;
258 static char *config_pid_file = NULL;
259 static char *config_base_dir = NULL;
260 static size_t _config_base_dir_len = 0;
261 static int config_write_base_only = 0;
262 static size_t config_alloc_chunk = 1;
264 static listen_socket_t **config_listen_address_list = NULL;
265 static size_t config_listen_address_list_len = 0;
267 static uint64_t stats_queue_length = 0;
268 static uint64_t stats_updates_received = 0;
269 static uint64_t stats_flush_received = 0;
270 static uint64_t stats_updates_written = 0;
271 static uint64_t stats_data_sets_written = 0;
272 static uint64_t stats_journal_bytes = 0;
273 static uint64_t stats_journal_rotate = 0;
274 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
276 /* Journaled updates */
277 #define JOURNAL_REPLAY(s) ((s) == NULL)
278 #define JOURNAL_BASE "rrd.journal"
279 static journal_set *journal_cur = NULL;
280 static journal_set *journal_old = NULL;
281 static char *journal_dir = NULL;
282 static FILE *journal_fh = NULL;         /* current journal file handle */
283 static long  journal_size = 0;          /* current journal size */
284 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
285 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
286 static int journal_write(char *cmd, char *args);
287 static void journal_done(void);
288 static void journal_rotate(void);
290 /* prototypes for forward refernces */
291 static int handle_request_help (HANDLER_PROTO);
293 /* 
294  * Functions
295  */
296 static void sig_common (const char *sig) /* {{{ */
298   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
299   state = FLUSHING;
300   pthread_cond_broadcast(&flush_cond);
301   pthread_cond_broadcast(&queue_cond);
302 } /* }}} void sig_common */
304 static void sig_int_handler (int UNUSED(s)) /* {{{ */
306   sig_common("INT");
307 } /* }}} void sig_int_handler */
309 static void sig_term_handler (int UNUSED(s)) /* {{{ */
311   sig_common("TERM");
312 } /* }}} void sig_term_handler */
314 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
316   config_flush_at_shutdown = 1;
317   sig_common("USR1");
318 } /* }}} void sig_usr1_handler */
320 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
322   config_flush_at_shutdown = 0;
323   sig_common("USR2");
324 } /* }}} void sig_usr2_handler */
326 static void install_signal_handlers(void) /* {{{ */
328   /* These structures are static, because `sigaction' behaves weird if the are
329    * overwritten.. */
330   static struct sigaction sa_int;
331   static struct sigaction sa_term;
332   static struct sigaction sa_pipe;
333   static struct sigaction sa_usr1;
334   static struct sigaction sa_usr2;
336   /* Install signal handlers */
337   memset (&sa_int, 0, sizeof (sa_int));
338   sa_int.sa_handler = sig_int_handler;
339   sigaction (SIGINT, &sa_int, NULL);
341   memset (&sa_term, 0, sizeof (sa_term));
342   sa_term.sa_handler = sig_term_handler;
343   sigaction (SIGTERM, &sa_term, NULL);
345   memset (&sa_pipe, 0, sizeof (sa_pipe));
346   sa_pipe.sa_handler = SIG_IGN;
347   sigaction (SIGPIPE, &sa_pipe, NULL);
349   memset (&sa_pipe, 0, sizeof (sa_usr1));
350   sa_usr1.sa_handler = sig_usr1_handler;
351   sigaction (SIGUSR1, &sa_usr1, NULL);
353   memset (&sa_usr2, 0, sizeof (sa_usr2));
354   sa_usr2.sa_handler = sig_usr2_handler;
355   sigaction (SIGUSR2, &sa_usr2, NULL);
357 } /* }}} void install_signal_handlers */
359 static int open_pidfile(char *action, int oflag) /* {{{ */
361   int fd;
362   const char *file;
363   char *file_copy, *dir;
365   file = (config_pid_file != NULL)
366     ? config_pid_file
367     : LOCALSTATEDIR "/run/rrdcached.pid";
369   /* dirname may modify its argument */
370   file_copy = strdup(file);
371   if (file_copy == NULL)
372   {
373     fprintf(stderr, "rrdcached: strdup(): %s\n",
374         rrd_strerror(errno));
375     return -1;
376   }
378   dir = dirname(file_copy);
379   if (rrd_mkdir_p(dir, 0777) != 0)
380   {
381     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
382         dir, rrd_strerror(errno));
383     return -1;
384   }
386   free(file_copy);
388   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
389   if (fd < 0)
390     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
391             action, file, rrd_strerror(errno));
393   return(fd);
394 } /* }}} static int open_pidfile */
396 /* check existing pid file to see whether a daemon is running */
397 static int check_pidfile(void)
399   int pid_fd;
400   pid_t pid;
401   char pid_str[16];
403   pid_fd = open_pidfile("open", O_RDWR);
404   if (pid_fd < 0)
405     return pid_fd;
407   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
408     return -1;
410   pid = atoi(pid_str);
411   if (pid <= 0)
412     return -1;
414   /* another running process that we can signal COULD be
415    * a competing rrdcached */
416   if (pid != getpid() && kill(pid, 0) == 0)
417   {
418     fprintf(stderr,
419             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
420     close(pid_fd);
421     return -1;
422   }
424   lseek(pid_fd, 0, SEEK_SET);
425   if (ftruncate(pid_fd, 0) == -1)
426   {
427     fprintf(stderr,
428             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
429     close(pid_fd);
430     return -1;
431   }
433   fprintf(stderr,
434           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
435           "rrdcached: starting normally.\n", pid);
437   return pid_fd;
438 } /* }}} static int check_pidfile */
440 static int write_pidfile (int fd) /* {{{ */
442   pid_t pid;
443   FILE *fh;
445   pid = getpid ();
447   fh = fdopen (fd, "w");
448   if (fh == NULL)
449   {
450     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
451     close(fd);
452     return (-1);
453   }
455   fprintf (fh, "%i\n", (int) pid);
456   fclose (fh);
458   return (0);
459 } /* }}} int write_pidfile */
461 static int remove_pidfile (void) /* {{{ */
463   char *file;
464   int status;
466   file = (config_pid_file != NULL)
467     ? config_pid_file
468     : LOCALSTATEDIR "/run/rrdcached.pid";
470   status = unlink (file);
471   if (status == 0)
472     return (0);
473   return (errno);
474 } /* }}} int remove_pidfile */
476 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
478   char *eol;
480   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
481                sock->next_read - sock->next_cmd);
483   if (eol == NULL)
484   {
485     /* no commands left, move remainder back to front of rbuf */
486     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
487             sock->next_read - sock->next_cmd);
488     sock->next_read -= sock->next_cmd;
489     sock->next_cmd = 0;
490     *len = 0;
491     return NULL;
492   }
493   else
494   {
495     char *cmd = sock->rbuf + sock->next_cmd;
496     *eol = '\0';
498     sock->next_cmd = eol - sock->rbuf + 1;
500     if (eol > sock->rbuf && *(eol-1) == '\r')
501       *(--eol) = '\0'; /* handle "\r\n" EOL */
503     *len = eol - cmd;
505     return cmd;
506   }
508   /* NOTREACHED */
509   assert(1==0);
510 } /* }}} char *next_cmd */
512 /* add the characters directly to the write buffer */
513 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
515   char *new_buf;
517   assert(sock != NULL);
519   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
520   if (new_buf == NULL)
521   {
522     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
523     return -1;
524   }
526   strncpy(new_buf + sock->wbuf_len, str, len + 1);
528   sock->wbuf = new_buf;
529   sock->wbuf_len += len;
531   return 0;
532 } /* }}} static int add_to_wbuf */
534 /* add the text to the "extra" info that's sent after the status line */
535 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
537   va_list argp;
538   char buffer[CMD_MAX];
539   int len;
541   if (JOURNAL_REPLAY(sock)) return 0;
542   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
544   va_start(argp, fmt);
545 #ifdef HAVE_VSNPRINTF
546   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
547 #else
548   len = vsprintf(buffer, fmt, argp);
549 #endif
550   va_end(argp);
551   if (len < 0)
552   {
553     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
554     return -1;
555   }
557   return add_to_wbuf(sock, buffer, len);
558 } /* }}} static int add_response_info */
560 static int count_lines(char *str) /* {{{ */
562   int lines = 0;
564   if (str != NULL)
565   {
566     while ((str = strchr(str, '\n')) != NULL)
567     {
568       ++lines;
569       ++str;
570     }
571   }
573   return lines;
574 } /* }}} static int count_lines */
576 /* send the response back to the user.
577  * returns 0 on success, -1 on error
578  * write buffer is always zeroed after this call */
579 static int send_response (listen_socket_t *sock, response_code rc,
580                           char *fmt, ...) /* {{{ */
582   va_list argp;
583   char buffer[CMD_MAX];
584   int lines;
585   ssize_t wrote;
586   int rclen, len;
588   if (JOURNAL_REPLAY(sock)) return rc;
590   if (sock->batch_start)
591   {
592     if (rc == RESP_OK)
593       return rc; /* no response on success during BATCH */
594     lines = sock->batch_cmd;
595   }
596   else if (rc == RESP_OK)
597     lines = count_lines(sock->wbuf);
598   else
599     lines = -1;
601   rclen = sprintf(buffer, "%d ", lines);
602   va_start(argp, fmt);
603 #ifdef HAVE_VSNPRINTF
604   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
605 #else
606   len = vsprintf(buffer+rclen, fmt, argp);
607 #endif
608   va_end(argp);
609   if (len < 0)
610     return -1;
612   len += rclen;
614   /* append the result to the wbuf, don't write to the user */
615   if (sock->batch_start)
616     return add_to_wbuf(sock, buffer, len);
618   /* first write must be complete */
619   if (len != write(sock->fd, buffer, len))
620   {
621     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
622     return -1;
623   }
625   if (sock->wbuf != NULL && rc == RESP_OK)
626   {
627     wrote = 0;
628     while (wrote < sock->wbuf_len)
629     {
630       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
631       if (wb <= 0)
632       {
633         RRDD_LOG(LOG_INFO, "send_response: could not write results");
634         return -1;
635       }
636       wrote += wb;
637     }
638   }
640   free(sock->wbuf); sock->wbuf = NULL;
641   sock->wbuf_len = 0;
643   return 0;
644 } /* }}} */
646 static void wipe_ci_values(cache_item_t *ci, time_t when)
648   ci->values = NULL;
649   ci->values_num = 0;
650   ci->values_alloc = 0;
652   ci->last_flush_time = when;
653   if (config_write_jitter > 0)
654     ci->last_flush_time += (rrd_random() % config_write_jitter);
657 /* remove_from_queue
658  * remove a "cache_item_t" item from the queue.
659  * must hold 'cache_lock' when calling this
660  */
661 static void remove_from_queue(cache_item_t *ci) /* {{{ */
663   if (ci == NULL) return;
664   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
666   if (ci->prev == NULL)
667     cache_queue_head = ci->next; /* reset head */
668   else
669     ci->prev->next = ci->next;
671   if (ci->next == NULL)
672     cache_queue_tail = ci->prev; /* reset the tail */
673   else
674     ci->next->prev = ci->prev;
676   ci->next = ci->prev = NULL;
677   ci->flags &= ~CI_FLAGS_IN_QUEUE;
679   pthread_mutex_lock (&stats_lock);
680   assert (stats_queue_length > 0);
681   stats_queue_length--;
682   pthread_mutex_unlock (&stats_lock);
684 } /* }}} static void remove_from_queue */
686 /* free the resources associated with the cache_item_t
687  * must hold cache_lock when calling this function
688  */
689 static void *free_cache_item(cache_item_t *ci) /* {{{ */
691   if (ci == NULL) return NULL;
693   remove_from_queue(ci);
695   for (size_t i=0; i < ci->values_num; i++)
696     free(ci->values[i]);
698   free (ci->values);
699   free (ci->file);
701   /* in case anyone is waiting */
702   pthread_cond_broadcast(&ci->flushed);
703   pthread_cond_destroy(&ci->flushed);
705   free (ci);
707   return NULL;
708 } /* }}} static void *free_cache_item */
710 /*
711  * enqueue_cache_item:
712  * `cache_lock' must be acquired before calling this function!
713  */
714 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
715     queue_side_t side)
717   if (ci == NULL)
718     return (-1);
720   if (ci->values_num == 0)
721     return (0);
723   if (side == HEAD)
724   {
725     if (cache_queue_head == ci)
726       return 0;
728     /* remove if further down in queue */
729     remove_from_queue(ci);
731     ci->prev = NULL;
732     ci->next = cache_queue_head;
733     if (ci->next != NULL)
734       ci->next->prev = ci;
735     cache_queue_head = ci;
737     if (cache_queue_tail == NULL)
738       cache_queue_tail = cache_queue_head;
739   }
740   else /* (side == TAIL) */
741   {
742     /* We don't move values back in the list.. */
743     if (ci->flags & CI_FLAGS_IN_QUEUE)
744       return (0);
746     assert (ci->next == NULL);
747     assert (ci->prev == NULL);
749     ci->prev = cache_queue_tail;
751     if (cache_queue_tail == NULL)
752       cache_queue_head = ci;
753     else
754       cache_queue_tail->next = ci;
756     cache_queue_tail = ci;
757   }
759   ci->flags |= CI_FLAGS_IN_QUEUE;
761   pthread_cond_signal(&queue_cond);
762   pthread_mutex_lock (&stats_lock);
763   stats_queue_length++;
764   pthread_mutex_unlock (&stats_lock);
766   return (0);
767 } /* }}} int enqueue_cache_item */
769 /*
770  * tree_callback_flush:
771  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
772  * while this is in progress.
773  */
774 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
775     gpointer data)
777   cache_item_t *ci;
778   callback_flush_data_t *cfd;
780   ci = (cache_item_t *) value;
781   cfd = (callback_flush_data_t *) data;
783   if (ci->flags & CI_FLAGS_IN_QUEUE)
784     return FALSE;
786   if (ci->values_num > 0
787       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
788   {
789     enqueue_cache_item (ci, TAIL);
790   }
791   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
792       && (ci->values_num <= 0))
793   {
794     assert ((char *) key == ci->file);
795     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
796     {
797       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
798       return (FALSE);
799     }
800   }
802   return (FALSE);
803 } /* }}} gboolean tree_callback_flush */
805 static int flush_old_values (int max_age)
807   callback_flush_data_t cfd;
808   size_t k;
810   memset (&cfd, 0, sizeof (cfd));
811   /* Pass the current time as user data so that we don't need to call
812    * `time' for each node. */
813   cfd.now = time (NULL);
814   cfd.keys = NULL;
815   cfd.keys_num = 0;
817   if (max_age > 0)
818     cfd.abs_timeout = cfd.now - max_age;
819   else
820     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
822   /* `tree_callback_flush' will return the keys of all values that haven't
823    * been touched in the last `config_flush_interval' seconds in `cfd'.
824    * The char*'s in this array point to the same memory as ci->file, so we
825    * don't need to free them separately. */
826   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
828   for (k = 0; k < cfd.keys_num; k++)
829   {
830     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
831     /* should never fail, since we have held the cache_lock
832      * the entire time */
833     assert(status == TRUE);
834   }
836   if (cfd.keys != NULL)
837   {
838     free (cfd.keys);
839     cfd.keys = NULL;
840   }
842   return (0);
843 } /* int flush_old_values */
845 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
847   struct timeval now;
848   struct timespec next_flush;
849   int status;
851   gettimeofday (&now, NULL);
852   next_flush.tv_sec = now.tv_sec + config_flush_interval;
853   next_flush.tv_nsec = 1000 * now.tv_usec;
855   pthread_mutex_lock(&cache_lock);
857   while (state == RUNNING)
858   {
859     gettimeofday (&now, NULL);
860     if ((now.tv_sec > next_flush.tv_sec)
861         || ((now.tv_sec == next_flush.tv_sec)
862           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
863     {
864       RRDD_LOG(LOG_DEBUG, "flushing old values");
866       /* Determine the time of the next cache flush. */
867       next_flush.tv_sec = now.tv_sec + config_flush_interval;
869       /* Flush all values that haven't been written in the last
870        * `config_write_interval' seconds. */
871       flush_old_values (config_write_interval);
873       /* unlock the cache while we rotate so we don't block incoming
874        * updates if the fsync() blocks on disk I/O */
875       pthread_mutex_unlock(&cache_lock);
876       journal_rotate();
877       pthread_mutex_lock(&cache_lock);
878     }
880     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
881     if (status != 0 && status != ETIMEDOUT)
882     {
883       RRDD_LOG (LOG_ERR, "flush_thread_main: "
884                 "pthread_cond_timedwait returned %i.", status);
885     }
886   }
888   if (config_flush_at_shutdown)
889     flush_old_values (-1); /* flush everything */
891   state = SHUTDOWN;
893   pthread_mutex_unlock(&cache_lock);
895   return NULL;
896 } /* void *flush_thread_main */
898 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
900   pthread_mutex_lock (&cache_lock);
902   while (state != SHUTDOWN
903          || (cache_queue_head != NULL && config_flush_at_shutdown))
904   {
905     cache_item_t *ci;
906     char *file;
907     char **values;
908     size_t values_num;
909     int status;
911     /* Now, check if there's something to store away. If not, wait until
912      * something comes in. */
913     if (cache_queue_head == NULL)
914     {
915       status = pthread_cond_wait (&queue_cond, &cache_lock);
916       if ((status != 0) && (status != ETIMEDOUT))
917       {
918         RRDD_LOG (LOG_ERR, "queue_thread_main: "
919             "pthread_cond_wait returned %i.", status);
920       }
921     }
923     /* Check if a value has arrived. This may be NULL if we timed out or there
924      * was an interrupt such as a signal. */
925     if (cache_queue_head == NULL)
926       continue;
928     ci = cache_queue_head;
930     /* copy the relevant parts */
931     file = strdup (ci->file);
932     if (file == NULL)
933     {
934       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
935       continue;
936     }
938     assert(ci->values != NULL);
939     assert(ci->values_num > 0);
941     values = ci->values;
942     values_num = ci->values_num;
944     wipe_ci_values(ci, time(NULL));
945     remove_from_queue(ci);
947     pthread_mutex_unlock (&cache_lock);
949     rrd_clear_error ();
950     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
951     if (status != 0)
952     {
953       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
954           "rrd_update_r (%s) failed with status %i. (%s)",
955           file, status, rrd_get_error());
956     }
958     journal_write("wrote", file);
960     /* Search again in the tree.  It's possible someone issued a "FORGET"
961      * while we were writing the update values. */
962     pthread_mutex_lock(&cache_lock);
963     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
964     if (ci)
965       pthread_cond_broadcast(&ci->flushed);
966     pthread_mutex_unlock(&cache_lock);
968     if (status == 0)
969     {
970       pthread_mutex_lock (&stats_lock);
971       stats_updates_written++;
972       stats_data_sets_written += values_num;
973       pthread_mutex_unlock (&stats_lock);
974     }
976     rrd_free_ptrs((void ***) &values, &values_num);
977     free(file);
979     pthread_mutex_lock (&cache_lock);
980   }
981   pthread_mutex_unlock (&cache_lock);
983   return (NULL);
984 } /* }}} void *queue_thread_main */
986 static int buffer_get_field (char **buffer_ret, /* {{{ */
987     size_t *buffer_size_ret, char **field_ret)
989   char *buffer;
990   size_t buffer_pos;
991   size_t buffer_size;
992   char *field;
993   size_t field_size;
994   int status;
996   buffer = *buffer_ret;
997   buffer_pos = 0;
998   buffer_size = *buffer_size_ret;
999   field = *buffer_ret;
1000   field_size = 0;
1002   if (buffer_size <= 0)
1003     return (-1);
1005   /* This is ensured by `handle_request'. */
1006   assert (buffer[buffer_size - 1] == '\0');
1008   status = -1;
1009   while (buffer_pos < buffer_size)
1010   {
1011     /* Check for end-of-field or end-of-buffer */
1012     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1013     {
1014       field[field_size] = 0;
1015       field_size++;
1016       buffer_pos++;
1017       status = 0;
1018       break;
1019     }
1020     /* Handle escaped characters. */
1021     else if (buffer[buffer_pos] == '\\')
1022     {
1023       if (buffer_pos >= (buffer_size - 1))
1024         break;
1025       buffer_pos++;
1026       field[field_size] = buffer[buffer_pos];
1027       field_size++;
1028       buffer_pos++;
1029     }
1030     /* Normal operation */ 
1031     else
1032     {
1033       field[field_size] = buffer[buffer_pos];
1034       field_size++;
1035       buffer_pos++;
1036     }
1037   } /* while (buffer_pos < buffer_size) */
1039   if (status != 0)
1040     return (status);
1042   *buffer_ret = buffer + buffer_pos;
1043   *buffer_size_ret = buffer_size - buffer_pos;
1044   *field_ret = field;
1046   return (0);
1047 } /* }}} int buffer_get_field */
1049 /* if we're restricting writes to the base directory,
1050  * check whether the file falls within the dir
1051  * returns 1 if OK, otherwise 0
1052  */
1053 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1055   assert(file != NULL);
1057   if (!config_write_base_only
1058       || JOURNAL_REPLAY(sock)
1059       || config_base_dir == NULL)
1060     return 1;
1062   if (strstr(file, "../") != NULL) goto err;
1064   /* relative paths without "../" are ok */
1065   if (*file != '/') return 1;
1067   /* file must be of the format base + "/" + <1+ char filename> */
1068   if (strlen(file) < _config_base_dir_len + 2) goto err;
1069   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1070   if (*(file + _config_base_dir_len) != '/') goto err;
1072   return 1;
1074 err:
1075   if (sock != NULL && sock->fd >= 0)
1076     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1078   return 0;
1079 } /* }}} static int check_file_access */
1081 /* when using a base dir, convert relative paths to absolute paths.
1082  * if necessary, modifies the "filename" pointer to point
1083  * to the new path created in "tmp".  "tmp" is provided
1084  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1085  *
1086  * this allows us to optimize for the expected case (absolute path)
1087  * with a no-op.
1088  */
1089 static void get_abs_path(char **filename, char *tmp)
1091   assert(tmp != NULL);
1092   assert(filename != NULL && *filename != NULL);
1094   if (config_base_dir == NULL || **filename == '/')
1095     return;
1097   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1098   *filename = tmp;
1099 } /* }}} static int get_abs_path */
1101 static int flush_file (const char *filename) /* {{{ */
1103   cache_item_t *ci;
1105   pthread_mutex_lock (&cache_lock);
1107   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1108   if (ci == NULL)
1109   {
1110     pthread_mutex_unlock (&cache_lock);
1111     return (ENOENT);
1112   }
1114   if (ci->values_num > 0)
1115   {
1116     /* Enqueue at head */
1117     enqueue_cache_item (ci, HEAD);
1118     pthread_cond_wait(&ci->flushed, &cache_lock);
1119   }
1121   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1122    * may have been purged during our cond_wait() */
1124   pthread_mutex_unlock(&cache_lock);
1126   return (0);
1127 } /* }}} int flush_file */
1129 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1131   char *err = "Syntax error.\n";
1133   if (cmd && cmd->syntax)
1134     err = cmd->syntax;
1136   return send_response(sock, RESP_ERR, "Usage: %s", err);
1137 } /* }}} static int syntax_error() */
1139 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1141   uint64_t copy_queue_length;
1142   uint64_t copy_updates_received;
1143   uint64_t copy_flush_received;
1144   uint64_t copy_updates_written;
1145   uint64_t copy_data_sets_written;
1146   uint64_t copy_journal_bytes;
1147   uint64_t copy_journal_rotate;
1149   uint64_t tree_nodes_number;
1150   uint64_t tree_depth;
1152   pthread_mutex_lock (&stats_lock);
1153   copy_queue_length       = stats_queue_length;
1154   copy_updates_received   = stats_updates_received;
1155   copy_flush_received     = stats_flush_received;
1156   copy_updates_written    = stats_updates_written;
1157   copy_data_sets_written  = stats_data_sets_written;
1158   copy_journal_bytes      = stats_journal_bytes;
1159   copy_journal_rotate     = stats_journal_rotate;
1160   pthread_mutex_unlock (&stats_lock);
1162   pthread_mutex_lock (&cache_lock);
1163   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1164   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1165   pthread_mutex_unlock (&cache_lock);
1167   add_response_info(sock,
1168                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1169   add_response_info(sock,
1170                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1171   add_response_info(sock,
1172                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1173   add_response_info(sock,
1174                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1175   add_response_info(sock,
1176                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1177   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1178   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1179   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1180   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1182   send_response(sock, RESP_OK, "Statistics follow\n");
1184   return (0);
1185 } /* }}} int handle_request_stats */
1187 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1189   char *file, file_tmp[PATH_MAX];
1190   int status;
1192   status = buffer_get_field (&buffer, &buffer_size, &file);
1193   if (status != 0)
1194   {
1195     return syntax_error(sock,cmd);
1196   }
1197   else
1198   {
1199     pthread_mutex_lock(&stats_lock);
1200     stats_flush_received++;
1201     pthread_mutex_unlock(&stats_lock);
1203     get_abs_path(&file, file_tmp);
1204     if (!check_file_access(file, sock)) return 0;
1206     status = flush_file (file);
1207     if (status == 0)
1208       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1209     else if (status == ENOENT)
1210     {
1211       /* no file in our tree; see whether it exists at all */
1212       struct stat statbuf;
1214       memset(&statbuf, 0, sizeof(statbuf));
1215       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1216         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1217       else
1218         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1219     }
1220     else if (status < 0)
1221       return send_response(sock, RESP_ERR, "Internal error.\n");
1222     else
1223       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1224   }
1226   /* NOTREACHED */
1227   assert(1==0);
1228 } /* }}} int handle_request_flush */
1230 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1232   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1234   pthread_mutex_lock(&cache_lock);
1235   flush_old_values(-1);
1236   pthread_mutex_unlock(&cache_lock);
1238   return send_response(sock, RESP_OK, "Started flush.\n");
1239 } /* }}} static int handle_request_flushall */
1241 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1243   int status;
1244   char *file, file_tmp[PATH_MAX];
1245   cache_item_t *ci;
1247   status = buffer_get_field(&buffer, &buffer_size, &file);
1248   if (status != 0)
1249     return syntax_error(sock,cmd);
1251   get_abs_path(&file, file_tmp);
1253   pthread_mutex_lock(&cache_lock);
1254   ci = g_tree_lookup(cache_tree, file);
1255   if (ci == NULL)
1256   {
1257     pthread_mutex_unlock(&cache_lock);
1258     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1259   }
1261   for (size_t i=0; i < ci->values_num; i++)
1262     add_response_info(sock, "%s\n", ci->values[i]);
1264   pthread_mutex_unlock(&cache_lock);
1265   return send_response(sock, RESP_OK, "updates pending\n");
1266 } /* }}} static int handle_request_pending */
1268 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1270   int status;
1271   gboolean found;
1272   char *file, file_tmp[PATH_MAX];
1274   status = buffer_get_field(&buffer, &buffer_size, &file);
1275   if (status != 0)
1276     return syntax_error(sock,cmd);
1278   get_abs_path(&file, file_tmp);
1279   if (!check_file_access(file, sock)) return 0;
1281   pthread_mutex_lock(&cache_lock);
1282   found = g_tree_remove(cache_tree, file);
1283   pthread_mutex_unlock(&cache_lock);
1285   if (found == TRUE)
1286   {
1287     if (!JOURNAL_REPLAY(sock))
1288       journal_write("forget", file);
1290     return send_response(sock, RESP_OK, "Gone!\n");
1291   }
1292   else
1293     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1295   /* NOTREACHED */
1296   assert(1==0);
1297 } /* }}} static int handle_request_forget */
1299 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1301   cache_item_t *ci;
1303   pthread_mutex_lock(&cache_lock);
1305   ci = cache_queue_head;
1306   while (ci != NULL)
1307   {
1308     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1309     ci = ci->next;
1310   }
1312   pthread_mutex_unlock(&cache_lock);
1314   return send_response(sock, RESP_OK, "in queue.\n");
1315 } /* }}} int handle_request_queue */
1317 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1319   char *file, file_tmp[PATH_MAX];
1320   int values_num = 0;
1321   int status;
1322   char orig_buf[CMD_MAX];
1324   cache_item_t *ci;
1326   /* save it for the journal later */
1327   if (!JOURNAL_REPLAY(sock))
1328     strncpy(orig_buf, buffer, buffer_size);
1330   status = buffer_get_field (&buffer, &buffer_size, &file);
1331   if (status != 0)
1332     return syntax_error(sock,cmd);
1334   pthread_mutex_lock(&stats_lock);
1335   stats_updates_received++;
1336   pthread_mutex_unlock(&stats_lock);
1338   get_abs_path(&file, file_tmp);
1339   if (!check_file_access(file, sock)) return 0;
1341   pthread_mutex_lock (&cache_lock);
1342   ci = g_tree_lookup (cache_tree, file);
1344   if (ci == NULL) /* {{{ */
1345   {
1346     struct stat statbuf;
1347     cache_item_t *tmp;
1349     /* don't hold the lock while we setup; stat(2) might block */
1350     pthread_mutex_unlock(&cache_lock);
1352     memset (&statbuf, 0, sizeof (statbuf));
1353     status = stat (file, &statbuf);
1354     if (status != 0)
1355     {
1356       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1358       status = errno;
1359       if (status == ENOENT)
1360         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1361       else
1362         return send_response(sock, RESP_ERR,
1363                              "stat failed with error %i.\n", status);
1364     }
1365     if (!S_ISREG (statbuf.st_mode))
1366       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1368     if (access(file, R_OK|W_OK) != 0)
1369       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1370                            file, rrd_strerror(errno));
1372     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1373     if (ci == NULL)
1374     {
1375       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1377       return send_response(sock, RESP_ERR, "malloc failed.\n");
1378     }
1379     memset (ci, 0, sizeof (cache_item_t));
1381     ci->file = strdup (file);
1382     if (ci->file == NULL)
1383     {
1384       free (ci);
1385       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1387       return send_response(sock, RESP_ERR, "strdup failed.\n");
1388     }
1390     wipe_ci_values(ci, now);
1391     ci->flags = CI_FLAGS_IN_TREE;
1392     pthread_cond_init(&ci->flushed, NULL);
1394     pthread_mutex_lock(&cache_lock);
1396     /* another UPDATE might have added this entry in the meantime */
1397     tmp = g_tree_lookup (cache_tree, file);
1398     if (tmp == NULL)
1399       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1400     else
1401     {
1402       free_cache_item (ci);
1403       ci = tmp;
1404     }
1406     /* state may have changed while we were unlocked */
1407     if (state == SHUTDOWN)
1408       return -1;
1409   } /* }}} */
1410   assert (ci != NULL);
1412   /* don't re-write updates in replay mode */
1413   if (!JOURNAL_REPLAY(sock))
1414     journal_write("update", orig_buf);
1416   while (buffer_size > 0)
1417   {
1418     char *value;
1419     time_t stamp;
1420     char *eostamp;
1422     status = buffer_get_field (&buffer, &buffer_size, &value);
1423     if (status != 0)
1424     {
1425       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1426       break;
1427     }
1429     /* make sure update time is always moving forward */
1430     stamp = strtol(value, &eostamp, 10);
1431     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1432     {
1433       pthread_mutex_unlock(&cache_lock);
1434       return send_response(sock, RESP_ERR,
1435                            "Cannot find timestamp in '%s'!\n", value);
1436     }
1437     else if (stamp <= ci->last_update_stamp)
1438     {
1439       pthread_mutex_unlock(&cache_lock);
1440       return send_response(sock, RESP_ERR,
1441                            "illegal attempt to update using time %ld when last"
1442                            " update time is %ld (minimum one second step)\n",
1443                            stamp, ci->last_update_stamp);
1444     }
1445     else
1446       ci->last_update_stamp = stamp;
1448     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1449                               &ci->values_alloc, config_alloc_chunk))
1450     {
1451       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1452       continue;
1453     }
1455     values_num++;
1456   }
1458   if (((now - ci->last_flush_time) >= config_write_interval)
1459       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1460       && (ci->values_num > 0))
1461   {
1462     enqueue_cache_item (ci, TAIL);
1463   }
1465   pthread_mutex_unlock (&cache_lock);
1467   if (values_num < 1)
1468     return send_response(sock, RESP_ERR, "No values updated.\n");
1469   else
1470     return send_response(sock, RESP_OK,
1471                          "errors, enqueued %i value(s).\n", values_num);
1473   /* NOTREACHED */
1474   assert(1==0);
1476 } /* }}} int handle_request_update */
1478 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1480   char *file, file_tmp[PATH_MAX];
1481   char *cf;
1483   char *start_str;
1484   char *end_str;
1485   time_t start_tm;
1486   time_t end_tm;
1488   unsigned long step;
1489   unsigned long ds_cnt;
1490   char **ds_namv;
1491   rrd_value_t *data;
1493   int status;
1494   unsigned long i;
1495   time_t t;
1496   rrd_value_t *data_ptr;
1498   file = NULL;
1499   cf = NULL;
1500   start_str = NULL;
1501   end_str = NULL;
1503   /* Read the arguments */
1504   do /* while (0) */
1505   {
1506     status = buffer_get_field (&buffer, &buffer_size, &file);
1507     if (status != 0)
1508       break;
1510     status = buffer_get_field (&buffer, &buffer_size, &cf);
1511     if (status != 0)
1512       break;
1514     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1515     if (status != 0)
1516     {
1517       start_str = NULL;
1518       status = 0;
1519       break;
1520     }
1522     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1523     if (status != 0)
1524     {
1525       end_str = NULL;
1526       status = 0;
1527       break;
1528     }
1529   } while (0);
1531   if (status != 0)
1532     return (syntax_error(sock,cmd));
1534   get_abs_path(&file, file_tmp);
1535   if (!check_file_access(file, sock)) return 0;
1537   status = flush_file (file);
1538   if ((status != 0) && (status != ENOENT))
1539     return (send_response (sock, RESP_ERR,
1540           "flush_file (%s) failed with status %i.\n", file, status));
1542   t = time (NULL); /* "now" */
1544   /* Parse start time */
1545   if (start_str != NULL)
1546   {
1547     char *endptr;
1548     long value;
1550     endptr = NULL;
1551     errno = 0;
1552     value = strtol (start_str, &endptr, /* base = */ 0);
1553     if ((endptr == start_str) || (errno != 0))
1554       return (send_response(sock, RESP_ERR,
1555             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1556             start_str));
1558     if (value > 0)
1559       start_tm = (time_t) value;
1560     else
1561       start_tm = (time_t) (t + value);
1562   }
1563   else
1564   {
1565     start_tm = t - 86400;
1566   }
1568   /* Parse end time */
1569   if (end_str != NULL)
1570   {
1571     char *endptr;
1572     long value;
1574     endptr = NULL;
1575     errno = 0;
1576     value = strtol (end_str, &endptr, /* base = */ 0);
1577     if ((endptr == end_str) || (errno != 0))
1578       return (send_response(sock, RESP_ERR,
1579             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1580             end_str));
1582     if (value > 0)
1583       end_tm = (time_t) value;
1584     else
1585       end_tm = (time_t) (t + value);
1586   }
1587   else
1588   {
1589     end_tm = t;
1590   }
1592   step = -1;
1593   ds_cnt = 0;
1594   ds_namv = NULL;
1595   data = NULL;
1597   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1598       &ds_cnt, &ds_namv, &data);
1599   if (status != 0)
1600     return (send_response(sock, RESP_ERR,
1601           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1603   add_response_info (sock, "FlushVersion: %lu\n", 1);
1604   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1605   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1606   add_response_info (sock, "Step: %lu\n", step);
1607   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1609 #define SSTRCAT(buffer,str,buffer_fill) do { \
1610     size_t str_len = strlen (str); \
1611     if ((buffer_fill + str_len) > sizeof (buffer)) \
1612       str_len = sizeof (buffer) - buffer_fill; \
1613     if (str_len > 0) { \
1614       strncpy (buffer + buffer_fill, str, str_len); \
1615       buffer_fill += str_len; \
1616       assert (buffer_fill <= sizeof (buffer)); \
1617       if (buffer_fill == sizeof (buffer)) \
1618         buffer[buffer_fill - 1] = 0; \
1619       else \
1620         buffer[buffer_fill] = 0; \
1621     } \
1622   } while (0)
1624   { /* Add list of DS names */
1625     char linebuf[1024];
1626     size_t linebuf_fill;
1628     memset (linebuf, 0, sizeof (linebuf));
1629     linebuf_fill = 0;
1630     for (i = 0; i < ds_cnt; i++)
1631     {
1632       if (i > 0)
1633         SSTRCAT (linebuf, " ", linebuf_fill);
1634       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1635       rrd_freemem(ds_namv[i]);
1636     }
1637     rrd_freemem(ds_namv);
1638     add_response_info (sock, "DSName: %s\n", linebuf);
1639   }
1641   /* Add the actual data */
1642   assert (step > 0);
1643   data_ptr = data;
1644   for (t = start_tm + step; t <= end_tm; t += step)
1645   {
1646     char linebuf[1024];
1647     size_t linebuf_fill;
1648     char tmp[128];
1650     memset (linebuf, 0, sizeof (linebuf));
1651     linebuf_fill = 0;
1652     for (i = 0; i < ds_cnt; i++)
1653     {
1654       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1655       tmp[sizeof (tmp) - 1] = 0;
1656       SSTRCAT (linebuf, tmp, linebuf_fill);
1658       data_ptr++;
1659     }
1661     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1662   } /* for (t) */
1663   rrd_freemem(data);
1665   return (send_response (sock, RESP_OK, "Success\n"));
1666 #undef SSTRCAT
1667 } /* }}} int handle_request_fetch */
1669 /* we came across a "WROTE" entry during journal replay.
1670  * throw away any values that we have accumulated for this file
1671  */
1672 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1674   cache_item_t *ci;
1675   const char *file = buffer;
1677   pthread_mutex_lock(&cache_lock);
1679   ci = g_tree_lookup(cache_tree, file);
1680   if (ci == NULL)
1681   {
1682     pthread_mutex_unlock(&cache_lock);
1683     return (0);
1684   }
1686   if (ci->values)
1687     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1689   wipe_ci_values(ci, now);
1690   remove_from_queue(ci);
1692   pthread_mutex_unlock(&cache_lock);
1693   return (0);
1694 } /* }}} int handle_request_wrote */
1696 /* start "BATCH" processing */
1697 static int batch_start (HANDLER_PROTO) /* {{{ */
1699   int status;
1700   if (sock->batch_start)
1701     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1703   status = send_response(sock, RESP_OK,
1704                          "Go ahead.  End with dot '.' on its own line.\n");
1705   sock->batch_start = time(NULL);
1706   sock->batch_cmd = 0;
1708   return status;
1709 } /* }}} static int batch_start */
1711 /* finish "BATCH" processing and return results to the client */
1712 static int batch_done (HANDLER_PROTO) /* {{{ */
1714   assert(sock->batch_start);
1715   sock->batch_start = 0;
1716   sock->batch_cmd  = 0;
1717   return send_response(sock, RESP_OK, "errors\n");
1718 } /* }}} static int batch_done */
1720 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1722   return -1;
1723 } /* }}} static int handle_request_quit */
1725 static command_t list_of_commands[] = { /* {{{ */
1726   {
1727     "UPDATE",
1728     handle_request_update,
1729     CMD_CONTEXT_ANY,
1730     "UPDATE <filename> <values> [<values> ...]\n"
1731     ,
1732     "Adds the given file to the internal cache if it is not yet known and\n"
1733     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1734     "for details.\n"
1735     "\n"
1736     "Each <values> has the following form:\n"
1737     "  <values> = <time>:<value>[:<value>[...]]\n"
1738     "See the rrdupdate(1) manpage for details.\n"
1739   },
1740   {
1741     "WROTE",
1742     handle_request_wrote,
1743     CMD_CONTEXT_JOURNAL,
1744     NULL,
1745     NULL
1746   },
1747   {
1748     "FLUSH",
1749     handle_request_flush,
1750     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1751     "FLUSH <filename>\n"
1752     ,
1753     "Adds the given filename to the head of the update queue and returns\n"
1754     "after it has been dequeued.\n"
1755   },
1756   {
1757     "FLUSHALL",
1758     handle_request_flushall,
1759     CMD_CONTEXT_CLIENT,
1760     "FLUSHALL\n"
1761     ,
1762     "Triggers writing of all pending updates.  Returns immediately.\n"
1763   },
1764   {
1765     "PENDING",
1766     handle_request_pending,
1767     CMD_CONTEXT_CLIENT,
1768     "PENDING <filename>\n"
1769     ,
1770     "Shows any 'pending' updates for a file, in order.\n"
1771     "The updates shown have not yet been written to the underlying RRD file.\n"
1772   },
1773   {
1774     "FORGET",
1775     handle_request_forget,
1776     CMD_CONTEXT_ANY,
1777     "FORGET <filename>\n"
1778     ,
1779     "Removes the file completely from the cache.\n"
1780     "Any pending updates for the file will be lost.\n"
1781   },
1782   {
1783     "QUEUE",
1784     handle_request_queue,
1785     CMD_CONTEXT_CLIENT,
1786     "QUEUE\n"
1787     ,
1788         "Shows all files in the output queue.\n"
1789     "The output is zero or more lines in the following format:\n"
1790     "(where <num_vals> is the number of values to be written)\n"
1791     "\n"
1792     "<num_vals> <filename>\n"
1793   },
1794   {
1795     "STATS",
1796     handle_request_stats,
1797     CMD_CONTEXT_CLIENT,
1798     "STATS\n"
1799     ,
1800     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1801     "a description of the values.\n"
1802   },
1803   {
1804     "HELP",
1805     handle_request_help,
1806     CMD_CONTEXT_CLIENT,
1807     "HELP [<command>]\n",
1808     NULL, /* special! */
1809   },
1810   {
1811     "BATCH",
1812     batch_start,
1813     CMD_CONTEXT_CLIENT,
1814     "BATCH\n"
1815     ,
1816     "The 'BATCH' command permits the client to initiate a bulk load\n"
1817     "   of commands to rrdcached.\n"
1818     "\n"
1819     "Usage:\n"
1820     "\n"
1821     "    client: BATCH\n"
1822     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1823     "    client: command #1\n"
1824     "    client: command #2\n"
1825     "    client: ... and so on\n"
1826     "    client: .\n"
1827     "    server: 2 errors\n"
1828     "    server: 7 message for command #7\n"
1829     "    server: 9 message for command #9\n"
1830     "\n"
1831     "For more information, consult the rrdcached(1) documentation.\n"
1832   },
1833   {
1834     ".",   /* BATCH terminator */
1835     batch_done,
1836     CMD_CONTEXT_BATCH,
1837     NULL,
1838     NULL
1839   },
1840   {
1841     "FETCH",
1842     handle_request_fetch,
1843     CMD_CONTEXT_CLIENT,
1844     "FETCH <file> <CF> [<start> [<end>]]\n"
1845     ,
1846     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1847   },
1848   {
1849     "QUIT",
1850     handle_request_quit,
1851     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1852     "QUIT\n"
1853     ,
1854     "Disconnect from rrdcached.\n"
1855   }
1856 }; /* }}} command_t list_of_commands[] */
1857 static size_t list_of_commands_len = sizeof (list_of_commands)
1858   / sizeof (list_of_commands[0]);
1860 static command_t *find_command(char *cmd)
1862   size_t i;
1864   for (i = 0; i < list_of_commands_len; i++)
1865     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1866       return (&list_of_commands[i]);
1867   return NULL;
1870 /* We currently use the index in the `list_of_commands' array as a bit position
1871  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1872  * outside these functions so that switching to a more elegant storage method
1873  * is easily possible. */
1874 static ssize_t find_command_index (const char *cmd) /* {{{ */
1876   size_t i;
1878   for (i = 0; i < list_of_commands_len; i++)
1879     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1880       return ((ssize_t) i);
1881   return (-1);
1882 } /* }}} ssize_t find_command_index */
1884 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1885     const char *cmd)
1887   ssize_t i;
1889   if (JOURNAL_REPLAY(sock))
1890     return (1);
1892   if (cmd == NULL)
1893     return (-1);
1895   if ((strcasecmp ("QUIT", cmd) == 0)
1896       || (strcasecmp ("HELP", cmd) == 0))
1897     return (1);
1898   else if (strcmp (".", cmd) == 0)
1899     cmd = "BATCH";
1901   i = find_command_index (cmd);
1902   if (i < 0)
1903     return (-1);
1904   assert (i < 32);
1906   if ((sock->permissions & (1 << i)) != 0)
1907     return (1);
1908   return (0);
1909 } /* }}} int socket_permission_check */
1911 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1912     const char *cmd)
1914   ssize_t i;
1916   i = find_command_index (cmd);
1917   if (i < 0)
1918     return (-1);
1919   assert (i < 32);
1921   sock->permissions |= (1 << i);
1922   return (0);
1923 } /* }}} int socket_permission_add */
1925 /* check whether commands are received in the expected context */
1926 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1928   if (JOURNAL_REPLAY(sock))
1929     return (cmd->context & CMD_CONTEXT_JOURNAL);
1930   else if (sock->batch_start)
1931     return (cmd->context & CMD_CONTEXT_BATCH);
1932   else
1933     return (cmd->context & CMD_CONTEXT_CLIENT);
1935   /* NOTREACHED */
1936   assert(1==0);
1939 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1941   int status;
1942   char *cmd_str;
1943   char *resp_txt;
1944   command_t *help = NULL;
1946   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1947   if (status == 0)
1948     help = find_command(cmd_str);
1950   if (help && (help->syntax || help->help))
1951   {
1952     char tmp[CMD_MAX];
1954     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1955     resp_txt = tmp;
1957     if (help->syntax)
1958       add_response_info(sock, "Usage: %s\n", help->syntax);
1960     if (help->help)
1961       add_response_info(sock, "%s\n", help->help);
1962   }
1963   else
1964   {
1965     size_t i;
1967     resp_txt = "Command overview\n";
1969     for (i = 0; i < list_of_commands_len; i++)
1970     {
1971       if (list_of_commands[i].syntax == NULL)
1972         continue;
1973       add_response_info (sock, "%s", list_of_commands[i].syntax);
1974     }
1975   }
1977   return send_response(sock, RESP_OK, resp_txt);
1978 } /* }}} int handle_request_help */
1980 static int handle_request (DISPATCH_PROTO) /* {{{ */
1982   char *buffer_ptr = buffer;
1983   char *cmd_str = NULL;
1984   command_t *cmd = NULL;
1985   int status;
1987   assert (buffer[buffer_size - 1] == '\0');
1989   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1990   if (status != 0)
1991   {
1992     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1993     return (-1);
1994   }
1996   if (sock != NULL && sock->batch_start)
1997     sock->batch_cmd++;
1999   cmd = find_command(cmd_str);
2000   if (!cmd)
2001     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2003   if (!socket_permission_check (sock, cmd->cmd))
2004     return send_response(sock, RESP_ERR, "Permission denied.\n");
2006   if (!command_check_context(sock, cmd))
2007     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2009   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2010 } /* }}} int handle_request */
2012 static void journal_set_free (journal_set *js) /* {{{ */
2014   if (js == NULL)
2015     return;
2017   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2019   free(js);
2020 } /* }}} journal_set_free */
2022 static void journal_set_remove (journal_set *js) /* {{{ */
2024   if (js == NULL)
2025     return;
2027   for (uint i=0; i < js->files_num; i++)
2028   {
2029     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2030     unlink(js->files[i]);
2031   }
2032 } /* }}} journal_set_remove */
2034 /* close current journal file handle.
2035  * MUST hold journal_lock before calling */
2036 static void journal_close(void) /* {{{ */
2038   if (journal_fh != NULL)
2039   {
2040     if (fclose(journal_fh) != 0)
2041       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2042   }
2044   journal_fh = NULL;
2045   journal_size = 0;
2046 } /* }}} journal_close */
2048 /* MUST hold journal_lock before calling */
2049 static void journal_new_file(void) /* {{{ */
2051   struct timeval now;
2052   int  new_fd;
2053   char new_file[PATH_MAX + 1];
2055   assert(journal_dir != NULL);
2056   assert(journal_cur != NULL);
2058   journal_close();
2060   gettimeofday(&now, NULL);
2061   /* this format assures that the files sort in strcmp() order */
2062   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2063            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2065   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2066                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2067   if (new_fd < 0)
2068     goto error;
2070   journal_fh = fdopen(new_fd, "a");
2071   if (journal_fh == NULL)
2072     goto error;
2074   journal_size = ftell(journal_fh);
2075   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2077   /* record the file in the journal set */
2078   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2080   return;
2082 error:
2083   RRDD_LOG(LOG_CRIT,
2084            "JOURNALING DISABLED: Error while trying to create %s : %s",
2085            new_file, rrd_strerror(errno));
2086   RRDD_LOG(LOG_CRIT,
2087            "JOURNALING DISABLED: All values will be flushed at shutdown");
2089   close(new_fd);
2090   config_flush_at_shutdown = 1;
2092 } /* }}} journal_new_file */
2094 /* MUST NOT hold journal_lock before calling this */
2095 static void journal_rotate(void) /* {{{ */
2097   journal_set *old_js = NULL;
2099   if (journal_dir == NULL)
2100     return;
2102   RRDD_LOG(LOG_DEBUG, "rotating journals");
2104   pthread_mutex_lock(&stats_lock);
2105   ++stats_journal_rotate;
2106   pthread_mutex_unlock(&stats_lock);
2108   pthread_mutex_lock(&journal_lock);
2110   journal_close();
2112   /* rotate the journal sets */
2113   old_js = journal_old;
2114   journal_old = journal_cur;
2115   journal_cur = calloc(1, sizeof(journal_set));
2117   if (journal_cur != NULL)
2118     journal_new_file();
2119   else
2120     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2122   pthread_mutex_unlock(&journal_lock);
2124   journal_set_remove(old_js);
2125   journal_set_free  (old_js);
2127 } /* }}} static void journal_rotate */
2129 /* MUST hold journal_lock when calling */
2130 static void journal_done(void) /* {{{ */
2132   if (journal_cur == NULL)
2133     return;
2135   journal_close();
2137   if (config_flush_at_shutdown)
2138   {
2139     RRDD_LOG(LOG_INFO, "removing journals");
2140     journal_set_remove(journal_old);
2141     journal_set_remove(journal_cur);
2142   }
2143   else
2144   {
2145     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2146              "journals will be used at next startup");
2147   }
2149   journal_set_free(journal_cur);
2150   journal_set_free(journal_old);
2151   free(journal_dir);
2153 } /* }}} static void journal_done */
2155 static int journal_write(char *cmd, char *args) /* {{{ */
2157   int chars;
2159   if (journal_fh == NULL)
2160     return 0;
2162   pthread_mutex_lock(&journal_lock);
2163   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2164   journal_size += chars;
2166   if (journal_size > JOURNAL_MAX)
2167     journal_new_file();
2169   pthread_mutex_unlock(&journal_lock);
2171   if (chars > 0)
2172   {
2173     pthread_mutex_lock(&stats_lock);
2174     stats_journal_bytes += chars;
2175     pthread_mutex_unlock(&stats_lock);
2176   }
2178   return chars;
2179 } /* }}} static int journal_write */
2181 static int journal_replay (const char *file) /* {{{ */
2183   FILE *fh;
2184   int entry_cnt = 0;
2185   int fail_cnt = 0;
2186   uint64_t line = 0;
2187   char entry[CMD_MAX];
2188   time_t now;
2190   if (file == NULL) return 0;
2192   {
2193     char *reason = "unknown error";
2194     int status = 0;
2195     struct stat statbuf;
2197     memset(&statbuf, 0, sizeof(statbuf));
2198     if (stat(file, &statbuf) != 0)
2199     {
2200       reason = "stat error";
2201       status = errno;
2202     }
2203     else if (!S_ISREG(statbuf.st_mode))
2204     {
2205       reason = "not a regular file";
2206       status = EPERM;
2207     }
2208     if (statbuf.st_uid != daemon_uid)
2209     {
2210       reason = "not owned by daemon user";
2211       status = EACCES;
2212     }
2213     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2214     {
2215       reason = "must not be user/group writable";
2216       status = EACCES;
2217     }
2219     if (status != 0)
2220     {
2221       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2222                file, rrd_strerror(status), reason);
2223       return 0;
2224     }
2225   }
2227   fh = fopen(file, "r");
2228   if (fh == NULL)
2229   {
2230     if (errno != ENOENT)
2231       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2232                file, rrd_strerror(errno));
2233     return 0;
2234   }
2235   else
2236     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2238   now = time(NULL);
2240   while(!feof(fh))
2241   {
2242     size_t entry_len;
2244     ++line;
2245     if (fgets(entry, sizeof(entry), fh) == NULL)
2246       break;
2247     entry_len = strlen(entry);
2249     /* check \n termination in case journal writing crashed mid-line */
2250     if (entry_len == 0)
2251       continue;
2252     else if (entry[entry_len - 1] != '\n')
2253     {
2254       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2255       ++fail_cnt;
2256       continue;
2257     }
2259     entry[entry_len - 1] = '\0';
2261     if (handle_request(NULL, now, entry, entry_len) == 0)
2262       ++entry_cnt;
2263     else
2264       ++fail_cnt;
2265   }
2267   fclose(fh);
2269   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2270            entry_cnt, fail_cnt);
2272   return entry_cnt > 0 ? 1 : 0;
2273 } /* }}} static int journal_replay */
2275 static int journal_sort(const void *v1, const void *v2)
2277   char **jn1 = (char **) v1;
2278   char **jn2 = (char **) v2;
2280   return strcmp(*jn1,*jn2);
2283 static void journal_init(void) /* {{{ */
2285   int had_journal = 0;
2286   DIR *dir;
2287   struct dirent *dent;
2288   char path[PATH_MAX+1];
2290   if (journal_dir == NULL) return;
2292   pthread_mutex_lock(&journal_lock);
2294   journal_cur = calloc(1, sizeof(journal_set));
2295   if (journal_cur == NULL)
2296   {
2297     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2298     return;
2299   }
2301   RRDD_LOG(LOG_INFO, "checking for journal files");
2303   /* Handle old journal files during transition.  This gives them the
2304    * correct sort order.  TODO: remove after first release
2305    */
2306   {
2307     char old_path[PATH_MAX+1];
2308     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2309     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2310     rename(old_path, path);
2312     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2313     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2314     rename(old_path, path);
2315   }
2317   dir = opendir(journal_dir);
2318   if (!dir) {
2319     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2320     return;
2321   }
2322   while ((dent = readdir(dir)) != NULL)
2323   {
2324     /* looks like a journal file? */
2325     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2326       continue;
2328     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2330     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2331     {
2332       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2333                dent->d_name);
2334       break;
2335     }
2336   }
2337   closedir(dir);
2339   qsort(journal_cur->files, journal_cur->files_num,
2340         sizeof(journal_cur->files[0]), journal_sort);
2342   for (uint i=0; i < journal_cur->files_num; i++)
2343     had_journal += journal_replay(journal_cur->files[i]);
2345   journal_new_file();
2347   /* it must have been a crash.  start a flush */
2348   if (had_journal && config_flush_at_shutdown)
2349     flush_old_values(-1);
2351   pthread_mutex_unlock(&journal_lock);
2353   RRDD_LOG(LOG_INFO, "journal processing complete");
2355 } /* }}} static void journal_init */
2357 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2359   assert(sock != NULL);
2361   free(sock->rbuf);  sock->rbuf = NULL;
2362   free(sock->wbuf);  sock->wbuf = NULL;
2363   free(sock);
2364 } /* }}} void free_listen_socket */
2366 static void close_connection(listen_socket_t *sock) /* {{{ */
2368   if (sock->fd >= 0)
2369   {
2370     close(sock->fd);
2371     sock->fd = -1;
2372   }
2374   free_listen_socket(sock);
2376 } /* }}} void close_connection */
2378 static void *connection_thread_main (void *args) /* {{{ */
2380   listen_socket_t *sock;
2381   int fd;
2383   sock = (listen_socket_t *) args;
2384   fd = sock->fd;
2386   /* init read buffers */
2387   sock->next_read = sock->next_cmd = 0;
2388   sock->rbuf = malloc(RBUF_SIZE);
2389   if (sock->rbuf == NULL)
2390   {
2391     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2392     close_connection(sock);
2393     return NULL;
2394   }
2396   pthread_mutex_lock (&connection_threads_lock);
2397   connection_threads_num++;
2398   pthread_mutex_unlock (&connection_threads_lock);
2400   while (state == RUNNING)
2401   {
2402     char *cmd;
2403     ssize_t cmd_len;
2404     ssize_t rbytes;
2405     time_t now;
2407     struct pollfd pollfd;
2408     int status;
2410     pollfd.fd = fd;
2411     pollfd.events = POLLIN | POLLPRI;
2412     pollfd.revents = 0;
2414     status = poll (&pollfd, 1, /* timeout = */ 500);
2415     if (state != RUNNING)
2416       break;
2417     else if (status == 0) /* timeout */
2418       continue;
2419     else if (status < 0) /* error */
2420     {
2421       status = errno;
2422       if (status != EINTR)
2423         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2424       continue;
2425     }
2427     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2428       break;
2429     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2430     {
2431       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2432           "poll(2) returned something unexpected: %#04hx",
2433           pollfd.revents);
2434       break;
2435     }
2437     rbytes = read(fd, sock->rbuf + sock->next_read,
2438                   RBUF_SIZE - sock->next_read);
2439     if (rbytes < 0)
2440     {
2441       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2442       break;
2443     }
2444     else if (rbytes == 0)
2445       break; /* eof */
2447     sock->next_read += rbytes;
2449     if (sock->batch_start)
2450       now = sock->batch_start;
2451     else
2452       now = time(NULL);
2454     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2455     {
2456       status = handle_request (sock, now, cmd, cmd_len+1);
2457       if (status != 0)
2458         goto out_close;
2459     }
2460   }
2462 out_close:
2463   close_connection(sock);
2465   /* Remove this thread from the connection threads list */
2466   pthread_mutex_lock (&connection_threads_lock);
2467   connection_threads_num--;
2468   if (connection_threads_num <= 0)
2469     pthread_cond_broadcast(&connection_threads_done);
2470   pthread_mutex_unlock (&connection_threads_lock);
2472   return (NULL);
2473 } /* }}} void *connection_thread_main */
2475 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2477   int fd;
2478   struct sockaddr_un sa;
2479   listen_socket_t *temp;
2480   int status;
2481   const char *path;
2482   char *path_copy, *dir;
2484   path = sock->addr;
2485   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2486     path += strlen("unix:");
2488   /* dirname may modify its argument */
2489   path_copy = strdup(path);
2490   if (path_copy == NULL)
2491   {
2492     fprintf(stderr, "rrdcached: strdup(): %s\n",
2493         rrd_strerror(errno));
2494     return (-1);
2495   }
2497   dir = dirname(path_copy);
2498   if (rrd_mkdir_p(dir, 0777) != 0)
2499   {
2500     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2501         dir, rrd_strerror(errno));
2502     return (-1);
2503   }
2505   free(path_copy);
2507   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2508       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2509   if (temp == NULL)
2510   {
2511     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2512     return (-1);
2513   }
2514   listen_fds = temp;
2515   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2517   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2518   if (fd < 0)
2519   {
2520     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2521              rrd_strerror(errno));
2522     return (-1);
2523   }
2525   memset (&sa, 0, sizeof (sa));
2526   sa.sun_family = AF_UNIX;
2527   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2529   /* if we've gotten this far, we own the pid file.  any daemon started
2530    * with the same args must not be alive.  therefore, ensure that we can
2531    * create the socket...
2532    */
2533   unlink(path);
2535   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2536   if (status != 0)
2537   {
2538     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2539              path, rrd_strerror(errno));
2540     close (fd);
2541     return (-1);
2542   }
2544   /* tweak the sockets group ownership */
2545   if (sock->socket_group != (gid_t)-1)
2546   {
2547     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2548          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2549     {
2550       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2551     }
2552   }
2554   if (sock->socket_permissions != (mode_t)-1)
2555   {
2556     if (chmod(path, sock->socket_permissions) != 0)
2557       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2558           (unsigned int)sock->socket_permissions, strerror(errno));
2559   }
2561   status = listen (fd, /* backlog = */ 10);
2562   if (status != 0)
2563   {
2564     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2565              path, rrd_strerror(errno));
2566     close (fd);
2567     unlink (path);
2568     return (-1);
2569   }
2571   listen_fds[listen_fds_num].fd = fd;
2572   listen_fds[listen_fds_num].family = PF_UNIX;
2573   strncpy(listen_fds[listen_fds_num].addr, path,
2574           sizeof (listen_fds[listen_fds_num].addr) - 1);
2575   listen_fds_num++;
2577   return (0);
2578 } /* }}} int open_listen_socket_unix */
2580 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2582   struct addrinfo ai_hints;
2583   struct addrinfo *ai_res;
2584   struct addrinfo *ai_ptr;
2585   char addr_copy[NI_MAXHOST];
2586   char *addr;
2587   char *port;
2588   int status;
2590   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2591   addr_copy[sizeof (addr_copy) - 1] = 0;
2592   addr = addr_copy;
2594   memset (&ai_hints, 0, sizeof (ai_hints));
2595   ai_hints.ai_flags = 0;
2596 #ifdef AI_ADDRCONFIG
2597   ai_hints.ai_flags |= AI_ADDRCONFIG;
2598 #endif
2599   ai_hints.ai_family = AF_UNSPEC;
2600   ai_hints.ai_socktype = SOCK_STREAM;
2602   port = NULL;
2603   if (*addr == '[') /* IPv6+port format */
2604   {
2605     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2606     addr++;
2608     port = strchr (addr, ']');
2609     if (port == NULL)
2610     {
2611       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2612       return (-1);
2613     }
2614     *port = 0;
2615     port++;
2617     if (*port == ':')
2618       port++;
2619     else if (*port == 0)
2620       port = NULL;
2621     else
2622     {
2623       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2624       return (-1);
2625     }
2626   } /* if (*addr == '[') */
2627   else
2628   {
2629     port = rindex(addr, ':');
2630     if (port != NULL)
2631     {
2632       *port = 0;
2633       port++;
2634     }
2635   }
2636   ai_res = NULL;
2637   status = getaddrinfo (addr,
2638                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2639                         &ai_hints, &ai_res);
2640   if (status != 0)
2641   {
2642     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2643              addr, gai_strerror (status));
2644     return (-1);
2645   }
2647   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2648   {
2649     int fd;
2650     listen_socket_t *temp;
2651     int one = 1;
2653     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2654         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2655     if (temp == NULL)
2656     {
2657       fprintf (stderr,
2658                "rrdcached: open_listen_socket_network: realloc failed.\n");
2659       continue;
2660     }
2661     listen_fds = temp;
2662     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2664     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2665     if (fd < 0)
2666     {
2667       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2668                rrd_strerror(errno));
2669       continue;
2670     }
2672     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2674     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2675     if (status != 0)
2676     {
2677       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2678                sock->addr, rrd_strerror(errno));
2679       close (fd);
2680       continue;
2681     }
2683     status = listen (fd, /* backlog = */ 10);
2684     if (status != 0)
2685     {
2686       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2687                sock->addr, rrd_strerror(errno));
2688       close (fd);
2689       freeaddrinfo(ai_res);
2690       return (-1);
2691     }
2693     listen_fds[listen_fds_num].fd = fd;
2694     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2695     listen_fds_num++;
2696   } /* for (ai_ptr) */
2698   freeaddrinfo(ai_res);
2699   return (0);
2700 } /* }}} static int open_listen_socket_network */
2702 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2704   assert(sock != NULL);
2705   assert(sock->addr != NULL);
2707   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2708       || sock->addr[0] == '/')
2709     return (open_listen_socket_unix(sock));
2710   else
2711     return (open_listen_socket_network(sock));
2712 } /* }}} int open_listen_socket */
2714 static int close_listen_sockets (void) /* {{{ */
2716   size_t i;
2718   for (i = 0; i < listen_fds_num; i++)
2719   {
2720     close (listen_fds[i].fd);
2722     if (listen_fds[i].family == PF_UNIX)
2723       unlink(listen_fds[i].addr);
2724   }
2726   free (listen_fds);
2727   listen_fds = NULL;
2728   listen_fds_num = 0;
2730   return (0);
2731 } /* }}} int close_listen_sockets */
2733 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2735   struct pollfd *pollfds;
2736   int pollfds_num;
2737   int status;
2738   int i;
2740   if (listen_fds_num < 1)
2741   {
2742     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2743     return (NULL);
2744   }
2746   pollfds_num = listen_fds_num;
2747   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2748   if (pollfds == NULL)
2749   {
2750     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2751     return (NULL);
2752   }
2753   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2755   RRDD_LOG(LOG_INFO, "listening for connections");
2757   while (state == RUNNING)
2758   {
2759     for (i = 0; i < pollfds_num; i++)
2760     {
2761       pollfds[i].fd = listen_fds[i].fd;
2762       pollfds[i].events = POLLIN | POLLPRI;
2763       pollfds[i].revents = 0;
2764     }
2766     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2767     if (state != RUNNING)
2768       break;
2769     else if (status == 0) /* timeout */
2770       continue;
2771     else if (status < 0) /* error */
2772     {
2773       status = errno;
2774       if (status != EINTR)
2775       {
2776         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2777       }
2778       continue;
2779     }
2781     for (i = 0; i < pollfds_num; i++)
2782     {
2783       listen_socket_t *client_sock;
2784       struct sockaddr_storage client_sa;
2785       socklen_t client_sa_size;
2786       pthread_t tid;
2787       pthread_attr_t attr;
2789       if (pollfds[i].revents == 0)
2790         continue;
2792       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2793       {
2794         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2795             "poll(2) returned something unexpected for listen FD #%i.",
2796             pollfds[i].fd);
2797         continue;
2798       }
2800       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2801       if (client_sock == NULL)
2802       {
2803         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2804         continue;
2805       }
2806       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2808       client_sa_size = sizeof (client_sa);
2809       client_sock->fd = accept (pollfds[i].fd,
2810           (struct sockaddr *) &client_sa, &client_sa_size);
2811       if (client_sock->fd < 0)
2812       {
2813         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2814         free(client_sock);
2815         continue;
2816       }
2818       pthread_attr_init (&attr);
2819       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2821       status = pthread_create (&tid, &attr, connection_thread_main,
2822                                client_sock);
2823       if (status != 0)
2824       {
2825         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2826         close_connection(client_sock);
2827         continue;
2828       }
2829     } /* for (pollfds_num) */
2830   } /* while (state == RUNNING) */
2832   RRDD_LOG(LOG_INFO, "starting shutdown");
2834   close_listen_sockets ();
2836   pthread_mutex_lock (&connection_threads_lock);
2837   while (connection_threads_num > 0)
2838     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2839   pthread_mutex_unlock (&connection_threads_lock);
2841   free(pollfds);
2843   return (NULL);
2844 } /* }}} void *listen_thread_main */
2846 static int daemonize (void) /* {{{ */
2848   int pid_fd;
2849   char *base_dir;
2851   daemon_uid = geteuid();
2853   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2854   if (pid_fd < 0)
2855     pid_fd = check_pidfile();
2856   if (pid_fd < 0)
2857     return pid_fd;
2859   /* open all the listen sockets */
2860   if (config_listen_address_list_len > 0)
2861   {
2862     for (size_t i = 0; i < config_listen_address_list_len; i++)
2863       open_listen_socket (config_listen_address_list[i]);
2865     rrd_free_ptrs((void ***) &config_listen_address_list,
2866                   &config_listen_address_list_len);
2867   }
2868   else
2869   {
2870     listen_socket_t sock;
2871     memset(&sock, 0, sizeof(sock));
2872     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2873     open_listen_socket (&sock);
2874   }
2876   if (listen_fds_num < 1)
2877   {
2878     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2879     goto error;
2880   }
2882   if (!stay_foreground)
2883   {
2884     pid_t child;
2886     child = fork ();
2887     if (child < 0)
2888     {
2889       fprintf (stderr, "daemonize: fork(2) failed.\n");
2890       goto error;
2891     }
2892     else if (child > 0)
2893       exit(0);
2895     /* Become session leader */
2896     setsid ();
2898     /* Open the first three file descriptors to /dev/null */
2899     close (2);
2900     close (1);
2901     close (0);
2903     open ("/dev/null", O_RDWR);
2904     if (dup(0) == -1 || dup(0) == -1){
2905         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2906     }
2907   } /* if (!stay_foreground) */
2909   /* Change into the /tmp directory. */
2910   base_dir = (config_base_dir != NULL)
2911     ? config_base_dir
2912     : "/tmp";
2914   if (chdir (base_dir) != 0)
2915   {
2916     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2917     goto error;
2918   }
2920   install_signal_handlers();
2922   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2923   RRDD_LOG(LOG_INFO, "starting up");
2925   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2926                                 (GDestroyNotify) free_cache_item);
2927   if (cache_tree == NULL)
2928   {
2929     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2930     goto error;
2931   }
2933   return write_pidfile (pid_fd);
2935 error:
2936   remove_pidfile();
2937   return -1;
2938 } /* }}} int daemonize */
2940 static int cleanup (void) /* {{{ */
2942   pthread_cond_broadcast (&flush_cond);
2943   pthread_join (flush_thread, NULL);
2945   pthread_cond_broadcast (&queue_cond);
2946   for (int i = 0; i < config_queue_threads; i++)
2947     pthread_join (queue_threads[i], NULL);
2949   if (config_flush_at_shutdown)
2950   {
2951     assert(cache_queue_head == NULL);
2952     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2953   }
2955   free(queue_threads);
2956   free(config_base_dir);
2958   pthread_mutex_lock(&cache_lock);
2959   g_tree_destroy(cache_tree);
2961   pthread_mutex_lock(&journal_lock);
2962   journal_done();
2964   RRDD_LOG(LOG_INFO, "goodbye");
2965   closelog ();
2967   remove_pidfile ();
2968   free(config_pid_file);
2970   return (0);
2971 } /* }}} int cleanup */
2973 static int read_options (int argc, char **argv) /* {{{ */
2975   int option;
2976   int status = 0;
2978   char **permissions = NULL;
2979   size_t permissions_len = 0;
2981   gid_t  socket_group = (gid_t)-1;
2982   mode_t socket_permissions = (mode_t)-1;
2984   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2985   {
2986     switch (option)
2987     {
2988       case 'g':
2989         stay_foreground=1;
2990         break;
2992       case 'l':
2993       {
2994         listen_socket_t *new;
2996         new = malloc(sizeof(listen_socket_t));
2997         if (new == NULL)
2998         {
2999           fprintf(stderr, "read_options: malloc failed.\n");
3000           return(2);
3001         }
3002         memset(new, 0, sizeof(listen_socket_t));
3004         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3006         /* Add permissions to the socket {{{ */
3007         if (permissions_len != 0)
3008         {
3009           size_t i;
3010           for (i = 0; i < permissions_len; i++)
3011           {
3012             status = socket_permission_add (new, permissions[i]);
3013             if (status != 0)
3014             {
3015               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3016                   "socket failed. Most likely, this permission doesn't "
3017                   "exist. Check your command line.\n", permissions[i]);
3018               status = 4;
3019             }
3020           }
3021         }
3022         else /* if (permissions_len == 0) */
3023         {
3024           /* Add permission for ALL commands to the socket. */
3025           size_t i;
3026           for (i = 0; i < list_of_commands_len; i++)
3027           {
3028             status = socket_permission_add (new, list_of_commands[i].cmd);
3029             if (status != 0)
3030             {
3031               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3032                   "socket failed. This should never happen, ever! Sorry.\n",
3033                   permissions[i]);
3034               status = 4;
3035             }
3036           }
3037         }
3038         /* }}} Done adding permissions. */
3040         new->socket_group = socket_group;
3041         new->socket_permissions = socket_permissions;
3043         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3044                          &config_listen_address_list_len, new))
3045         {
3046           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3047           return (2);
3048         }
3049       }
3050       break;
3052       /* set socket group permissions */
3053       case 's':
3054       {
3055         gid_t group_gid;
3056         struct group *grp;
3058         group_gid = strtoul(optarg, NULL, 10);
3059         if (errno != EINVAL && group_gid>0)
3060         {
3061           /* we were passed a number */
3062           grp = getgrgid(group_gid);
3063         }
3064         else
3065         {
3066           grp = getgrnam(optarg);
3067         }
3069         if (grp)
3070         {
3071           socket_group = grp->gr_gid;
3072         }
3073         else
3074         {
3075           /* no idea what the user wanted... */
3076           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3077           return (5);
3078         }
3079       }
3080       break;
3082       /* set socket file permissions */
3083       case 'm':
3084       {
3085         long  tmp;
3086         char *endptr = NULL;
3088         tmp = strtol (optarg, &endptr, 8);
3089         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3090             || (tmp > 07777) || (tmp < 0)) {
3091           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3092               optarg);
3093           return (5);
3094         }
3096         socket_permissions = (mode_t)tmp;
3097       }
3098       break;
3100       case 'P':
3101       {
3102         char *optcopy;
3103         char *saveptr;
3104         char *dummy;
3105         char *ptr;
3107         rrd_free_ptrs ((void *) &permissions, &permissions_len);
3109         optcopy = strdup (optarg);
3110         dummy = optcopy;
3111         saveptr = NULL;
3112         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3113         {
3114           dummy = NULL;
3115           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3116         }
3118         free (optcopy);
3119       }
3120       break;
3122       case 'f':
3123       {
3124         int temp;
3126         temp = atoi (optarg);
3127         if (temp > 0)
3128           config_flush_interval = temp;
3129         else
3130         {
3131           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3132           status = 3;
3133         }
3134       }
3135       break;
3137       case 'w':
3138       {
3139         int temp;
3141         temp = atoi (optarg);
3142         if (temp > 0)
3143           config_write_interval = temp;
3144         else
3145         {
3146           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3147           status = 2;
3148         }
3149       }
3150       break;
3152       case 'z':
3153       {
3154         int temp;
3156         temp = atoi(optarg);
3157         if (temp > 0)
3158           config_write_jitter = temp;
3159         else
3160         {
3161           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3162           status = 2;
3163         }
3165         break;
3166       }
3168       case 't':
3169       {
3170         int threads;
3171         threads = atoi(optarg);
3172         if (threads >= 1)
3173           config_queue_threads = threads;
3174         else
3175         {
3176           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3177           return 1;
3178         }
3179       }
3180       break;
3182       case 'B':
3183         config_write_base_only = 1;
3184         break;
3186       case 'b':
3187       {
3188         size_t len;
3189         char base_realpath[PATH_MAX];
3191         if (config_base_dir != NULL)
3192           free (config_base_dir);
3193         config_base_dir = strdup (optarg);
3194         if (config_base_dir == NULL)
3195         {
3196           fprintf (stderr, "read_options: strdup failed.\n");
3197           return (3);
3198         }
3200         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3201         {
3202           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3203               config_base_dir, rrd_strerror (errno));
3204           return (3);
3205         }
3207         /* make sure that the base directory is not resolved via
3208          * symbolic links.  this makes some performance-enhancing
3209          * assumptions possible (we don't have to resolve paths
3210          * that start with a "/")
3211          */
3212         if (realpath(config_base_dir, base_realpath) == NULL)
3213         {
3214           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3215               "%s\n", config_base_dir, rrd_strerror(errno));
3216           return 5;
3217         }
3219         len = strlen (config_base_dir);
3220         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3221         {
3222           config_base_dir[len - 1] = 0;
3223           len--;
3224         }
3226         if (len < 1)
3227         {
3228           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3229           return (4);
3230         }
3232         _config_base_dir_len = len;
3234         len = strlen (base_realpath);
3235         while ((len > 0) && (base_realpath[len - 1] == '/'))
3236         {
3237           base_realpath[len - 1] = '\0';
3238           len--;
3239         }
3241         if (strncmp(config_base_dir,
3242                          base_realpath, sizeof(base_realpath)) != 0)
3243         {
3244           fprintf(stderr,
3245                   "Base directory (-b) resolved via file system links!\n"
3246                   "Please consult rrdcached '-b' documentation!\n"
3247                   "Consider specifying the real directory (%s)\n",
3248                   base_realpath);
3249           return 5;
3250         }
3251       }
3252       break;
3254       case 'p':
3255       {
3256         if (config_pid_file != NULL)
3257           free (config_pid_file);
3258         config_pid_file = strdup (optarg);
3259         if (config_pid_file == NULL)
3260         {
3261           fprintf (stderr, "read_options: strdup failed.\n");
3262           return (3);
3263         }
3264       }
3265       break;
3267       case 'F':
3268         config_flush_at_shutdown = 1;
3269         break;
3271       case 'j':
3272       {
3273         char journal_dir_actual[PATH_MAX];
3274         const char *dir;
3275         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3277         status = rrd_mkdir_p(dir, 0777);
3278         if (status != 0)
3279         {
3280           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3281               dir, rrd_strerror(errno));
3282           return 6;
3283         }
3285         if (access(dir, R_OK|W_OK|X_OK) != 0)
3286         {
3287           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3288                   errno ? rrd_strerror(errno) : "");
3289           return 6;
3290         }
3291       }
3292       break;
3294       case 'a':
3295       {
3296         int temp = atoi(optarg);
3297         if (temp > 0)
3298           config_alloc_chunk = temp;
3299         else
3300         {
3301           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3302           return 10;
3303         }
3304       }
3305       break;
3307       case 'h':
3308       case '?':
3309         printf ("RRDCacheD %s\n"
3310             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3311             "\n"
3312             "Usage: rrdcached [options]\n"
3313             "\n"
3314             "Valid options are:\n"
3315             "  -l <address>  Socket address to listen to.\n"
3316             "  -P <perms>    Sets the permissions to assign to all following "
3317                             "sockets\n"
3318             "  -w <seconds>  Interval in which to write data.\n"
3319             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3320             "  -t <threads>  Number of write threads.\n"
3321             "  -f <seconds>  Interval in which to flush dead data.\n"
3322             "  -p <file>     Location of the PID-file.\n"
3323             "  -b <dir>      Base directory to change to.\n"
3324             "  -B            Restrict file access to paths within -b <dir>\n"
3325             "  -g            Do not fork and run in the foreground.\n"
3326             "  -j <dir>      Directory in which to create the journal files.\n"
3327             "  -F            Always flush all updates at shutdown\n"
3328             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3329             "                (the socket will also have read/write permissions "
3330                             "for that group)\n"
3331             "  -m <mode>     File permissions (octal) of all following UNIX "
3332                             "sockets\n"
3333             "  -a <size>     Memory allocation chunk size. Default is 1."
3334             "\n"
3335             "For more information and a detailed description of all options "
3336             "please refer\n"
3337             "to the rrdcached(1) manual page.\n",
3338             VERSION);
3339         if (option == 'h')
3340           status = -1;
3341         else
3342           status = 1;
3343         break;
3344     } /* switch (option) */
3345   } /* while (getopt) */
3347   /* advise the user when values are not sane */
3348   if (config_flush_interval < 2 * config_write_interval)
3349     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3350             " 2x write interval (-w) !\n");
3351   if (config_write_jitter > config_write_interval)
3352     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3353             " write interval (-w) !\n");
3355   if (config_write_base_only && config_base_dir == NULL)
3356     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3357             "  Consult the rrdcached documentation\n");
3359   if (journal_dir == NULL)
3360     config_flush_at_shutdown = 1;
3362   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3364   return (status);
3365 } /* }}} int read_options */
3367 int main (int argc, char **argv)
3369   int status;
3371   status = read_options (argc, argv);
3372   if (status != 0)
3373   {
3374     if (status < 0)
3375       status = 0;
3376     return (status);
3377   }
3379   status = daemonize ();
3380   if (status != 0)
3381   {
3382     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3383     return (1);
3384   }
3386   journal_init();
3388   /* start the queue threads */
3389   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3390   if (queue_threads == NULL)
3391   {
3392     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3393     cleanup();
3394     return (1);
3395   }
3396   for (int i = 0; i < config_queue_threads; i++)
3397   {
3398     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3399     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3400     if (status != 0)
3401     {
3402       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3403       cleanup();
3404       return (1);
3405     }
3406   }
3408   /* start the flush thread */
3409   memset(&flush_thread, 0, sizeof(flush_thread));
3410   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3411   if (status != 0)
3412   {
3413     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3414     cleanup();
3415     return (1);
3416   }
3418   listen_thread_main (NULL);
3419   cleanup ();
3421   return (0);
3422 } /* int main */
3424 /*
3425  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3426  */