Code

rrdcached: Let the -s, -m and -P options affect the default socket as well -- Sebasti...
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) { \
118       fprintf(stderr, __VA_ARGS__); \
119       fprintf(stderr, "\n"); } \
120     syslog ((severity), __VA_ARGS__); \
121   } while (0)
123 /*
124  * Types
125  */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
130   int fd;
131   char addr[PATH_MAX + 1];
132   int family;
134   /* state for BATCH processing */
135   time_t batch_start;
136   int batch_cmd;
138   /* buffered IO */
139   char *rbuf;
140   off_t next_cmd;
141   off_t next_read;
143   char *wbuf;
144   ssize_t wbuf_len;
146   uint32_t permissions;
148   gid_t  socket_group;
149   mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
157                         time_t UNUSED(now),\
158                         char  UNUSED(*buffer),\
159                         size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
162                         DISPATCH_PROTO
164 struct command_s {
165   char   *cmd;
166   int (*handler)(HANDLER_PROTO);
168   char  context;                /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT      (1<<0)
170 #define CMD_CONTEXT_BATCH       (1<<1)
171 #define CMD_CONTEXT_JOURNAL     (1<<2)
172 #define CMD_CONTEXT_ANY         (0x7f)
174   char *syntax;
175   char *help;
176 };
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
182   char *file;
183   char **values;
184   size_t values_num;
185   time_t last_flush_time;
186   time_t last_update_stamp;
187 #define CI_FLAGS_IN_TREE  (1<<0)
188 #define CI_FLAGS_IN_QUEUE (1<<1)
189   int flags;
190   pthread_cond_t  flushed;
191   cache_item_t *prev;
192   cache_item_t *next;
193 };
195 struct callback_flush_data_s
197   time_t now;
198   time_t abs_timeout;
199   char **keys;
200   size_t keys_num;
201 };
202 typedef struct callback_flush_data_s callback_flush_data_t;
204 enum queue_side_e
206   HEAD,
207   TAIL
208 };
209 typedef enum queue_side_e queue_side_t;
211 /* describe a set of journal files */
212 typedef struct {
213   char **files;
214   size_t files_num;
215 } journal_set;
217 /* max length of socket command or response */
218 #define CMD_MAX 4096
219 #define RBUF_SIZE (CMD_MAX*2)
221 /*
222  * Variables
223  */
224 static int stay_foreground = 0;
225 static uid_t daemon_uid;
227 static listen_socket_t *listen_fds = NULL;
228 static size_t listen_fds_num = 0;
230 static listen_socket_t default_socket;
232 enum {
233   RUNNING,              /* normal operation */
234   FLUSHING,             /* flushing remaining values */
235   SHUTDOWN              /* shutting down */
236 } state = RUNNING;
238 static pthread_t *queue_threads;
239 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
240 static int config_queue_threads = 4;
242 static pthread_t flush_thread;
243 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
245 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
246 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
247 static int connection_threads_num = 0;
249 /* Cache stuff */
250 static GTree          *cache_tree = NULL;
251 static cache_item_t   *cache_queue_head = NULL;
252 static cache_item_t   *cache_queue_tail = NULL;
253 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
255 static int config_write_interval = 300;
256 static int config_write_jitter   = 0;
257 static int config_flush_interval = 3600;
258 static int config_flush_at_shutdown = 0;
259 static char *config_pid_file = NULL;
260 static char *config_base_dir = NULL;
261 static size_t _config_base_dir_len = 0;
262 static int config_write_base_only = 0;
264 static listen_socket_t **config_listen_address_list = NULL;
265 static size_t config_listen_address_list_len = 0;
267 static uint64_t stats_queue_length = 0;
268 static uint64_t stats_updates_received = 0;
269 static uint64_t stats_flush_received = 0;
270 static uint64_t stats_updates_written = 0;
271 static uint64_t stats_data_sets_written = 0;
272 static uint64_t stats_journal_bytes = 0;
273 static uint64_t stats_journal_rotate = 0;
274 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
276 /* Journaled updates */
277 #define JOURNAL_REPLAY(s) ((s) == NULL)
278 #define JOURNAL_BASE "rrd.journal"
279 static journal_set *journal_cur = NULL;
280 static journal_set *journal_old = NULL;
281 static char *journal_dir = NULL;
282 static FILE *journal_fh = NULL;         /* current journal file handle */
283 static long  journal_size = 0;          /* current journal size */
284 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
285 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
286 static int journal_write(char *cmd, char *args);
287 static void journal_done(void);
288 static void journal_rotate(void);
290 /* prototypes for forward refernces */
291 static int handle_request_help (HANDLER_PROTO);
293 /* 
294  * Functions
295  */
296 static void sig_common (const char *sig) /* {{{ */
298   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
299   state = FLUSHING;
300   pthread_cond_broadcast(&flush_cond);
301   pthread_cond_broadcast(&queue_cond);
302 } /* }}} void sig_common */
304 static void sig_int_handler (int UNUSED(s)) /* {{{ */
306   sig_common("INT");
307 } /* }}} void sig_int_handler */
309 static void sig_term_handler (int UNUSED(s)) /* {{{ */
311   sig_common("TERM");
312 } /* }}} void sig_term_handler */
314 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
316   config_flush_at_shutdown = 1;
317   sig_common("USR1");
318 } /* }}} void sig_usr1_handler */
320 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
322   config_flush_at_shutdown = 0;
323   sig_common("USR2");
324 } /* }}} void sig_usr2_handler */
326 static void install_signal_handlers(void) /* {{{ */
328   /* These structures are static, because `sigaction' behaves weird if the are
329    * overwritten.. */
330   static struct sigaction sa_int;
331   static struct sigaction sa_term;
332   static struct sigaction sa_pipe;
333   static struct sigaction sa_usr1;
334   static struct sigaction sa_usr2;
336   /* Install signal handlers */
337   memset (&sa_int, 0, sizeof (sa_int));
338   sa_int.sa_handler = sig_int_handler;
339   sigaction (SIGINT, &sa_int, NULL);
341   memset (&sa_term, 0, sizeof (sa_term));
342   sa_term.sa_handler = sig_term_handler;
343   sigaction (SIGTERM, &sa_term, NULL);
345   memset (&sa_pipe, 0, sizeof (sa_pipe));
346   sa_pipe.sa_handler = SIG_IGN;
347   sigaction (SIGPIPE, &sa_pipe, NULL);
349   memset (&sa_pipe, 0, sizeof (sa_usr1));
350   sa_usr1.sa_handler = sig_usr1_handler;
351   sigaction (SIGUSR1, &sa_usr1, NULL);
353   memset (&sa_usr2, 0, sizeof (sa_usr2));
354   sa_usr2.sa_handler = sig_usr2_handler;
355   sigaction (SIGUSR2, &sa_usr2, NULL);
357 } /* }}} void install_signal_handlers */
359 static int open_pidfile(char *action, int oflag) /* {{{ */
361   int fd;
362   const char *file;
363   char *file_copy, *dir;
365   file = (config_pid_file != NULL)
366     ? config_pid_file
367     : LOCALSTATEDIR "/run/rrdcached.pid";
369   /* dirname may modify its argument */
370   file_copy = strdup(file);
371   if (file_copy == NULL)
372   {
373     fprintf(stderr, "rrdcached: strdup(): %s\n",
374         rrd_strerror(errno));
375     return -1;
376   }
378   dir = dirname(file_copy);
379   if (rrd_mkdir_p(dir, 0777) != 0)
380   {
381     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
382         dir, rrd_strerror(errno));
383     return -1;
384   }
386   free(file_copy);
388   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
389   if (fd < 0)
390     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
391             action, file, rrd_strerror(errno));
393   return(fd);
394 } /* }}} static int open_pidfile */
396 /* check existing pid file to see whether a daemon is running */
397 static int check_pidfile(void)
399   int pid_fd;
400   pid_t pid;
401   char pid_str[16];
403   pid_fd = open_pidfile("open", O_RDWR);
404   if (pid_fd < 0)
405     return pid_fd;
407   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
408     return -1;
410   pid = atoi(pid_str);
411   if (pid <= 0)
412     return -1;
414   /* another running process that we can signal COULD be
415    * a competing rrdcached */
416   if (pid != getpid() && kill(pid, 0) == 0)
417   {
418     fprintf(stderr,
419             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
420     close(pid_fd);
421     return -1;
422   }
424   lseek(pid_fd, 0, SEEK_SET);
425   if (ftruncate(pid_fd, 0) == -1)
426   {
427     fprintf(stderr,
428             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
429     close(pid_fd);
430     return -1;
431   }
433   fprintf(stderr,
434           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
435           "rrdcached: starting normally.\n", pid);
437   return pid_fd;
438 } /* }}} static int check_pidfile */
440 static int write_pidfile (int fd) /* {{{ */
442   pid_t pid;
443   FILE *fh;
445   pid = getpid ();
447   fh = fdopen (fd, "w");
448   if (fh == NULL)
449   {
450     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
451     close(fd);
452     return (-1);
453   }
455   fprintf (fh, "%i\n", (int) pid);
456   fclose (fh);
458   return (0);
459 } /* }}} int write_pidfile */
461 static int remove_pidfile (void) /* {{{ */
463   char *file;
464   int status;
466   file = (config_pid_file != NULL)
467     ? config_pid_file
468     : LOCALSTATEDIR "/run/rrdcached.pid";
470   status = unlink (file);
471   if (status == 0)
472     return (0);
473   return (errno);
474 } /* }}} int remove_pidfile */
476 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
478   char *eol;
480   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
481                sock->next_read - sock->next_cmd);
483   if (eol == NULL)
484   {
485     /* no commands left, move remainder back to front of rbuf */
486     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
487             sock->next_read - sock->next_cmd);
488     sock->next_read -= sock->next_cmd;
489     sock->next_cmd = 0;
490     *len = 0;
491     return NULL;
492   }
493   else
494   {
495     char *cmd = sock->rbuf + sock->next_cmd;
496     *eol = '\0';
498     sock->next_cmd = eol - sock->rbuf + 1;
500     if (eol > sock->rbuf && *(eol-1) == '\r')
501       *(--eol) = '\0'; /* handle "\r\n" EOL */
503     *len = eol - cmd;
505     return cmd;
506   }
508   /* NOTREACHED */
509   assert(1==0);
510 } /* }}} char *next_cmd */
512 /* add the characters directly to the write buffer */
513 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
515   char *new_buf;
517   assert(sock != NULL);
519   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
520   if (new_buf == NULL)
521   {
522     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
523     return -1;
524   }
526   strncpy(new_buf + sock->wbuf_len, str, len + 1);
528   sock->wbuf = new_buf;
529   sock->wbuf_len += len;
531   return 0;
532 } /* }}} static int add_to_wbuf */
534 /* add the text to the "extra" info that's sent after the status line */
535 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
537   va_list argp;
538   char buffer[CMD_MAX];
539   int len;
541   if (JOURNAL_REPLAY(sock)) return 0;
542   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
544   va_start(argp, fmt);
545 #ifdef HAVE_VSNPRINTF
546   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
547 #else
548   len = vsprintf(buffer, fmt, argp);
549 #endif
550   va_end(argp);
551   if (len < 0)
552   {
553     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
554     return -1;
555   }
557   return add_to_wbuf(sock, buffer, len);
558 } /* }}} static int add_response_info */
560 static int count_lines(char *str) /* {{{ */
562   int lines = 0;
564   if (str != NULL)
565   {
566     while ((str = strchr(str, '\n')) != NULL)
567     {
568       ++lines;
569       ++str;
570     }
571   }
573   return lines;
574 } /* }}} static int count_lines */
576 /* send the response back to the user.
577  * returns 0 on success, -1 on error
578  * write buffer is always zeroed after this call */
579 static int send_response (listen_socket_t *sock, response_code rc,
580                           char *fmt, ...) /* {{{ */
582   va_list argp;
583   char buffer[CMD_MAX];
584   int lines;
585   ssize_t wrote;
586   int rclen, len;
588   if (JOURNAL_REPLAY(sock)) return rc;
590   if (sock->batch_start)
591   {
592     if (rc == RESP_OK)
593       return rc; /* no response on success during BATCH */
594     lines = sock->batch_cmd;
595   }
596   else if (rc == RESP_OK)
597     lines = count_lines(sock->wbuf);
598   else
599     lines = -1;
601   rclen = sprintf(buffer, "%d ", lines);
602   va_start(argp, fmt);
603 #ifdef HAVE_VSNPRINTF
604   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
605 #else
606   len = vsprintf(buffer+rclen, fmt, argp);
607 #endif
608   va_end(argp);
609   if (len < 0)
610     return -1;
612   len += rclen;
614   /* append the result to the wbuf, don't write to the user */
615   if (sock->batch_start)
616     return add_to_wbuf(sock, buffer, len);
618   /* first write must be complete */
619   if (len != write(sock->fd, buffer, len))
620   {
621     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
622     return -1;
623   }
625   if (sock->wbuf != NULL && rc == RESP_OK)
626   {
627     wrote = 0;
628     while (wrote < sock->wbuf_len)
629     {
630       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
631       if (wb <= 0)
632       {
633         RRDD_LOG(LOG_INFO, "send_response: could not write results");
634         return -1;
635       }
636       wrote += wb;
637     }
638   }
640   free(sock->wbuf); sock->wbuf = NULL;
641   sock->wbuf_len = 0;
643   return 0;
644 } /* }}} */
646 static void wipe_ci_values(cache_item_t *ci, time_t when)
648   ci->values = NULL;
649   ci->values_num = 0;
651   ci->last_flush_time = when;
652   if (config_write_jitter > 0)
653     ci->last_flush_time += (rrd_random() % config_write_jitter);
656 /* remove_from_queue
657  * remove a "cache_item_t" item from the queue.
658  * must hold 'cache_lock' when calling this
659  */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
662   if (ci == NULL) return;
663   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
665   if (ci->prev == NULL)
666     cache_queue_head = ci->next; /* reset head */
667   else
668     ci->prev->next = ci->next;
670   if (ci->next == NULL)
671     cache_queue_tail = ci->prev; /* reset the tail */
672   else
673     ci->next->prev = ci->prev;
675   ci->next = ci->prev = NULL;
676   ci->flags &= ~CI_FLAGS_IN_QUEUE;
678   pthread_mutex_lock (&stats_lock);
679   assert (stats_queue_length > 0);
680   stats_queue_length--;
681   pthread_mutex_unlock (&stats_lock);
683 } /* }}} static void remove_from_queue */
685 /* free the resources associated with the cache_item_t
686  * must hold cache_lock when calling this function
687  */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
690   if (ci == NULL) return NULL;
692   remove_from_queue(ci);
694   for (size_t i=0; i < ci->values_num; i++)
695     free(ci->values[i]);
697   free (ci->values);
698   free (ci->file);
700   /* in case anyone is waiting */
701   pthread_cond_broadcast(&ci->flushed);
702   pthread_cond_destroy(&ci->flushed);
704   free (ci);
706   return NULL;
707 } /* }}} static void *free_cache_item */
709 /*
710  * enqueue_cache_item:
711  * `cache_lock' must be acquired before calling this function!
712  */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714     queue_side_t side)
716   if (ci == NULL)
717     return (-1);
719   if (ci->values_num == 0)
720     return (0);
722   if (side == HEAD)
723   {
724     if (cache_queue_head == ci)
725       return 0;
727     /* remove if further down in queue */
728     remove_from_queue(ci);
730     ci->prev = NULL;
731     ci->next = cache_queue_head;
732     if (ci->next != NULL)
733       ci->next->prev = ci;
734     cache_queue_head = ci;
736     if (cache_queue_tail == NULL)
737       cache_queue_tail = cache_queue_head;
738   }
739   else /* (side == TAIL) */
740   {
741     /* We don't move values back in the list.. */
742     if (ci->flags & CI_FLAGS_IN_QUEUE)
743       return (0);
745     assert (ci->next == NULL);
746     assert (ci->prev == NULL);
748     ci->prev = cache_queue_tail;
750     if (cache_queue_tail == NULL)
751       cache_queue_head = ci;
752     else
753       cache_queue_tail->next = ci;
755     cache_queue_tail = ci;
756   }
758   ci->flags |= CI_FLAGS_IN_QUEUE;
760   pthread_cond_signal(&queue_cond);
761   pthread_mutex_lock (&stats_lock);
762   stats_queue_length++;
763   pthread_mutex_unlock (&stats_lock);
765   return (0);
766 } /* }}} int enqueue_cache_item */
768 /*
769  * tree_callback_flush:
770  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771  * while this is in progress.
772  */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774     gpointer data)
776   cache_item_t *ci;
777   callback_flush_data_t *cfd;
779   ci = (cache_item_t *) value;
780   cfd = (callback_flush_data_t *) data;
782   if (ci->flags & CI_FLAGS_IN_QUEUE)
783     return FALSE;
785   if (ci->values_num > 0
786       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787   {
788     enqueue_cache_item (ci, TAIL);
789   }
790   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791       && (ci->values_num <= 0))
792   {
793     assert ((char *) key == ci->file);
794     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795     {
796       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797       return (FALSE);
798     }
799   }
801   return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
804 static int flush_old_values (int max_age)
806   callback_flush_data_t cfd;
807   size_t k;
809   memset (&cfd, 0, sizeof (cfd));
810   /* Pass the current time as user data so that we don't need to call
811    * `time' for each node. */
812   cfd.now = time (NULL);
813   cfd.keys = NULL;
814   cfd.keys_num = 0;
816   if (max_age > 0)
817     cfd.abs_timeout = cfd.now - max_age;
818   else
819     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
821   /* `tree_callback_flush' will return the keys of all values that haven't
822    * been touched in the last `config_flush_interval' seconds in `cfd'.
823    * The char*'s in this array point to the same memory as ci->file, so we
824    * don't need to free them separately. */
825   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
827   for (k = 0; k < cfd.keys_num; k++)
828   {
829     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830     /* should never fail, since we have held the cache_lock
831      * the entire time */
832     assert(status == TRUE);
833   }
835   if (cfd.keys != NULL)
836   {
837     free (cfd.keys);
838     cfd.keys = NULL;
839   }
841   return (0);
842 } /* int flush_old_values */
844 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
846   struct timeval now;
847   struct timespec next_flush;
848   int status;
850   gettimeofday (&now, NULL);
851   next_flush.tv_sec = now.tv_sec + config_flush_interval;
852   next_flush.tv_nsec = 1000 * now.tv_usec;
854   pthread_mutex_lock(&cache_lock);
856   while (state == RUNNING)
857   {
858     gettimeofday (&now, NULL);
859     if ((now.tv_sec > next_flush.tv_sec)
860         || ((now.tv_sec == next_flush.tv_sec)
861           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862     {
863       RRDD_LOG(LOG_DEBUG, "flushing old values");
865       /* Determine the time of the next cache flush. */
866       next_flush.tv_sec = now.tv_sec + config_flush_interval;
868       /* Flush all values that haven't been written in the last
869        * `config_write_interval' seconds. */
870       flush_old_values (config_write_interval);
872       /* unlock the cache while we rotate so we don't block incoming
873        * updates if the fsync() blocks on disk I/O */
874       pthread_mutex_unlock(&cache_lock);
875       journal_rotate();
876       pthread_mutex_lock(&cache_lock);
877     }
879     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880     if (status != 0 && status != ETIMEDOUT)
881     {
882       RRDD_LOG (LOG_ERR, "flush_thread_main: "
883                 "pthread_cond_timedwait returned %i.", status);
884     }
885   }
887   if (config_flush_at_shutdown)
888     flush_old_values (-1); /* flush everything */
890   state = SHUTDOWN;
892   pthread_mutex_unlock(&cache_lock);
894   return NULL;
895 } /* void *flush_thread_main */
897 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
899   pthread_mutex_lock (&cache_lock);
901   while (state != SHUTDOWN
902          || (cache_queue_head != NULL && config_flush_at_shutdown))
903   {
904     cache_item_t *ci;
905     char *file;
906     char **values;
907     size_t values_num;
908     int status;
910     /* Now, check if there's something to store away. If not, wait until
911      * something comes in. */
912     if (cache_queue_head == NULL)
913     {
914       status = pthread_cond_wait (&queue_cond, &cache_lock);
915       if ((status != 0) && (status != ETIMEDOUT))
916       {
917         RRDD_LOG (LOG_ERR, "queue_thread_main: "
918             "pthread_cond_wait returned %i.", status);
919       }
920     }
922     /* Check if a value has arrived. This may be NULL if we timed out or there
923      * was an interrupt such as a signal. */
924     if (cache_queue_head == NULL)
925       continue;
927     ci = cache_queue_head;
929     /* copy the relevant parts */
930     file = strdup (ci->file);
931     if (file == NULL)
932     {
933       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934       continue;
935     }
937     assert(ci->values != NULL);
938     assert(ci->values_num > 0);
940     values = ci->values;
941     values_num = ci->values_num;
943     wipe_ci_values(ci, time(NULL));
944     remove_from_queue(ci);
946     pthread_mutex_unlock (&cache_lock);
948     rrd_clear_error ();
949     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950     if (status != 0)
951     {
952       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953           "rrd_update_r (%s) failed with status %i. (%s)",
954           file, status, rrd_get_error());
955     }
957     journal_write("wrote", file);
959     /* Search again in the tree.  It's possible someone issued a "FORGET"
960      * while we were writing the update values. */
961     pthread_mutex_lock(&cache_lock);
962     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963     if (ci)
964       pthread_cond_broadcast(&ci->flushed);
965     pthread_mutex_unlock(&cache_lock);
967     if (status == 0)
968     {
969       pthread_mutex_lock (&stats_lock);
970       stats_updates_written++;
971       stats_data_sets_written += values_num;
972       pthread_mutex_unlock (&stats_lock);
973     }
975     rrd_free_ptrs((void ***) &values, &values_num);
976     free(file);
978     pthread_mutex_lock (&cache_lock);
979   }
980   pthread_mutex_unlock (&cache_lock);
982   return (NULL);
983 } /* }}} void *queue_thread_main */
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986     size_t *buffer_size_ret, char **field_ret)
988   char *buffer;
989   size_t buffer_pos;
990   size_t buffer_size;
991   char *field;
992   size_t field_size;
993   int status;
995   buffer = *buffer_ret;
996   buffer_pos = 0;
997   buffer_size = *buffer_size_ret;
998   field = *buffer_ret;
999   field_size = 0;
1001   if (buffer_size <= 0)
1002     return (-1);
1004   /* This is ensured by `handle_request'. */
1005   assert (buffer[buffer_size - 1] == '\0');
1007   status = -1;
1008   while (buffer_pos < buffer_size)
1009   {
1010     /* Check for end-of-field or end-of-buffer */
1011     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012     {
1013       field[field_size] = 0;
1014       field_size++;
1015       buffer_pos++;
1016       status = 0;
1017       break;
1018     }
1019     /* Handle escaped characters. */
1020     else if (buffer[buffer_pos] == '\\')
1021     {
1022       if (buffer_pos >= (buffer_size - 1))
1023         break;
1024       buffer_pos++;
1025       field[field_size] = buffer[buffer_pos];
1026       field_size++;
1027       buffer_pos++;
1028     }
1029     /* Normal operation */ 
1030     else
1031     {
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036   } /* while (buffer_pos < buffer_size) */
1038   if (status != 0)
1039     return (status);
1041   *buffer_ret = buffer + buffer_pos;
1042   *buffer_size_ret = buffer_size - buffer_pos;
1043   *field_ret = field;
1045   return (0);
1046 } /* }}} int buffer_get_field */
1048 /* if we're restricting writes to the base directory,
1049  * check whether the file falls within the dir
1050  * returns 1 if OK, otherwise 0
1051  */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1054   assert(file != NULL);
1056   if (!config_write_base_only
1057       || JOURNAL_REPLAY(sock)
1058       || config_base_dir == NULL)
1059     return 1;
1061   if (strstr(file, "../") != NULL) goto err;
1063   /* relative paths without "../" are ok */
1064   if (*file != '/') return 1;
1066   /* file must be of the format base + "/" + <1+ char filename> */
1067   if (strlen(file) < _config_base_dir_len + 2) goto err;
1068   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069   if (*(file + _config_base_dir_len) != '/') goto err;
1071   return 1;
1073 err:
1074   if (sock != NULL && sock->fd >= 0)
1075     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1077   return 0;
1078 } /* }}} static int check_file_access */
1080 /* when using a base dir, convert relative paths to absolute paths.
1081  * if necessary, modifies the "filename" pointer to point
1082  * to the new path created in "tmp".  "tmp" is provided
1083  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084  *
1085  * this allows us to optimize for the expected case (absolute path)
1086  * with a no-op.
1087  */
1088 static void get_abs_path(char **filename, char *tmp)
1090   assert(tmp != NULL);
1091   assert(filename != NULL && *filename != NULL);
1093   if (config_base_dir == NULL || **filename == '/')
1094     return;
1096   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097   *filename = tmp;
1098 } /* }}} static int get_abs_path */
1100 static int flush_file (const char *filename) /* {{{ */
1102   cache_item_t *ci;
1104   pthread_mutex_lock (&cache_lock);
1106   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107   if (ci == NULL)
1108   {
1109     pthread_mutex_unlock (&cache_lock);
1110     return (ENOENT);
1111   }
1113   if (ci->values_num > 0)
1114   {
1115     /* Enqueue at head */
1116     enqueue_cache_item (ci, HEAD);
1117     pthread_cond_wait(&ci->flushed, &cache_lock);
1118   }
1120   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1121    * may have been purged during our cond_wait() */
1123   pthread_mutex_unlock(&cache_lock);
1125   return (0);
1126 } /* }}} int flush_file */
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1130   char *err = "Syntax error.\n";
1132   if (cmd && cmd->syntax)
1133     err = cmd->syntax;
1135   return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1140   uint64_t copy_queue_length;
1141   uint64_t copy_updates_received;
1142   uint64_t copy_flush_received;
1143   uint64_t copy_updates_written;
1144   uint64_t copy_data_sets_written;
1145   uint64_t copy_journal_bytes;
1146   uint64_t copy_journal_rotate;
1148   uint64_t tree_nodes_number;
1149   uint64_t tree_depth;
1151   pthread_mutex_lock (&stats_lock);
1152   copy_queue_length       = stats_queue_length;
1153   copy_updates_received   = stats_updates_received;
1154   copy_flush_received     = stats_flush_received;
1155   copy_updates_written    = stats_updates_written;
1156   copy_data_sets_written  = stats_data_sets_written;
1157   copy_journal_bytes      = stats_journal_bytes;
1158   copy_journal_rotate     = stats_journal_rotate;
1159   pthread_mutex_unlock (&stats_lock);
1161   pthread_mutex_lock (&cache_lock);
1162   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1164   pthread_mutex_unlock (&cache_lock);
1166   add_response_info(sock,
1167                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1168   add_response_info(sock,
1169                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170   add_response_info(sock,
1171                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172   add_response_info(sock,
1173                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174   add_response_info(sock,
1175                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1181   send_response(sock, RESP_OK, "Statistics follow\n");
1183   return (0);
1184 } /* }}} int handle_request_stats */
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1188   char *file, file_tmp[PATH_MAX];
1189   int status;
1191   status = buffer_get_field (&buffer, &buffer_size, &file);
1192   if (status != 0)
1193   {
1194     return syntax_error(sock,cmd);
1195   }
1196   else
1197   {
1198     pthread_mutex_lock(&stats_lock);
1199     stats_flush_received++;
1200     pthread_mutex_unlock(&stats_lock);
1202     get_abs_path(&file, file_tmp);
1203     if (!check_file_access(file, sock)) return 0;
1205     status = flush_file (file);
1206     if (status == 0)
1207       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208     else if (status == ENOENT)
1209     {
1210       /* no file in our tree; see whether it exists at all */
1211       struct stat statbuf;
1213       memset(&statbuf, 0, sizeof(statbuf));
1214       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216       else
1217         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218     }
1219     else if (status < 0)
1220       return send_response(sock, RESP_ERR, "Internal error.\n");
1221     else
1222       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223   }
1225   /* NOTREACHED */
1226   assert(1==0);
1227 } /* }}} int handle_request_flush */
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1231   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1233   pthread_mutex_lock(&cache_lock);
1234   flush_old_values(-1);
1235   pthread_mutex_unlock(&cache_lock);
1237   return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1242   int status;
1243   char *file, file_tmp[PATH_MAX];
1244   cache_item_t *ci;
1246   status = buffer_get_field(&buffer, &buffer_size, &file);
1247   if (status != 0)
1248     return syntax_error(sock,cmd);
1250   get_abs_path(&file, file_tmp);
1252   pthread_mutex_lock(&cache_lock);
1253   ci = g_tree_lookup(cache_tree, file);
1254   if (ci == NULL)
1255   {
1256     pthread_mutex_unlock(&cache_lock);
1257     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258   }
1260   for (size_t i=0; i < ci->values_num; i++)
1261     add_response_info(sock, "%s\n", ci->values[i]);
1263   pthread_mutex_unlock(&cache_lock);
1264   return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1269   int status;
1270   gboolean found;
1271   char *file, file_tmp[PATH_MAX];
1273   status = buffer_get_field(&buffer, &buffer_size, &file);
1274   if (status != 0)
1275     return syntax_error(sock,cmd);
1277   get_abs_path(&file, file_tmp);
1278   if (!check_file_access(file, sock)) return 0;
1280   pthread_mutex_lock(&cache_lock);
1281   found = g_tree_remove(cache_tree, file);
1282   pthread_mutex_unlock(&cache_lock);
1284   if (found == TRUE)
1285   {
1286     if (!JOURNAL_REPLAY(sock))
1287       journal_write("forget", file);
1289     return send_response(sock, RESP_OK, "Gone!\n");
1290   }
1291   else
1292     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1294   /* NOTREACHED */
1295   assert(1==0);
1296 } /* }}} static int handle_request_forget */
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1300   cache_item_t *ci;
1302   pthread_mutex_lock(&cache_lock);
1304   ci = cache_queue_head;
1305   while (ci != NULL)
1306   {
1307     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308     ci = ci->next;
1309   }
1311   pthread_mutex_unlock(&cache_lock);
1313   return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1318   char *file, file_tmp[PATH_MAX];
1319   int values_num = 0;
1320   int status;
1321   char orig_buf[CMD_MAX];
1323   cache_item_t *ci;
1325   /* save it for the journal later */
1326   if (!JOURNAL_REPLAY(sock))
1327     strncpy(orig_buf, buffer, buffer_size);
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return syntax_error(sock,cmd);
1333   pthread_mutex_lock(&stats_lock);
1334   stats_updates_received++;
1335   pthread_mutex_unlock(&stats_lock);
1337   get_abs_path(&file, file_tmp);
1338   if (!check_file_access(file, sock)) return 0;
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346     cache_item_t *tmp;
1348     /* don't hold the lock while we setup; stat(2) might block */
1349     pthread_mutex_unlock(&cache_lock);
1351     memset (&statbuf, 0, sizeof (statbuf));
1352     status = stat (file, &statbuf);
1353     if (status != 0)
1354     {
1355       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1357       status = errno;
1358       if (status == ENOENT)
1359         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360       else
1361         return send_response(sock, RESP_ERR,
1362                              "stat failed with error %i.\n", status);
1363     }
1364     if (!S_ISREG (statbuf.st_mode))
1365       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1367     if (access(file, R_OK|W_OK) != 0)
1368       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369                            file, rrd_strerror(errno));
1371     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372     if (ci == NULL)
1373     {
1374       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1376       return send_response(sock, RESP_ERR, "malloc failed.\n");
1377     }
1378     memset (ci, 0, sizeof (cache_item_t));
1380     ci->file = strdup (file);
1381     if (ci->file == NULL)
1382     {
1383       free (ci);
1384       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1386       return send_response(sock, RESP_ERR, "strdup failed.\n");
1387     }
1389     wipe_ci_values(ci, now);
1390     ci->flags = CI_FLAGS_IN_TREE;
1391     pthread_cond_init(&ci->flushed, NULL);
1393     pthread_mutex_lock(&cache_lock);
1395     /* another UPDATE might have added this entry in the meantime */
1396     tmp = g_tree_lookup (cache_tree, file);
1397     if (tmp == NULL)
1398       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399     else
1400     {
1401       free_cache_item (ci);
1402       ci = tmp;
1403     }
1405     /* state may have changed while we were unlocked */
1406     if (state == SHUTDOWN)
1407       return -1;
1408   } /* }}} */
1409   assert (ci != NULL);
1411   /* don't re-write updates in replay mode */
1412   if (!JOURNAL_REPLAY(sock))
1413     journal_write("update", orig_buf);
1415   while (buffer_size > 0)
1416   {
1417     char *value;
1418     time_t stamp;
1419     char *eostamp;
1421     status = buffer_get_field (&buffer, &buffer_size, &value);
1422     if (status != 0)
1423     {
1424       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425       break;
1426     }
1428     /* make sure update time is always moving forward */
1429     stamp = strtol(value, &eostamp, 10);
1430     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1431     {
1432       pthread_mutex_unlock(&cache_lock);
1433       return send_response(sock, RESP_ERR,
1434                            "Cannot find timestamp in '%s'!\n", value);
1435     }
1436     else if (stamp <= ci->last_update_stamp)
1437     {
1438       pthread_mutex_unlock(&cache_lock);
1439       return send_response(sock, RESP_ERR,
1440                            "illegal attempt to update using time %ld when last"
1441                            " update time is %ld (minimum one second step)\n",
1442                            stamp, ci->last_update_stamp);
1443     }
1444     else
1445       ci->last_update_stamp = stamp;
1447     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1448     {
1449       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1450       continue;
1451     }
1453     values_num++;
1454   }
1456   if (((now - ci->last_flush_time) >= config_write_interval)
1457       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1458       && (ci->values_num > 0))
1459   {
1460     enqueue_cache_item (ci, TAIL);
1461   }
1463   pthread_mutex_unlock (&cache_lock);
1465   if (values_num < 1)
1466     return send_response(sock, RESP_ERR, "No values updated.\n");
1467   else
1468     return send_response(sock, RESP_OK,
1469                          "errors, enqueued %i value(s).\n", values_num);
1471   /* NOTREACHED */
1472   assert(1==0);
1474 } /* }}} int handle_request_update */
1476 /* we came across a "WROTE" entry during journal replay.
1477  * throw away any values that we have accumulated for this file
1478  */
1479 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1481   cache_item_t *ci;
1482   const char *file = buffer;
1484   pthread_mutex_lock(&cache_lock);
1486   ci = g_tree_lookup(cache_tree, file);
1487   if (ci == NULL)
1488   {
1489     pthread_mutex_unlock(&cache_lock);
1490     return (0);
1491   }
1493   if (ci->values)
1494     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1496   wipe_ci_values(ci, now);
1497   remove_from_queue(ci);
1499   pthread_mutex_unlock(&cache_lock);
1500   return (0);
1501 } /* }}} int handle_request_wrote */
1503 /* start "BATCH" processing */
1504 static int batch_start (HANDLER_PROTO) /* {{{ */
1506   int status;
1507   if (sock->batch_start)
1508     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1510   status = send_response(sock, RESP_OK,
1511                          "Go ahead.  End with dot '.' on its own line.\n");
1512   sock->batch_start = time(NULL);
1513   sock->batch_cmd = 0;
1515   return status;
1516 } /* }}} static int batch_start */
1518 /* finish "BATCH" processing and return results to the client */
1519 static int batch_done (HANDLER_PROTO) /* {{{ */
1521   assert(sock->batch_start);
1522   sock->batch_start = 0;
1523   sock->batch_cmd  = 0;
1524   return send_response(sock, RESP_OK, "errors\n");
1525 } /* }}} static int batch_done */
1527 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1529   return -1;
1530 } /* }}} static int handle_request_quit */
1532 static command_t list_of_commands[] = { /* {{{ */
1533   {
1534     "UPDATE",
1535     handle_request_update,
1536     CMD_CONTEXT_ANY,
1537     "UPDATE <filename> <values> [<values> ...]\n"
1538     ,
1539     "Adds the given file to the internal cache if it is not yet known and\n"
1540     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1541     "for details.\n"
1542     "\n"
1543     "Each <values> has the following form:\n"
1544     "  <values> = <time>:<value>[:<value>[...]]\n"
1545     "See the rrdupdate(1) manpage for details.\n"
1546   },
1547   {
1548     "WROTE",
1549     handle_request_wrote,
1550     CMD_CONTEXT_JOURNAL,
1551     NULL,
1552     NULL
1553   },
1554   {
1555     "FLUSH",
1556     handle_request_flush,
1557     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1558     "FLUSH <filename>\n"
1559     ,
1560     "Adds the given filename to the head of the update queue and returns\n"
1561     "after it has been dequeued.\n"
1562   },
1563   {
1564     "FLUSHALL",
1565     handle_request_flushall,
1566     CMD_CONTEXT_CLIENT,
1567     "FLUSHALL\n"
1568     ,
1569     "Triggers writing of all pending updates.  Returns immediately.\n"
1570   },
1571   {
1572     "PENDING",
1573     handle_request_pending,
1574     CMD_CONTEXT_CLIENT,
1575     "PENDING <filename>\n"
1576     ,
1577     "Shows any 'pending' updates for a file, in order.\n"
1578     "The updates shown have not yet been written to the underlying RRD file.\n"
1579   },
1580   {
1581     "FORGET",
1582     handle_request_forget,
1583     CMD_CONTEXT_ANY,
1584     "FORGET <filename>\n"
1585     ,
1586     "Removes the file completely from the cache.\n"
1587     "Any pending updates for the file will be lost.\n"
1588   },
1589   {
1590     "QUEUE",
1591     handle_request_queue,
1592     CMD_CONTEXT_CLIENT,
1593     "QUEUE\n"
1594     ,
1595         "Shows all files in the output queue.\n"
1596     "The output is zero or more lines in the following format:\n"
1597     "(where <num_vals> is the number of values to be written)\n"
1598     "\n"
1599     "<num_vals> <filename>\n"
1600   },
1601   {
1602     "STATS",
1603     handle_request_stats,
1604     CMD_CONTEXT_CLIENT,
1605     "STATS\n"
1606     ,
1607     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1608     "a description of the values.\n"
1609   },
1610   {
1611     "HELP",
1612     handle_request_help,
1613     CMD_CONTEXT_CLIENT,
1614     "HELP [<command>]\n",
1615     NULL, /* special! */
1616   },
1617   {
1618     "BATCH",
1619     batch_start,
1620     CMD_CONTEXT_CLIENT,
1621     "BATCH\n"
1622     ,
1623     "The 'BATCH' command permits the client to initiate a bulk load\n"
1624     "   of commands to rrdcached.\n"
1625     "\n"
1626     "Usage:\n"
1627     "\n"
1628     "    client: BATCH\n"
1629     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1630     "    client: command #1\n"
1631     "    client: command #2\n"
1632     "    client: ... and so on\n"
1633     "    client: .\n"
1634     "    server: 2 errors\n"
1635     "    server: 7 message for command #7\n"
1636     "    server: 9 message for command #9\n"
1637     "\n"
1638     "For more information, consult the rrdcached(1) documentation.\n"
1639   },
1640   {
1641     ".",   /* BATCH terminator */
1642     batch_done,
1643     CMD_CONTEXT_BATCH,
1644     NULL,
1645     NULL
1646   },
1647   {
1648     "QUIT",
1649     handle_request_quit,
1650     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1651     "QUIT\n"
1652     ,
1653     "Disconnect from rrdcached.\n"
1654   }
1655 }; /* }}} command_t list_of_commands[] */
1656 static size_t list_of_commands_len = sizeof (list_of_commands)
1657   / sizeof (list_of_commands[0]);
1659 static command_t *find_command(char *cmd)
1661   size_t i;
1663   for (i = 0; i < list_of_commands_len; i++)
1664     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1665       return (&list_of_commands[i]);
1666   return NULL;
1669 /* We currently use the index in the `list_of_commands' array as a bit position
1670  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1671  * outside these functions so that switching to a more elegant storage method
1672  * is easily possible. */
1673 static ssize_t find_command_index (const char *cmd) /* {{{ */
1675   size_t i;
1677   for (i = 0; i < list_of_commands_len; i++)
1678     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1679       return ((ssize_t) i);
1680   return (-1);
1681 } /* }}} ssize_t find_command_index */
1683 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1684     const char *cmd)
1686   ssize_t i;
1688   if (JOURNAL_REPLAY(sock))
1689     return (1);
1691   if (cmd == NULL)
1692     return (-1);
1694   if ((strcasecmp ("QUIT", cmd) == 0)
1695       || (strcasecmp ("HELP", cmd) == 0))
1696     return (1);
1697   else if (strcmp (".", cmd) == 0)
1698     cmd = "BATCH";
1700   i = find_command_index (cmd);
1701   if (i < 0)
1702     return (-1);
1703   assert (i < 32);
1705   if ((sock->permissions & (1 << i)) != 0)
1706     return (1);
1707   return (0);
1708 } /* }}} int socket_permission_check */
1710 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1711     const char *cmd)
1713   ssize_t i;
1715   i = find_command_index (cmd);
1716   if (i < 0)
1717     return (-1);
1718   assert (i < 32);
1720   sock->permissions |= (1 << i);
1721   return (0);
1722 } /* }}} int socket_permission_add */
1724 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1726   sock->permissions = 0;
1727 } /* }}} socket_permission_clear */
1729 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1730     listen_socket_t *src)
1732   dest->permissions = src->permissions;
1733 } /* }}} socket_permission_copy */
1735 /* check whether commands are received in the expected context */
1736 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1738   if (JOURNAL_REPLAY(sock))
1739     return (cmd->context & CMD_CONTEXT_JOURNAL);
1740   else if (sock->batch_start)
1741     return (cmd->context & CMD_CONTEXT_BATCH);
1742   else
1743     return (cmd->context & CMD_CONTEXT_CLIENT);
1745   /* NOTREACHED */
1746   assert(1==0);
1749 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1751   int status;
1752   char *cmd_str;
1753   char *resp_txt;
1754   command_t *help = NULL;
1756   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1757   if (status == 0)
1758     help = find_command(cmd_str);
1760   if (help && (help->syntax || help->help))
1761   {
1762     char tmp[CMD_MAX];
1764     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1765     resp_txt = tmp;
1767     if (help->syntax)
1768       add_response_info(sock, "Usage: %s\n", help->syntax);
1770     if (help->help)
1771       add_response_info(sock, "%s\n", help->help);
1772   }
1773   else
1774   {
1775     size_t i;
1777     resp_txt = "Command overview\n";
1779     for (i = 0; i < list_of_commands_len; i++)
1780     {
1781       if (list_of_commands[i].syntax == NULL)
1782         continue;
1783       add_response_info (sock, "%s", list_of_commands[i].syntax);
1784     }
1785   }
1787   return send_response(sock, RESP_OK, resp_txt);
1788 } /* }}} int handle_request_help */
1790 static int handle_request (DISPATCH_PROTO) /* {{{ */
1792   char *buffer_ptr = buffer;
1793   char *cmd_str = NULL;
1794   command_t *cmd = NULL;
1795   int status;
1797   assert (buffer[buffer_size - 1] == '\0');
1799   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1800   if (status != 0)
1801   {
1802     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1803     return (-1);
1804   }
1806   if (sock != NULL && sock->batch_start)
1807     sock->batch_cmd++;
1809   cmd = find_command(cmd_str);
1810   if (!cmd)
1811     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1813   if (!socket_permission_check (sock, cmd->cmd))
1814     return send_response(sock, RESP_ERR, "Permission denied.\n");
1816   if (!command_check_context(sock, cmd))
1817     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1819   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1820 } /* }}} int handle_request */
1822 static void journal_set_free (journal_set *js) /* {{{ */
1824   if (js == NULL)
1825     return;
1827   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1829   free(js);
1830 } /* }}} journal_set_free */
1832 static void journal_set_remove (journal_set *js) /* {{{ */
1834   if (js == NULL)
1835     return;
1837   for (uint i=0; i < js->files_num; i++)
1838   {
1839     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1840     unlink(js->files[i]);
1841   }
1842 } /* }}} journal_set_remove */
1844 /* close current journal file handle.
1845  * MUST hold journal_lock before calling */
1846 static void journal_close(void) /* {{{ */
1848   if (journal_fh != NULL)
1849   {
1850     if (fclose(journal_fh) != 0)
1851       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1852   }
1854   journal_fh = NULL;
1855   journal_size = 0;
1856 } /* }}} journal_close */
1858 /* MUST hold journal_lock before calling */
1859 static void journal_new_file(void) /* {{{ */
1861   struct timeval now;
1862   int  new_fd;
1863   char new_file[PATH_MAX + 1];
1865   assert(journal_dir != NULL);
1866   assert(journal_cur != NULL);
1868   journal_close();
1870   gettimeofday(&now, NULL);
1871   /* this format assures that the files sort in strcmp() order */
1872   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1873            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1875   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1876                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1877   if (new_fd < 0)
1878     goto error;
1880   journal_fh = fdopen(new_fd, "a");
1881   if (journal_fh == NULL)
1882     goto error;
1884   journal_size = ftell(journal_fh);
1885   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1887   /* record the file in the journal set */
1888   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1890   return;
1892 error:
1893   RRDD_LOG(LOG_CRIT,
1894            "JOURNALING DISABLED: Error while trying to create %s : %s",
1895            new_file, rrd_strerror(errno));
1896   RRDD_LOG(LOG_CRIT,
1897            "JOURNALING DISABLED: All values will be flushed at shutdown");
1899   close(new_fd);
1900   config_flush_at_shutdown = 1;
1902 } /* }}} journal_new_file */
1904 /* MUST NOT hold journal_lock before calling this */
1905 static void journal_rotate(void) /* {{{ */
1907   journal_set *old_js = NULL;
1909   if (journal_dir == NULL)
1910     return;
1912   RRDD_LOG(LOG_DEBUG, "rotating journals");
1914   pthread_mutex_lock(&stats_lock);
1915   ++stats_journal_rotate;
1916   pthread_mutex_unlock(&stats_lock);
1918   pthread_mutex_lock(&journal_lock);
1920   journal_close();
1922   /* rotate the journal sets */
1923   old_js = journal_old;
1924   journal_old = journal_cur;
1925   journal_cur = calloc(1, sizeof(journal_set));
1927   if (journal_cur != NULL)
1928     journal_new_file();
1929   else
1930     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1932   pthread_mutex_unlock(&journal_lock);
1934   journal_set_remove(old_js);
1935   journal_set_free  (old_js);
1937 } /* }}} static void journal_rotate */
1939 /* MUST hold journal_lock when calling */
1940 static void journal_done(void) /* {{{ */
1942   if (journal_cur == NULL)
1943     return;
1945   journal_close();
1947   if (config_flush_at_shutdown)
1948   {
1949     RRDD_LOG(LOG_INFO, "removing journals");
1950     journal_set_remove(journal_old);
1951     journal_set_remove(journal_cur);
1952   }
1953   else
1954   {
1955     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1956              "journals will be used at next startup");
1957   }
1959   journal_set_free(journal_cur);
1960   journal_set_free(journal_old);
1961   free(journal_dir);
1963 } /* }}} static void journal_done */
1965 static int journal_write(char *cmd, char *args) /* {{{ */
1967   int chars;
1969   if (journal_fh == NULL)
1970     return 0;
1972   pthread_mutex_lock(&journal_lock);
1973   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1974   journal_size += chars;
1976   if (journal_size > JOURNAL_MAX)
1977     journal_new_file();
1979   pthread_mutex_unlock(&journal_lock);
1981   if (chars > 0)
1982   {
1983     pthread_mutex_lock(&stats_lock);
1984     stats_journal_bytes += chars;
1985     pthread_mutex_unlock(&stats_lock);
1986   }
1988   return chars;
1989 } /* }}} static int journal_write */
1991 static int journal_replay (const char *file) /* {{{ */
1993   FILE *fh;
1994   int entry_cnt = 0;
1995   int fail_cnt = 0;
1996   uint64_t line = 0;
1997   char entry[CMD_MAX];
1998   time_t now;
2000   if (file == NULL) return 0;
2002   {
2003     char *reason = "unknown error";
2004     int status = 0;
2005     struct stat statbuf;
2007     memset(&statbuf, 0, sizeof(statbuf));
2008     if (stat(file, &statbuf) != 0)
2009     {
2010       reason = "stat error";
2011       status = errno;
2012     }
2013     else if (!S_ISREG(statbuf.st_mode))
2014     {
2015       reason = "not a regular file";
2016       status = EPERM;
2017     }
2018     if (statbuf.st_uid != daemon_uid)
2019     {
2020       reason = "not owned by daemon user";
2021       status = EACCES;
2022     }
2023     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2024     {
2025       reason = "must not be user/group writable";
2026       status = EACCES;
2027     }
2029     if (status != 0)
2030     {
2031       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2032                file, rrd_strerror(status), reason);
2033       return 0;
2034     }
2035   }
2037   fh = fopen(file, "r");
2038   if (fh == NULL)
2039   {
2040     if (errno != ENOENT)
2041       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2042                file, rrd_strerror(errno));
2043     return 0;
2044   }
2045   else
2046     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2048   now = time(NULL);
2050   while(!feof(fh))
2051   {
2052     size_t entry_len;
2054     ++line;
2055     if (fgets(entry, sizeof(entry), fh) == NULL)
2056       break;
2057     entry_len = strlen(entry);
2059     /* check \n termination in case journal writing crashed mid-line */
2060     if (entry_len == 0)
2061       continue;
2062     else if (entry[entry_len - 1] != '\n')
2063     {
2064       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2065       ++fail_cnt;
2066       continue;
2067     }
2069     entry[entry_len - 1] = '\0';
2071     if (handle_request(NULL, now, entry, entry_len) == 0)
2072       ++entry_cnt;
2073     else
2074       ++fail_cnt;
2075   }
2077   fclose(fh);
2079   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2080            entry_cnt, fail_cnt);
2082   return entry_cnt > 0 ? 1 : 0;
2083 } /* }}} static int journal_replay */
2085 static int journal_sort(const void *v1, const void *v2)
2087   char **jn1 = (char **) v1;
2088   char **jn2 = (char **) v2;
2090   return strcmp(*jn1,*jn2);
2093 static void journal_init(void) /* {{{ */
2095   int had_journal = 0;
2096   DIR *dir;
2097   struct dirent *dent;
2098   char path[PATH_MAX+1];
2100   if (journal_dir == NULL) return;
2102   pthread_mutex_lock(&journal_lock);
2104   journal_cur = calloc(1, sizeof(journal_set));
2105   if (journal_cur == NULL)
2106   {
2107     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2108     return;
2109   }
2111   RRDD_LOG(LOG_INFO, "checking for journal files");
2113   /* Handle old journal files during transition.  This gives them the
2114    * correct sort order.  TODO: remove after first release
2115    */
2116   {
2117     char old_path[PATH_MAX+1];
2118     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2119     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2120     rename(old_path, path);
2122     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2123     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2124     rename(old_path, path);
2125   }
2127   dir = opendir(journal_dir);
2128   if (!dir) {
2129     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2130     return;
2131   }
2132   while ((dent = readdir(dir)) != NULL)
2133   {
2134     /* looks like a journal file? */
2135     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2136       continue;
2138     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2140     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2141     {
2142       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2143                dent->d_name);
2144       break;
2145     }
2146   }
2147   closedir(dir);
2149   qsort(journal_cur->files, journal_cur->files_num,
2150         sizeof(journal_cur->files[0]), journal_sort);
2152   for (uint i=0; i < journal_cur->files_num; i++)
2153     had_journal += journal_replay(journal_cur->files[i]);
2155   journal_new_file();
2157   /* it must have been a crash.  start a flush */
2158   if (had_journal && config_flush_at_shutdown)
2159     flush_old_values(-1);
2161   pthread_mutex_unlock(&journal_lock);
2163   RRDD_LOG(LOG_INFO, "journal processing complete");
2165 } /* }}} static void journal_init */
2167 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2169   assert(sock != NULL);
2171   free(sock->rbuf);  sock->rbuf = NULL;
2172   free(sock->wbuf);  sock->wbuf = NULL;
2173   free(sock);
2174 } /* }}} void free_listen_socket */
2176 static void close_connection(listen_socket_t *sock) /* {{{ */
2178   if (sock->fd >= 0)
2179   {
2180     close(sock->fd);
2181     sock->fd = -1;
2182   }
2184   free_listen_socket(sock);
2186 } /* }}} void close_connection */
2188 static void *connection_thread_main (void *args) /* {{{ */
2190   listen_socket_t *sock;
2191   int fd;
2193   sock = (listen_socket_t *) args;
2194   fd = sock->fd;
2196   /* init read buffers */
2197   sock->next_read = sock->next_cmd = 0;
2198   sock->rbuf = malloc(RBUF_SIZE);
2199   if (sock->rbuf == NULL)
2200   {
2201     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2202     close_connection(sock);
2203     return NULL;
2204   }
2206   pthread_mutex_lock (&connection_threads_lock);
2207   connection_threads_num++;
2208   pthread_mutex_unlock (&connection_threads_lock);
2210   while (state == RUNNING)
2211   {
2212     char *cmd;
2213     ssize_t cmd_len;
2214     ssize_t rbytes;
2215     time_t now;
2217     struct pollfd pollfd;
2218     int status;
2220     pollfd.fd = fd;
2221     pollfd.events = POLLIN | POLLPRI;
2222     pollfd.revents = 0;
2224     status = poll (&pollfd, 1, /* timeout = */ 500);
2225     if (state != RUNNING)
2226       break;
2227     else if (status == 0) /* timeout */
2228       continue;
2229     else if (status < 0) /* error */
2230     {
2231       status = errno;
2232       if (status != EINTR)
2233         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2234       continue;
2235     }
2237     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2238       break;
2239     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2240     {
2241       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2242           "poll(2) returned something unexpected: %#04hx",
2243           pollfd.revents);
2244       break;
2245     }
2247     rbytes = read(fd, sock->rbuf + sock->next_read,
2248                   RBUF_SIZE - sock->next_read);
2249     if (rbytes < 0)
2250     {
2251       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2252       break;
2253     }
2254     else if (rbytes == 0)
2255       break; /* eof */
2257     sock->next_read += rbytes;
2259     if (sock->batch_start)
2260       now = sock->batch_start;
2261     else
2262       now = time(NULL);
2264     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2265     {
2266       status = handle_request (sock, now, cmd, cmd_len+1);
2267       if (status != 0)
2268         goto out_close;
2269     }
2270   }
2272 out_close:
2273   close_connection(sock);
2275   /* Remove this thread from the connection threads list */
2276   pthread_mutex_lock (&connection_threads_lock);
2277   connection_threads_num--;
2278   if (connection_threads_num <= 0)
2279     pthread_cond_broadcast(&connection_threads_done);
2280   pthread_mutex_unlock (&connection_threads_lock);
2282   return (NULL);
2283 } /* }}} void *connection_thread_main */
2285 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2287   int fd;
2288   struct sockaddr_un sa;
2289   listen_socket_t *temp;
2290   int status;
2291   const char *path;
2292   char *path_copy, *dir;
2294   path = sock->addr;
2295   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2296     path += strlen("unix:");
2298   /* dirname may modify its argument */
2299   path_copy = strdup(path);
2300   if (path_copy == NULL)
2301   {
2302     fprintf(stderr, "rrdcached: strdup(): %s\n",
2303         rrd_strerror(errno));
2304     return (-1);
2305   }
2307   dir = dirname(path_copy);
2308   if (rrd_mkdir_p(dir, 0777) != 0)
2309   {
2310     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2311         dir, rrd_strerror(errno));
2312     return (-1);
2313   }
2315   free(path_copy);
2317   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2318       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2319   if (temp == NULL)
2320   {
2321     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2322     return (-1);
2323   }
2324   listen_fds = temp;
2325   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2327   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2328   if (fd < 0)
2329   {
2330     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2331              rrd_strerror(errno));
2332     return (-1);
2333   }
2335   memset (&sa, 0, sizeof (sa));
2336   sa.sun_family = AF_UNIX;
2337   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2339   /* if we've gotten this far, we own the pid file.  any daemon started
2340    * with the same args must not be alive.  therefore, ensure that we can
2341    * create the socket...
2342    */
2343   unlink(path);
2345   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2346   if (status != 0)
2347   {
2348     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2349              path, rrd_strerror(errno));
2350     close (fd);
2351     return (-1);
2352   }
2354   /* tweak the sockets group ownership */
2355   if (sock->socket_group != (gid_t)-1)
2356   {
2357     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2358          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2359     {
2360       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2361     }
2362   }
2364   if (sock->socket_permissions != (mode_t)-1)
2365   {
2366     if (chmod(path, sock->socket_permissions) != 0)
2367       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2368           (unsigned int)sock->socket_permissions, strerror(errno));
2369   }
2371   status = listen (fd, /* backlog = */ 10);
2372   if (status != 0)
2373   {
2374     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2375              path, rrd_strerror(errno));
2376     close (fd);
2377     unlink (path);
2378     return (-1);
2379   }
2381   listen_fds[listen_fds_num].fd = fd;
2382   listen_fds[listen_fds_num].family = PF_UNIX;
2383   strncpy(listen_fds[listen_fds_num].addr, path,
2384           sizeof (listen_fds[listen_fds_num].addr) - 1);
2385   listen_fds_num++;
2387   return (0);
2388 } /* }}} int open_listen_socket_unix */
2390 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2392   struct addrinfo ai_hints;
2393   struct addrinfo *ai_res;
2394   struct addrinfo *ai_ptr;
2395   char addr_copy[NI_MAXHOST];
2396   char *addr;
2397   char *port;
2398   int status;
2400   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2401   addr_copy[sizeof (addr_copy) - 1] = 0;
2402   addr = addr_copy;
2404   memset (&ai_hints, 0, sizeof (ai_hints));
2405   ai_hints.ai_flags = 0;
2406 #ifdef AI_ADDRCONFIG
2407   ai_hints.ai_flags |= AI_ADDRCONFIG;
2408 #endif
2409   ai_hints.ai_family = AF_UNSPEC;
2410   ai_hints.ai_socktype = SOCK_STREAM;
2412   port = NULL;
2413   if (*addr == '[') /* IPv6+port format */
2414   {
2415     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2416     addr++;
2418     port = strchr (addr, ']');
2419     if (port == NULL)
2420     {
2421       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2422       return (-1);
2423     }
2424     *port = 0;
2425     port++;
2427     if (*port == ':')
2428       port++;
2429     else if (*port == 0)
2430       port = NULL;
2431     else
2432     {
2433       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2434       return (-1);
2435     }
2436   } /* if (*addr == '[') */
2437   else
2438   {
2439     port = rindex(addr, ':');
2440     if (port != NULL)
2441     {
2442       *port = 0;
2443       port++;
2444     }
2445   }
2446   ai_res = NULL;
2447   status = getaddrinfo (addr,
2448                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2449                         &ai_hints, &ai_res);
2450   if (status != 0)
2451   {
2452     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2453              addr, gai_strerror (status));
2454     return (-1);
2455   }
2457   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2458   {
2459     int fd;
2460     listen_socket_t *temp;
2461     int one = 1;
2463     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2464         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2465     if (temp == NULL)
2466     {
2467       fprintf (stderr,
2468                "rrdcached: open_listen_socket_network: realloc failed.\n");
2469       continue;
2470     }
2471     listen_fds = temp;
2472     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2474     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2475     if (fd < 0)
2476     {
2477       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2478                rrd_strerror(errno));
2479       continue;
2480     }
2482     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2484     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2485     if (status != 0)
2486     {
2487       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2488                sock->addr, rrd_strerror(errno));
2489       close (fd);
2490       continue;
2491     }
2493     status = listen (fd, /* backlog = */ 10);
2494     if (status != 0)
2495     {
2496       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2497                sock->addr, rrd_strerror(errno));
2498       close (fd);
2499       freeaddrinfo(ai_res);
2500       return (-1);
2501     }
2503     listen_fds[listen_fds_num].fd = fd;
2504     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2505     listen_fds_num++;
2506   } /* for (ai_ptr) */
2508   freeaddrinfo(ai_res);
2509   return (0);
2510 } /* }}} static int open_listen_socket_network */
2512 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2514   assert(sock != NULL);
2515   assert(sock->addr != NULL);
2517   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2518       || sock->addr[0] == '/')
2519     return (open_listen_socket_unix(sock));
2520   else
2521     return (open_listen_socket_network(sock));
2522 } /* }}} int open_listen_socket */
2524 static int close_listen_sockets (void) /* {{{ */
2526   size_t i;
2528   for (i = 0; i < listen_fds_num; i++)
2529   {
2530     close (listen_fds[i].fd);
2532     if (listen_fds[i].family == PF_UNIX)
2533       unlink(listen_fds[i].addr);
2534   }
2536   free (listen_fds);
2537   listen_fds = NULL;
2538   listen_fds_num = 0;
2540   return (0);
2541 } /* }}} int close_listen_sockets */
2543 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2545   struct pollfd *pollfds;
2546   int pollfds_num;
2547   int status;
2548   int i;
2550   if (listen_fds_num < 1)
2551   {
2552     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2553     return (NULL);
2554   }
2556   pollfds_num = listen_fds_num;
2557   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2558   if (pollfds == NULL)
2559   {
2560     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2561     return (NULL);
2562   }
2563   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2565   RRDD_LOG(LOG_INFO, "listening for connections");
2567   while (state == RUNNING)
2568   {
2569     for (i = 0; i < pollfds_num; i++)
2570     {
2571       pollfds[i].fd = listen_fds[i].fd;
2572       pollfds[i].events = POLLIN | POLLPRI;
2573       pollfds[i].revents = 0;
2574     }
2576     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2577     if (state != RUNNING)
2578       break;
2579     else if (status == 0) /* timeout */
2580       continue;
2581     else if (status < 0) /* error */
2582     {
2583       status = errno;
2584       if (status != EINTR)
2585       {
2586         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2587       }
2588       continue;
2589     }
2591     for (i = 0; i < pollfds_num; i++)
2592     {
2593       listen_socket_t *client_sock;
2594       struct sockaddr_storage client_sa;
2595       socklen_t client_sa_size;
2596       pthread_t tid;
2597       pthread_attr_t attr;
2599       if (pollfds[i].revents == 0)
2600         continue;
2602       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2603       {
2604         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2605             "poll(2) returned something unexpected for listen FD #%i.",
2606             pollfds[i].fd);
2607         continue;
2608       }
2610       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2611       if (client_sock == NULL)
2612       {
2613         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2614         continue;
2615       }
2616       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2618       client_sa_size = sizeof (client_sa);
2619       client_sock->fd = accept (pollfds[i].fd,
2620           (struct sockaddr *) &client_sa, &client_sa_size);
2621       if (client_sock->fd < 0)
2622       {
2623         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2624         free(client_sock);
2625         continue;
2626       }
2628       pthread_attr_init (&attr);
2629       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2631       status = pthread_create (&tid, &attr, connection_thread_main,
2632                                client_sock);
2633       if (status != 0)
2634       {
2635         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2636         close_connection(client_sock);
2637         continue;
2638       }
2639     } /* for (pollfds_num) */
2640   } /* while (state == RUNNING) */
2642   RRDD_LOG(LOG_INFO, "starting shutdown");
2644   close_listen_sockets ();
2646   pthread_mutex_lock (&connection_threads_lock);
2647   while (connection_threads_num > 0)
2648     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2649   pthread_mutex_unlock (&connection_threads_lock);
2651   free(pollfds);
2653   return (NULL);
2654 } /* }}} void *listen_thread_main */
2656 static int daemonize (void) /* {{{ */
2658   int pid_fd;
2659   char *base_dir;
2661   daemon_uid = geteuid();
2663   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2664   if (pid_fd < 0)
2665     pid_fd = check_pidfile();
2666   if (pid_fd < 0)
2667     return pid_fd;
2669   /* open all the listen sockets */
2670   if (config_listen_address_list_len > 0)
2671   {
2672     for (size_t i = 0; i < config_listen_address_list_len; i++)
2673       open_listen_socket (config_listen_address_list[i]);
2675     rrd_free_ptrs((void ***) &config_listen_address_list,
2676                   &config_listen_address_list_len);
2677   }
2678   else
2679   {
2680     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2681         sizeof(default_socket.addr) - 1);
2682     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2683     open_listen_socket (&default_socket);
2684   }
2686   if (listen_fds_num < 1)
2687   {
2688     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2689     goto error;
2690   }
2692   if (!stay_foreground)
2693   {
2694     pid_t child;
2696     child = fork ();
2697     if (child < 0)
2698     {
2699       fprintf (stderr, "daemonize: fork(2) failed.\n");
2700       goto error;
2701     }
2702     else if (child > 0)
2703       exit(0);
2705     /* Become session leader */
2706     setsid ();
2708     /* Open the first three file descriptors to /dev/null */
2709     close (2);
2710     close (1);
2711     close (0);
2713     open ("/dev/null", O_RDWR);
2714     if (dup(0) == -1 || dup(0) == -1){
2715         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2716     }
2717   } /* if (!stay_foreground) */
2719   /* Change into the /tmp directory. */
2720   base_dir = (config_base_dir != NULL)
2721     ? config_base_dir
2722     : "/tmp";
2724   if (chdir (base_dir) != 0)
2725   {
2726     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2727     goto error;
2728   }
2730   install_signal_handlers();
2732   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2733   RRDD_LOG(LOG_INFO, "starting up");
2735   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2736                                 (GDestroyNotify) free_cache_item);
2737   if (cache_tree == NULL)
2738   {
2739     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2740     goto error;
2741   }
2743   return write_pidfile (pid_fd);
2745 error:
2746   remove_pidfile();
2747   return -1;
2748 } /* }}} int daemonize */
2750 static int cleanup (void) /* {{{ */
2752   pthread_cond_broadcast (&flush_cond);
2753   pthread_join (flush_thread, NULL);
2755   pthread_cond_broadcast (&queue_cond);
2756   for (int i = 0; i < config_queue_threads; i++)
2757     pthread_join (queue_threads[i], NULL);
2759   if (config_flush_at_shutdown)
2760   {
2761     assert(cache_queue_head == NULL);
2762     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2763   }
2765   free(queue_threads);
2766   free(config_base_dir);
2768   pthread_mutex_lock(&cache_lock);
2769   g_tree_destroy(cache_tree);
2771   pthread_mutex_lock(&journal_lock);
2772   journal_done();
2774   RRDD_LOG(LOG_INFO, "goodbye");
2775   closelog ();
2777   remove_pidfile ();
2778   free(config_pid_file);
2780   return (0);
2781 } /* }}} int cleanup */
2783 static int read_options (int argc, char **argv) /* {{{ */
2785   int option;
2786   int status = 0;
2788   socket_permission_clear (&default_socket);
2790   default_socket.socket_group = (gid_t)-1;
2791   default_socket.socket_permissions = (mode_t)-1;
2793   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2794   {
2795     switch (option)
2796     {
2797       case 'g':
2798         stay_foreground=1;
2799         break;
2801       case 'l':
2802       {
2803         listen_socket_t *new;
2805         new = malloc(sizeof(listen_socket_t));
2806         if (new == NULL)
2807         {
2808           fprintf(stderr, "read_options: malloc failed.\n");
2809           return(2);
2810         }
2811         memset(new, 0, sizeof(listen_socket_t));
2813         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2815         /* Add permissions to the socket {{{ */
2816         if (default_socket.permissions != 0)
2817         {
2818           socket_permission_copy (new, &default_socket);
2819         }
2820         else /* if (default_socket.permissions == 0) */
2821         {
2822           /* Add permission for ALL commands to the socket. */
2823           size_t i;
2824           for (i = 0; i < list_of_commands_len; i++)
2825           {
2826             status = socket_permission_add (new, list_of_commands[i].cmd);
2827             if (status != 0)
2828             {
2829               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2830                   "socket failed. This should never happen, ever! Sorry.\n",
2831                   list_of_commands[i].cmd);
2832               status = 4;
2833             }
2834           }
2835         }
2836         /* }}} Done adding permissions. */
2838         new->socket_group = default_socket.socket_group;
2839         new->socket_permissions = default_socket.socket_permissions;
2841         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2842                          &config_listen_address_list_len, new))
2843         {
2844           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2845           return (2);
2846         }
2847       }
2848       break;
2850       /* set socket group permissions */
2851       case 's':
2852       {
2853         gid_t group_gid;
2854         struct group *grp;
2856         group_gid = strtoul(optarg, NULL, 10);
2857         if (errno != EINVAL && group_gid>0)
2858         {
2859           /* we were passed a number */
2860           grp = getgrgid(group_gid);
2861         }
2862         else
2863         {
2864           grp = getgrnam(optarg);
2865         }
2867         if (grp)
2868         {
2869           default_socket.socket_group = grp->gr_gid;
2870         }
2871         else
2872         {
2873           /* no idea what the user wanted... */
2874           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2875           return (5);
2876         }
2877       }
2878       break;
2880       /* set socket file permissions */
2881       case 'm':
2882       {
2883         long  tmp;
2884         char *endptr = NULL;
2886         tmp = strtol (optarg, &endptr, 8);
2887         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2888             || (tmp > 07777) || (tmp < 0)) {
2889           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2890               optarg);
2891           return (5);
2892         }
2894         default_socket.socket_permissions = (mode_t)tmp;
2895       }
2896       break;
2898       case 'P':
2899       {
2900         char *optcopy;
2901         char *saveptr;
2902         char *dummy;
2903         char *ptr;
2905         socket_permission_clear (&default_socket);
2907         optcopy = strdup (optarg);
2908         dummy = optcopy;
2909         saveptr = NULL;
2910         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2911         {
2912           dummy = NULL;
2913           status = socket_permission_add (&default_socket, ptr);
2914           if (status != 0)
2915           {
2916             fprintf (stderr, "read_options: Adding permission \"%s\" to "
2917                 "socket failed. Most likely, this permission doesn't "
2918                 "exist. Check your command line.\n", ptr);
2919             status = 4;
2920           }
2921         }
2923         free (optcopy);
2924       }
2925       break;
2927       case 'f':
2928       {
2929         int temp;
2931         temp = atoi (optarg);
2932         if (temp > 0)
2933           config_flush_interval = temp;
2934         else
2935         {
2936           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2937           status = 3;
2938         }
2939       }
2940       break;
2942       case 'w':
2943       {
2944         int temp;
2946         temp = atoi (optarg);
2947         if (temp > 0)
2948           config_write_interval = temp;
2949         else
2950         {
2951           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2952           status = 2;
2953         }
2954       }
2955       break;
2957       case 'z':
2958       {
2959         int temp;
2961         temp = atoi(optarg);
2962         if (temp > 0)
2963           config_write_jitter = temp;
2964         else
2965         {
2966           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2967           status = 2;
2968         }
2970         break;
2971       }
2973       case 't':
2974       {
2975         int threads;
2976         threads = atoi(optarg);
2977         if (threads >= 1)
2978           config_queue_threads = threads;
2979         else
2980         {
2981           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2982           return 1;
2983         }
2984       }
2985       break;
2987       case 'B':
2988         config_write_base_only = 1;
2989         break;
2991       case 'b':
2992       {
2993         size_t len;
2994         char base_realpath[PATH_MAX];
2996         if (config_base_dir != NULL)
2997           free (config_base_dir);
2998         config_base_dir = strdup (optarg);
2999         if (config_base_dir == NULL)
3000         {
3001           fprintf (stderr, "read_options: strdup failed.\n");
3002           return (3);
3003         }
3005         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3006         {
3007           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3008               config_base_dir, rrd_strerror (errno));
3009           return (3);
3010         }
3012         /* make sure that the base directory is not resolved via
3013          * symbolic links.  this makes some performance-enhancing
3014          * assumptions possible (we don't have to resolve paths
3015          * that start with a "/")
3016          */
3017         if (realpath(config_base_dir, base_realpath) == NULL)
3018         {
3019           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3020               "%s\n", config_base_dir, rrd_strerror(errno));
3021           return 5;
3022         }
3024         len = strlen (config_base_dir);
3025         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3026         {
3027           config_base_dir[len - 1] = 0;
3028           len--;
3029         }
3031         if (len < 1)
3032         {
3033           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3034           return (4);
3035         }
3037         _config_base_dir_len = len;
3039         len = strlen (base_realpath);
3040         while ((len > 0) && (base_realpath[len - 1] == '/'))
3041         {
3042           base_realpath[len - 1] = '\0';
3043           len--;
3044         }
3046         if (strncmp(config_base_dir,
3047                          base_realpath, sizeof(base_realpath)) != 0)
3048         {
3049           fprintf(stderr,
3050                   "Base directory (-b) resolved via file system links!\n"
3051                   "Please consult rrdcached '-b' documentation!\n"
3052                   "Consider specifying the real directory (%s)\n",
3053                   base_realpath);
3054           return 5;
3055         }
3056       }
3057       break;
3059       case 'p':
3060       {
3061         if (config_pid_file != NULL)
3062           free (config_pid_file);
3063         config_pid_file = strdup (optarg);
3064         if (config_pid_file == NULL)
3065         {
3066           fprintf (stderr, "read_options: strdup failed.\n");
3067           return (3);
3068         }
3069       }
3070       break;
3072       case 'F':
3073         config_flush_at_shutdown = 1;
3074         break;
3076       case 'j':
3077       {
3078         char journal_dir_actual[PATH_MAX];
3079         const char *dir;
3080         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3082         status = rrd_mkdir_p(dir, 0777);
3083         if (status != 0)
3084         {
3085           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3086               dir, rrd_strerror(errno));
3087           return 6;
3088         }
3090         if (access(dir, R_OK|W_OK|X_OK) != 0)
3091         {
3092           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3093                   errno ? rrd_strerror(errno) : "");
3094           return 6;
3095         }
3096       }
3097       break;
3099       case 'h':
3100       case '?':
3101         printf ("RRDCacheD %s\n"
3102             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3103             "\n"
3104             "Usage: rrdcached [options]\n"
3105             "\n"
3106             "Valid options are:\n"
3107             "  -l <address>  Socket address to listen to.\n"
3108             "  -P <perms>    Sets the permissions to assign to all following "
3109                             "sockets\n"
3110             "  -w <seconds>  Interval in which to write data.\n"
3111             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3112             "  -t <threads>  Number of write threads.\n"
3113             "  -f <seconds>  Interval in which to flush dead data.\n"
3114             "  -p <file>     Location of the PID-file.\n"
3115             "  -b <dir>      Base directory to change to.\n"
3116             "  -B            Restrict file access to paths within -b <dir>\n"
3117             "  -g            Do not fork and run in the foreground.\n"
3118             "  -j <dir>      Directory in which to create the journal files.\n"
3119             "  -F            Always flush all updates at shutdown\n"
3120             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3121             "                (the socket will also have read/write permissions "
3122                             "for that group)\n"
3123             "  -m <mode>     File permissions (octal) of all following UNIX "
3124                             "sockets\n"
3125             "\n"
3126             "For more information and a detailed description of all options "
3127             "please refer\n"
3128             "to the rrdcached(1) manual page.\n",
3129             VERSION);
3130         if (option == 'h')
3131           status = -1;
3132         else
3133           status = 1;
3134         break;
3135     } /* switch (option) */
3136   } /* while (getopt) */
3138   /* advise the user when values are not sane */
3139   if (config_flush_interval < 2 * config_write_interval)
3140     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3141             " 2x write interval (-w) !\n");
3142   if (config_write_jitter > config_write_interval)
3143     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3144             " write interval (-w) !\n");
3146   if (config_write_base_only && config_base_dir == NULL)
3147     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3148             "  Consult the rrdcached documentation\n");
3150   if (journal_dir == NULL)
3151     config_flush_at_shutdown = 1;
3153   return (status);
3154 } /* }}} int read_options */
3156 int main (int argc, char **argv)
3158   int status;
3160   status = read_options (argc, argv);
3161   if (status != 0)
3162   {
3163     if (status < 0)
3164       status = 0;
3165     return (status);
3166   }
3168   status = daemonize ();
3169   if (status != 0)
3170   {
3171     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3172     return (1);
3173   }
3175   journal_init();
3177   /* start the queue threads */
3178   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3179   if (queue_threads == NULL)
3180   {
3181     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3182     cleanup();
3183     return (1);
3184   }
3185   for (int i = 0; i < config_queue_threads; i++)
3186   {
3187     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3188     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3189     if (status != 0)
3190     {
3191       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3192       cleanup();
3193       return (1);
3194     }
3195   }
3197   /* start the flush thread */
3198   memset(&flush_thread, 0, sizeof(flush_thread));
3199   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3200   if (status != 0)
3201   {
3202     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3203     cleanup();
3204     return (1);
3205   }
3207   listen_thread_main (NULL);
3208   cleanup ();
3210   return (0);
3211 } /* int main */
3213 /*
3214  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3215  */