Code

Obviously this will only work if rrdcached is running as root which in
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
111 #include <glib-2.0/glib.h>
112 /* }}} */
114 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
116 #ifndef __GNUC__
117 # define __attribute__(x) /**/
118 #endif
120 /*
121  * Types
122  */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125 struct listen_socket_s
127   int fd;
128   char addr[PATH_MAX + 1];
129   int family;
131   /* state for BATCH processing */
132   time_t batch_start;
133   int batch_cmd;
135   /* buffered IO */
136   char *rbuf;
137   off_t next_cmd;
138   off_t next_read;
140   char *wbuf;
141   ssize_t wbuf_len;
143   uint32_t permissions;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command_s;
148 typedef struct command_s command_t;
149 /* note: guard against "unused" warnings in the handlers */
150 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
151                         time_t now              __attribute__((unused)),\
152                         char  *buffer           __attribute__((unused)),\
153                         size_t buffer_size      __attribute__((unused))
155 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
156                         DISPATCH_PROTO
158 struct command_s {
159   char   *cmd;
160   int (*handler)(HANDLER_PROTO);
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
168   char *syntax;
169   char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
189 struct callback_flush_data_s
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* describe a set of journal files */
206 typedef struct {
207   char **files;
208   size_t files_num;
209 } journal_set;
211 /* max length of socket command or response */
212 #define CMD_MAX 4096
213 #define RBUF_SIZE (CMD_MAX*2)
215 /*
216  * Variables
217  */
218 static int stay_foreground = 0;
219 static uid_t daemon_uid;
221 static listen_socket_t *listen_fds = NULL;
222 static size_t listen_fds_num = 0;
224 static gboolean set_socket_group = FALSE;
225 static gid_t socket_group;
227 enum {
228   RUNNING,              /* normal operation */
229   FLUSHING,             /* flushing remaining values */
230   SHUTDOWN              /* shutting down */
231 } state = RUNNING;
233 static pthread_t *queue_threads;
234 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
235 static int config_queue_threads = 4;
237 static pthread_t flush_thread;
238 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
240 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
241 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
242 static int connection_threads_num = 0;
244 /* Cache stuff */
245 static GTree          *cache_tree = NULL;
246 static cache_item_t   *cache_queue_head = NULL;
247 static cache_item_t   *cache_queue_tail = NULL;
248 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
250 static int config_write_interval = 300;
251 static int config_write_jitter   = 0;
252 static int config_flush_interval = 3600;
253 static int config_flush_at_shutdown = 0;
254 static char *config_pid_file = NULL;
255 static char *config_base_dir = NULL;
256 static size_t _config_base_dir_len = 0;
257 static int config_write_base_only = 0;
259 static listen_socket_t **config_listen_address_list = NULL;
260 static size_t config_listen_address_list_len = 0;
262 static uint64_t stats_queue_length = 0;
263 static uint64_t stats_updates_received = 0;
264 static uint64_t stats_flush_received = 0;
265 static uint64_t stats_updates_written = 0;
266 static uint64_t stats_data_sets_written = 0;
267 static uint64_t stats_journal_bytes = 0;
268 static uint64_t stats_journal_rotate = 0;
269 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
271 /* Journaled updates */
272 #define JOURNAL_REPLAY(s) ((s) == NULL)
273 #define JOURNAL_BASE "rrd.journal"
274 static journal_set *journal_cur = NULL;
275 static journal_set *journal_old = NULL;
276 static char *journal_dir = NULL;
277 static FILE *journal_fh = NULL;         /* current journal file handle */
278 static long  journal_size = 0;          /* current journal size */
279 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
280 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
281 static int journal_write(char *cmd, char *args);
282 static void journal_done(void);
283 static void journal_rotate(void);
285 /* prototypes for forward refernces */
286 static int handle_request_help (HANDLER_PROTO);
288 /* 
289  * Functions
290  */
291 static void sig_common (const char *sig) /* {{{ */
293   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
294   state = FLUSHING;
295   pthread_cond_broadcast(&flush_cond);
296   pthread_cond_broadcast(&queue_cond);
297 } /* }}} void sig_common */
299 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
301   sig_common("INT");
302 } /* }}} void sig_int_handler */
304 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
306   sig_common("TERM");
307 } /* }}} void sig_term_handler */
309 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
311   config_flush_at_shutdown = 1;
312   sig_common("USR1");
313 } /* }}} void sig_usr1_handler */
315 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
317   config_flush_at_shutdown = 0;
318   sig_common("USR2");
319 } /* }}} void sig_usr2_handler */
321 static void install_signal_handlers(void) /* {{{ */
323   /* These structures are static, because `sigaction' behaves weird if the are
324    * overwritten.. */
325   static struct sigaction sa_int;
326   static struct sigaction sa_term;
327   static struct sigaction sa_pipe;
328   static struct sigaction sa_usr1;
329   static struct sigaction sa_usr2;
331   /* Install signal handlers */
332   memset (&sa_int, 0, sizeof (sa_int));
333   sa_int.sa_handler = sig_int_handler;
334   sigaction (SIGINT, &sa_int, NULL);
336   memset (&sa_term, 0, sizeof (sa_term));
337   sa_term.sa_handler = sig_term_handler;
338   sigaction (SIGTERM, &sa_term, NULL);
340   memset (&sa_pipe, 0, sizeof (sa_pipe));
341   sa_pipe.sa_handler = SIG_IGN;
342   sigaction (SIGPIPE, &sa_pipe, NULL);
344   memset (&sa_pipe, 0, sizeof (sa_usr1));
345   sa_usr1.sa_handler = sig_usr1_handler;
346   sigaction (SIGUSR1, &sa_usr1, NULL);
348   memset (&sa_usr2, 0, sizeof (sa_usr2));
349   sa_usr2.sa_handler = sig_usr2_handler;
350   sigaction (SIGUSR2, &sa_usr2, NULL);
352 } /* }}} void install_signal_handlers */
354 static int open_pidfile(char *action, int oflag) /* {{{ */
356   int fd;
357   const char *file;
358   char *file_copy, *dir;
360   file = (config_pid_file != NULL)
361     ? config_pid_file
362     : LOCALSTATEDIR "/run/rrdcached.pid";
364   /* dirname may modify its argument */
365   file_copy = strdup(file);
366   if (file_copy == NULL)
367   {
368     fprintf(stderr, "rrdcached: strdup(): %s\n",
369         rrd_strerror(errno));
370     return -1;
371   }
373   dir = dirname(file_copy);
374   if (rrd_mkdir_p(dir, 0777) != 0)
375   {
376     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
377         dir, rrd_strerror(errno));
378     return -1;
379   }
381   free(file_copy);
383   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
384   if (fd < 0)
385     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
386             action, file, rrd_strerror(errno));
388   return(fd);
389 } /* }}} static int open_pidfile */
391 /* check existing pid file to see whether a daemon is running */
392 static int check_pidfile(void)
394   int pid_fd;
395   pid_t pid;
396   char pid_str[16];
398   pid_fd = open_pidfile("open", O_RDWR);
399   if (pid_fd < 0)
400     return pid_fd;
402   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
403     return -1;
405   pid = atoi(pid_str);
406   if (pid <= 0)
407     return -1;
409   /* another running process that we can signal COULD be
410    * a competing rrdcached */
411   if (pid != getpid() && kill(pid, 0) == 0)
412   {
413     fprintf(stderr,
414             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
415     close(pid_fd);
416     return -1;
417   }
419   lseek(pid_fd, 0, SEEK_SET);
420   if (ftruncate(pid_fd, 0) == -1)
421   {
422     fprintf(stderr,
423             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
424     close(pid_fd);
425     return -1;
426   }
428   fprintf(stderr,
429           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
430           "rrdcached: starting normally.\n", pid);
432   return pid_fd;
433 } /* }}} static int check_pidfile */
435 static int write_pidfile (int fd) /* {{{ */
437   pid_t pid;
438   FILE *fh;
440   pid = getpid ();
442   fh = fdopen (fd, "w");
443   if (fh == NULL)
444   {
445     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
446     close(fd);
447     return (-1);
448   }
450   fprintf (fh, "%i\n", (int) pid);
451   fclose (fh);
453   return (0);
454 } /* }}} int write_pidfile */
456 static int remove_pidfile (void) /* {{{ */
458   char *file;
459   int status;
461   file = (config_pid_file != NULL)
462     ? config_pid_file
463     : LOCALSTATEDIR "/run/rrdcached.pid";
465   status = unlink (file);
466   if (status == 0)
467     return (0);
468   return (errno);
469 } /* }}} int remove_pidfile */
471 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
473   char *eol;
475   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
476                sock->next_read - sock->next_cmd);
478   if (eol == NULL)
479   {
480     /* no commands left, move remainder back to front of rbuf */
481     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
482             sock->next_read - sock->next_cmd);
483     sock->next_read -= sock->next_cmd;
484     sock->next_cmd = 0;
485     *len = 0;
486     return NULL;
487   }
488   else
489   {
490     char *cmd = sock->rbuf + sock->next_cmd;
491     *eol = '\0';
493     sock->next_cmd = eol - sock->rbuf + 1;
495     if (eol > sock->rbuf && *(eol-1) == '\r')
496       *(--eol) = '\0'; /* handle "\r\n" EOL */
498     *len = eol - cmd;
500     return cmd;
501   }
503   /* NOTREACHED */
504   assert(1==0);
505 } /* }}} char *next_cmd */
507 /* add the characters directly to the write buffer */
508 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
510   char *new_buf;
512   assert(sock != NULL);
514   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
515   if (new_buf == NULL)
516   {
517     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
518     return -1;
519   }
521   strncpy(new_buf + sock->wbuf_len, str, len + 1);
523   sock->wbuf = new_buf;
524   sock->wbuf_len += len;
526   return 0;
527 } /* }}} static int add_to_wbuf */
529 /* add the text to the "extra" info that's sent after the status line */
530 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
532   va_list argp;
533   char buffer[CMD_MAX];
534   int len;
536   if (JOURNAL_REPLAY(sock)) return 0;
537   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
539   va_start(argp, fmt);
540 #ifdef HAVE_VSNPRINTF
541   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
542 #else
543   len = vsprintf(buffer, fmt, argp);
544 #endif
545   va_end(argp);
546   if (len < 0)
547   {
548     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
549     return -1;
550   }
552   return add_to_wbuf(sock, buffer, len);
553 } /* }}} static int add_response_info */
555 static int count_lines(char *str) /* {{{ */
557   int lines = 0;
559   if (str != NULL)
560   {
561     while ((str = strchr(str, '\n')) != NULL)
562     {
563       ++lines;
564       ++str;
565     }
566   }
568   return lines;
569 } /* }}} static int count_lines */
571 /* send the response back to the user.
572  * returns 0 on success, -1 on error
573  * write buffer is always zeroed after this call */
574 static int send_response (listen_socket_t *sock, response_code rc,
575                           char *fmt, ...) /* {{{ */
577   va_list argp;
578   char buffer[CMD_MAX];
579   int lines;
580   ssize_t wrote;
581   int rclen, len;
583   if (JOURNAL_REPLAY(sock)) return rc;
585   if (sock->batch_start)
586   {
587     if (rc == RESP_OK)
588       return rc; /* no response on success during BATCH */
589     lines = sock->batch_cmd;
590   }
591   else if (rc == RESP_OK)
592     lines = count_lines(sock->wbuf);
593   else
594     lines = -1;
596   rclen = sprintf(buffer, "%d ", lines);
597   va_start(argp, fmt);
598 #ifdef HAVE_VSNPRINTF
599   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
600 #else
601   len = vsprintf(buffer+rclen, fmt, argp);
602 #endif
603   va_end(argp);
604   if (len < 0)
605     return -1;
607   len += rclen;
609   /* append the result to the wbuf, don't write to the user */
610   if (sock->batch_start)
611     return add_to_wbuf(sock, buffer, len);
613   /* first write must be complete */
614   if (len != write(sock->fd, buffer, len))
615   {
616     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
617     return -1;
618   }
620   if (sock->wbuf != NULL && rc == RESP_OK)
621   {
622     wrote = 0;
623     while (wrote < sock->wbuf_len)
624     {
625       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
626       if (wb <= 0)
627       {
628         RRDD_LOG(LOG_INFO, "send_response: could not write results");
629         return -1;
630       }
631       wrote += wb;
632     }
633   }
635   free(sock->wbuf); sock->wbuf = NULL;
636   sock->wbuf_len = 0;
638   return 0;
639 } /* }}} */
641 static void wipe_ci_values(cache_item_t *ci, time_t when)
643   ci->values = NULL;
644   ci->values_num = 0;
646   ci->last_flush_time = when;
647   if (config_write_jitter > 0)
648     ci->last_flush_time += (rrd_random() % config_write_jitter);
651 /* remove_from_queue
652  * remove a "cache_item_t" item from the queue.
653  * must hold 'cache_lock' when calling this
654  */
655 static void remove_from_queue(cache_item_t *ci) /* {{{ */
657   if (ci == NULL) return;
658   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
660   if (ci->prev == NULL)
661     cache_queue_head = ci->next; /* reset head */
662   else
663     ci->prev->next = ci->next;
665   if (ci->next == NULL)
666     cache_queue_tail = ci->prev; /* reset the tail */
667   else
668     ci->next->prev = ci->prev;
670   ci->next = ci->prev = NULL;
671   ci->flags &= ~CI_FLAGS_IN_QUEUE;
673   pthread_mutex_lock (&stats_lock);
674   assert (stats_queue_length > 0);
675   stats_queue_length--;
676   pthread_mutex_unlock (&stats_lock);
678 } /* }}} static void remove_from_queue */
680 /* free the resources associated with the cache_item_t
681  * must hold cache_lock when calling this function
682  */
683 static void *free_cache_item(cache_item_t *ci) /* {{{ */
685   if (ci == NULL) return NULL;
687   remove_from_queue(ci);
689   for (size_t i=0; i < ci->values_num; i++)
690     free(ci->values[i]);
692   free (ci->values);
693   free (ci->file);
695   /* in case anyone is waiting */
696   pthread_cond_broadcast(&ci->flushed);
697   pthread_cond_destroy(&ci->flushed);
699   free (ci);
701   return NULL;
702 } /* }}} static void *free_cache_item */
704 /*
705  * enqueue_cache_item:
706  * `cache_lock' must be acquired before calling this function!
707  */
708 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
709     queue_side_t side)
711   if (ci == NULL)
712     return (-1);
714   if (ci->values_num == 0)
715     return (0);
717   if (side == HEAD)
718   {
719     if (cache_queue_head == ci)
720       return 0;
722     /* remove if further down in queue */
723     remove_from_queue(ci);
725     ci->prev = NULL;
726     ci->next = cache_queue_head;
727     if (ci->next != NULL)
728       ci->next->prev = ci;
729     cache_queue_head = ci;
731     if (cache_queue_tail == NULL)
732       cache_queue_tail = cache_queue_head;
733   }
734   else /* (side == TAIL) */
735   {
736     /* We don't move values back in the list.. */
737     if (ci->flags & CI_FLAGS_IN_QUEUE)
738       return (0);
740     assert (ci->next == NULL);
741     assert (ci->prev == NULL);
743     ci->prev = cache_queue_tail;
745     if (cache_queue_tail == NULL)
746       cache_queue_head = ci;
747     else
748       cache_queue_tail->next = ci;
750     cache_queue_tail = ci;
751   }
753   ci->flags |= CI_FLAGS_IN_QUEUE;
755   pthread_cond_signal(&queue_cond);
756   pthread_mutex_lock (&stats_lock);
757   stats_queue_length++;
758   pthread_mutex_unlock (&stats_lock);
760   return (0);
761 } /* }}} int enqueue_cache_item */
763 /*
764  * tree_callback_flush:
765  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
766  * while this is in progress.
767  */
768 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
769     gpointer data)
771   cache_item_t *ci;
772   callback_flush_data_t *cfd;
774   ci = (cache_item_t *) value;
775   cfd = (callback_flush_data_t *) data;
777   if (ci->flags & CI_FLAGS_IN_QUEUE)
778     return FALSE;
780   if (ci->values_num > 0
781       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
782   {
783     enqueue_cache_item (ci, TAIL);
784   }
785   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
786       && (ci->values_num <= 0))
787   {
788     assert ((char *) key == ci->file);
789     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
790     {
791       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
792       return (FALSE);
793     }
794   }
796   return (FALSE);
797 } /* }}} gboolean tree_callback_flush */
799 static int flush_old_values (int max_age)
801   callback_flush_data_t cfd;
802   size_t k;
804   memset (&cfd, 0, sizeof (cfd));
805   /* Pass the current time as user data so that we don't need to call
806    * `time' for each node. */
807   cfd.now = time (NULL);
808   cfd.keys = NULL;
809   cfd.keys_num = 0;
811   if (max_age > 0)
812     cfd.abs_timeout = cfd.now - max_age;
813   else
814     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
816   /* `tree_callback_flush' will return the keys of all values that haven't
817    * been touched in the last `config_flush_interval' seconds in `cfd'.
818    * The char*'s in this array point to the same memory as ci->file, so we
819    * don't need to free them separately. */
820   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
822   for (k = 0; k < cfd.keys_num; k++)
823   {
824     /* should never fail, since we have held the cache_lock
825      * the entire time */
826     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
827   }
829   if (cfd.keys != NULL)
830   {
831     free (cfd.keys);
832     cfd.keys = NULL;
833   }
835   return (0);
836 } /* int flush_old_values */
838 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
840   struct timeval now;
841   struct timespec next_flush;
842   int status;
844   gettimeofday (&now, NULL);
845   next_flush.tv_sec = now.tv_sec + config_flush_interval;
846   next_flush.tv_nsec = 1000 * now.tv_usec;
848   pthread_mutex_lock(&cache_lock);
850   while (state == RUNNING)
851   {
852     gettimeofday (&now, NULL);
853     if ((now.tv_sec > next_flush.tv_sec)
854         || ((now.tv_sec == next_flush.tv_sec)
855           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
856     {
857       RRDD_LOG(LOG_DEBUG, "flushing old values");
859       /* Determine the time of the next cache flush. */
860       next_flush.tv_sec = now.tv_sec + config_flush_interval;
862       /* Flush all values that haven't been written in the last
863        * `config_write_interval' seconds. */
864       flush_old_values (config_write_interval);
866       /* unlock the cache while we rotate so we don't block incoming
867        * updates if the fsync() blocks on disk I/O */
868       pthread_mutex_unlock(&cache_lock);
869       journal_rotate();
870       pthread_mutex_lock(&cache_lock);
871     }
873     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
874     if (status != 0 && status != ETIMEDOUT)
875     {
876       RRDD_LOG (LOG_ERR, "flush_thread_main: "
877                 "pthread_cond_timedwait returned %i.", status);
878     }
879   }
881   if (config_flush_at_shutdown)
882     flush_old_values (-1); /* flush everything */
884   state = SHUTDOWN;
886   pthread_mutex_unlock(&cache_lock);
888   return NULL;
889 } /* void *flush_thread_main */
891 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
893   pthread_mutex_lock (&cache_lock);
895   while (state != SHUTDOWN
896          || (cache_queue_head != NULL && config_flush_at_shutdown))
897   {
898     cache_item_t *ci;
899     char *file;
900     char **values;
901     size_t values_num;
902     int status;
904     /* Now, check if there's something to store away. If not, wait until
905      * something comes in. */
906     if (cache_queue_head == NULL)
907     {
908       status = pthread_cond_wait (&queue_cond, &cache_lock);
909       if ((status != 0) && (status != ETIMEDOUT))
910       {
911         RRDD_LOG (LOG_ERR, "queue_thread_main: "
912             "pthread_cond_wait returned %i.", status);
913       }
914     }
916     /* Check if a value has arrived. This may be NULL if we timed out or there
917      * was an interrupt such as a signal. */
918     if (cache_queue_head == NULL)
919       continue;
921     ci = cache_queue_head;
923     /* copy the relevant parts */
924     file = strdup (ci->file);
925     if (file == NULL)
926     {
927       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
928       continue;
929     }
931     assert(ci->values != NULL);
932     assert(ci->values_num > 0);
934     values = ci->values;
935     values_num = ci->values_num;
937     wipe_ci_values(ci, time(NULL));
938     remove_from_queue(ci);
940     pthread_mutex_unlock (&cache_lock);
942     rrd_clear_error ();
943     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
944     if (status != 0)
945     {
946       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
947           "rrd_update_r (%s) failed with status %i. (%s)",
948           file, status, rrd_get_error());
949     }
951     journal_write("wrote", file);
953     /* Search again in the tree.  It's possible someone issued a "FORGET"
954      * while we were writing the update values. */
955     pthread_mutex_lock(&cache_lock);
956     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
957     if (ci)
958       pthread_cond_broadcast(&ci->flushed);
959     pthread_mutex_unlock(&cache_lock);
961     if (status == 0)
962     {
963       pthread_mutex_lock (&stats_lock);
964       stats_updates_written++;
965       stats_data_sets_written += values_num;
966       pthread_mutex_unlock (&stats_lock);
967     }
969     rrd_free_ptrs((void ***) &values, &values_num);
970     free(file);
972     pthread_mutex_lock (&cache_lock);
973   }
974   pthread_mutex_unlock (&cache_lock);
976   return (NULL);
977 } /* }}} void *queue_thread_main */
979 static int buffer_get_field (char **buffer_ret, /* {{{ */
980     size_t *buffer_size_ret, char **field_ret)
982   char *buffer;
983   size_t buffer_pos;
984   size_t buffer_size;
985   char *field;
986   size_t field_size;
987   int status;
989   buffer = *buffer_ret;
990   buffer_pos = 0;
991   buffer_size = *buffer_size_ret;
992   field = *buffer_ret;
993   field_size = 0;
995   if (buffer_size <= 0)
996     return (-1);
998   /* This is ensured by `handle_request'. */
999   assert (buffer[buffer_size - 1] == '\0');
1001   status = -1;
1002   while (buffer_pos < buffer_size)
1003   {
1004     /* Check for end-of-field or end-of-buffer */
1005     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1006     {
1007       field[field_size] = 0;
1008       field_size++;
1009       buffer_pos++;
1010       status = 0;
1011       break;
1012     }
1013     /* Handle escaped characters. */
1014     else if (buffer[buffer_pos] == '\\')
1015     {
1016       if (buffer_pos >= (buffer_size - 1))
1017         break;
1018       buffer_pos++;
1019       field[field_size] = buffer[buffer_pos];
1020       field_size++;
1021       buffer_pos++;
1022     }
1023     /* Normal operation */ 
1024     else
1025     {
1026       field[field_size] = buffer[buffer_pos];
1027       field_size++;
1028       buffer_pos++;
1029     }
1030   } /* while (buffer_pos < buffer_size) */
1032   if (status != 0)
1033     return (status);
1035   *buffer_ret = buffer + buffer_pos;
1036   *buffer_size_ret = buffer_size - buffer_pos;
1037   *field_ret = field;
1039   return (0);
1040 } /* }}} int buffer_get_field */
1042 /* if we're restricting writes to the base directory,
1043  * check whether the file falls within the dir
1044  * returns 1 if OK, otherwise 0
1045  */
1046 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1048   assert(file != NULL);
1050   if (!config_write_base_only
1051       || JOURNAL_REPLAY(sock)
1052       || config_base_dir == NULL)
1053     return 1;
1055   if (strstr(file, "../") != NULL) goto err;
1057   /* relative paths without "../" are ok */
1058   if (*file != '/') return 1;
1060   /* file must be of the format base + "/" + <1+ char filename> */
1061   if (strlen(file) < _config_base_dir_len + 2) goto err;
1062   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1063   if (*(file + _config_base_dir_len) != '/') goto err;
1065   return 1;
1067 err:
1068   if (sock != NULL && sock->fd >= 0)
1069     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1071   return 0;
1072 } /* }}} static int check_file_access */
1074 /* when using a base dir, convert relative paths to absolute paths.
1075  * if necessary, modifies the "filename" pointer to point
1076  * to the new path created in "tmp".  "tmp" is provided
1077  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1078  *
1079  * this allows us to optimize for the expected case (absolute path)
1080  * with a no-op.
1081  */
1082 static void get_abs_path(char **filename, char *tmp)
1084   assert(tmp != NULL);
1085   assert(filename != NULL && *filename != NULL);
1087   if (config_base_dir == NULL || **filename == '/')
1088     return;
1090   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1091   *filename = tmp;
1092 } /* }}} static int get_abs_path */
1094 static int flush_file (const char *filename) /* {{{ */
1096   cache_item_t *ci;
1098   pthread_mutex_lock (&cache_lock);
1100   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1101   if (ci == NULL)
1102   {
1103     pthread_mutex_unlock (&cache_lock);
1104     return (ENOENT);
1105   }
1107   if (ci->values_num > 0)
1108   {
1109     /* Enqueue at head */
1110     enqueue_cache_item (ci, HEAD);
1111     pthread_cond_wait(&ci->flushed, &cache_lock);
1112   }
1114   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1115    * may have been purged during our cond_wait() */
1117   pthread_mutex_unlock(&cache_lock);
1119   return (0);
1120 } /* }}} int flush_file */
1122 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1124   char *err = "Syntax error.\n";
1126   if (cmd && cmd->syntax)
1127     err = cmd->syntax;
1129   return send_response(sock, RESP_ERR, "Usage: %s", err);
1130 } /* }}} static int syntax_error() */
1132 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1134   uint64_t copy_queue_length;
1135   uint64_t copy_updates_received;
1136   uint64_t copy_flush_received;
1137   uint64_t copy_updates_written;
1138   uint64_t copy_data_sets_written;
1139   uint64_t copy_journal_bytes;
1140   uint64_t copy_journal_rotate;
1142   uint64_t tree_nodes_number;
1143   uint64_t tree_depth;
1145   pthread_mutex_lock (&stats_lock);
1146   copy_queue_length       = stats_queue_length;
1147   copy_updates_received   = stats_updates_received;
1148   copy_flush_received     = stats_flush_received;
1149   copy_updates_written    = stats_updates_written;
1150   copy_data_sets_written  = stats_data_sets_written;
1151   copy_journal_bytes      = stats_journal_bytes;
1152   copy_journal_rotate     = stats_journal_rotate;
1153   pthread_mutex_unlock (&stats_lock);
1155   pthread_mutex_lock (&cache_lock);
1156   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1157   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1158   pthread_mutex_unlock (&cache_lock);
1160   add_response_info(sock,
1161                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1162   add_response_info(sock,
1163                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1164   add_response_info(sock,
1165                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1166   add_response_info(sock,
1167                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1168   add_response_info(sock,
1169                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1170   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1171   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1172   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1173   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1175   send_response(sock, RESP_OK, "Statistics follow\n");
1177   return (0);
1178 } /* }}} int handle_request_stats */
1180 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1182   char *file, file_tmp[PATH_MAX];
1183   int status;
1185   status = buffer_get_field (&buffer, &buffer_size, &file);
1186   if (status != 0)
1187   {
1188     return syntax_error(sock,cmd);
1189   }
1190   else
1191   {
1192     pthread_mutex_lock(&stats_lock);
1193     stats_flush_received++;
1194     pthread_mutex_unlock(&stats_lock);
1196     get_abs_path(&file, file_tmp);
1197     if (!check_file_access(file, sock)) return 0;
1199     status = flush_file (file);
1200     if (status == 0)
1201       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1202     else if (status == ENOENT)
1203     {
1204       /* no file in our tree; see whether it exists at all */
1205       struct stat statbuf;
1207       memset(&statbuf, 0, sizeof(statbuf));
1208       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1209         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1210       else
1211         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1212     }
1213     else if (status < 0)
1214       return send_response(sock, RESP_ERR, "Internal error.\n");
1215     else
1216       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1217   }
1219   /* NOTREACHED */
1220   assert(1==0);
1221 } /* }}} int handle_request_flush */
1223 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1225   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1227   pthread_mutex_lock(&cache_lock);
1228   flush_old_values(-1);
1229   pthread_mutex_unlock(&cache_lock);
1231   return send_response(sock, RESP_OK, "Started flush.\n");
1232 } /* }}} static int handle_request_flushall */
1234 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1236   int status;
1237   char *file, file_tmp[PATH_MAX];
1238   cache_item_t *ci;
1240   status = buffer_get_field(&buffer, &buffer_size, &file);
1241   if (status != 0)
1242     return syntax_error(sock,cmd);
1244   get_abs_path(&file, file_tmp);
1246   pthread_mutex_lock(&cache_lock);
1247   ci = g_tree_lookup(cache_tree, file);
1248   if (ci == NULL)
1249   {
1250     pthread_mutex_unlock(&cache_lock);
1251     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1252   }
1254   for (size_t i=0; i < ci->values_num; i++)
1255     add_response_info(sock, "%s\n", ci->values[i]);
1257   pthread_mutex_unlock(&cache_lock);
1258   return send_response(sock, RESP_OK, "updates pending\n");
1259 } /* }}} static int handle_request_pending */
1261 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1263   int status;
1264   gboolean found;
1265   char *file, file_tmp[PATH_MAX];
1267   status = buffer_get_field(&buffer, &buffer_size, &file);
1268   if (status != 0)
1269     return syntax_error(sock,cmd);
1271   get_abs_path(&file, file_tmp);
1272   if (!check_file_access(file, sock)) return 0;
1274   pthread_mutex_lock(&cache_lock);
1275   found = g_tree_remove(cache_tree, file);
1276   pthread_mutex_unlock(&cache_lock);
1278   if (found == TRUE)
1279   {
1280     if (!JOURNAL_REPLAY(sock))
1281       journal_write("forget", file);
1283     return send_response(sock, RESP_OK, "Gone!\n");
1284   }
1285   else
1286     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1288   /* NOTREACHED */
1289   assert(1==0);
1290 } /* }}} static int handle_request_forget */
1292 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1294   cache_item_t *ci;
1296   pthread_mutex_lock(&cache_lock);
1298   ci = cache_queue_head;
1299   while (ci != NULL)
1300   {
1301     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1302     ci = ci->next;
1303   }
1305   pthread_mutex_unlock(&cache_lock);
1307   return send_response(sock, RESP_OK, "in queue.\n");
1308 } /* }}} int handle_request_queue */
1310 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1312   char *file, file_tmp[PATH_MAX];
1313   int values_num = 0;
1314   int status;
1315   char orig_buf[CMD_MAX];
1317   cache_item_t *ci;
1319   /* save it for the journal later */
1320   if (!JOURNAL_REPLAY(sock))
1321     strncpy(orig_buf, buffer, buffer_size);
1323   status = buffer_get_field (&buffer, &buffer_size, &file);
1324   if (status != 0)
1325     return syntax_error(sock,cmd);
1327   pthread_mutex_lock(&stats_lock);
1328   stats_updates_received++;
1329   pthread_mutex_unlock(&stats_lock);
1331   get_abs_path(&file, file_tmp);
1332   if (!check_file_access(file, sock)) return 0;
1334   pthread_mutex_lock (&cache_lock);
1335   ci = g_tree_lookup (cache_tree, file);
1337   if (ci == NULL) /* {{{ */
1338   {
1339     struct stat statbuf;
1340     cache_item_t *tmp;
1342     /* don't hold the lock while we setup; stat(2) might block */
1343     pthread_mutex_unlock(&cache_lock);
1345     memset (&statbuf, 0, sizeof (statbuf));
1346     status = stat (file, &statbuf);
1347     if (status != 0)
1348     {
1349       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1351       status = errno;
1352       if (status == ENOENT)
1353         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1354       else
1355         return send_response(sock, RESP_ERR,
1356                              "stat failed with error %i.\n", status);
1357     }
1358     if (!S_ISREG (statbuf.st_mode))
1359       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1361     if (access(file, R_OK|W_OK) != 0)
1362       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1363                            file, rrd_strerror(errno));
1365     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1366     if (ci == NULL)
1367     {
1368       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1370       return send_response(sock, RESP_ERR, "malloc failed.\n");
1371     }
1372     memset (ci, 0, sizeof (cache_item_t));
1374     ci->file = strdup (file);
1375     if (ci->file == NULL)
1376     {
1377       free (ci);
1378       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1380       return send_response(sock, RESP_ERR, "strdup failed.\n");
1381     }
1383     wipe_ci_values(ci, now);
1384     ci->flags = CI_FLAGS_IN_TREE;
1385     pthread_cond_init(&ci->flushed, NULL);
1387     pthread_mutex_lock(&cache_lock);
1389     /* another UPDATE might have added this entry in the meantime */
1390     tmp = g_tree_lookup (cache_tree, file);
1391     if (tmp == NULL)
1392       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1393     else
1394     {
1395       free_cache_item (ci);
1396       ci = tmp;
1397     }
1399     /* state may have changed while we were unlocked */
1400     if (state == SHUTDOWN)
1401       return -1;
1402   } /* }}} */
1403   assert (ci != NULL);
1405   /* don't re-write updates in replay mode */
1406   if (!JOURNAL_REPLAY(sock))
1407     journal_write("update", orig_buf);
1409   while (buffer_size > 0)
1410   {
1411     char *value;
1412     time_t stamp;
1413     char *eostamp;
1415     status = buffer_get_field (&buffer, &buffer_size, &value);
1416     if (status != 0)
1417     {
1418       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1419       break;
1420     }
1422     /* make sure update time is always moving forward */
1423     stamp = strtol(value, &eostamp, 10);
1424     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1425     {
1426       pthread_mutex_unlock(&cache_lock);
1427       return send_response(sock, RESP_ERR,
1428                            "Cannot find timestamp in '%s'!\n", value);
1429     }
1430     else if (stamp <= ci->last_update_stamp)
1431     {
1432       pthread_mutex_unlock(&cache_lock);
1433       return send_response(sock, RESP_ERR,
1434                            "illegal attempt to update using time %ld when last"
1435                            " update time is %ld (minimum one second step)\n",
1436                            stamp, ci->last_update_stamp);
1437     }
1438     else
1439       ci->last_update_stamp = stamp;
1441     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1442     {
1443       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1444       continue;
1445     }
1447     values_num++;
1448   }
1450   if (((now - ci->last_flush_time) >= config_write_interval)
1451       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1452       && (ci->values_num > 0))
1453   {
1454     enqueue_cache_item (ci, TAIL);
1455   }
1457   pthread_mutex_unlock (&cache_lock);
1459   if (values_num < 1)
1460     return send_response(sock, RESP_ERR, "No values updated.\n");
1461   else
1462     return send_response(sock, RESP_OK,
1463                          "errors, enqueued %i value(s).\n", values_num);
1465   /* NOTREACHED */
1466   assert(1==0);
1468 } /* }}} int handle_request_update */
1470 /* we came across a "WROTE" entry during journal replay.
1471  * throw away any values that we have accumulated for this file
1472  */
1473 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1475   cache_item_t *ci;
1476   const char *file = buffer;
1478   pthread_mutex_lock(&cache_lock);
1480   ci = g_tree_lookup(cache_tree, file);
1481   if (ci == NULL)
1482   {
1483     pthread_mutex_unlock(&cache_lock);
1484     return (0);
1485   }
1487   if (ci->values)
1488     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1490   wipe_ci_values(ci, now);
1491   remove_from_queue(ci);
1493   pthread_mutex_unlock(&cache_lock);
1494   return (0);
1495 } /* }}} int handle_request_wrote */
1497 /* start "BATCH" processing */
1498 static int batch_start (HANDLER_PROTO) /* {{{ */
1500   int status;
1501   if (sock->batch_start)
1502     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1504   status = send_response(sock, RESP_OK,
1505                          "Go ahead.  End with dot '.' on its own line.\n");
1506   sock->batch_start = time(NULL);
1507   sock->batch_cmd = 0;
1509   return status;
1510 } /* }}} static int batch_start */
1512 /* finish "BATCH" processing and return results to the client */
1513 static int batch_done (HANDLER_PROTO) /* {{{ */
1515   assert(sock->batch_start);
1516   sock->batch_start = 0;
1517   sock->batch_cmd  = 0;
1518   return send_response(sock, RESP_OK, "errors\n");
1519 } /* }}} static int batch_done */
1521 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1523   return -1;
1524 } /* }}} static int handle_request_quit */
1526 static command_t list_of_commands[] = { /* {{{ */
1527   {
1528     "UPDATE",
1529     handle_request_update,
1530     CMD_CONTEXT_ANY,
1531     "UPDATE <filename> <values> [<values> ...]\n"
1532     ,
1533     "Adds the given file to the internal cache if it is not yet known and\n"
1534     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1535     "for details.\n"
1536     "\n"
1537     "Each <values> has the following form:\n"
1538     "  <values> = <time>:<value>[:<value>[...]]\n"
1539     "See the rrdupdate(1) manpage for details.\n"
1540   },
1541   {
1542     "WROTE",
1543     handle_request_wrote,
1544     CMD_CONTEXT_JOURNAL,
1545     NULL,
1546     NULL
1547   },
1548   {
1549     "FLUSH",
1550     handle_request_flush,
1551     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1552     "FLUSH <filename>\n"
1553     ,
1554     "Adds the given filename to the head of the update queue and returns\n"
1555     "after it has been dequeued.\n"
1556   },
1557   {
1558     "FLUSHALL",
1559     handle_request_flushall,
1560     CMD_CONTEXT_CLIENT,
1561     "FLUSHALL\n"
1562     ,
1563     "Triggers writing of all pending updates.  Returns immediately.\n"
1564   },
1565   {
1566     "PENDING",
1567     handle_request_pending,
1568     CMD_CONTEXT_CLIENT,
1569     "PENDING <filename>\n"
1570     ,
1571     "Shows any 'pending' updates for a file, in order.\n"
1572     "The updates shown have not yet been written to the underlying RRD file.\n"
1573   },
1574   {
1575     "FORGET",
1576     handle_request_forget,
1577     CMD_CONTEXT_ANY,
1578     "FORGET <filename>\n"
1579     ,
1580     "Removes the file completely from the cache.\n"
1581     "Any pending updates for the file will be lost.\n"
1582   },
1583   {
1584     "QUEUE",
1585     handle_request_queue,
1586     CMD_CONTEXT_CLIENT,
1587     "QUEUE\n"
1588     ,
1589         "Shows all files in the output queue.\n"
1590     "The output is zero or more lines in the following format:\n"
1591     "(where <num_vals> is the number of values to be written)\n"
1592     "\n"
1593     "<num_vals> <filename>\n"
1594   },
1595   {
1596     "STATS",
1597     handle_request_stats,
1598     CMD_CONTEXT_CLIENT,
1599     "STATS\n"
1600     ,
1601     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1602     "a description of the values.\n"
1603   },
1604   {
1605     "HELP",
1606     handle_request_help,
1607     CMD_CONTEXT_CLIENT,
1608     "HELP [<command>]\n",
1609     NULL, /* special! */
1610   },
1611   {
1612     "BATCH",
1613     batch_start,
1614     CMD_CONTEXT_CLIENT,
1615     "BATCH\n"
1616     ,
1617     "The 'BATCH' command permits the client to initiate a bulk load\n"
1618     "   of commands to rrdcached.\n"
1619     "\n"
1620     "Usage:\n"
1621     "\n"
1622     "    client: BATCH\n"
1623     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1624     "    client: command #1\n"
1625     "    client: command #2\n"
1626     "    client: ... and so on\n"
1627     "    client: .\n"
1628     "    server: 2 errors\n"
1629     "    server: 7 message for command #7\n"
1630     "    server: 9 message for command #9\n"
1631     "\n"
1632     "For more information, consult the rrdcached(1) documentation.\n"
1633   },
1634   {
1635     ".",   /* BATCH terminator */
1636     batch_done,
1637     CMD_CONTEXT_BATCH,
1638     NULL,
1639     NULL
1640   },
1641   {
1642     "QUIT",
1643     handle_request_quit,
1644     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1645     "QUIT\n"
1646     ,
1647     "Disconnect from rrdcached.\n"
1648   }
1649 }; /* }}} command_t list_of_commands[] */
1650 static size_t list_of_commands_len = sizeof (list_of_commands)
1651   / sizeof (list_of_commands[0]);
1653 static command_t *find_command(char *cmd)
1655   size_t i;
1657   for (i = 0; i < list_of_commands_len; i++)
1658     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1659       return (&list_of_commands[i]);
1660   return NULL;
1663 /* We currently use the index in the `list_of_commands' array as a bit position
1664  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1665  * outside these functions so that switching to a more elegant storage method
1666  * is easily possible. */
1667 static ssize_t find_command_index (const char *cmd) /* {{{ */
1669   size_t i;
1671   for (i = 0; i < list_of_commands_len; i++)
1672     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1673       return ((ssize_t) i);
1674   return (-1);
1675 } /* }}} ssize_t find_command_index */
1677 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1678     const char *cmd)
1680   ssize_t i;
1682   if (JOURNAL_REPLAY(sock))
1683     return (1);
1685   if (cmd == NULL)
1686     return (-1);
1688   if ((strcasecmp ("QUIT", cmd) == 0)
1689       || (strcasecmp ("HELP", cmd) == 0))
1690     return (1);
1691   else if (strcmp (".", cmd) == 0)
1692     cmd = "BATCH";
1694   i = find_command_index (cmd);
1695   if (i < 0)
1696     return (-1);
1697   assert (i < 32);
1699   if ((sock->permissions & (1 << i)) != 0)
1700     return (1);
1701   return (0);
1702 } /* }}} int socket_permission_check */
1704 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1705     const char *cmd)
1707   ssize_t i;
1709   i = find_command_index (cmd);
1710   if (i < 0)
1711     return (-1);
1712   assert (i < 32);
1714   sock->permissions |= (1 << i);
1715   return (0);
1716 } /* }}} int socket_permission_add */
1718 /* check whether commands are received in the expected context */
1719 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1721   if (JOURNAL_REPLAY(sock))
1722     return (cmd->context & CMD_CONTEXT_JOURNAL);
1723   else if (sock->batch_start)
1724     return (cmd->context & CMD_CONTEXT_BATCH);
1725   else
1726     return (cmd->context & CMD_CONTEXT_CLIENT);
1728   /* NOTREACHED */
1729   assert(1==0);
1732 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1734   int status;
1735   char *cmd_str;
1736   char *resp_txt;
1737   command_t *help = NULL;
1739   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1740   if (status == 0)
1741     help = find_command(cmd_str);
1743   if (help && (help->syntax || help->help))
1744   {
1745     char tmp[CMD_MAX];
1747     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1748     resp_txt = tmp;
1750     if (help->syntax)
1751       add_response_info(sock, "Usage: %s\n", help->syntax);
1753     if (help->help)
1754       add_response_info(sock, "%s\n", help->help);
1755   }
1756   else
1757   {
1758     size_t i;
1760     resp_txt = "Command overview\n";
1762     for (i = 0; i < list_of_commands_len; i++)
1763     {
1764       if (list_of_commands[i].syntax == NULL)
1765         continue;
1766       add_response_info (sock, "%s", list_of_commands[i].syntax);
1767     }
1768   }
1770   return send_response(sock, RESP_OK, resp_txt);
1771 } /* }}} int handle_request_help */
1773 static int handle_request (DISPATCH_PROTO) /* {{{ */
1775   char *buffer_ptr = buffer;
1776   char *cmd_str = NULL;
1777   command_t *cmd = NULL;
1778   int status;
1780   assert (buffer[buffer_size - 1] == '\0');
1782   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1783   if (status != 0)
1784   {
1785     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1786     return (-1);
1787   }
1789   if (sock != NULL && sock->batch_start)
1790     sock->batch_cmd++;
1792   cmd = find_command(cmd_str);
1793   if (!cmd)
1794     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1796   if (!socket_permission_check (sock, cmd->cmd))
1797     return send_response(sock, RESP_ERR, "Permission denied.\n");
1799   if (!command_check_context(sock, cmd))
1800     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1802   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1803 } /* }}} int handle_request */
1805 static void journal_set_free (journal_set *js) /* {{{ */
1807   if (js == NULL)
1808     return;
1810   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1812   free(js);
1813 } /* }}} journal_set_free */
1815 static void journal_set_remove (journal_set *js) /* {{{ */
1817   if (js == NULL)
1818     return;
1820   for (uint i=0; i < js->files_num; i++)
1821   {
1822     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1823     unlink(js->files[i]);
1824   }
1825 } /* }}} journal_set_remove */
1827 /* close current journal file handle.
1828  * MUST hold journal_lock before calling */
1829 static void journal_close(void) /* {{{ */
1831   if (journal_fh != NULL)
1832   {
1833     if (fclose(journal_fh) != 0)
1834       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1835   }
1837   journal_fh = NULL;
1838   journal_size = 0;
1839 } /* }}} journal_close */
1841 /* MUST hold journal_lock before calling */
1842 static void journal_new_file(void) /* {{{ */
1844   struct timeval now;
1845   int  new_fd;
1846   char new_file[PATH_MAX + 1];
1848   assert(journal_dir != NULL);
1849   assert(journal_cur != NULL);
1851   journal_close();
1853   gettimeofday(&now, NULL);
1854   /* this format assures that the files sort in strcmp() order */
1855   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1856            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1858   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1859                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1860   if (new_fd < 0)
1861     goto error;
1863   journal_fh = fdopen(new_fd, "a");
1864   if (journal_fh == NULL)
1865     goto error;
1867   journal_size = ftell(journal_fh);
1868   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1870   /* record the file in the journal set */
1871   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1873   return;
1875 error:
1876   RRDD_LOG(LOG_CRIT,
1877            "JOURNALING DISABLED: Error while trying to create %s : %s",
1878            new_file, rrd_strerror(errno));
1879   RRDD_LOG(LOG_CRIT,
1880            "JOURNALING DISABLED: All values will be flushed at shutdown");
1882   close(new_fd);
1883   config_flush_at_shutdown = 1;
1885 } /* }}} journal_new_file */
1887 /* MUST NOT hold journal_lock before calling this */
1888 static void journal_rotate(void) /* {{{ */
1890   journal_set *old_js = NULL;
1892   if (journal_dir == NULL)
1893     return;
1895   RRDD_LOG(LOG_DEBUG, "rotating journals");
1897   pthread_mutex_lock(&stats_lock);
1898   ++stats_journal_rotate;
1899   pthread_mutex_unlock(&stats_lock);
1901   pthread_mutex_lock(&journal_lock);
1903   journal_close();
1905   /* rotate the journal sets */
1906   old_js = journal_old;
1907   journal_old = journal_cur;
1908   journal_cur = calloc(1, sizeof(journal_set));
1910   if (journal_cur != NULL)
1911     journal_new_file();
1912   else
1913     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1915   pthread_mutex_unlock(&journal_lock);
1917   journal_set_remove(old_js);
1918   journal_set_free  (old_js);
1920 } /* }}} static void journal_rotate */
1922 /* MUST hold journal_lock when calling */
1923 static void journal_done(void) /* {{{ */
1925   if (journal_cur == NULL)
1926     return;
1928   journal_close();
1930   if (config_flush_at_shutdown)
1931   {
1932     RRDD_LOG(LOG_INFO, "removing journals");
1933     journal_set_remove(journal_old);
1934     journal_set_remove(journal_cur);
1935   }
1936   else
1937   {
1938     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1939              "journals will be used at next startup");
1940   }
1942   journal_set_free(journal_cur);
1943   journal_set_free(journal_old);
1944   free(journal_dir);
1946 } /* }}} static void journal_done */
1948 static int journal_write(char *cmd, char *args) /* {{{ */
1950   int chars;
1952   if (journal_fh == NULL)
1953     return 0;
1955   pthread_mutex_lock(&journal_lock);
1956   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1957   journal_size += chars;
1959   if (journal_size > JOURNAL_MAX)
1960     journal_new_file();
1962   pthread_mutex_unlock(&journal_lock);
1964   if (chars > 0)
1965   {
1966     pthread_mutex_lock(&stats_lock);
1967     stats_journal_bytes += chars;
1968     pthread_mutex_unlock(&stats_lock);
1969   }
1971   return chars;
1972 } /* }}} static int journal_write */
1974 static int journal_replay (const char *file) /* {{{ */
1976   FILE *fh;
1977   int entry_cnt = 0;
1978   int fail_cnt = 0;
1979   uint64_t line = 0;
1980   char entry[CMD_MAX];
1981   time_t now;
1983   if (file == NULL) return 0;
1985   {
1986     char *reason = "unknown error";
1987     int status = 0;
1988     struct stat statbuf;
1990     memset(&statbuf, 0, sizeof(statbuf));
1991     if (stat(file, &statbuf) != 0)
1992     {
1993       reason = "stat error";
1994       status = errno;
1995     }
1996     else if (!S_ISREG(statbuf.st_mode))
1997     {
1998       reason = "not a regular file";
1999       status = EPERM;
2000     }
2001     if (statbuf.st_uid != daemon_uid)
2002     {
2003       reason = "not owned by daemon user";
2004       status = EACCES;
2005     }
2006     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2007     {
2008       reason = "must not be user/group writable";
2009       status = EACCES;
2010     }
2012     if (status != 0)
2013     {
2014       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2015                file, rrd_strerror(status), reason);
2016       return 0;
2017     }
2018   }
2020   fh = fopen(file, "r");
2021   if (fh == NULL)
2022   {
2023     if (errno != ENOENT)
2024       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2025                file, rrd_strerror(errno));
2026     return 0;
2027   }
2028   else
2029     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2031   now = time(NULL);
2033   while(!feof(fh))
2034   {
2035     size_t entry_len;
2037     ++line;
2038     if (fgets(entry, sizeof(entry), fh) == NULL)
2039       break;
2040     entry_len = strlen(entry);
2042     /* check \n termination in case journal writing crashed mid-line */
2043     if (entry_len == 0)
2044       continue;
2045     else if (entry[entry_len - 1] != '\n')
2046     {
2047       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2048       ++fail_cnt;
2049       continue;
2050     }
2052     entry[entry_len - 1] = '\0';
2054     if (handle_request(NULL, now, entry, entry_len) == 0)
2055       ++entry_cnt;
2056     else
2057       ++fail_cnt;
2058   }
2060   fclose(fh);
2062   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2063            entry_cnt, fail_cnt);
2065   return entry_cnt > 0 ? 1 : 0;
2066 } /* }}} static int journal_replay */
2068 static int journal_sort(const void *v1, const void *v2)
2070   char **jn1 = (char **) v1;
2071   char **jn2 = (char **) v2;
2073   return strcmp(*jn1,*jn2);
2076 static void journal_init(void) /* {{{ */
2078   int had_journal = 0;
2079   DIR *dir;
2080   struct dirent *dent;
2081   char path[PATH_MAX+1];
2083   if (journal_dir == NULL) return;
2085   pthread_mutex_lock(&journal_lock);
2087   journal_cur = calloc(1, sizeof(journal_set));
2088   if (journal_cur == NULL)
2089   {
2090     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2091     return;
2092   }
2094   RRDD_LOG(LOG_INFO, "checking for journal files");
2096   /* Handle old journal files during transition.  This gives them the
2097    * correct sort order.  TODO: remove after first release
2098    */
2099   {
2100     char old_path[PATH_MAX+1];
2101     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2102     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2103     rename(old_path, path);
2105     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2106     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2107     rename(old_path, path);
2108   }
2110   dir = opendir(journal_dir);
2111   while ((dent = readdir(dir)) != NULL)
2112   {
2113     /* looks like a journal file? */
2114     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2115       continue;
2117     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2119     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2120     {
2121       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2122                dent->d_name);
2123       break;
2124     }
2125   }
2126   closedir(dir);
2128   qsort(journal_cur->files, journal_cur->files_num,
2129         sizeof(journal_cur->files[0]), journal_sort);
2131   for (uint i=0; i < journal_cur->files_num; i++)
2132     had_journal += journal_replay(journal_cur->files[i]);
2134   journal_new_file();
2136   /* it must have been a crash.  start a flush */
2137   if (had_journal && config_flush_at_shutdown)
2138     flush_old_values(-1);
2140   pthread_mutex_unlock(&journal_lock);
2142   RRDD_LOG(LOG_INFO, "journal processing complete");
2144 } /* }}} static void journal_init */
2146 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2148   assert(sock != NULL);
2150   free(sock->rbuf);  sock->rbuf = NULL;
2151   free(sock->wbuf);  sock->wbuf = NULL;
2152   free(sock);
2153 } /* }}} void free_listen_socket */
2155 static void close_connection(listen_socket_t *sock) /* {{{ */
2157   if (sock->fd >= 0)
2158   {
2159     close(sock->fd);
2160     sock->fd = -1;
2161   }
2163   free_listen_socket(sock);
2165 } /* }}} void close_connection */
2167 static void *connection_thread_main (void *args) /* {{{ */
2169   listen_socket_t *sock;
2170   int fd;
2172   sock = (listen_socket_t *) args;
2173   fd = sock->fd;
2175   /* init read buffers */
2176   sock->next_read = sock->next_cmd = 0;
2177   sock->rbuf = malloc(RBUF_SIZE);
2178   if (sock->rbuf == NULL)
2179   {
2180     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2181     close_connection(sock);
2182     return NULL;
2183   }
2185   pthread_mutex_lock (&connection_threads_lock);
2186   connection_threads_num++;
2187   pthread_mutex_unlock (&connection_threads_lock);
2189   while (state == RUNNING)
2190   {
2191     char *cmd;
2192     ssize_t cmd_len;
2193     ssize_t rbytes;
2194     time_t now;
2196     struct pollfd pollfd;
2197     int status;
2199     pollfd.fd = fd;
2200     pollfd.events = POLLIN | POLLPRI;
2201     pollfd.revents = 0;
2203     status = poll (&pollfd, 1, /* timeout = */ 500);
2204     if (state != RUNNING)
2205       break;
2206     else if (status == 0) /* timeout */
2207       continue;
2208     else if (status < 0) /* error */
2209     {
2210       status = errno;
2211       if (status != EINTR)
2212         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2213       continue;
2214     }
2216     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2217       break;
2218     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2219     {
2220       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2221           "poll(2) returned something unexpected: %#04hx",
2222           pollfd.revents);
2223       break;
2224     }
2226     rbytes = read(fd, sock->rbuf + sock->next_read,
2227                   RBUF_SIZE - sock->next_read);
2228     if (rbytes < 0)
2229     {
2230       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2231       break;
2232     }
2233     else if (rbytes == 0)
2234       break; /* eof */
2236     sock->next_read += rbytes;
2238     if (sock->batch_start)
2239       now = sock->batch_start;
2240     else
2241       now = time(NULL);
2243     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2244     {
2245       status = handle_request (sock, now, cmd, cmd_len+1);
2246       if (status != 0)
2247         goto out_close;
2248     }
2249   }
2251 out_close:
2252   close_connection(sock);
2254   /* Remove this thread from the connection threads list */
2255   pthread_mutex_lock (&connection_threads_lock);
2256   connection_threads_num--;
2257   if (connection_threads_num <= 0)
2258     pthread_cond_broadcast(&connection_threads_done);
2259   pthread_mutex_unlock (&connection_threads_lock);
2261   return (NULL);
2262 } /* }}} void *connection_thread_main */
2264 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2266   int fd;
2267   struct sockaddr_un sa;
2268   listen_socket_t *temp;
2269   int status;
2270   const char *path;
2271   char *path_copy, *dir;
2273   path = sock->addr;
2274   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2275     path += strlen("unix:");
2277   /* dirname may modify its argument */
2278   path_copy = strdup(path);
2279   if (path_copy == NULL)
2280   {
2281     fprintf(stderr, "rrdcached: strdup(): %s\n",
2282         rrd_strerror(errno));
2283     return (-1);
2284   }
2286   dir = dirname(path_copy);
2287   if (rrd_mkdir_p(dir, 0777) != 0)
2288   {
2289     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2290         dir, rrd_strerror(errno));
2291     return (-1);
2292   }
2294   free(path_copy);
2296   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2297       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2298   if (temp == NULL)
2299   {
2300     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2301     return (-1);
2302   }
2303   listen_fds = temp;
2304   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2306   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2307   if (fd < 0)
2308   {
2309     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2310              rrd_strerror(errno));
2311     return (-1);
2312   }
2314   memset (&sa, 0, sizeof (sa));
2315   sa.sun_family = AF_UNIX;
2316   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2318   /* if we've gotten this far, we own the pid file.  any daemon started
2319    * with the same args must not be alive.  therefore, ensure that we can
2320    * create the socket...
2321    */
2322   unlink(path);
2324   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2325   if (status != 0)
2326   {
2327     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2328              path, rrd_strerror(errno));
2329     close (fd);
2330     return (-1);
2331   }
2333   /* tweak the sockets group ownership */
2334   if (set_socket_group)
2335   {
2336     if ( (chown(path, getuid(), socket_group) != 0) ||
2337          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2338     {
2339       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2340     }
2341   }
2343   status = listen (fd, /* backlog = */ 10);
2344   if (status != 0)
2345   {
2346     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2347              path, rrd_strerror(errno));
2348     close (fd);
2349     unlink (path);
2350     return (-1);
2351   }
2353   listen_fds[listen_fds_num].fd = fd;
2354   listen_fds[listen_fds_num].family = PF_UNIX;
2355   strncpy(listen_fds[listen_fds_num].addr, path,
2356           sizeof (listen_fds[listen_fds_num].addr) - 1);
2357   listen_fds_num++;
2359   return (0);
2360 } /* }}} int open_listen_socket_unix */
2362 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2364   struct addrinfo ai_hints;
2365   struct addrinfo *ai_res;
2366   struct addrinfo *ai_ptr;
2367   char addr_copy[NI_MAXHOST];
2368   char *addr;
2369   char *port;
2370   int status;
2372   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2373   addr_copy[sizeof (addr_copy) - 1] = 0;
2374   addr = addr_copy;
2376   memset (&ai_hints, 0, sizeof (ai_hints));
2377   ai_hints.ai_flags = 0;
2378 #ifdef AI_ADDRCONFIG
2379   ai_hints.ai_flags |= AI_ADDRCONFIG;
2380 #endif
2381   ai_hints.ai_family = AF_UNSPEC;
2382   ai_hints.ai_socktype = SOCK_STREAM;
2384   port = NULL;
2385   if (*addr == '[') /* IPv6+port format */
2386   {
2387     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2388     addr++;
2390     port = strchr (addr, ']');
2391     if (port == NULL)
2392     {
2393       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2394       return (-1);
2395     }
2396     *port = 0;
2397     port++;
2399     if (*port == ':')
2400       port++;
2401     else if (*port == 0)
2402       port = NULL;
2403     else
2404     {
2405       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2406       return (-1);
2407     }
2408   } /* if (*addr == '[') */
2409   else
2410   {
2411     port = rindex(addr, ':');
2412     if (port != NULL)
2413     {
2414       *port = 0;
2415       port++;
2416     }
2417   }
2418   ai_res = NULL;
2419   status = getaddrinfo (addr,
2420                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2421                         &ai_hints, &ai_res);
2422   if (status != 0)
2423   {
2424     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2425              addr, gai_strerror (status));
2426     return (-1);
2427   }
2429   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2430   {
2431     int fd;
2432     listen_socket_t *temp;
2433     int one = 1;
2435     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2436         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2437     if (temp == NULL)
2438     {
2439       fprintf (stderr,
2440                "rrdcached: open_listen_socket_network: realloc failed.\n");
2441       continue;
2442     }
2443     listen_fds = temp;
2444     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2446     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2447     if (fd < 0)
2448     {
2449       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2450                rrd_strerror(errno));
2451       continue;
2452     }
2454     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2456     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2457     if (status != 0)
2458     {
2459       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2460                sock->addr, rrd_strerror(errno));
2461       close (fd);
2462       continue;
2463     }
2465     status = listen (fd, /* backlog = */ 10);
2466     if (status != 0)
2467     {
2468       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2469                sock->addr, rrd_strerror(errno));
2470       close (fd);
2471       freeaddrinfo(ai_res);
2472       return (-1);
2473     }
2475     listen_fds[listen_fds_num].fd = fd;
2476     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2477     listen_fds_num++;
2478   } /* for (ai_ptr) */
2480   freeaddrinfo(ai_res);
2481   return (0);
2482 } /* }}} static int open_listen_socket_network */
2484 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2486   assert(sock != NULL);
2487   assert(sock->addr != NULL);
2489   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2490       || sock->addr[0] == '/')
2491     return (open_listen_socket_unix(sock));
2492   else
2493     return (open_listen_socket_network(sock));
2494 } /* }}} int open_listen_socket */
2496 static int close_listen_sockets (void) /* {{{ */
2498   size_t i;
2500   for (i = 0; i < listen_fds_num; i++)
2501   {
2502     close (listen_fds[i].fd);
2504     if (listen_fds[i].family == PF_UNIX)
2505       unlink(listen_fds[i].addr);
2506   }
2508   free (listen_fds);
2509   listen_fds = NULL;
2510   listen_fds_num = 0;
2512   return (0);
2513 } /* }}} int close_listen_sockets */
2515 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2517   struct pollfd *pollfds;
2518   int pollfds_num;
2519   int status;
2520   int i;
2522   if (listen_fds_num < 1)
2523   {
2524     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2525     return (NULL);
2526   }
2528   pollfds_num = listen_fds_num;
2529   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2530   if (pollfds == NULL)
2531   {
2532     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2533     return (NULL);
2534   }
2535   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2537   RRDD_LOG(LOG_INFO, "listening for connections");
2539   while (state == RUNNING)
2540   {
2541     for (i = 0; i < pollfds_num; i++)
2542     {
2543       pollfds[i].fd = listen_fds[i].fd;
2544       pollfds[i].events = POLLIN | POLLPRI;
2545       pollfds[i].revents = 0;
2546     }
2548     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2549     if (state != RUNNING)
2550       break;
2551     else if (status == 0) /* timeout */
2552       continue;
2553     else if (status < 0) /* error */
2554     {
2555       status = errno;
2556       if (status != EINTR)
2557       {
2558         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2559       }
2560       continue;
2561     }
2563     for (i = 0; i < pollfds_num; i++)
2564     {
2565       listen_socket_t *client_sock;
2566       struct sockaddr_storage client_sa;
2567       socklen_t client_sa_size;
2568       pthread_t tid;
2569       pthread_attr_t attr;
2571       if (pollfds[i].revents == 0)
2572         continue;
2574       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2575       {
2576         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2577             "poll(2) returned something unexpected for listen FD #%i.",
2578             pollfds[i].fd);
2579         continue;
2580       }
2582       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2583       if (client_sock == NULL)
2584       {
2585         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2586         continue;
2587       }
2588       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2590       client_sa_size = sizeof (client_sa);
2591       client_sock->fd = accept (pollfds[i].fd,
2592           (struct sockaddr *) &client_sa, &client_sa_size);
2593       if (client_sock->fd < 0)
2594       {
2595         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2596         free(client_sock);
2597         continue;
2598       }
2600       pthread_attr_init (&attr);
2601       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2603       status = pthread_create (&tid, &attr, connection_thread_main,
2604                                client_sock);
2605       if (status != 0)
2606       {
2607         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2608         close_connection(client_sock);
2609         continue;
2610       }
2611     } /* for (pollfds_num) */
2612   } /* while (state == RUNNING) */
2614   RRDD_LOG(LOG_INFO, "starting shutdown");
2616   close_listen_sockets ();
2618   pthread_mutex_lock (&connection_threads_lock);
2619   while (connection_threads_num > 0)
2620     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2621   pthread_mutex_unlock (&connection_threads_lock);
2623   free(pollfds);
2625   return (NULL);
2626 } /* }}} void *listen_thread_main */
2628 static int daemonize (void) /* {{{ */
2630   int pid_fd;
2631   char *base_dir;
2633   daemon_uid = geteuid();
2635   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2636   if (pid_fd < 0)
2637     pid_fd = check_pidfile();
2638   if (pid_fd < 0)
2639     return pid_fd;
2641   /* open all the listen sockets */
2642   if (config_listen_address_list_len > 0)
2643   {
2644     for (size_t i = 0; i < config_listen_address_list_len; i++)
2645       open_listen_socket (config_listen_address_list[i]);
2647     rrd_free_ptrs((void ***) &config_listen_address_list,
2648                   &config_listen_address_list_len);
2649   }
2650   else
2651   {
2652     listen_socket_t sock;
2653     memset(&sock, 0, sizeof(sock));
2654     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2655     open_listen_socket (&sock);
2656   }
2658   if (listen_fds_num < 1)
2659   {
2660     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2661     goto error;
2662   }
2664   if (!stay_foreground)
2665   {
2666     pid_t child;
2668     child = fork ();
2669     if (child < 0)
2670     {
2671       fprintf (stderr, "daemonize: fork(2) failed.\n");
2672       goto error;
2673     }
2674     else if (child > 0)
2675       exit(0);
2677     /* Become session leader */
2678     setsid ();
2680     /* Open the first three file descriptors to /dev/null */
2681     close (2);
2682     close (1);
2683     close (0);
2685     open ("/dev/null", O_RDWR);
2686     if (dup(0) == -1 || dup(0) == -1){
2687         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2688     }
2689   } /* if (!stay_foreground) */
2691   /* Change into the /tmp directory. */
2692   base_dir = (config_base_dir != NULL)
2693     ? config_base_dir
2694     : "/tmp";
2696   if (chdir (base_dir) != 0)
2697   {
2698     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2699     goto error;
2700   }
2702   install_signal_handlers();
2704   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2705   RRDD_LOG(LOG_INFO, "starting up");
2707   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2708                                 (GDestroyNotify) free_cache_item);
2709   if (cache_tree == NULL)
2710   {
2711     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2712     goto error;
2713   }
2715   return write_pidfile (pid_fd);
2717 error:
2718   remove_pidfile();
2719   return -1;
2720 } /* }}} int daemonize */
2722 static int cleanup (void) /* {{{ */
2724   pthread_cond_broadcast (&flush_cond);
2725   pthread_join (flush_thread, NULL);
2727   pthread_cond_broadcast (&queue_cond);
2728   for (int i = 0; i < config_queue_threads; i++)
2729     pthread_join (queue_threads[i], NULL);
2731   if (config_flush_at_shutdown)
2732   {
2733     assert(cache_queue_head == NULL);
2734     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2735   }
2737   free(queue_threads);
2738   free(config_base_dir);
2740   pthread_mutex_lock(&cache_lock);
2741   g_tree_destroy(cache_tree);
2743   pthread_mutex_lock(&journal_lock);
2744   journal_done();
2746   RRDD_LOG(LOG_INFO, "goodbye");
2747   closelog ();
2749   remove_pidfile ();
2750   free(config_pid_file);
2752   return (0);
2753 } /* }}} int cleanup */
2755 static int read_options (int argc, char **argv) /* {{{ */
2757   int option;
2758   int status = 0;
2760   char **permissions = NULL;
2761   size_t permissions_len = 0;
2763   while ((option = getopt(argc, argv, "gl:s:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2764   {
2765     switch (option)
2766     {
2767       case 'g':
2768         stay_foreground=1;
2769         break;
2771       case 'l':
2772       {
2773         listen_socket_t *new;
2775         new = malloc(sizeof(listen_socket_t));
2776         if (new == NULL)
2777         {
2778           fprintf(stderr, "read_options: malloc failed.\n");
2779           return(2);
2780         }
2781         memset(new, 0, sizeof(listen_socket_t));
2783         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2785         /* Add permissions to the socket {{{ */
2786         if (permissions_len != 0)
2787         {
2788           size_t i;
2789           for (i = 0; i < permissions_len; i++)
2790           {
2791             status = socket_permission_add (new, permissions[i]);
2792             if (status != 0)
2793             {
2794               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2795                   "socket failed. Most likely, this permission doesn't "
2796                   "exist. Check your command line.\n", permissions[i]);
2797               status = 4;
2798             }
2799           }
2800         }
2801         else /* if (permissions_len == 0) */
2802         {
2803           /* Add permission for ALL commands to the socket. */
2804           size_t i;
2805           for (i = 0; i < list_of_commands_len; i++)
2806           {
2807             status = socket_permission_add (new, list_of_commands[i].cmd);
2808             if (status != 0)
2809             {
2810               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2811                   "socket failed. This should never happen, ever! Sorry.\n",
2812                   permissions[i]);
2813               status = 4;
2814             }
2815           }
2816         }
2817         /* }}} Done adding permissions. */
2819         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2820                          &config_listen_address_list_len, new))
2821         {
2822           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2823           return (2);
2824         }
2825       }
2826       break;
2828       /* set socket group permissions */
2829       case 's':
2830       {
2831         gid_t group_gid;
2832         struct group *grp;
2834         group_gid = strtoul(optarg, NULL, 10);
2835         if (errno != EINVAL && group_gid>0)
2836         {
2837           /* we were passed a number */
2838           grp = getgrgid(group_gid);
2839         }
2840         else
2841         {
2842           grp = getgrnam(optarg);
2843         }
2845         if (grp)
2846         {
2847           socket_group = grp->gr_gid;
2848           set_socket_group = TRUE;
2849         }
2850         else
2851         {
2852           /* no idea what the user wanted... */
2853           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2854           return (5);
2855         }
2856       }
2857       break;
2859       case 'P':
2860       {
2861         char *optcopy;
2862         char *saveptr;
2863         char *dummy;
2864         char *ptr;
2866         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2868         optcopy = strdup (optarg);
2869         dummy = optcopy;
2870         saveptr = NULL;
2871         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2872         {
2873           dummy = NULL;
2874           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2875         }
2877         free (optcopy);
2878       }
2879       break;
2881       case 'f':
2882       {
2883         int temp;
2885         temp = atoi (optarg);
2886         if (temp > 0)
2887           config_flush_interval = temp;
2888         else
2889         {
2890           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2891           status = 3;
2892         }
2893       }
2894       break;
2896       case 'w':
2897       {
2898         int temp;
2900         temp = atoi (optarg);
2901         if (temp > 0)
2902           config_write_interval = temp;
2903         else
2904         {
2905           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2906           status = 2;
2907         }
2908       }
2909       break;
2911       case 'z':
2912       {
2913         int temp;
2915         temp = atoi(optarg);
2916         if (temp > 0)
2917           config_write_jitter = temp;
2918         else
2919         {
2920           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2921           status = 2;
2922         }
2924         break;
2925       }
2927       case 't':
2928       {
2929         int threads;
2930         threads = atoi(optarg);
2931         if (threads >= 1)
2932           config_queue_threads = threads;
2933         else
2934         {
2935           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2936           return 1;
2937         }
2938       }
2939       break;
2941       case 'B':
2942         config_write_base_only = 1;
2943         break;
2945       case 'b':
2946       {
2947         size_t len;
2948         char base_realpath[PATH_MAX];
2950         if (config_base_dir != NULL)
2951           free (config_base_dir);
2952         config_base_dir = strdup (optarg);
2953         if (config_base_dir == NULL)
2954         {
2955           fprintf (stderr, "read_options: strdup failed.\n");
2956           return (3);
2957         }
2959         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2960         {
2961           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2962               config_base_dir, rrd_strerror (errno));
2963           return (3);
2964         }
2966         /* make sure that the base directory is not resolved via
2967          * symbolic links.  this makes some performance-enhancing
2968          * assumptions possible (we don't have to resolve paths
2969          * that start with a "/")
2970          */
2971         if (realpath(config_base_dir, base_realpath) == NULL)
2972         {
2973           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2974               "%s\n", config_base_dir, rrd_strerror(errno));
2975           return 5;
2976         }
2978         len = strlen (config_base_dir);
2979         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2980         {
2981           config_base_dir[len - 1] = 0;
2982           len--;
2983         }
2985         if (len < 1)
2986         {
2987           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2988           return (4);
2989         }
2991         _config_base_dir_len = len;
2993         len = strlen (base_realpath);
2994         while ((len > 0) && (base_realpath[len - 1] == '/'))
2995         {
2996           base_realpath[len - 1] = '\0';
2997           len--;
2998         }
3000         if (strncmp(config_base_dir,
3001                          base_realpath, sizeof(base_realpath)) != 0)
3002         {
3003           fprintf(stderr,
3004                   "Base directory (-b) resolved via file system links!\n"
3005                   "Please consult rrdcached '-b' documentation!\n"
3006                   "Consider specifying the real directory (%s)\n",
3007                   base_realpath);
3008           return 5;
3009         }
3010       }
3011       break;
3013       case 'p':
3014       {
3015         if (config_pid_file != NULL)
3016           free (config_pid_file);
3017         config_pid_file = strdup (optarg);
3018         if (config_pid_file == NULL)
3019         {
3020           fprintf (stderr, "read_options: strdup failed.\n");
3021           return (3);
3022         }
3023       }
3024       break;
3026       case 'F':
3027         config_flush_at_shutdown = 1;
3028         break;
3030       case 'j':
3031       {
3032         const char *dir = journal_dir = strdup(optarg);
3034         status = rrd_mkdir_p(dir, 0777);
3035         if (status != 0)
3036         {
3037           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3038               dir, rrd_strerror(errno));
3039           return 6;
3040         }
3042         if (access(dir, R_OK|W_OK|X_OK) != 0)
3043         {
3044           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3045                   errno ? rrd_strerror(errno) : "");
3046           return 6;
3047         }
3048       }
3049       break;
3051       case 'h':
3052       case '?':
3053         printf ("RRDCacheD %s\n"
3054             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3055             "\n"
3056             "Usage: rrdcached [options]\n"
3057             "\n"
3058             "Valid options are:\n"
3059             "  -l <address>  Socket address to listen to.\n"
3060             "  -P <perms>    Sets the permissions to assign to all following "
3061                             "sockets\n"
3062             "  -w <seconds>  Interval in which to write data.\n"
3063             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3064             "  -t <threads>  Number of write threads.\n"
3065             "  -f <seconds>  Interval in which to flush dead data.\n"
3066             "  -p <file>     Location of the PID-file.\n"
3067             "  -b <dir>      Base directory to change to.\n"
3068             "  -B            Restrict file access to paths within -b <dir>\n"
3069             "  -g            Do not fork and run in the foreground.\n"
3070             "  -j <dir>      Directory in which to create the journal files.\n"
3071             "  -F            Always flush all updates at shutdown\n"
3072             "  -s <id|name>  Make socket g+rw to named group\n"
3073             "\n"
3074             "For more information and a detailed description of all options "
3075             "please refer\n"
3076             "to the rrdcached(1) manual page.\n",
3077             VERSION);
3078         status = -1;
3079         break;
3080     } /* switch (option) */
3081   } /* while (getopt) */
3083   /* advise the user when values are not sane */
3084   if (config_flush_interval < 2 * config_write_interval)
3085     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3086             " 2x write interval (-w) !\n");
3087   if (config_write_jitter > config_write_interval)
3088     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3089             " write interval (-w) !\n");
3091   if (config_write_base_only && config_base_dir == NULL)
3092     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3093             "  Consult the rrdcached documentation\n");
3095   if (journal_dir == NULL)
3096     config_flush_at_shutdown = 1;
3098   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3100   return (status);
3101 } /* }}} int read_options */
3103 int main (int argc, char **argv)
3105   int status;
3107   status = read_options (argc, argv);
3108   if (status != 0)
3109   {
3110     if (status < 0)
3111       status = 0;
3112     return (status);
3113   }
3115   status = daemonize ();
3116   if (status != 0)
3117   {
3118     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3119     return (1);
3120   }
3122   journal_init();
3124   /* start the queue threads */
3125   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3126   if (queue_threads == NULL)
3127   {
3128     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3129     cleanup();
3130     return (1);
3131   }
3132   for (int i = 0; i < config_queue_threads; i++)
3133   {
3134     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3135     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3136     if (status != 0)
3137     {
3138       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3139       cleanup();
3140       return (1);
3141     }
3142   }
3144   /* start the flush thread */
3145   memset(&flush_thread, 0, sizeof(flush_thread));
3146   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3147   if (status != 0)
3148   {
3149     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3150     cleanup();
3151     return (1);
3152   }
3154   listen_thread_main (NULL);
3155   cleanup ();
3157   return (0);
3158 } /* int main */
3160 /*
3161  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3162  */