Code

Imported upstream version 1.4.3.
[pkg-rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
111 #include <glib-2.0/glib.h>
112 /* }}} */
114 #define RRDD_LOG(severity, ...) \
115   do { \
116     if (stay_foreground) \
117       fprintf(stderr, __VA_ARGS__); \
118     syslog ((severity), __VA_ARGS__); \
119   } while (0)
121 #ifndef __GNUC__
122 # define __attribute__(x) /**/
123 #endif
125 /*
126  * Types
127  */
128 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
130 struct listen_socket_s
132   int fd;
133   char addr[PATH_MAX + 1];
134   int family;
136   /* state for BATCH processing */
137   time_t batch_start;
138   int batch_cmd;
140   /* buffered IO */
141   char *rbuf;
142   off_t next_cmd;
143   off_t next_read;
145   char *wbuf;
146   ssize_t wbuf_len;
148   uint32_t permissions;
150   gid_t  socket_group;
151   mode_t socket_permissions;
152 };
153 typedef struct listen_socket_s listen_socket_t;
155 struct command_s;
156 typedef struct command_s command_t;
157 /* note: guard against "unused" warnings in the handlers */
158 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
159                         time_t now              __attribute__((unused)),\
160                         char  *buffer           __attribute__((unused)),\
161                         size_t buffer_size      __attribute__((unused))
163 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
164                         DISPATCH_PROTO
166 struct command_s {
167   char   *cmd;
168   int (*handler)(HANDLER_PROTO);
170   char  context;                /* where we expect to see it */
171 #define CMD_CONTEXT_CLIENT      (1<<0)
172 #define CMD_CONTEXT_BATCH       (1<<1)
173 #define CMD_CONTEXT_JOURNAL     (1<<2)
174 #define CMD_CONTEXT_ANY         (0x7f)
176   char *syntax;
177   char *help;
178 };
180 struct cache_item_s;
181 typedef struct cache_item_s cache_item_t;
182 struct cache_item_s
184   char *file;
185   char **values;
186   size_t values_num;
187   time_t last_flush_time;
188   time_t last_update_stamp;
189 #define CI_FLAGS_IN_TREE  (1<<0)
190 #define CI_FLAGS_IN_QUEUE (1<<1)
191   int flags;
192   pthread_cond_t  flushed;
193   cache_item_t *prev;
194   cache_item_t *next;
195 };
197 struct callback_flush_data_s
199   time_t now;
200   time_t abs_timeout;
201   char **keys;
202   size_t keys_num;
203 };
204 typedef struct callback_flush_data_s callback_flush_data_t;
206 enum queue_side_e
208   HEAD,
209   TAIL
210 };
211 typedef enum queue_side_e queue_side_t;
213 /* describe a set of journal files */
214 typedef struct {
215   char **files;
216   size_t files_num;
217 } journal_set;
219 /* max length of socket command or response */
220 #define CMD_MAX 4096
221 #define RBUF_SIZE (CMD_MAX*2)
223 /*
224  * Variables
225  */
226 static int stay_foreground = 0;
227 static uid_t daemon_uid;
229 static listen_socket_t *listen_fds = NULL;
230 static size_t listen_fds_num = 0;
232 enum {
233   RUNNING,              /* normal operation */
234   FLUSHING,             /* flushing remaining values */
235   SHUTDOWN              /* shutting down */
236 } state = RUNNING;
238 static pthread_t *queue_threads;
239 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
240 static int config_queue_threads = 4;
242 static pthread_t flush_thread;
243 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
245 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
246 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
247 static int connection_threads_num = 0;
249 /* Cache stuff */
250 static GTree          *cache_tree = NULL;
251 static cache_item_t   *cache_queue_head = NULL;
252 static cache_item_t   *cache_queue_tail = NULL;
253 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
255 static int config_write_interval = 300;
256 static int config_write_jitter   = 0;
257 static int config_flush_interval = 3600;
258 static int config_flush_at_shutdown = 0;
259 static char *config_pid_file = NULL;
260 static char *config_base_dir = NULL;
261 static size_t _config_base_dir_len = 0;
262 static int config_write_base_only = 0;
264 static listen_socket_t **config_listen_address_list = NULL;
265 static size_t config_listen_address_list_len = 0;
267 static uint64_t stats_queue_length = 0;
268 static uint64_t stats_updates_received = 0;
269 static uint64_t stats_flush_received = 0;
270 static uint64_t stats_updates_written = 0;
271 static uint64_t stats_data_sets_written = 0;
272 static uint64_t stats_journal_bytes = 0;
273 static uint64_t stats_journal_rotate = 0;
274 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
276 /* Journaled updates */
277 #define JOURNAL_REPLAY(s) ((s) == NULL)
278 #define JOURNAL_BASE "rrd.journal"
279 static journal_set *journal_cur = NULL;
280 static journal_set *journal_old = NULL;
281 static char *journal_dir = NULL;
282 static FILE *journal_fh = NULL;         /* current journal file handle */
283 static long  journal_size = 0;          /* current journal size */
284 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
285 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
286 static int journal_write(char *cmd, char *args);
287 static void journal_done(void);
288 static void journal_rotate(void);
290 /* prototypes for forward refernces */
291 static int handle_request_help (HANDLER_PROTO);
293 /* 
294  * Functions
295  */
296 static void sig_common (const char *sig) /* {{{ */
298   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
299   state = FLUSHING;
300   pthread_cond_broadcast(&flush_cond);
301   pthread_cond_broadcast(&queue_cond);
302 } /* }}} void sig_common */
304 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
306   sig_common("INT");
307 } /* }}} void sig_int_handler */
309 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
311   sig_common("TERM");
312 } /* }}} void sig_term_handler */
314 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
316   config_flush_at_shutdown = 1;
317   sig_common("USR1");
318 } /* }}} void sig_usr1_handler */
320 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
322   config_flush_at_shutdown = 0;
323   sig_common("USR2");
324 } /* }}} void sig_usr2_handler */
326 static void install_signal_handlers(void) /* {{{ */
328   /* These structures are static, because `sigaction' behaves weird if the are
329    * overwritten.. */
330   static struct sigaction sa_int;
331   static struct sigaction sa_term;
332   static struct sigaction sa_pipe;
333   static struct sigaction sa_usr1;
334   static struct sigaction sa_usr2;
336   /* Install signal handlers */
337   memset (&sa_int, 0, sizeof (sa_int));
338   sa_int.sa_handler = sig_int_handler;
339   sigaction (SIGINT, &sa_int, NULL);
341   memset (&sa_term, 0, sizeof (sa_term));
342   sa_term.sa_handler = sig_term_handler;
343   sigaction (SIGTERM, &sa_term, NULL);
345   memset (&sa_pipe, 0, sizeof (sa_pipe));
346   sa_pipe.sa_handler = SIG_IGN;
347   sigaction (SIGPIPE, &sa_pipe, NULL);
349   memset (&sa_pipe, 0, sizeof (sa_usr1));
350   sa_usr1.sa_handler = sig_usr1_handler;
351   sigaction (SIGUSR1, &sa_usr1, NULL);
353   memset (&sa_usr2, 0, sizeof (sa_usr2));
354   sa_usr2.sa_handler = sig_usr2_handler;
355   sigaction (SIGUSR2, &sa_usr2, NULL);
357 } /* }}} void install_signal_handlers */
359 static int open_pidfile(char *action, int oflag) /* {{{ */
361   int fd;
362   const char *file;
363   char *file_copy, *dir;
365   file = (config_pid_file != NULL)
366     ? config_pid_file
367     : LOCALSTATEDIR "/run/rrdcached.pid";
369   /* dirname may modify its argument */
370   file_copy = strdup(file);
371   if (file_copy == NULL)
372   {
373     fprintf(stderr, "rrdcached: strdup(): %s\n",
374         rrd_strerror(errno));
375     return -1;
376   }
378   dir = dirname(file_copy);
379   if (rrd_mkdir_p(dir, 0777) != 0)
380   {
381     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
382         dir, rrd_strerror(errno));
383     return -1;
384   }
386   free(file_copy);
388   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
389   if (fd < 0)
390     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
391             action, file, rrd_strerror(errno));
393   return(fd);
394 } /* }}} static int open_pidfile */
396 /* check existing pid file to see whether a daemon is running */
397 static int check_pidfile(void)
399   int pid_fd;
400   pid_t pid;
401   char pid_str[16];
403   pid_fd = open_pidfile("open", O_RDWR);
404   if (pid_fd < 0)
405     return pid_fd;
407   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
408     return -1;
410   pid = atoi(pid_str);
411   if (pid <= 0)
412     return -1;
414   /* another running process that we can signal COULD be
415    * a competing rrdcached */
416   if (pid != getpid() && kill(pid, 0) == 0)
417   {
418     fprintf(stderr,
419             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
420     close(pid_fd);
421     return -1;
422   }
424   lseek(pid_fd, 0, SEEK_SET);
425   if (ftruncate(pid_fd, 0) == -1)
426   {
427     fprintf(stderr,
428             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
429     close(pid_fd);
430     return -1;
431   }
433   fprintf(stderr,
434           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
435           "rrdcached: starting normally.\n", pid);
437   return pid_fd;
438 } /* }}} static int check_pidfile */
440 static int write_pidfile (int fd) /* {{{ */
442   pid_t pid;
443   FILE *fh;
445   pid = getpid ();
447   fh = fdopen (fd, "w");
448   if (fh == NULL)
449   {
450     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
451     close(fd);
452     return (-1);
453   }
455   fprintf (fh, "%i\n", (int) pid);
456   fclose (fh);
458   return (0);
459 } /* }}} int write_pidfile */
461 static int remove_pidfile (void) /* {{{ */
463   char *file;
464   int status;
466   file = (config_pid_file != NULL)
467     ? config_pid_file
468     : LOCALSTATEDIR "/run/rrdcached.pid";
470   status = unlink (file);
471   if (status == 0)
472     return (0);
473   return (errno);
474 } /* }}} int remove_pidfile */
476 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
478   char *eol;
480   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
481                sock->next_read - sock->next_cmd);
483   if (eol == NULL)
484   {
485     /* no commands left, move remainder back to front of rbuf */
486     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
487             sock->next_read - sock->next_cmd);
488     sock->next_read -= sock->next_cmd;
489     sock->next_cmd = 0;
490     *len = 0;
491     return NULL;
492   }
493   else
494   {
495     char *cmd = sock->rbuf + sock->next_cmd;
496     *eol = '\0';
498     sock->next_cmd = eol - sock->rbuf + 1;
500     if (eol > sock->rbuf && *(eol-1) == '\r')
501       *(--eol) = '\0'; /* handle "\r\n" EOL */
503     *len = eol - cmd;
505     return cmd;
506   }
508   /* NOTREACHED */
509   assert(1==0);
510 } /* }}} char *next_cmd */
512 /* add the characters directly to the write buffer */
513 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
515   char *new_buf;
517   assert(sock != NULL);
519   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
520   if (new_buf == NULL)
521   {
522     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
523     return -1;
524   }
526   strncpy(new_buf + sock->wbuf_len, str, len + 1);
528   sock->wbuf = new_buf;
529   sock->wbuf_len += len;
531   return 0;
532 } /* }}} static int add_to_wbuf */
534 /* add the text to the "extra" info that's sent after the status line */
535 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
537   va_list argp;
538   char buffer[CMD_MAX];
539   int len;
541   if (JOURNAL_REPLAY(sock)) return 0;
542   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
544   va_start(argp, fmt);
545 #ifdef HAVE_VSNPRINTF
546   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
547 #else
548   len = vsprintf(buffer, fmt, argp);
549 #endif
550   va_end(argp);
551   if (len < 0)
552   {
553     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
554     return -1;
555   }
557   return add_to_wbuf(sock, buffer, len);
558 } /* }}} static int add_response_info */
560 static int count_lines(char *str) /* {{{ */
562   int lines = 0;
564   if (str != NULL)
565   {
566     while ((str = strchr(str, '\n')) != NULL)
567     {
568       ++lines;
569       ++str;
570     }
571   }
573   return lines;
574 } /* }}} static int count_lines */
576 /* send the response back to the user.
577  * returns 0 on success, -1 on error
578  * write buffer is always zeroed after this call */
579 static int send_response (listen_socket_t *sock, response_code rc,
580                           char *fmt, ...) /* {{{ */
582   va_list argp;
583   char buffer[CMD_MAX];
584   int lines;
585   ssize_t wrote;
586   int rclen, len;
588   if (JOURNAL_REPLAY(sock)) return rc;
590   if (sock->batch_start)
591   {
592     if (rc == RESP_OK)
593       return rc; /* no response on success during BATCH */
594     lines = sock->batch_cmd;
595   }
596   else if (rc == RESP_OK)
597     lines = count_lines(sock->wbuf);
598   else
599     lines = -1;
601   rclen = sprintf(buffer, "%d ", lines);
602   va_start(argp, fmt);
603 #ifdef HAVE_VSNPRINTF
604   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
605 #else
606   len = vsprintf(buffer+rclen, fmt, argp);
607 #endif
608   va_end(argp);
609   if (len < 0)
610     return -1;
612   len += rclen;
614   /* append the result to the wbuf, don't write to the user */
615   if (sock->batch_start)
616     return add_to_wbuf(sock, buffer, len);
618   /* first write must be complete */
619   if (len != write(sock->fd, buffer, len))
620   {
621     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
622     return -1;
623   }
625   if (sock->wbuf != NULL && rc == RESP_OK)
626   {
627     wrote = 0;
628     while (wrote < sock->wbuf_len)
629     {
630       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
631       if (wb <= 0)
632       {
633         RRDD_LOG(LOG_INFO, "send_response: could not write results");
634         return -1;
635       }
636       wrote += wb;
637     }
638   }
640   free(sock->wbuf); sock->wbuf = NULL;
641   sock->wbuf_len = 0;
643   return 0;
644 } /* }}} */
646 static void wipe_ci_values(cache_item_t *ci, time_t when)
648   ci->values = NULL;
649   ci->values_num = 0;
651   ci->last_flush_time = when;
652   if (config_write_jitter > 0)
653     ci->last_flush_time += (rrd_random() % config_write_jitter);
656 /* remove_from_queue
657  * remove a "cache_item_t" item from the queue.
658  * must hold 'cache_lock' when calling this
659  */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
662   if (ci == NULL) return;
663   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
665   if (ci->prev == NULL)
666     cache_queue_head = ci->next; /* reset head */
667   else
668     ci->prev->next = ci->next;
670   if (ci->next == NULL)
671     cache_queue_tail = ci->prev; /* reset the tail */
672   else
673     ci->next->prev = ci->prev;
675   ci->next = ci->prev = NULL;
676   ci->flags &= ~CI_FLAGS_IN_QUEUE;
678   pthread_mutex_lock (&stats_lock);
679   assert (stats_queue_length > 0);
680   stats_queue_length--;
681   pthread_mutex_unlock (&stats_lock);
683 } /* }}} static void remove_from_queue */
685 /* free the resources associated with the cache_item_t
686  * must hold cache_lock when calling this function
687  */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
690   if (ci == NULL) return NULL;
692   remove_from_queue(ci);
694   for (size_t i=0; i < ci->values_num; i++)
695     free(ci->values[i]);
697   free (ci->values);
698   free (ci->file);
700   /* in case anyone is waiting */
701   pthread_cond_broadcast(&ci->flushed);
702   pthread_cond_destroy(&ci->flushed);
704   free (ci);
706   return NULL;
707 } /* }}} static void *free_cache_item */
709 /*
710  * enqueue_cache_item:
711  * `cache_lock' must be acquired before calling this function!
712  */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714     queue_side_t side)
716   if (ci == NULL)
717     return (-1);
719   if (ci->values_num == 0)
720     return (0);
722   if (side == HEAD)
723   {
724     if (cache_queue_head == ci)
725       return 0;
727     /* remove if further down in queue */
728     remove_from_queue(ci);
730     ci->prev = NULL;
731     ci->next = cache_queue_head;
732     if (ci->next != NULL)
733       ci->next->prev = ci;
734     cache_queue_head = ci;
736     if (cache_queue_tail == NULL)
737       cache_queue_tail = cache_queue_head;
738   }
739   else /* (side == TAIL) */
740   {
741     /* We don't move values back in the list.. */
742     if (ci->flags & CI_FLAGS_IN_QUEUE)
743       return (0);
745     assert (ci->next == NULL);
746     assert (ci->prev == NULL);
748     ci->prev = cache_queue_tail;
750     if (cache_queue_tail == NULL)
751       cache_queue_head = ci;
752     else
753       cache_queue_tail->next = ci;
755     cache_queue_tail = ci;
756   }
758   ci->flags |= CI_FLAGS_IN_QUEUE;
760   pthread_cond_signal(&queue_cond);
761   pthread_mutex_lock (&stats_lock);
762   stats_queue_length++;
763   pthread_mutex_unlock (&stats_lock);
765   return (0);
766 } /* }}} int enqueue_cache_item */
768 /*
769  * tree_callback_flush:
770  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771  * while this is in progress.
772  */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774     gpointer data)
776   cache_item_t *ci;
777   callback_flush_data_t *cfd;
779   ci = (cache_item_t *) value;
780   cfd = (callback_flush_data_t *) data;
782   if (ci->flags & CI_FLAGS_IN_QUEUE)
783     return FALSE;
785   if (ci->values_num > 0
786       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787   {
788     enqueue_cache_item (ci, TAIL);
789   }
790   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791       && (ci->values_num <= 0))
792   {
793     assert ((char *) key == ci->file);
794     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795     {
796       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797       return (FALSE);
798     }
799   }
801   return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
804 static int flush_old_values (int max_age)
806   callback_flush_data_t cfd;
807   size_t k;
809   memset (&cfd, 0, sizeof (cfd));
810   /* Pass the current time as user data so that we don't need to call
811    * `time' for each node. */
812   cfd.now = time (NULL);
813   cfd.keys = NULL;
814   cfd.keys_num = 0;
816   if (max_age > 0)
817     cfd.abs_timeout = cfd.now - max_age;
818   else
819     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
821   /* `tree_callback_flush' will return the keys of all values that haven't
822    * been touched in the last `config_flush_interval' seconds in `cfd'.
823    * The char*'s in this array point to the same memory as ci->file, so we
824    * don't need to free them separately. */
825   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
827   for (k = 0; k < cfd.keys_num; k++)
828   {
829     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830     /* should never fail, since we have held the cache_lock
831      * the entire time */
832     assert(status == TRUE);
833   }
835   if (cfd.keys != NULL)
836   {
837     free (cfd.keys);
838     cfd.keys = NULL;
839   }
841   return (0);
842 } /* int flush_old_values */
844 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
846   struct timeval now;
847   struct timespec next_flush;
848   int status;
850   gettimeofday (&now, NULL);
851   next_flush.tv_sec = now.tv_sec + config_flush_interval;
852   next_flush.tv_nsec = 1000 * now.tv_usec;
854   pthread_mutex_lock(&cache_lock);
856   while (state == RUNNING)
857   {
858     gettimeofday (&now, NULL);
859     if ((now.tv_sec > next_flush.tv_sec)
860         || ((now.tv_sec == next_flush.tv_sec)
861           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862     {
863       RRDD_LOG(LOG_DEBUG, "flushing old values");
865       /* Determine the time of the next cache flush. */
866       next_flush.tv_sec = now.tv_sec + config_flush_interval;
868       /* Flush all values that haven't been written in the last
869        * `config_write_interval' seconds. */
870       flush_old_values (config_write_interval);
872       /* unlock the cache while we rotate so we don't block incoming
873        * updates if the fsync() blocks on disk I/O */
874       pthread_mutex_unlock(&cache_lock);
875       journal_rotate();
876       pthread_mutex_lock(&cache_lock);
877     }
879     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880     if (status != 0 && status != ETIMEDOUT)
881     {
882       RRDD_LOG (LOG_ERR, "flush_thread_main: "
883                 "pthread_cond_timedwait returned %i.", status);
884     }
885   }
887   if (config_flush_at_shutdown)
888     flush_old_values (-1); /* flush everything */
890   state = SHUTDOWN;
892   pthread_mutex_unlock(&cache_lock);
894   return NULL;
895 } /* void *flush_thread_main */
897 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
899   pthread_mutex_lock (&cache_lock);
901   while (state != SHUTDOWN
902          || (cache_queue_head != NULL && config_flush_at_shutdown))
903   {
904     cache_item_t *ci;
905     char *file;
906     char **values;
907     size_t values_num;
908     int status;
910     /* Now, check if there's something to store away. If not, wait until
911      * something comes in. */
912     if (cache_queue_head == NULL)
913     {
914       status = pthread_cond_wait (&queue_cond, &cache_lock);
915       if ((status != 0) && (status != ETIMEDOUT))
916       {
917         RRDD_LOG (LOG_ERR, "queue_thread_main: "
918             "pthread_cond_wait returned %i.", status);
919       }
920     }
922     /* Check if a value has arrived. This may be NULL if we timed out or there
923      * was an interrupt such as a signal. */
924     if (cache_queue_head == NULL)
925       continue;
927     ci = cache_queue_head;
929     /* copy the relevant parts */
930     file = strdup (ci->file);
931     if (file == NULL)
932     {
933       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934       continue;
935     }
937     assert(ci->values != NULL);
938     assert(ci->values_num > 0);
940     values = ci->values;
941     values_num = ci->values_num;
943     wipe_ci_values(ci, time(NULL));
944     remove_from_queue(ci);
946     pthread_mutex_unlock (&cache_lock);
948     rrd_clear_error ();
949     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950     if (status != 0)
951     {
952       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953           "rrd_update_r (%s) failed with status %i. (%s)",
954           file, status, rrd_get_error());
955     }
957     journal_write("wrote", file);
959     /* Search again in the tree.  It's possible someone issued a "FORGET"
960      * while we were writing the update values. */
961     pthread_mutex_lock(&cache_lock);
962     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963     if (ci)
964       pthread_cond_broadcast(&ci->flushed);
965     pthread_mutex_unlock(&cache_lock);
967     if (status == 0)
968     {
969       pthread_mutex_lock (&stats_lock);
970       stats_updates_written++;
971       stats_data_sets_written += values_num;
972       pthread_mutex_unlock (&stats_lock);
973     }
975     rrd_free_ptrs((void ***) &values, &values_num);
976     free(file);
978     pthread_mutex_lock (&cache_lock);
979   }
980   pthread_mutex_unlock (&cache_lock);
982   return (NULL);
983 } /* }}} void *queue_thread_main */
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986     size_t *buffer_size_ret, char **field_ret)
988   char *buffer;
989   size_t buffer_pos;
990   size_t buffer_size;
991   char *field;
992   size_t field_size;
993   int status;
995   buffer = *buffer_ret;
996   buffer_pos = 0;
997   buffer_size = *buffer_size_ret;
998   field = *buffer_ret;
999   field_size = 0;
1001   if (buffer_size <= 0)
1002     return (-1);
1004   /* This is ensured by `handle_request'. */
1005   assert (buffer[buffer_size - 1] == '\0');
1007   status = -1;
1008   while (buffer_pos < buffer_size)
1009   {
1010     /* Check for end-of-field or end-of-buffer */
1011     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012     {
1013       field[field_size] = 0;
1014       field_size++;
1015       buffer_pos++;
1016       status = 0;
1017       break;
1018     }
1019     /* Handle escaped characters. */
1020     else if (buffer[buffer_pos] == '\\')
1021     {
1022       if (buffer_pos >= (buffer_size - 1))
1023         break;
1024       buffer_pos++;
1025       field[field_size] = buffer[buffer_pos];
1026       field_size++;
1027       buffer_pos++;
1028     }
1029     /* Normal operation */ 
1030     else
1031     {
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036   } /* while (buffer_pos < buffer_size) */
1038   if (status != 0)
1039     return (status);
1041   *buffer_ret = buffer + buffer_pos;
1042   *buffer_size_ret = buffer_size - buffer_pos;
1043   *field_ret = field;
1045   return (0);
1046 } /* }}} int buffer_get_field */
1048 /* if we're restricting writes to the base directory,
1049  * check whether the file falls within the dir
1050  * returns 1 if OK, otherwise 0
1051  */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1054   assert(file != NULL);
1056   if (!config_write_base_only
1057       || JOURNAL_REPLAY(sock)
1058       || config_base_dir == NULL)
1059     return 1;
1061   if (strstr(file, "../") != NULL) goto err;
1063   /* relative paths without "../" are ok */
1064   if (*file != '/') return 1;
1066   /* file must be of the format base + "/" + <1+ char filename> */
1067   if (strlen(file) < _config_base_dir_len + 2) goto err;
1068   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069   if (*(file + _config_base_dir_len) != '/') goto err;
1071   return 1;
1073 err:
1074   if (sock != NULL && sock->fd >= 0)
1075     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1077   return 0;
1078 } /* }}} static int check_file_access */
1080 /* when using a base dir, convert relative paths to absolute paths.
1081  * if necessary, modifies the "filename" pointer to point
1082  * to the new path created in "tmp".  "tmp" is provided
1083  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084  *
1085  * this allows us to optimize for the expected case (absolute path)
1086  * with a no-op.
1087  */
1088 static void get_abs_path(char **filename, char *tmp)
1090   assert(tmp != NULL);
1091   assert(filename != NULL && *filename != NULL);
1093   if (config_base_dir == NULL || **filename == '/')
1094     return;
1096   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097   *filename = tmp;
1098 } /* }}} static int get_abs_path */
1100 static int flush_file (const char *filename) /* {{{ */
1102   cache_item_t *ci;
1104   pthread_mutex_lock (&cache_lock);
1106   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107   if (ci == NULL)
1108   {
1109     pthread_mutex_unlock (&cache_lock);
1110     return (ENOENT);
1111   }
1113   if (ci->values_num > 0)
1114   {
1115     /* Enqueue at head */
1116     enqueue_cache_item (ci, HEAD);
1117     pthread_cond_wait(&ci->flushed, &cache_lock);
1118   }
1120   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1121    * may have been purged during our cond_wait() */
1123   pthread_mutex_unlock(&cache_lock);
1125   return (0);
1126 } /* }}} int flush_file */
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1130   char *err = "Syntax error.\n";
1132   if (cmd && cmd->syntax)
1133     err = cmd->syntax;
1135   return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1140   uint64_t copy_queue_length;
1141   uint64_t copy_updates_received;
1142   uint64_t copy_flush_received;
1143   uint64_t copy_updates_written;
1144   uint64_t copy_data_sets_written;
1145   uint64_t copy_journal_bytes;
1146   uint64_t copy_journal_rotate;
1148   uint64_t tree_nodes_number;
1149   uint64_t tree_depth;
1151   pthread_mutex_lock (&stats_lock);
1152   copy_queue_length       = stats_queue_length;
1153   copy_updates_received   = stats_updates_received;
1154   copy_flush_received     = stats_flush_received;
1155   copy_updates_written    = stats_updates_written;
1156   copy_data_sets_written  = stats_data_sets_written;
1157   copy_journal_bytes      = stats_journal_bytes;
1158   copy_journal_rotate     = stats_journal_rotate;
1159   pthread_mutex_unlock (&stats_lock);
1161   pthread_mutex_lock (&cache_lock);
1162   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1164   pthread_mutex_unlock (&cache_lock);
1166   add_response_info(sock,
1167                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1168   add_response_info(sock,
1169                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170   add_response_info(sock,
1171                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172   add_response_info(sock,
1173                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174   add_response_info(sock,
1175                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1181   send_response(sock, RESP_OK, "Statistics follow\n");
1183   return (0);
1184 } /* }}} int handle_request_stats */
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1188   char *file, file_tmp[PATH_MAX];
1189   int status;
1191   status = buffer_get_field (&buffer, &buffer_size, &file);
1192   if (status != 0)
1193   {
1194     return syntax_error(sock,cmd);
1195   }
1196   else
1197   {
1198     pthread_mutex_lock(&stats_lock);
1199     stats_flush_received++;
1200     pthread_mutex_unlock(&stats_lock);
1202     get_abs_path(&file, file_tmp);
1203     if (!check_file_access(file, sock)) return 0;
1205     status = flush_file (file);
1206     if (status == 0)
1207       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208     else if (status == ENOENT)
1209     {
1210       /* no file in our tree; see whether it exists at all */
1211       struct stat statbuf;
1213       memset(&statbuf, 0, sizeof(statbuf));
1214       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216       else
1217         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218     }
1219     else if (status < 0)
1220       return send_response(sock, RESP_ERR, "Internal error.\n");
1221     else
1222       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223   }
1225   /* NOTREACHED */
1226   assert(1==0);
1227 } /* }}} int handle_request_flush */
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1231   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1233   pthread_mutex_lock(&cache_lock);
1234   flush_old_values(-1);
1235   pthread_mutex_unlock(&cache_lock);
1237   return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1242   int status;
1243   char *file, file_tmp[PATH_MAX];
1244   cache_item_t *ci;
1246   status = buffer_get_field(&buffer, &buffer_size, &file);
1247   if (status != 0)
1248     return syntax_error(sock,cmd);
1250   get_abs_path(&file, file_tmp);
1252   pthread_mutex_lock(&cache_lock);
1253   ci = g_tree_lookup(cache_tree, file);
1254   if (ci == NULL)
1255   {
1256     pthread_mutex_unlock(&cache_lock);
1257     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258   }
1260   for (size_t i=0; i < ci->values_num; i++)
1261     add_response_info(sock, "%s\n", ci->values[i]);
1263   pthread_mutex_unlock(&cache_lock);
1264   return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1269   int status;
1270   gboolean found;
1271   char *file, file_tmp[PATH_MAX];
1273   status = buffer_get_field(&buffer, &buffer_size, &file);
1274   if (status != 0)
1275     return syntax_error(sock,cmd);
1277   get_abs_path(&file, file_tmp);
1278   if (!check_file_access(file, sock)) return 0;
1280   pthread_mutex_lock(&cache_lock);
1281   found = g_tree_remove(cache_tree, file);
1282   pthread_mutex_unlock(&cache_lock);
1284   if (found == TRUE)
1285   {
1286     if (!JOURNAL_REPLAY(sock))
1287       journal_write("forget", file);
1289     return send_response(sock, RESP_OK, "Gone!\n");
1290   }
1291   else
1292     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1294   /* NOTREACHED */
1295   assert(1==0);
1296 } /* }}} static int handle_request_forget */
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1300   cache_item_t *ci;
1302   pthread_mutex_lock(&cache_lock);
1304   ci = cache_queue_head;
1305   while (ci != NULL)
1306   {
1307     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308     ci = ci->next;
1309   }
1311   pthread_mutex_unlock(&cache_lock);
1313   return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1318   char *file, file_tmp[PATH_MAX];
1319   int values_num = 0;
1320   int status;
1321   char orig_buf[CMD_MAX];
1323   cache_item_t *ci;
1325   /* save it for the journal later */
1326   if (!JOURNAL_REPLAY(sock))
1327     strncpy(orig_buf, buffer, buffer_size);
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return syntax_error(sock,cmd);
1333   pthread_mutex_lock(&stats_lock);
1334   stats_updates_received++;
1335   pthread_mutex_unlock(&stats_lock);
1337   get_abs_path(&file, file_tmp);
1338   if (!check_file_access(file, sock)) return 0;
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346     cache_item_t *tmp;
1348     /* don't hold the lock while we setup; stat(2) might block */
1349     pthread_mutex_unlock(&cache_lock);
1351     memset (&statbuf, 0, sizeof (statbuf));
1352     status = stat (file, &statbuf);
1353     if (status != 0)
1354     {
1355       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1357       status = errno;
1358       if (status == ENOENT)
1359         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360       else
1361         return send_response(sock, RESP_ERR,
1362                              "stat failed with error %i.\n", status);
1363     }
1364     if (!S_ISREG (statbuf.st_mode))
1365       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1367     if (access(file, R_OK|W_OK) != 0)
1368       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369                            file, rrd_strerror(errno));
1371     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372     if (ci == NULL)
1373     {
1374       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1376       return send_response(sock, RESP_ERR, "malloc failed.\n");
1377     }
1378     memset (ci, 0, sizeof (cache_item_t));
1380     ci->file = strdup (file);
1381     if (ci->file == NULL)
1382     {
1383       free (ci);
1384       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1386       return send_response(sock, RESP_ERR, "strdup failed.\n");
1387     }
1389     wipe_ci_values(ci, now);
1390     ci->flags = CI_FLAGS_IN_TREE;
1391     pthread_cond_init(&ci->flushed, NULL);
1393     pthread_mutex_lock(&cache_lock);
1395     /* another UPDATE might have added this entry in the meantime */
1396     tmp = g_tree_lookup (cache_tree, file);
1397     if (tmp == NULL)
1398       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399     else
1400     {
1401       free_cache_item (ci);
1402       ci = tmp;
1403     }
1405     /* state may have changed while we were unlocked */
1406     if (state == SHUTDOWN)
1407       return -1;
1408   } /* }}} */
1409   assert (ci != NULL);
1411   /* don't re-write updates in replay mode */
1412   if (!JOURNAL_REPLAY(sock))
1413     journal_write("update", orig_buf);
1415   while (buffer_size > 0)
1416   {
1417     char *value;
1418     time_t stamp;
1419     char *eostamp;
1421     status = buffer_get_field (&buffer, &buffer_size, &value);
1422     if (status != 0)
1423     {
1424       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425       break;
1426     }
1428     /* make sure update time is always moving forward */
1429     stamp = strtol(value, &eostamp, 10);
1430     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1431     {
1432       pthread_mutex_unlock(&cache_lock);
1433       return send_response(sock, RESP_ERR,
1434                            "Cannot find timestamp in '%s'!\n", value);
1435     }
1436     else if (stamp <= ci->last_update_stamp)
1437     {
1438       pthread_mutex_unlock(&cache_lock);
1439       return send_response(sock, RESP_ERR,
1440                            "illegal attempt to update using time %ld when last"
1441                            " update time is %ld (minimum one second step)\n",
1442                            stamp, ci->last_update_stamp);
1443     }
1444     else
1445       ci->last_update_stamp = stamp;
1447     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1448     {
1449       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1450       continue;
1451     }
1453     values_num++;
1454   }
1456   if (((now - ci->last_flush_time) >= config_write_interval)
1457       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1458       && (ci->values_num > 0))
1459   {
1460     enqueue_cache_item (ci, TAIL);
1461   }
1463   pthread_mutex_unlock (&cache_lock);
1465   if (values_num < 1)
1466     return send_response(sock, RESP_ERR, "No values updated.\n");
1467   else
1468     return send_response(sock, RESP_OK,
1469                          "errors, enqueued %i value(s).\n", values_num);
1471   /* NOTREACHED */
1472   assert(1==0);
1474 } /* }}} int handle_request_update */
1476 /* we came across a "WROTE" entry during journal replay.
1477  * throw away any values that we have accumulated for this file
1478  */
1479 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1481   cache_item_t *ci;
1482   const char *file = buffer;
1484   pthread_mutex_lock(&cache_lock);
1486   ci = g_tree_lookup(cache_tree, file);
1487   if (ci == NULL)
1488   {
1489     pthread_mutex_unlock(&cache_lock);
1490     return (0);
1491   }
1493   if (ci->values)
1494     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1496   wipe_ci_values(ci, now);
1497   remove_from_queue(ci);
1499   pthread_mutex_unlock(&cache_lock);
1500   return (0);
1501 } /* }}} int handle_request_wrote */
1503 /* start "BATCH" processing */
1504 static int batch_start (HANDLER_PROTO) /* {{{ */
1506   int status;
1507   if (sock->batch_start)
1508     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1510   status = send_response(sock, RESP_OK,
1511                          "Go ahead.  End with dot '.' on its own line.\n");
1512   sock->batch_start = time(NULL);
1513   sock->batch_cmd = 0;
1515   return status;
1516 } /* }}} static int batch_start */
1518 /* finish "BATCH" processing and return results to the client */
1519 static int batch_done (HANDLER_PROTO) /* {{{ */
1521   assert(sock->batch_start);
1522   sock->batch_start = 0;
1523   sock->batch_cmd  = 0;
1524   return send_response(sock, RESP_OK, "errors\n");
1525 } /* }}} static int batch_done */
1527 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1529   return -1;
1530 } /* }}} static int handle_request_quit */
1532 static command_t list_of_commands[] = { /* {{{ */
1533   {
1534     "UPDATE",
1535     handle_request_update,
1536     CMD_CONTEXT_ANY,
1537     "UPDATE <filename> <values> [<values> ...]\n"
1538     ,
1539     "Adds the given file to the internal cache if it is not yet known and\n"
1540     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1541     "for details.\n"
1542     "\n"
1543     "Each <values> has the following form:\n"
1544     "  <values> = <time>:<value>[:<value>[...]]\n"
1545     "See the rrdupdate(1) manpage for details.\n"
1546   },
1547   {
1548     "WROTE",
1549     handle_request_wrote,
1550     CMD_CONTEXT_JOURNAL,
1551     NULL,
1552     NULL
1553   },
1554   {
1555     "FLUSH",
1556     handle_request_flush,
1557     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1558     "FLUSH <filename>\n"
1559     ,
1560     "Adds the given filename to the head of the update queue and returns\n"
1561     "after it has been dequeued.\n"
1562   },
1563   {
1564     "FLUSHALL",
1565     handle_request_flushall,
1566     CMD_CONTEXT_CLIENT,
1567     "FLUSHALL\n"
1568     ,
1569     "Triggers writing of all pending updates.  Returns immediately.\n"
1570   },
1571   {
1572     "PENDING",
1573     handle_request_pending,
1574     CMD_CONTEXT_CLIENT,
1575     "PENDING <filename>\n"
1576     ,
1577     "Shows any 'pending' updates for a file, in order.\n"
1578     "The updates shown have not yet been written to the underlying RRD file.\n"
1579   },
1580   {
1581     "FORGET",
1582     handle_request_forget,
1583     CMD_CONTEXT_ANY,
1584     "FORGET <filename>\n"
1585     ,
1586     "Removes the file completely from the cache.\n"
1587     "Any pending updates for the file will be lost.\n"
1588   },
1589   {
1590     "QUEUE",
1591     handle_request_queue,
1592     CMD_CONTEXT_CLIENT,
1593     "QUEUE\n"
1594     ,
1595         "Shows all files in the output queue.\n"
1596     "The output is zero or more lines in the following format:\n"
1597     "(where <num_vals> is the number of values to be written)\n"
1598     "\n"
1599     "<num_vals> <filename>\n"
1600   },
1601   {
1602     "STATS",
1603     handle_request_stats,
1604     CMD_CONTEXT_CLIENT,
1605     "STATS\n"
1606     ,
1607     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1608     "a description of the values.\n"
1609   },
1610   {
1611     "HELP",
1612     handle_request_help,
1613     CMD_CONTEXT_CLIENT,
1614     "HELP [<command>]\n",
1615     NULL, /* special! */
1616   },
1617   {
1618     "BATCH",
1619     batch_start,
1620     CMD_CONTEXT_CLIENT,
1621     "BATCH\n"
1622     ,
1623     "The 'BATCH' command permits the client to initiate a bulk load\n"
1624     "   of commands to rrdcached.\n"
1625     "\n"
1626     "Usage:\n"
1627     "\n"
1628     "    client: BATCH\n"
1629     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1630     "    client: command #1\n"
1631     "    client: command #2\n"
1632     "    client: ... and so on\n"
1633     "    client: .\n"
1634     "    server: 2 errors\n"
1635     "    server: 7 message for command #7\n"
1636     "    server: 9 message for command #9\n"
1637     "\n"
1638     "For more information, consult the rrdcached(1) documentation.\n"
1639   },
1640   {
1641     ".",   /* BATCH terminator */
1642     batch_done,
1643     CMD_CONTEXT_BATCH,
1644     NULL,
1645     NULL
1646   },
1647   {
1648     "QUIT",
1649     handle_request_quit,
1650     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1651     "QUIT\n"
1652     ,
1653     "Disconnect from rrdcached.\n"
1654   }
1655 }; /* }}} command_t list_of_commands[] */
1656 static size_t list_of_commands_len = sizeof (list_of_commands)
1657   / sizeof (list_of_commands[0]);
1659 static command_t *find_command(char *cmd)
1661   size_t i;
1663   for (i = 0; i < list_of_commands_len; i++)
1664     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1665       return (&list_of_commands[i]);
1666   return NULL;
1669 /* We currently use the index in the `list_of_commands' array as a bit position
1670  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1671  * outside these functions so that switching to a more elegant storage method
1672  * is easily possible. */
1673 static ssize_t find_command_index (const char *cmd) /* {{{ */
1675   size_t i;
1677   for (i = 0; i < list_of_commands_len; i++)
1678     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1679       return ((ssize_t) i);
1680   return (-1);
1681 } /* }}} ssize_t find_command_index */
1683 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1684     const char *cmd)
1686   ssize_t i;
1688   if (JOURNAL_REPLAY(sock))
1689     return (1);
1691   if (cmd == NULL)
1692     return (-1);
1694   if ((strcasecmp ("QUIT", cmd) == 0)
1695       || (strcasecmp ("HELP", cmd) == 0))
1696     return (1);
1697   else if (strcmp (".", cmd) == 0)
1698     cmd = "BATCH";
1700   i = find_command_index (cmd);
1701   if (i < 0)
1702     return (-1);
1703   assert (i < 32);
1705   if ((sock->permissions & (1 << i)) != 0)
1706     return (1);
1707   return (0);
1708 } /* }}} int socket_permission_check */
1710 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1711     const char *cmd)
1713   ssize_t i;
1715   i = find_command_index (cmd);
1716   if (i < 0)
1717     return (-1);
1718   assert (i < 32);
1720   sock->permissions |= (1 << i);
1721   return (0);
1722 } /* }}} int socket_permission_add */
1724 /* check whether commands are received in the expected context */
1725 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1727   if (JOURNAL_REPLAY(sock))
1728     return (cmd->context & CMD_CONTEXT_JOURNAL);
1729   else if (sock->batch_start)
1730     return (cmd->context & CMD_CONTEXT_BATCH);
1731   else
1732     return (cmd->context & CMD_CONTEXT_CLIENT);
1734   /* NOTREACHED */
1735   assert(1==0);
1738 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1740   int status;
1741   char *cmd_str;
1742   char *resp_txt;
1743   command_t *help = NULL;
1745   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1746   if (status == 0)
1747     help = find_command(cmd_str);
1749   if (help && (help->syntax || help->help))
1750   {
1751     char tmp[CMD_MAX];
1753     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1754     resp_txt = tmp;
1756     if (help->syntax)
1757       add_response_info(sock, "Usage: %s\n", help->syntax);
1759     if (help->help)
1760       add_response_info(sock, "%s\n", help->help);
1761   }
1762   else
1763   {
1764     size_t i;
1766     resp_txt = "Command overview\n";
1768     for (i = 0; i < list_of_commands_len; i++)
1769     {
1770       if (list_of_commands[i].syntax == NULL)
1771         continue;
1772       add_response_info (sock, "%s", list_of_commands[i].syntax);
1773     }
1774   }
1776   return send_response(sock, RESP_OK, resp_txt);
1777 } /* }}} int handle_request_help */
1779 static int handle_request (DISPATCH_PROTO) /* {{{ */
1781   char *buffer_ptr = buffer;
1782   char *cmd_str = NULL;
1783   command_t *cmd = NULL;
1784   int status;
1786   assert (buffer[buffer_size - 1] == '\0');
1788   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1789   if (status != 0)
1790   {
1791     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1792     return (-1);
1793   }
1795   if (sock != NULL && sock->batch_start)
1796     sock->batch_cmd++;
1798   cmd = find_command(cmd_str);
1799   if (!cmd)
1800     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1802   if (!socket_permission_check (sock, cmd->cmd))
1803     return send_response(sock, RESP_ERR, "Permission denied.\n");
1805   if (!command_check_context(sock, cmd))
1806     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1808   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1809 } /* }}} int handle_request */
1811 static void journal_set_free (journal_set *js) /* {{{ */
1813   if (js == NULL)
1814     return;
1816   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1818   free(js);
1819 } /* }}} journal_set_free */
1821 static void journal_set_remove (journal_set *js) /* {{{ */
1823   if (js == NULL)
1824     return;
1826   for (uint i=0; i < js->files_num; i++)
1827   {
1828     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1829     unlink(js->files[i]);
1830   }
1831 } /* }}} journal_set_remove */
1833 /* close current journal file handle.
1834  * MUST hold journal_lock before calling */
1835 static void journal_close(void) /* {{{ */
1837   if (journal_fh != NULL)
1838   {
1839     if (fclose(journal_fh) != 0)
1840       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1841   }
1843   journal_fh = NULL;
1844   journal_size = 0;
1845 } /* }}} journal_close */
1847 /* MUST hold journal_lock before calling */
1848 static void journal_new_file(void) /* {{{ */
1850   struct timeval now;
1851   int  new_fd;
1852   char new_file[PATH_MAX + 1];
1854   assert(journal_dir != NULL);
1855   assert(journal_cur != NULL);
1857   journal_close();
1859   gettimeofday(&now, NULL);
1860   /* this format assures that the files sort in strcmp() order */
1861   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1862            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1864   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1865                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1866   if (new_fd < 0)
1867     goto error;
1869   journal_fh = fdopen(new_fd, "a");
1870   if (journal_fh == NULL)
1871     goto error;
1873   journal_size = ftell(journal_fh);
1874   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1876   /* record the file in the journal set */
1877   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1879   return;
1881 error:
1882   RRDD_LOG(LOG_CRIT,
1883            "JOURNALING DISABLED: Error while trying to create %s : %s",
1884            new_file, rrd_strerror(errno));
1885   RRDD_LOG(LOG_CRIT,
1886            "JOURNALING DISABLED: All values will be flushed at shutdown");
1888   close(new_fd);
1889   config_flush_at_shutdown = 1;
1891 } /* }}} journal_new_file */
1893 /* MUST NOT hold journal_lock before calling this */
1894 static void journal_rotate(void) /* {{{ */
1896   journal_set *old_js = NULL;
1898   if (journal_dir == NULL)
1899     return;
1901   RRDD_LOG(LOG_DEBUG, "rotating journals");
1903   pthread_mutex_lock(&stats_lock);
1904   ++stats_journal_rotate;
1905   pthread_mutex_unlock(&stats_lock);
1907   pthread_mutex_lock(&journal_lock);
1909   journal_close();
1911   /* rotate the journal sets */
1912   old_js = journal_old;
1913   journal_old = journal_cur;
1914   journal_cur = calloc(1, sizeof(journal_set));
1916   if (journal_cur != NULL)
1917     journal_new_file();
1918   else
1919     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1921   pthread_mutex_unlock(&journal_lock);
1923   journal_set_remove(old_js);
1924   journal_set_free  (old_js);
1926 } /* }}} static void journal_rotate */
1928 /* MUST hold journal_lock when calling */
1929 static void journal_done(void) /* {{{ */
1931   if (journal_cur == NULL)
1932     return;
1934   journal_close();
1936   if (config_flush_at_shutdown)
1937   {
1938     RRDD_LOG(LOG_INFO, "removing journals");
1939     journal_set_remove(journal_old);
1940     journal_set_remove(journal_cur);
1941   }
1942   else
1943   {
1944     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1945              "journals will be used at next startup");
1946   }
1948   journal_set_free(journal_cur);
1949   journal_set_free(journal_old);
1950   free(journal_dir);
1952 } /* }}} static void journal_done */
1954 static int journal_write(char *cmd, char *args) /* {{{ */
1956   int chars;
1958   if (journal_fh == NULL)
1959     return 0;
1961   pthread_mutex_lock(&journal_lock);
1962   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1963   journal_size += chars;
1965   if (journal_size > JOURNAL_MAX)
1966     journal_new_file();
1968   pthread_mutex_unlock(&journal_lock);
1970   if (chars > 0)
1971   {
1972     pthread_mutex_lock(&stats_lock);
1973     stats_journal_bytes += chars;
1974     pthread_mutex_unlock(&stats_lock);
1975   }
1977   return chars;
1978 } /* }}} static int journal_write */
1980 static int journal_replay (const char *file) /* {{{ */
1982   FILE *fh;
1983   int entry_cnt = 0;
1984   int fail_cnt = 0;
1985   uint64_t line = 0;
1986   char entry[CMD_MAX];
1987   time_t now;
1989   if (file == NULL) return 0;
1991   {
1992     char *reason = "unknown error";
1993     int status = 0;
1994     struct stat statbuf;
1996     memset(&statbuf, 0, sizeof(statbuf));
1997     if (stat(file, &statbuf) != 0)
1998     {
1999       reason = "stat error";
2000       status = errno;
2001     }
2002     else if (!S_ISREG(statbuf.st_mode))
2003     {
2004       reason = "not a regular file";
2005       status = EPERM;
2006     }
2007     if (statbuf.st_uid != daemon_uid)
2008     {
2009       reason = "not owned by daemon user";
2010       status = EACCES;
2011     }
2012     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2013     {
2014       reason = "must not be user/group writable";
2015       status = EACCES;
2016     }
2018     if (status != 0)
2019     {
2020       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2021                file, rrd_strerror(status), reason);
2022       return 0;
2023     }
2024   }
2026   fh = fopen(file, "r");
2027   if (fh == NULL)
2028   {
2029     if (errno != ENOENT)
2030       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2031                file, rrd_strerror(errno));
2032     return 0;
2033   }
2034   else
2035     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2037   now = time(NULL);
2039   while(!feof(fh))
2040   {
2041     size_t entry_len;
2043     ++line;
2044     if (fgets(entry, sizeof(entry), fh) == NULL)
2045       break;
2046     entry_len = strlen(entry);
2048     /* check \n termination in case journal writing crashed mid-line */
2049     if (entry_len == 0)
2050       continue;
2051     else if (entry[entry_len - 1] != '\n')
2052     {
2053       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2054       ++fail_cnt;
2055       continue;
2056     }
2058     entry[entry_len - 1] = '\0';
2060     if (handle_request(NULL, now, entry, entry_len) == 0)
2061       ++entry_cnt;
2062     else
2063       ++fail_cnt;
2064   }
2066   fclose(fh);
2068   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2069            entry_cnt, fail_cnt);
2071   return entry_cnt > 0 ? 1 : 0;
2072 } /* }}} static int journal_replay */
2074 static int journal_sort(const void *v1, const void *v2)
2076   char **jn1 = (char **) v1;
2077   char **jn2 = (char **) v2;
2079   return strcmp(*jn1,*jn2);
2082 static void journal_init(void) /* {{{ */
2084   int had_journal = 0;
2085   DIR *dir;
2086   struct dirent *dent;
2087   char path[PATH_MAX+1];
2089   if (journal_dir == NULL) return;
2091   pthread_mutex_lock(&journal_lock);
2093   journal_cur = calloc(1, sizeof(journal_set));
2094   if (journal_cur == NULL)
2095   {
2096     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2097     return;
2098   }
2100   RRDD_LOG(LOG_INFO, "checking for journal files");
2102   /* Handle old journal files during transition.  This gives them the
2103    * correct sort order.  TODO: remove after first release
2104    */
2105   {
2106     char old_path[PATH_MAX+1];
2107     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2108     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2109     rename(old_path, path);
2111     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2112     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2113     rename(old_path, path);
2114   }
2116   dir = opendir(journal_dir);
2117   while ((dent = readdir(dir)) != NULL)
2118   {
2119     /* looks like a journal file? */
2120     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2121       continue;
2123     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2125     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2126     {
2127       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2128                dent->d_name);
2129       break;
2130     }
2131   }
2132   closedir(dir);
2134   qsort(journal_cur->files, journal_cur->files_num,
2135         sizeof(journal_cur->files[0]), journal_sort);
2137   for (uint i=0; i < journal_cur->files_num; i++)
2138     had_journal += journal_replay(journal_cur->files[i]);
2140   journal_new_file();
2142   /* it must have been a crash.  start a flush */
2143   if (had_journal && config_flush_at_shutdown)
2144     flush_old_values(-1);
2146   pthread_mutex_unlock(&journal_lock);
2148   RRDD_LOG(LOG_INFO, "journal processing complete");
2150 } /* }}} static void journal_init */
2152 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2154   assert(sock != NULL);
2156   free(sock->rbuf);  sock->rbuf = NULL;
2157   free(sock->wbuf);  sock->wbuf = NULL;
2158   free(sock);
2159 } /* }}} void free_listen_socket */
2161 static void close_connection(listen_socket_t *sock) /* {{{ */
2163   if (sock->fd >= 0)
2164   {
2165     close(sock->fd);
2166     sock->fd = -1;
2167   }
2169   free_listen_socket(sock);
2171 } /* }}} void close_connection */
2173 static void *connection_thread_main (void *args) /* {{{ */
2175   listen_socket_t *sock;
2176   int fd;
2178   sock = (listen_socket_t *) args;
2179   fd = sock->fd;
2181   /* init read buffers */
2182   sock->next_read = sock->next_cmd = 0;
2183   sock->rbuf = malloc(RBUF_SIZE);
2184   if (sock->rbuf == NULL)
2185   {
2186     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2187     close_connection(sock);
2188     return NULL;
2189   }
2191   pthread_mutex_lock (&connection_threads_lock);
2192   connection_threads_num++;
2193   pthread_mutex_unlock (&connection_threads_lock);
2195   while (state == RUNNING)
2196   {
2197     char *cmd;
2198     ssize_t cmd_len;
2199     ssize_t rbytes;
2200     time_t now;
2202     struct pollfd pollfd;
2203     int status;
2205     pollfd.fd = fd;
2206     pollfd.events = POLLIN | POLLPRI;
2207     pollfd.revents = 0;
2209     status = poll (&pollfd, 1, /* timeout = */ 500);
2210     if (state != RUNNING)
2211       break;
2212     else if (status == 0) /* timeout */
2213       continue;
2214     else if (status < 0) /* error */
2215     {
2216       status = errno;
2217       if (status != EINTR)
2218         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2219       continue;
2220     }
2222     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2223       break;
2224     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2225     {
2226       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2227           "poll(2) returned something unexpected: %#04hx",
2228           pollfd.revents);
2229       break;
2230     }
2232     rbytes = read(fd, sock->rbuf + sock->next_read,
2233                   RBUF_SIZE - sock->next_read);
2234     if (rbytes < 0)
2235     {
2236       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2237       break;
2238     }
2239     else if (rbytes == 0)
2240       break; /* eof */
2242     sock->next_read += rbytes;
2244     if (sock->batch_start)
2245       now = sock->batch_start;
2246     else
2247       now = time(NULL);
2249     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2250     {
2251       status = handle_request (sock, now, cmd, cmd_len+1);
2252       if (status != 0)
2253         goto out_close;
2254     }
2255   }
2257 out_close:
2258   close_connection(sock);
2260   /* Remove this thread from the connection threads list */
2261   pthread_mutex_lock (&connection_threads_lock);
2262   connection_threads_num--;
2263   if (connection_threads_num <= 0)
2264     pthread_cond_broadcast(&connection_threads_done);
2265   pthread_mutex_unlock (&connection_threads_lock);
2267   return (NULL);
2268 } /* }}} void *connection_thread_main */
2270 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2272   int fd;
2273   struct sockaddr_un sa;
2274   listen_socket_t *temp;
2275   int status;
2276   const char *path;
2277   char *path_copy, *dir;
2279   path = sock->addr;
2280   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2281     path += strlen("unix:");
2283   /* dirname may modify its argument */
2284   path_copy = strdup(path);
2285   if (path_copy == NULL)
2286   {
2287     fprintf(stderr, "rrdcached: strdup(): %s\n",
2288         rrd_strerror(errno));
2289     return (-1);
2290   }
2292   dir = dirname(path_copy);
2293   if (rrd_mkdir_p(dir, 0777) != 0)
2294   {
2295     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2296         dir, rrd_strerror(errno));
2297     return (-1);
2298   }
2300   free(path_copy);
2302   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2303       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2304   if (temp == NULL)
2305   {
2306     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2307     return (-1);
2308   }
2309   listen_fds = temp;
2310   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2312   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2313   if (fd < 0)
2314   {
2315     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2316              rrd_strerror(errno));
2317     return (-1);
2318   }
2320   memset (&sa, 0, sizeof (sa));
2321   sa.sun_family = AF_UNIX;
2322   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2324   /* if we've gotten this far, we own the pid file.  any daemon started
2325    * with the same args must not be alive.  therefore, ensure that we can
2326    * create the socket...
2327    */
2328   unlink(path);
2330   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2331   if (status != 0)
2332   {
2333     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2334              path, rrd_strerror(errno));
2335     close (fd);
2336     return (-1);
2337   }
2339   /* tweak the sockets group ownership */
2340   if (sock->socket_group != (gid_t)-1)
2341   {
2342     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2343          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2344     {
2345       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2346     }
2347   }
2349   if (sock->socket_permissions != (mode_t)-1)
2350   {
2351     if (chmod(path, sock->socket_permissions) != 0)
2352       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2353           (unsigned int)sock->socket_permissions, strerror(errno));
2354   }
2356   status = listen (fd, /* backlog = */ 10);
2357   if (status != 0)
2358   {
2359     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2360              path, rrd_strerror(errno));
2361     close (fd);
2362     unlink (path);
2363     return (-1);
2364   }
2366   listen_fds[listen_fds_num].fd = fd;
2367   listen_fds[listen_fds_num].family = PF_UNIX;
2368   strncpy(listen_fds[listen_fds_num].addr, path,
2369           sizeof (listen_fds[listen_fds_num].addr) - 1);
2370   listen_fds_num++;
2372   return (0);
2373 } /* }}} int open_listen_socket_unix */
2375 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2377   struct addrinfo ai_hints;
2378   struct addrinfo *ai_res;
2379   struct addrinfo *ai_ptr;
2380   char addr_copy[NI_MAXHOST];
2381   char *addr;
2382   char *port;
2383   int status;
2385   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2386   addr_copy[sizeof (addr_copy) - 1] = 0;
2387   addr = addr_copy;
2389   memset (&ai_hints, 0, sizeof (ai_hints));
2390   ai_hints.ai_flags = 0;
2391 #ifdef AI_ADDRCONFIG
2392   ai_hints.ai_flags |= AI_ADDRCONFIG;
2393 #endif
2394   ai_hints.ai_family = AF_UNSPEC;
2395   ai_hints.ai_socktype = SOCK_STREAM;
2397   port = NULL;
2398   if (*addr == '[') /* IPv6+port format */
2399   {
2400     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2401     addr++;
2403     port = strchr (addr, ']');
2404     if (port == NULL)
2405     {
2406       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2407       return (-1);
2408     }
2409     *port = 0;
2410     port++;
2412     if (*port == ':')
2413       port++;
2414     else if (*port == 0)
2415       port = NULL;
2416     else
2417     {
2418       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2419       return (-1);
2420     }
2421   } /* if (*addr == '[') */
2422   else
2423   {
2424     port = rindex(addr, ':');
2425     if (port != NULL)
2426     {
2427       *port = 0;
2428       port++;
2429     }
2430   }
2431   ai_res = NULL;
2432   status = getaddrinfo (addr,
2433                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2434                         &ai_hints, &ai_res);
2435   if (status != 0)
2436   {
2437     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2438              addr, gai_strerror (status));
2439     return (-1);
2440   }
2442   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2443   {
2444     int fd;
2445     listen_socket_t *temp;
2446     int one = 1;
2448     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2449         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2450     if (temp == NULL)
2451     {
2452       fprintf (stderr,
2453                "rrdcached: open_listen_socket_network: realloc failed.\n");
2454       continue;
2455     }
2456     listen_fds = temp;
2457     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2459     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2460     if (fd < 0)
2461     {
2462       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2463                rrd_strerror(errno));
2464       continue;
2465     }
2467     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2469     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2470     if (status != 0)
2471     {
2472       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2473                sock->addr, rrd_strerror(errno));
2474       close (fd);
2475       continue;
2476     }
2478     status = listen (fd, /* backlog = */ 10);
2479     if (status != 0)
2480     {
2481       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2482                sock->addr, rrd_strerror(errno));
2483       close (fd);
2484       freeaddrinfo(ai_res);
2485       return (-1);
2486     }
2488     listen_fds[listen_fds_num].fd = fd;
2489     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2490     listen_fds_num++;
2491   } /* for (ai_ptr) */
2493   freeaddrinfo(ai_res);
2494   return (0);
2495 } /* }}} static int open_listen_socket_network */
2497 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2499   assert(sock != NULL);
2500   assert(sock->addr != NULL);
2502   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2503       || sock->addr[0] == '/')
2504     return (open_listen_socket_unix(sock));
2505   else
2506     return (open_listen_socket_network(sock));
2507 } /* }}} int open_listen_socket */
2509 static int close_listen_sockets (void) /* {{{ */
2511   size_t i;
2513   for (i = 0; i < listen_fds_num; i++)
2514   {
2515     close (listen_fds[i].fd);
2517     if (listen_fds[i].family == PF_UNIX)
2518       unlink(listen_fds[i].addr);
2519   }
2521   free (listen_fds);
2522   listen_fds = NULL;
2523   listen_fds_num = 0;
2525   return (0);
2526 } /* }}} int close_listen_sockets */
2528 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2530   struct pollfd *pollfds;
2531   int pollfds_num;
2532   int status;
2533   int i;
2535   if (listen_fds_num < 1)
2536   {
2537     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2538     return (NULL);
2539   }
2541   pollfds_num = listen_fds_num;
2542   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2543   if (pollfds == NULL)
2544   {
2545     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2546     return (NULL);
2547   }
2548   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2550   RRDD_LOG(LOG_INFO, "listening for connections");
2552   while (state == RUNNING)
2553   {
2554     for (i = 0; i < pollfds_num; i++)
2555     {
2556       pollfds[i].fd = listen_fds[i].fd;
2557       pollfds[i].events = POLLIN | POLLPRI;
2558       pollfds[i].revents = 0;
2559     }
2561     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2562     if (state != RUNNING)
2563       break;
2564     else if (status == 0) /* timeout */
2565       continue;
2566     else if (status < 0) /* error */
2567     {
2568       status = errno;
2569       if (status != EINTR)
2570       {
2571         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2572       }
2573       continue;
2574     }
2576     for (i = 0; i < pollfds_num; i++)
2577     {
2578       listen_socket_t *client_sock;
2579       struct sockaddr_storage client_sa;
2580       socklen_t client_sa_size;
2581       pthread_t tid;
2582       pthread_attr_t attr;
2584       if (pollfds[i].revents == 0)
2585         continue;
2587       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2588       {
2589         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2590             "poll(2) returned something unexpected for listen FD #%i.",
2591             pollfds[i].fd);
2592         continue;
2593       }
2595       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2596       if (client_sock == NULL)
2597       {
2598         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2599         continue;
2600       }
2601       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2603       client_sa_size = sizeof (client_sa);
2604       client_sock->fd = accept (pollfds[i].fd,
2605           (struct sockaddr *) &client_sa, &client_sa_size);
2606       if (client_sock->fd < 0)
2607       {
2608         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2609         free(client_sock);
2610         continue;
2611       }
2613       pthread_attr_init (&attr);
2614       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2616       status = pthread_create (&tid, &attr, connection_thread_main,
2617                                client_sock);
2618       if (status != 0)
2619       {
2620         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2621         close_connection(client_sock);
2622         continue;
2623       }
2624     } /* for (pollfds_num) */
2625   } /* while (state == RUNNING) */
2627   RRDD_LOG(LOG_INFO, "starting shutdown");
2629   close_listen_sockets ();
2631   pthread_mutex_lock (&connection_threads_lock);
2632   while (connection_threads_num > 0)
2633     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2634   pthread_mutex_unlock (&connection_threads_lock);
2636   free(pollfds);
2638   return (NULL);
2639 } /* }}} void *listen_thread_main */
2641 static int daemonize (void) /* {{{ */
2643   int pid_fd;
2644   char *base_dir;
2646   daemon_uid = geteuid();
2648   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2649   if (pid_fd < 0)
2650     pid_fd = check_pidfile();
2651   if (pid_fd < 0)
2652     return pid_fd;
2654   /* open all the listen sockets */
2655   if (config_listen_address_list_len > 0)
2656   {
2657     for (size_t i = 0; i < config_listen_address_list_len; i++)
2658       open_listen_socket (config_listen_address_list[i]);
2660     rrd_free_ptrs((void ***) &config_listen_address_list,
2661                   &config_listen_address_list_len);
2662   }
2663   else
2664   {
2665     listen_socket_t sock;
2666     memset(&sock, 0, sizeof(sock));
2667     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2668     open_listen_socket (&sock);
2669   }
2671   if (listen_fds_num < 1)
2672   {
2673     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2674     goto error;
2675   }
2677   if (!stay_foreground)
2678   {
2679     pid_t child;
2681     child = fork ();
2682     if (child < 0)
2683     {
2684       fprintf (stderr, "daemonize: fork(2) failed.\n");
2685       goto error;
2686     }
2687     else if (child > 0)
2688       exit(0);
2690     /* Become session leader */
2691     setsid ();
2693     /* Open the first three file descriptors to /dev/null */
2694     close (2);
2695     close (1);
2696     close (0);
2698     open ("/dev/null", O_RDWR);
2699     if (dup(0) == -1 || dup(0) == -1){
2700         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2701     }
2702   } /* if (!stay_foreground) */
2704   /* Change into the /tmp directory. */
2705   base_dir = (config_base_dir != NULL)
2706     ? config_base_dir
2707     : "/tmp";
2709   if (chdir (base_dir) != 0)
2710   {
2711     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2712     goto error;
2713   }
2715   install_signal_handlers();
2717   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2718   RRDD_LOG(LOG_INFO, "starting up");
2720   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2721                                 (GDestroyNotify) free_cache_item);
2722   if (cache_tree == NULL)
2723   {
2724     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2725     goto error;
2726   }
2728   return write_pidfile (pid_fd);
2730 error:
2731   remove_pidfile();
2732   return -1;
2733 } /* }}} int daemonize */
2735 static int cleanup (void) /* {{{ */
2737   pthread_cond_broadcast (&flush_cond);
2738   pthread_join (flush_thread, NULL);
2740   pthread_cond_broadcast (&queue_cond);
2741   for (int i = 0; i < config_queue_threads; i++)
2742     pthread_join (queue_threads[i], NULL);
2744   if (config_flush_at_shutdown)
2745   {
2746     assert(cache_queue_head == NULL);
2747     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2748   }
2750   free(queue_threads);
2751   free(config_base_dir);
2753   pthread_mutex_lock(&cache_lock);
2754   g_tree_destroy(cache_tree);
2756   pthread_mutex_lock(&journal_lock);
2757   journal_done();
2759   RRDD_LOG(LOG_INFO, "goodbye");
2760   closelog ();
2762   remove_pidfile ();
2763   free(config_pid_file);
2765   return (0);
2766 } /* }}} int cleanup */
2768 static int read_options (int argc, char **argv) /* {{{ */
2770   int option;
2771   int status = 0;
2773   char **permissions = NULL;
2774   size_t permissions_len = 0;
2776   gid_t  socket_group = (gid_t)-1;
2777   mode_t socket_permissions = (mode_t)-1;
2779   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2780   {
2781     switch (option)
2782     {
2783       case 'g':
2784         stay_foreground=1;
2785         break;
2787       case 'l':
2788       {
2789         listen_socket_t *new;
2791         new = malloc(sizeof(listen_socket_t));
2792         if (new == NULL)
2793         {
2794           fprintf(stderr, "read_options: malloc failed.\n");
2795           return(2);
2796         }
2797         memset(new, 0, sizeof(listen_socket_t));
2799         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2801         /* Add permissions to the socket {{{ */
2802         if (permissions_len != 0)
2803         {
2804           size_t i;
2805           for (i = 0; i < permissions_len; i++)
2806           {
2807             status = socket_permission_add (new, permissions[i]);
2808             if (status != 0)
2809             {
2810               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2811                   "socket failed. Most likely, this permission doesn't "
2812                   "exist. Check your command line.\n", permissions[i]);
2813               status = 4;
2814             }
2815           }
2816         }
2817         else /* if (permissions_len == 0) */
2818         {
2819           /* Add permission for ALL commands to the socket. */
2820           size_t i;
2821           for (i = 0; i < list_of_commands_len; i++)
2822           {
2823             status = socket_permission_add (new, list_of_commands[i].cmd);
2824             if (status != 0)
2825             {
2826               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2827                   "socket failed. This should never happen, ever! Sorry.\n",
2828                   permissions[i]);
2829               status = 4;
2830             }
2831           }
2832         }
2833         /* }}} Done adding permissions. */
2835         new->socket_group = socket_group;
2836         new->socket_permissions = socket_permissions;
2838         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2839                          &config_listen_address_list_len, new))
2840         {
2841           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2842           return (2);
2843         }
2844       }
2845       break;
2847       /* set socket group permissions */
2848       case 's':
2849       {
2850         gid_t group_gid;
2851         struct group *grp;
2853         group_gid = strtoul(optarg, NULL, 10);
2854         if (errno != EINVAL && group_gid>0)
2855         {
2856           /* we were passed a number */
2857           grp = getgrgid(group_gid);
2858         }
2859         else
2860         {
2861           grp = getgrnam(optarg);
2862         }
2864         if (grp)
2865         {
2866           socket_group = grp->gr_gid;
2867         }
2868         else
2869         {
2870           /* no idea what the user wanted... */
2871           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2872           return (5);
2873         }
2874       }
2875       break;
2877       /* set socket file permissions */
2878       case 'm':
2879       {
2880         long  tmp;
2881         char *endptr = NULL;
2883         tmp = strtol (optarg, &endptr, 8);
2884         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2885             || (tmp > 07777) || (tmp < 0)) {
2886           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2887               optarg);
2888           return (5);
2889         }
2891         socket_permissions = (mode_t)tmp;
2892       }
2893       break;
2895       case 'P':
2896       {
2897         char *optcopy;
2898         char *saveptr;
2899         char *dummy;
2900         char *ptr;
2902         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2904         optcopy = strdup (optarg);
2905         dummy = optcopy;
2906         saveptr = NULL;
2907         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2908         {
2909           dummy = NULL;
2910           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2911         }
2913         free (optcopy);
2914       }
2915       break;
2917       case 'f':
2918       {
2919         int temp;
2921         temp = atoi (optarg);
2922         if (temp > 0)
2923           config_flush_interval = temp;
2924         else
2925         {
2926           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2927           status = 3;
2928         }
2929       }
2930       break;
2932       case 'w':
2933       {
2934         int temp;
2936         temp = atoi (optarg);
2937         if (temp > 0)
2938           config_write_interval = temp;
2939         else
2940         {
2941           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2942           status = 2;
2943         }
2944       }
2945       break;
2947       case 'z':
2948       {
2949         int temp;
2951         temp = atoi(optarg);
2952         if (temp > 0)
2953           config_write_jitter = temp;
2954         else
2955         {
2956           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2957           status = 2;
2958         }
2960         break;
2961       }
2963       case 't':
2964       {
2965         int threads;
2966         threads = atoi(optarg);
2967         if (threads >= 1)
2968           config_queue_threads = threads;
2969         else
2970         {
2971           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2972           return 1;
2973         }
2974       }
2975       break;
2977       case 'B':
2978         config_write_base_only = 1;
2979         break;
2981       case 'b':
2982       {
2983         size_t len;
2984         char base_realpath[PATH_MAX];
2986         if (config_base_dir != NULL)
2987           free (config_base_dir);
2988         config_base_dir = strdup (optarg);
2989         if (config_base_dir == NULL)
2990         {
2991           fprintf (stderr, "read_options: strdup failed.\n");
2992           return (3);
2993         }
2995         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2996         {
2997           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2998               config_base_dir, rrd_strerror (errno));
2999           return (3);
3000         }
3002         /* make sure that the base directory is not resolved via
3003          * symbolic links.  this makes some performance-enhancing
3004          * assumptions possible (we don't have to resolve paths
3005          * that start with a "/")
3006          */
3007         if (realpath(config_base_dir, base_realpath) == NULL)
3008         {
3009           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3010               "%s\n", config_base_dir, rrd_strerror(errno));
3011           return 5;
3012         }
3014         len = strlen (config_base_dir);
3015         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3016         {
3017           config_base_dir[len - 1] = 0;
3018           len--;
3019         }
3021         if (len < 1)
3022         {
3023           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3024           return (4);
3025         }
3027         _config_base_dir_len = len;
3029         len = strlen (base_realpath);
3030         while ((len > 0) && (base_realpath[len - 1] == '/'))
3031         {
3032           base_realpath[len - 1] = '\0';
3033           len--;
3034         }
3036         if (strncmp(config_base_dir,
3037                          base_realpath, sizeof(base_realpath)) != 0)
3038         {
3039           fprintf(stderr,
3040                   "Base directory (-b) resolved via file system links!\n"
3041                   "Please consult rrdcached '-b' documentation!\n"
3042                   "Consider specifying the real directory (%s)\n",
3043                   base_realpath);
3044           return 5;
3045         }
3046       }
3047       break;
3049       case 'p':
3050       {
3051         if (config_pid_file != NULL)
3052           free (config_pid_file);
3053         config_pid_file = strdup (optarg);
3054         if (config_pid_file == NULL)
3055         {
3056           fprintf (stderr, "read_options: strdup failed.\n");
3057           return (3);
3058         }
3059       }
3060       break;
3062       case 'F':
3063         config_flush_at_shutdown = 1;
3064         break;
3066       case 'j':
3067       {
3068         const char *dir = journal_dir = strdup(optarg);
3070         status = rrd_mkdir_p(dir, 0777);
3071         if (status != 0)
3072         {
3073           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3074               dir, rrd_strerror(errno));
3075           return 6;
3076         }
3078         if (access(dir, R_OK|W_OK|X_OK) != 0)
3079         {
3080           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3081                   errno ? rrd_strerror(errno) : "");
3082           return 6;
3083         }
3084       }
3085       break;
3087       case 'h':
3088       case '?':
3089         printf ("RRDCacheD %s\n"
3090             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3091             "\n"
3092             "Usage: rrdcached [options]\n"
3093             "\n"
3094             "Valid options are:\n"
3095             "  -l <address>  Socket address to listen to.\n"
3096             "  -P <perms>    Sets the permissions to assign to all following "
3097                             "sockets\n"
3098             "  -w <seconds>  Interval in which to write data.\n"
3099             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3100             "  -t <threads>  Number of write threads.\n"
3101             "  -f <seconds>  Interval in which to flush dead data.\n"
3102             "  -p <file>     Location of the PID-file.\n"
3103             "  -b <dir>      Base directory to change to.\n"
3104             "  -B            Restrict file access to paths within -b <dir>\n"
3105             "  -g            Do not fork and run in the foreground.\n"
3106             "  -j <dir>      Directory in which to create the journal files.\n"
3107             "  -F            Always flush all updates at shutdown\n"
3108             "  -s <id|name>  Make socket g+rw to named group\n"
3109             "\n"
3110             "For more information and a detailed description of all options "
3111             "please refer\n"
3112             "to the rrdcached(1) manual page.\n",
3113             VERSION);
3114         status = -1;
3115         break;
3116     } /* switch (option) */
3117   } /* while (getopt) */
3119   /* advise the user when values are not sane */
3120   if (config_flush_interval < 2 * config_write_interval)
3121     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3122             " 2x write interval (-w) !\n");
3123   if (config_write_jitter > config_write_interval)
3124     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3125             " write interval (-w) !\n");
3127   if (config_write_base_only && config_base_dir == NULL)
3128     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3129             "  Consult the rrdcached documentation\n");
3131   if (journal_dir == NULL)
3132     config_flush_at_shutdown = 1;
3134   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3136   return (status);
3137 } /* }}} int read_options */
3139 int main (int argc, char **argv)
3141   int status;
3143   status = read_options (argc, argv);
3144   if (status != 0)
3145   {
3146     if (status < 0)
3147       status = 0;
3148     return (status);
3149   }
3151   status = daemonize ();
3152   if (status != 0)
3153   {
3154     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3155     return (1);
3156   }
3158   journal_init();
3160   /* start the queue threads */
3161   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3162   if (queue_threads == NULL)
3163   {
3164     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3165     cleanup();
3166     return (1);
3167   }
3168   for (int i = 0; i < config_queue_threads; i++)
3169   {
3170     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3171     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3172     if (status != 0)
3173     {
3174       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3175       cleanup();
3176       return (1);
3177     }
3178   }
3180   /* start the flush thread */
3181   memset(&flush_thread, 0, sizeof(flush_thread));
3182   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3183   if (status != 0)
3184   {
3185     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3186     cleanup();
3187     return (1);
3188   }
3190   listen_thread_main (NULL);
3191   cleanup ();
3193   return (0);
3194 } /* int main */
3196 /*
3197  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3198  */