Code

make rrdtool compile with gcc 4.5 if -std=c99 is set -- anicka@suse.cz
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #include <glib-2.0/glib.h>
113 /* }}} */
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) \
118       fprintf(stderr, __VA_ARGS__); \
119     syslog ((severity), __VA_ARGS__); \
120   } while (0)
122 /*
123  * Types
124  */
125 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127 struct listen_socket_s
129   int fd;
130   char addr[PATH_MAX + 1];
131   int family;
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
142   char *wbuf;
143   ssize_t wbuf_len;
145   uint32_t permissions;
147   gid_t  socket_group;
148   mode_t socket_permissions;
149 };
150 typedef struct listen_socket_s listen_socket_t;
152 struct command_s;
153 typedef struct command_s command_t;
154 /* note: guard against "unused" warnings in the handlers */
155 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
156                         time_t UNUSED(now),\
157                         char  UNUSED(*buffer),\
158                         size_t UNUSED(buffer_size)
160 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
161                         DISPATCH_PROTO
163 struct command_s {
164   char   *cmd;
165   int (*handler)(HANDLER_PROTO);
167   char  context;                /* where we expect to see it */
168 #define CMD_CONTEXT_CLIENT      (1<<0)
169 #define CMD_CONTEXT_BATCH       (1<<1)
170 #define CMD_CONTEXT_JOURNAL     (1<<2)
171 #define CMD_CONTEXT_ANY         (0x7f)
173   char *syntax;
174   char *help;
175 };
177 struct cache_item_s;
178 typedef struct cache_item_s cache_item_t;
179 struct cache_item_s
181   char *file;
182   char **values;
183   size_t values_num;
184   time_t last_flush_time;
185   time_t last_update_stamp;
186 #define CI_FLAGS_IN_TREE  (1<<0)
187 #define CI_FLAGS_IN_QUEUE (1<<1)
188   int flags;
189   pthread_cond_t  flushed;
190   cache_item_t *prev;
191   cache_item_t *next;
192 };
194 struct callback_flush_data_s
196   time_t now;
197   time_t abs_timeout;
198   char **keys;
199   size_t keys_num;
200 };
201 typedef struct callback_flush_data_s callback_flush_data_t;
203 enum queue_side_e
205   HEAD,
206   TAIL
207 };
208 typedef enum queue_side_e queue_side_t;
210 /* describe a set of journal files */
211 typedef struct {
212   char **files;
213   size_t files_num;
214 } journal_set;
216 /* max length of socket command or response */
217 #define CMD_MAX 4096
218 #define RBUF_SIZE (CMD_MAX*2)
220 /*
221  * Variables
222  */
223 static int stay_foreground = 0;
224 static uid_t daemon_uid;
226 static listen_socket_t *listen_fds = NULL;
227 static size_t listen_fds_num = 0;
229 enum {
230   RUNNING,              /* normal operation */
231   FLUSHING,             /* flushing remaining values */
232   SHUTDOWN              /* shutting down */
233 } state = RUNNING;
235 static pthread_t *queue_threads;
236 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
237 static int config_queue_threads = 4;
239 static pthread_t flush_thread;
240 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
242 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
243 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
244 static int connection_threads_num = 0;
246 /* Cache stuff */
247 static GTree          *cache_tree = NULL;
248 static cache_item_t   *cache_queue_head = NULL;
249 static cache_item_t   *cache_queue_tail = NULL;
250 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
252 static int config_write_interval = 300;
253 static int config_write_jitter   = 0;
254 static int config_flush_interval = 3600;
255 static int config_flush_at_shutdown = 0;
256 static char *config_pid_file = NULL;
257 static char *config_base_dir = NULL;
258 static size_t _config_base_dir_len = 0;
259 static int config_write_base_only = 0;
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
273 /* Journaled updates */
274 #define JOURNAL_REPLAY(s) ((s) == NULL)
275 #define JOURNAL_BASE "rrd.journal"
276 static journal_set *journal_cur = NULL;
277 static journal_set *journal_old = NULL;
278 static char *journal_dir = NULL;
279 static FILE *journal_fh = NULL;         /* current journal file handle */
280 static long  journal_size = 0;          /* current journal size */
281 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
282 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
283 static int journal_write(char *cmd, char *args);
284 static void journal_done(void);
285 static void journal_rotate(void);
287 /* prototypes for forward refernces */
288 static int handle_request_help (HANDLER_PROTO);
290 /* 
291  * Functions
292  */
293 static void sig_common (const char *sig) /* {{{ */
295   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
296   state = FLUSHING;
297   pthread_cond_broadcast(&flush_cond);
298   pthread_cond_broadcast(&queue_cond);
299 } /* }}} void sig_common */
301 static void sig_int_handler (int UNUSED(s)) /* {{{ */
303   sig_common("INT");
304 } /* }}} void sig_int_handler */
306 static void sig_term_handler (int UNUSED(s)) /* {{{ */
308   sig_common("TERM");
309 } /* }}} void sig_term_handler */
311 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
313   config_flush_at_shutdown = 1;
314   sig_common("USR1");
315 } /* }}} void sig_usr1_handler */
317 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
319   config_flush_at_shutdown = 0;
320   sig_common("USR2");
321 } /* }}} void sig_usr2_handler */
323 static void install_signal_handlers(void) /* {{{ */
325   /* These structures are static, because `sigaction' behaves weird if the are
326    * overwritten.. */
327   static struct sigaction sa_int;
328   static struct sigaction sa_term;
329   static struct sigaction sa_pipe;
330   static struct sigaction sa_usr1;
331   static struct sigaction sa_usr2;
333   /* Install signal handlers */
334   memset (&sa_int, 0, sizeof (sa_int));
335   sa_int.sa_handler = sig_int_handler;
336   sigaction (SIGINT, &sa_int, NULL);
338   memset (&sa_term, 0, sizeof (sa_term));
339   sa_term.sa_handler = sig_term_handler;
340   sigaction (SIGTERM, &sa_term, NULL);
342   memset (&sa_pipe, 0, sizeof (sa_pipe));
343   sa_pipe.sa_handler = SIG_IGN;
344   sigaction (SIGPIPE, &sa_pipe, NULL);
346   memset (&sa_pipe, 0, sizeof (sa_usr1));
347   sa_usr1.sa_handler = sig_usr1_handler;
348   sigaction (SIGUSR1, &sa_usr1, NULL);
350   memset (&sa_usr2, 0, sizeof (sa_usr2));
351   sa_usr2.sa_handler = sig_usr2_handler;
352   sigaction (SIGUSR2, &sa_usr2, NULL);
354 } /* }}} void install_signal_handlers */
356 static int open_pidfile(char *action, int oflag) /* {{{ */
358   int fd;
359   const char *file;
360   char *file_copy, *dir;
362   file = (config_pid_file != NULL)
363     ? config_pid_file
364     : LOCALSTATEDIR "/run/rrdcached.pid";
366   /* dirname may modify its argument */
367   file_copy = strdup(file);
368   if (file_copy == NULL)
369   {
370     fprintf(stderr, "rrdcached: strdup(): %s\n",
371         rrd_strerror(errno));
372     return -1;
373   }
375   dir = dirname(file_copy);
376   if (rrd_mkdir_p(dir, 0777) != 0)
377   {
378     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
379         dir, rrd_strerror(errno));
380     return -1;
381   }
383   free(file_copy);
385   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
386   if (fd < 0)
387     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
388             action, file, rrd_strerror(errno));
390   return(fd);
391 } /* }}} static int open_pidfile */
393 /* check existing pid file to see whether a daemon is running */
394 static int check_pidfile(void)
396   int pid_fd;
397   pid_t pid;
398   char pid_str[16];
400   pid_fd = open_pidfile("open", O_RDWR);
401   if (pid_fd < 0)
402     return pid_fd;
404   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
405     return -1;
407   pid = atoi(pid_str);
408   if (pid <= 0)
409     return -1;
411   /* another running process that we can signal COULD be
412    * a competing rrdcached */
413   if (pid != getpid() && kill(pid, 0) == 0)
414   {
415     fprintf(stderr,
416             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
417     close(pid_fd);
418     return -1;
419   }
421   lseek(pid_fd, 0, SEEK_SET);
422   if (ftruncate(pid_fd, 0) == -1)
423   {
424     fprintf(stderr,
425             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
426     close(pid_fd);
427     return -1;
428   }
430   fprintf(stderr,
431           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
432           "rrdcached: starting normally.\n", pid);
434   return pid_fd;
435 } /* }}} static int check_pidfile */
437 static int write_pidfile (int fd) /* {{{ */
439   pid_t pid;
440   FILE *fh;
442   pid = getpid ();
444   fh = fdopen (fd, "w");
445   if (fh == NULL)
446   {
447     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
448     close(fd);
449     return (-1);
450   }
452   fprintf (fh, "%i\n", (int) pid);
453   fclose (fh);
455   return (0);
456 } /* }}} int write_pidfile */
458 static int remove_pidfile (void) /* {{{ */
460   char *file;
461   int status;
463   file = (config_pid_file != NULL)
464     ? config_pid_file
465     : LOCALSTATEDIR "/run/rrdcached.pid";
467   status = unlink (file);
468   if (status == 0)
469     return (0);
470   return (errno);
471 } /* }}} int remove_pidfile */
473 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
475   char *eol;
477   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
478                sock->next_read - sock->next_cmd);
480   if (eol == NULL)
481   {
482     /* no commands left, move remainder back to front of rbuf */
483     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
484             sock->next_read - sock->next_cmd);
485     sock->next_read -= sock->next_cmd;
486     sock->next_cmd = 0;
487     *len = 0;
488     return NULL;
489   }
490   else
491   {
492     char *cmd = sock->rbuf + sock->next_cmd;
493     *eol = '\0';
495     sock->next_cmd = eol - sock->rbuf + 1;
497     if (eol > sock->rbuf && *(eol-1) == '\r')
498       *(--eol) = '\0'; /* handle "\r\n" EOL */
500     *len = eol - cmd;
502     return cmd;
503   }
505   /* NOTREACHED */
506   assert(1==0);
507 } /* }}} char *next_cmd */
509 /* add the characters directly to the write buffer */
510 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
512   char *new_buf;
514   assert(sock != NULL);
516   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
517   if (new_buf == NULL)
518   {
519     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
520     return -1;
521   }
523   strncpy(new_buf + sock->wbuf_len, str, len + 1);
525   sock->wbuf = new_buf;
526   sock->wbuf_len += len;
528   return 0;
529 } /* }}} static int add_to_wbuf */
531 /* add the text to the "extra" info that's sent after the status line */
532 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
534   va_list argp;
535   char buffer[CMD_MAX];
536   int len;
538   if (JOURNAL_REPLAY(sock)) return 0;
539   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
541   va_start(argp, fmt);
542 #ifdef HAVE_VSNPRINTF
543   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
544 #else
545   len = vsprintf(buffer, fmt, argp);
546 #endif
547   va_end(argp);
548   if (len < 0)
549   {
550     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
551     return -1;
552   }
554   return add_to_wbuf(sock, buffer, len);
555 } /* }}} static int add_response_info */
557 static int count_lines(char *str) /* {{{ */
559   int lines = 0;
561   if (str != NULL)
562   {
563     while ((str = strchr(str, '\n')) != NULL)
564     {
565       ++lines;
566       ++str;
567     }
568   }
570   return lines;
571 } /* }}} static int count_lines */
573 /* send the response back to the user.
574  * returns 0 on success, -1 on error
575  * write buffer is always zeroed after this call */
576 static int send_response (listen_socket_t *sock, response_code rc,
577                           char *fmt, ...) /* {{{ */
579   va_list argp;
580   char buffer[CMD_MAX];
581   int lines;
582   ssize_t wrote;
583   int rclen, len;
585   if (JOURNAL_REPLAY(sock)) return rc;
587   if (sock->batch_start)
588   {
589     if (rc == RESP_OK)
590       return rc; /* no response on success during BATCH */
591     lines = sock->batch_cmd;
592   }
593   else if (rc == RESP_OK)
594     lines = count_lines(sock->wbuf);
595   else
596     lines = -1;
598   rclen = sprintf(buffer, "%d ", lines);
599   va_start(argp, fmt);
600 #ifdef HAVE_VSNPRINTF
601   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
602 #else
603   len = vsprintf(buffer+rclen, fmt, argp);
604 #endif
605   va_end(argp);
606   if (len < 0)
607     return -1;
609   len += rclen;
611   /* append the result to the wbuf, don't write to the user */
612   if (sock->batch_start)
613     return add_to_wbuf(sock, buffer, len);
615   /* first write must be complete */
616   if (len != write(sock->fd, buffer, len))
617   {
618     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
619     return -1;
620   }
622   if (sock->wbuf != NULL && rc == RESP_OK)
623   {
624     wrote = 0;
625     while (wrote < sock->wbuf_len)
626     {
627       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
628       if (wb <= 0)
629       {
630         RRDD_LOG(LOG_INFO, "send_response: could not write results");
631         return -1;
632       }
633       wrote += wb;
634     }
635   }
637   free(sock->wbuf); sock->wbuf = NULL;
638   sock->wbuf_len = 0;
640   return 0;
641 } /* }}} */
643 static void wipe_ci_values(cache_item_t *ci, time_t when)
645   ci->values = NULL;
646   ci->values_num = 0;
648   ci->last_flush_time = when;
649   if (config_write_jitter > 0)
650     ci->last_flush_time += (rrd_random() % config_write_jitter);
653 /* remove_from_queue
654  * remove a "cache_item_t" item from the queue.
655  * must hold 'cache_lock' when calling this
656  */
657 static void remove_from_queue(cache_item_t *ci) /* {{{ */
659   if (ci == NULL) return;
660   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
662   if (ci->prev == NULL)
663     cache_queue_head = ci->next; /* reset head */
664   else
665     ci->prev->next = ci->next;
667   if (ci->next == NULL)
668     cache_queue_tail = ci->prev; /* reset the tail */
669   else
670     ci->next->prev = ci->prev;
672   ci->next = ci->prev = NULL;
673   ci->flags &= ~CI_FLAGS_IN_QUEUE;
675   pthread_mutex_lock (&stats_lock);
676   assert (stats_queue_length > 0);
677   stats_queue_length--;
678   pthread_mutex_unlock (&stats_lock);
680 } /* }}} static void remove_from_queue */
682 /* free the resources associated with the cache_item_t
683  * must hold cache_lock when calling this function
684  */
685 static void *free_cache_item(cache_item_t *ci) /* {{{ */
687   if (ci == NULL) return NULL;
689   remove_from_queue(ci);
691   for (size_t i=0; i < ci->values_num; i++)
692     free(ci->values[i]);
694   free (ci->values);
695   free (ci->file);
697   /* in case anyone is waiting */
698   pthread_cond_broadcast(&ci->flushed);
699   pthread_cond_destroy(&ci->flushed);
701   free (ci);
703   return NULL;
704 } /* }}} static void *free_cache_item */
706 /*
707  * enqueue_cache_item:
708  * `cache_lock' must be acquired before calling this function!
709  */
710 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
711     queue_side_t side)
713   if (ci == NULL)
714     return (-1);
716   if (ci->values_num == 0)
717     return (0);
719   if (side == HEAD)
720   {
721     if (cache_queue_head == ci)
722       return 0;
724     /* remove if further down in queue */
725     remove_from_queue(ci);
727     ci->prev = NULL;
728     ci->next = cache_queue_head;
729     if (ci->next != NULL)
730       ci->next->prev = ci;
731     cache_queue_head = ci;
733     if (cache_queue_tail == NULL)
734       cache_queue_tail = cache_queue_head;
735   }
736   else /* (side == TAIL) */
737   {
738     /* We don't move values back in the list.. */
739     if (ci->flags & CI_FLAGS_IN_QUEUE)
740       return (0);
742     assert (ci->next == NULL);
743     assert (ci->prev == NULL);
745     ci->prev = cache_queue_tail;
747     if (cache_queue_tail == NULL)
748       cache_queue_head = ci;
749     else
750       cache_queue_tail->next = ci;
752     cache_queue_tail = ci;
753   }
755   ci->flags |= CI_FLAGS_IN_QUEUE;
757   pthread_cond_signal(&queue_cond);
758   pthread_mutex_lock (&stats_lock);
759   stats_queue_length++;
760   pthread_mutex_unlock (&stats_lock);
762   return (0);
763 } /* }}} int enqueue_cache_item */
765 /*
766  * tree_callback_flush:
767  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
768  * while this is in progress.
769  */
770 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
771     gpointer data)
773   cache_item_t *ci;
774   callback_flush_data_t *cfd;
776   ci = (cache_item_t *) value;
777   cfd = (callback_flush_data_t *) data;
779   if (ci->flags & CI_FLAGS_IN_QUEUE)
780     return FALSE;
782   if (ci->values_num > 0
783       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
784   {
785     enqueue_cache_item (ci, TAIL);
786   }
787   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
788       && (ci->values_num <= 0))
789   {
790     assert ((char *) key == ci->file);
791     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
792     {
793       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
794       return (FALSE);
795     }
796   }
798   return (FALSE);
799 } /* }}} gboolean tree_callback_flush */
801 static int flush_old_values (int max_age)
803   callback_flush_data_t cfd;
804   size_t k;
806   memset (&cfd, 0, sizeof (cfd));
807   /* Pass the current time as user data so that we don't need to call
808    * `time' for each node. */
809   cfd.now = time (NULL);
810   cfd.keys = NULL;
811   cfd.keys_num = 0;
813   if (max_age > 0)
814     cfd.abs_timeout = cfd.now - max_age;
815   else
816     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
818   /* `tree_callback_flush' will return the keys of all values that haven't
819    * been touched in the last `config_flush_interval' seconds in `cfd'.
820    * The char*'s in this array point to the same memory as ci->file, so we
821    * don't need to free them separately. */
822   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
824   for (k = 0; k < cfd.keys_num; k++)
825   {
826     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
827     /* should never fail, since we have held the cache_lock
828      * the entire time */
829     assert(status == TRUE);
830   }
832   if (cfd.keys != NULL)
833   {
834     free (cfd.keys);
835     cfd.keys = NULL;
836   }
838   return (0);
839 } /* int flush_old_values */
841 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
843   struct timeval now;
844   struct timespec next_flush;
845   int status;
847   gettimeofday (&now, NULL);
848   next_flush.tv_sec = now.tv_sec + config_flush_interval;
849   next_flush.tv_nsec = 1000 * now.tv_usec;
851   pthread_mutex_lock(&cache_lock);
853   while (state == RUNNING)
854   {
855     gettimeofday (&now, NULL);
856     if ((now.tv_sec > next_flush.tv_sec)
857         || ((now.tv_sec == next_flush.tv_sec)
858           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
859     {
860       RRDD_LOG(LOG_DEBUG, "flushing old values");
862       /* Determine the time of the next cache flush. */
863       next_flush.tv_sec = now.tv_sec + config_flush_interval;
865       /* Flush all values that haven't been written in the last
866        * `config_write_interval' seconds. */
867       flush_old_values (config_write_interval);
869       /* unlock the cache while we rotate so we don't block incoming
870        * updates if the fsync() blocks on disk I/O */
871       pthread_mutex_unlock(&cache_lock);
872       journal_rotate();
873       pthread_mutex_lock(&cache_lock);
874     }
876     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
877     if (status != 0 && status != ETIMEDOUT)
878     {
879       RRDD_LOG (LOG_ERR, "flush_thread_main: "
880                 "pthread_cond_timedwait returned %i.", status);
881     }
882   }
884   if (config_flush_at_shutdown)
885     flush_old_values (-1); /* flush everything */
887   state = SHUTDOWN;
889   pthread_mutex_unlock(&cache_lock);
891   return NULL;
892 } /* void *flush_thread_main */
894 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
896   pthread_mutex_lock (&cache_lock);
898   while (state != SHUTDOWN
899          || (cache_queue_head != NULL && config_flush_at_shutdown))
900   {
901     cache_item_t *ci;
902     char *file;
903     char **values;
904     size_t values_num;
905     int status;
907     /* Now, check if there's something to store away. If not, wait until
908      * something comes in. */
909     if (cache_queue_head == NULL)
910     {
911       status = pthread_cond_wait (&queue_cond, &cache_lock);
912       if ((status != 0) && (status != ETIMEDOUT))
913       {
914         RRDD_LOG (LOG_ERR, "queue_thread_main: "
915             "pthread_cond_wait returned %i.", status);
916       }
917     }
919     /* Check if a value has arrived. This may be NULL if we timed out or there
920      * was an interrupt such as a signal. */
921     if (cache_queue_head == NULL)
922       continue;
924     ci = cache_queue_head;
926     /* copy the relevant parts */
927     file = strdup (ci->file);
928     if (file == NULL)
929     {
930       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
931       continue;
932     }
934     assert(ci->values != NULL);
935     assert(ci->values_num > 0);
937     values = ci->values;
938     values_num = ci->values_num;
940     wipe_ci_values(ci, time(NULL));
941     remove_from_queue(ci);
943     pthread_mutex_unlock (&cache_lock);
945     rrd_clear_error ();
946     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
947     if (status != 0)
948     {
949       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
950           "rrd_update_r (%s) failed with status %i. (%s)",
951           file, status, rrd_get_error());
952     }
954     journal_write("wrote", file);
956     /* Search again in the tree.  It's possible someone issued a "FORGET"
957      * while we were writing the update values. */
958     pthread_mutex_lock(&cache_lock);
959     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
960     if (ci)
961       pthread_cond_broadcast(&ci->flushed);
962     pthread_mutex_unlock(&cache_lock);
964     if (status == 0)
965     {
966       pthread_mutex_lock (&stats_lock);
967       stats_updates_written++;
968       stats_data_sets_written += values_num;
969       pthread_mutex_unlock (&stats_lock);
970     }
972     rrd_free_ptrs((void ***) &values, &values_num);
973     free(file);
975     pthread_mutex_lock (&cache_lock);
976   }
977   pthread_mutex_unlock (&cache_lock);
979   return (NULL);
980 } /* }}} void *queue_thread_main */
982 static int buffer_get_field (char **buffer_ret, /* {{{ */
983     size_t *buffer_size_ret, char **field_ret)
985   char *buffer;
986   size_t buffer_pos;
987   size_t buffer_size;
988   char *field;
989   size_t field_size;
990   int status;
992   buffer = *buffer_ret;
993   buffer_pos = 0;
994   buffer_size = *buffer_size_ret;
995   field = *buffer_ret;
996   field_size = 0;
998   if (buffer_size <= 0)
999     return (-1);
1001   /* This is ensured by `handle_request'. */
1002   assert (buffer[buffer_size - 1] == '\0');
1004   status = -1;
1005   while (buffer_pos < buffer_size)
1006   {
1007     /* Check for end-of-field or end-of-buffer */
1008     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1009     {
1010       field[field_size] = 0;
1011       field_size++;
1012       buffer_pos++;
1013       status = 0;
1014       break;
1015     }
1016     /* Handle escaped characters. */
1017     else if (buffer[buffer_pos] == '\\')
1018     {
1019       if (buffer_pos >= (buffer_size - 1))
1020         break;
1021       buffer_pos++;
1022       field[field_size] = buffer[buffer_pos];
1023       field_size++;
1024       buffer_pos++;
1025     }
1026     /* Normal operation */ 
1027     else
1028     {
1029       field[field_size] = buffer[buffer_pos];
1030       field_size++;
1031       buffer_pos++;
1032     }
1033   } /* while (buffer_pos < buffer_size) */
1035   if (status != 0)
1036     return (status);
1038   *buffer_ret = buffer + buffer_pos;
1039   *buffer_size_ret = buffer_size - buffer_pos;
1040   *field_ret = field;
1042   return (0);
1043 } /* }}} int buffer_get_field */
1045 /* if we're restricting writes to the base directory,
1046  * check whether the file falls within the dir
1047  * returns 1 if OK, otherwise 0
1048  */
1049 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1051   assert(file != NULL);
1053   if (!config_write_base_only
1054       || JOURNAL_REPLAY(sock)
1055       || config_base_dir == NULL)
1056     return 1;
1058   if (strstr(file, "../") != NULL) goto err;
1060   /* relative paths without "../" are ok */
1061   if (*file != '/') return 1;
1063   /* file must be of the format base + "/" + <1+ char filename> */
1064   if (strlen(file) < _config_base_dir_len + 2) goto err;
1065   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1066   if (*(file + _config_base_dir_len) != '/') goto err;
1068   return 1;
1070 err:
1071   if (sock != NULL && sock->fd >= 0)
1072     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1074   return 0;
1075 } /* }}} static int check_file_access */
1077 /* when using a base dir, convert relative paths to absolute paths.
1078  * if necessary, modifies the "filename" pointer to point
1079  * to the new path created in "tmp".  "tmp" is provided
1080  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1081  *
1082  * this allows us to optimize for the expected case (absolute path)
1083  * with a no-op.
1084  */
1085 static void get_abs_path(char **filename, char *tmp)
1087   assert(tmp != NULL);
1088   assert(filename != NULL && *filename != NULL);
1090   if (config_base_dir == NULL || **filename == '/')
1091     return;
1093   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1094   *filename = tmp;
1095 } /* }}} static int get_abs_path */
1097 static int flush_file (const char *filename) /* {{{ */
1099   cache_item_t *ci;
1101   pthread_mutex_lock (&cache_lock);
1103   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1104   if (ci == NULL)
1105   {
1106     pthread_mutex_unlock (&cache_lock);
1107     return (ENOENT);
1108   }
1110   if (ci->values_num > 0)
1111   {
1112     /* Enqueue at head */
1113     enqueue_cache_item (ci, HEAD);
1114     pthread_cond_wait(&ci->flushed, &cache_lock);
1115   }
1117   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1118    * may have been purged during our cond_wait() */
1120   pthread_mutex_unlock(&cache_lock);
1122   return (0);
1123 } /* }}} int flush_file */
1125 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1127   char *err = "Syntax error.\n";
1129   if (cmd && cmd->syntax)
1130     err = cmd->syntax;
1132   return send_response(sock, RESP_ERR, "Usage: %s", err);
1133 } /* }}} static int syntax_error() */
1135 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1137   uint64_t copy_queue_length;
1138   uint64_t copy_updates_received;
1139   uint64_t copy_flush_received;
1140   uint64_t copy_updates_written;
1141   uint64_t copy_data_sets_written;
1142   uint64_t copy_journal_bytes;
1143   uint64_t copy_journal_rotate;
1145   uint64_t tree_nodes_number;
1146   uint64_t tree_depth;
1148   pthread_mutex_lock (&stats_lock);
1149   copy_queue_length       = stats_queue_length;
1150   copy_updates_received   = stats_updates_received;
1151   copy_flush_received     = stats_flush_received;
1152   copy_updates_written    = stats_updates_written;
1153   copy_data_sets_written  = stats_data_sets_written;
1154   copy_journal_bytes      = stats_journal_bytes;
1155   copy_journal_rotate     = stats_journal_rotate;
1156   pthread_mutex_unlock (&stats_lock);
1158   pthread_mutex_lock (&cache_lock);
1159   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1161   pthread_mutex_unlock (&cache_lock);
1163   add_response_info(sock,
1164                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1165   add_response_info(sock,
1166                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167   add_response_info(sock,
1168                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169   add_response_info(sock,
1170                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171   add_response_info(sock,
1172                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1178   send_response(sock, RESP_OK, "Statistics follow\n");
1180   return (0);
1181 } /* }}} int handle_request_stats */
1183 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1185   char *file, file_tmp[PATH_MAX];
1186   int status;
1188   status = buffer_get_field (&buffer, &buffer_size, &file);
1189   if (status != 0)
1190   {
1191     return syntax_error(sock,cmd);
1192   }
1193   else
1194   {
1195     pthread_mutex_lock(&stats_lock);
1196     stats_flush_received++;
1197     pthread_mutex_unlock(&stats_lock);
1199     get_abs_path(&file, file_tmp);
1200     if (!check_file_access(file, sock)) return 0;
1202     status = flush_file (file);
1203     if (status == 0)
1204       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205     else if (status == ENOENT)
1206     {
1207       /* no file in our tree; see whether it exists at all */
1208       struct stat statbuf;
1210       memset(&statbuf, 0, sizeof(statbuf));
1211       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1213       else
1214         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1215     }
1216     else if (status < 0)
1217       return send_response(sock, RESP_ERR, "Internal error.\n");
1218     else
1219       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1220   }
1222   /* NOTREACHED */
1223   assert(1==0);
1224 } /* }}} int handle_request_flush */
1226 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1228   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1230   pthread_mutex_lock(&cache_lock);
1231   flush_old_values(-1);
1232   pthread_mutex_unlock(&cache_lock);
1234   return send_response(sock, RESP_OK, "Started flush.\n");
1235 } /* }}} static int handle_request_flushall */
1237 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1239   int status;
1240   char *file, file_tmp[PATH_MAX];
1241   cache_item_t *ci;
1243   status = buffer_get_field(&buffer, &buffer_size, &file);
1244   if (status != 0)
1245     return syntax_error(sock,cmd);
1247   get_abs_path(&file, file_tmp);
1249   pthread_mutex_lock(&cache_lock);
1250   ci = g_tree_lookup(cache_tree, file);
1251   if (ci == NULL)
1252   {
1253     pthread_mutex_unlock(&cache_lock);
1254     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1255   }
1257   for (size_t i=0; i < ci->values_num; i++)
1258     add_response_info(sock, "%s\n", ci->values[i]);
1260   pthread_mutex_unlock(&cache_lock);
1261   return send_response(sock, RESP_OK, "updates pending\n");
1262 } /* }}} static int handle_request_pending */
1264 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1266   int status;
1267   gboolean found;
1268   char *file, file_tmp[PATH_MAX];
1270   status = buffer_get_field(&buffer, &buffer_size, &file);
1271   if (status != 0)
1272     return syntax_error(sock,cmd);
1274   get_abs_path(&file, file_tmp);
1275   if (!check_file_access(file, sock)) return 0;
1277   pthread_mutex_lock(&cache_lock);
1278   found = g_tree_remove(cache_tree, file);
1279   pthread_mutex_unlock(&cache_lock);
1281   if (found == TRUE)
1282   {
1283     if (!JOURNAL_REPLAY(sock))
1284       journal_write("forget", file);
1286     return send_response(sock, RESP_OK, "Gone!\n");
1287   }
1288   else
1289     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1291   /* NOTREACHED */
1292   assert(1==0);
1293 } /* }}} static int handle_request_forget */
1295 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1297   cache_item_t *ci;
1299   pthread_mutex_lock(&cache_lock);
1301   ci = cache_queue_head;
1302   while (ci != NULL)
1303   {
1304     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1305     ci = ci->next;
1306   }
1308   pthread_mutex_unlock(&cache_lock);
1310   return send_response(sock, RESP_OK, "in queue.\n");
1311 } /* }}} int handle_request_queue */
1313 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1315   char *file, file_tmp[PATH_MAX];
1316   int values_num = 0;
1317   int status;
1318   char orig_buf[CMD_MAX];
1320   cache_item_t *ci;
1322   /* save it for the journal later */
1323   if (!JOURNAL_REPLAY(sock))
1324     strncpy(orig_buf, buffer, buffer_size);
1326   status = buffer_get_field (&buffer, &buffer_size, &file);
1327   if (status != 0)
1328     return syntax_error(sock,cmd);
1330   pthread_mutex_lock(&stats_lock);
1331   stats_updates_received++;
1332   pthread_mutex_unlock(&stats_lock);
1334   get_abs_path(&file, file_tmp);
1335   if (!check_file_access(file, sock)) return 0;
1337   pthread_mutex_lock (&cache_lock);
1338   ci = g_tree_lookup (cache_tree, file);
1340   if (ci == NULL) /* {{{ */
1341   {
1342     struct stat statbuf;
1343     cache_item_t *tmp;
1345     /* don't hold the lock while we setup; stat(2) might block */
1346     pthread_mutex_unlock(&cache_lock);
1348     memset (&statbuf, 0, sizeof (statbuf));
1349     status = stat (file, &statbuf);
1350     if (status != 0)
1351     {
1352       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1354       status = errno;
1355       if (status == ENOENT)
1356         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1357       else
1358         return send_response(sock, RESP_ERR,
1359                              "stat failed with error %i.\n", status);
1360     }
1361     if (!S_ISREG (statbuf.st_mode))
1362       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1364     if (access(file, R_OK|W_OK) != 0)
1365       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1366                            file, rrd_strerror(errno));
1368     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1369     if (ci == NULL)
1370     {
1371       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1373       return send_response(sock, RESP_ERR, "malloc failed.\n");
1374     }
1375     memset (ci, 0, sizeof (cache_item_t));
1377     ci->file = strdup (file);
1378     if (ci->file == NULL)
1379     {
1380       free (ci);
1381       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1383       return send_response(sock, RESP_ERR, "strdup failed.\n");
1384     }
1386     wipe_ci_values(ci, now);
1387     ci->flags = CI_FLAGS_IN_TREE;
1388     pthread_cond_init(&ci->flushed, NULL);
1390     pthread_mutex_lock(&cache_lock);
1392     /* another UPDATE might have added this entry in the meantime */
1393     tmp = g_tree_lookup (cache_tree, file);
1394     if (tmp == NULL)
1395       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1396     else
1397     {
1398       free_cache_item (ci);
1399       ci = tmp;
1400     }
1402     /* state may have changed while we were unlocked */
1403     if (state == SHUTDOWN)
1404       return -1;
1405   } /* }}} */
1406   assert (ci != NULL);
1408   /* don't re-write updates in replay mode */
1409   if (!JOURNAL_REPLAY(sock))
1410     journal_write("update", orig_buf);
1412   while (buffer_size > 0)
1413   {
1414     char *value;
1415     time_t stamp;
1416     char *eostamp;
1418     status = buffer_get_field (&buffer, &buffer_size, &value);
1419     if (status != 0)
1420     {
1421       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1422       break;
1423     }
1425     /* make sure update time is always moving forward */
1426     stamp = strtol(value, &eostamp, 10);
1427     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1428     {
1429       pthread_mutex_unlock(&cache_lock);
1430       return send_response(sock, RESP_ERR,
1431                            "Cannot find timestamp in '%s'!\n", value);
1432     }
1433     else if (stamp <= ci->last_update_stamp)
1434     {
1435       pthread_mutex_unlock(&cache_lock);
1436       return send_response(sock, RESP_ERR,
1437                            "illegal attempt to update using time %ld when last"
1438                            " update time is %ld (minimum one second step)\n",
1439                            stamp, ci->last_update_stamp);
1440     }
1441     else
1442       ci->last_update_stamp = stamp;
1444     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1445     {
1446       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1447       continue;
1448     }
1450     values_num++;
1451   }
1453   if (((now - ci->last_flush_time) >= config_write_interval)
1454       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1455       && (ci->values_num > 0))
1456   {
1457     enqueue_cache_item (ci, TAIL);
1458   }
1460   pthread_mutex_unlock (&cache_lock);
1462   if (values_num < 1)
1463     return send_response(sock, RESP_ERR, "No values updated.\n");
1464   else
1465     return send_response(sock, RESP_OK,
1466                          "errors, enqueued %i value(s).\n", values_num);
1468   /* NOTREACHED */
1469   assert(1==0);
1471 } /* }}} int handle_request_update */
1473 /* we came across a "WROTE" entry during journal replay.
1474  * throw away any values that we have accumulated for this file
1475  */
1476 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1478   cache_item_t *ci;
1479   const char *file = buffer;
1481   pthread_mutex_lock(&cache_lock);
1483   ci = g_tree_lookup(cache_tree, file);
1484   if (ci == NULL)
1485   {
1486     pthread_mutex_unlock(&cache_lock);
1487     return (0);
1488   }
1490   if (ci->values)
1491     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1493   wipe_ci_values(ci, now);
1494   remove_from_queue(ci);
1496   pthread_mutex_unlock(&cache_lock);
1497   return (0);
1498 } /* }}} int handle_request_wrote */
1500 /* start "BATCH" processing */
1501 static int batch_start (HANDLER_PROTO) /* {{{ */
1503   int status;
1504   if (sock->batch_start)
1505     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1507   status = send_response(sock, RESP_OK,
1508                          "Go ahead.  End with dot '.' on its own line.\n");
1509   sock->batch_start = time(NULL);
1510   sock->batch_cmd = 0;
1512   return status;
1513 } /* }}} static int batch_start */
1515 /* finish "BATCH" processing and return results to the client */
1516 static int batch_done (HANDLER_PROTO) /* {{{ */
1518   assert(sock->batch_start);
1519   sock->batch_start = 0;
1520   sock->batch_cmd  = 0;
1521   return send_response(sock, RESP_OK, "errors\n");
1522 } /* }}} static int batch_done */
1524 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1526   return -1;
1527 } /* }}} static int handle_request_quit */
1529 static command_t list_of_commands[] = { /* {{{ */
1530   {
1531     "UPDATE",
1532     handle_request_update,
1533     CMD_CONTEXT_ANY,
1534     "UPDATE <filename> <values> [<values> ...]\n"
1535     ,
1536     "Adds the given file to the internal cache if it is not yet known and\n"
1537     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1538     "for details.\n"
1539     "\n"
1540     "Each <values> has the following form:\n"
1541     "  <values> = <time>:<value>[:<value>[...]]\n"
1542     "See the rrdupdate(1) manpage for details.\n"
1543   },
1544   {
1545     "WROTE",
1546     handle_request_wrote,
1547     CMD_CONTEXT_JOURNAL,
1548     NULL,
1549     NULL
1550   },
1551   {
1552     "FLUSH",
1553     handle_request_flush,
1554     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1555     "FLUSH <filename>\n"
1556     ,
1557     "Adds the given filename to the head of the update queue and returns\n"
1558     "after it has been dequeued.\n"
1559   },
1560   {
1561     "FLUSHALL",
1562     handle_request_flushall,
1563     CMD_CONTEXT_CLIENT,
1564     "FLUSHALL\n"
1565     ,
1566     "Triggers writing of all pending updates.  Returns immediately.\n"
1567   },
1568   {
1569     "PENDING",
1570     handle_request_pending,
1571     CMD_CONTEXT_CLIENT,
1572     "PENDING <filename>\n"
1573     ,
1574     "Shows any 'pending' updates for a file, in order.\n"
1575     "The updates shown have not yet been written to the underlying RRD file.\n"
1576   },
1577   {
1578     "FORGET",
1579     handle_request_forget,
1580     CMD_CONTEXT_ANY,
1581     "FORGET <filename>\n"
1582     ,
1583     "Removes the file completely from the cache.\n"
1584     "Any pending updates for the file will be lost.\n"
1585   },
1586   {
1587     "QUEUE",
1588     handle_request_queue,
1589     CMD_CONTEXT_CLIENT,
1590     "QUEUE\n"
1591     ,
1592         "Shows all files in the output queue.\n"
1593     "The output is zero or more lines in the following format:\n"
1594     "(where <num_vals> is the number of values to be written)\n"
1595     "\n"
1596     "<num_vals> <filename>\n"
1597   },
1598   {
1599     "STATS",
1600     handle_request_stats,
1601     CMD_CONTEXT_CLIENT,
1602     "STATS\n"
1603     ,
1604     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1605     "a description of the values.\n"
1606   },
1607   {
1608     "HELP",
1609     handle_request_help,
1610     CMD_CONTEXT_CLIENT,
1611     "HELP [<command>]\n",
1612     NULL, /* special! */
1613   },
1614   {
1615     "BATCH",
1616     batch_start,
1617     CMD_CONTEXT_CLIENT,
1618     "BATCH\n"
1619     ,
1620     "The 'BATCH' command permits the client to initiate a bulk load\n"
1621     "   of commands to rrdcached.\n"
1622     "\n"
1623     "Usage:\n"
1624     "\n"
1625     "    client: BATCH\n"
1626     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1627     "    client: command #1\n"
1628     "    client: command #2\n"
1629     "    client: ... and so on\n"
1630     "    client: .\n"
1631     "    server: 2 errors\n"
1632     "    server: 7 message for command #7\n"
1633     "    server: 9 message for command #9\n"
1634     "\n"
1635     "For more information, consult the rrdcached(1) documentation.\n"
1636   },
1637   {
1638     ".",   /* BATCH terminator */
1639     batch_done,
1640     CMD_CONTEXT_BATCH,
1641     NULL,
1642     NULL
1643   },
1644   {
1645     "QUIT",
1646     handle_request_quit,
1647     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1648     "QUIT\n"
1649     ,
1650     "Disconnect from rrdcached.\n"
1651   }
1652 }; /* }}} command_t list_of_commands[] */
1653 static size_t list_of_commands_len = sizeof (list_of_commands)
1654   / sizeof (list_of_commands[0]);
1656 static command_t *find_command(char *cmd)
1658   size_t i;
1660   for (i = 0; i < list_of_commands_len; i++)
1661     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1662       return (&list_of_commands[i]);
1663   return NULL;
1666 /* We currently use the index in the `list_of_commands' array as a bit position
1667  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1668  * outside these functions so that switching to a more elegant storage method
1669  * is easily possible. */
1670 static ssize_t find_command_index (const char *cmd) /* {{{ */
1672   size_t i;
1674   for (i = 0; i < list_of_commands_len; i++)
1675     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1676       return ((ssize_t) i);
1677   return (-1);
1678 } /* }}} ssize_t find_command_index */
1680 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1681     const char *cmd)
1683   ssize_t i;
1685   if (JOURNAL_REPLAY(sock))
1686     return (1);
1688   if (cmd == NULL)
1689     return (-1);
1691   if ((strcasecmp ("QUIT", cmd) == 0)
1692       || (strcasecmp ("HELP", cmd) == 0))
1693     return (1);
1694   else if (strcmp (".", cmd) == 0)
1695     cmd = "BATCH";
1697   i = find_command_index (cmd);
1698   if (i < 0)
1699     return (-1);
1700   assert (i < 32);
1702   if ((sock->permissions & (1 << i)) != 0)
1703     return (1);
1704   return (0);
1705 } /* }}} int socket_permission_check */
1707 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1708     const char *cmd)
1710   ssize_t i;
1712   i = find_command_index (cmd);
1713   if (i < 0)
1714     return (-1);
1715   assert (i < 32);
1717   sock->permissions |= (1 << i);
1718   return (0);
1719 } /* }}} int socket_permission_add */
1721 /* check whether commands are received in the expected context */
1722 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1724   if (JOURNAL_REPLAY(sock))
1725     return (cmd->context & CMD_CONTEXT_JOURNAL);
1726   else if (sock->batch_start)
1727     return (cmd->context & CMD_CONTEXT_BATCH);
1728   else
1729     return (cmd->context & CMD_CONTEXT_CLIENT);
1731   /* NOTREACHED */
1732   assert(1==0);
1735 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1737   int status;
1738   char *cmd_str;
1739   char *resp_txt;
1740   command_t *help = NULL;
1742   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1743   if (status == 0)
1744     help = find_command(cmd_str);
1746   if (help && (help->syntax || help->help))
1747   {
1748     char tmp[CMD_MAX];
1750     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1751     resp_txt = tmp;
1753     if (help->syntax)
1754       add_response_info(sock, "Usage: %s\n", help->syntax);
1756     if (help->help)
1757       add_response_info(sock, "%s\n", help->help);
1758   }
1759   else
1760   {
1761     size_t i;
1763     resp_txt = "Command overview\n";
1765     for (i = 0; i < list_of_commands_len; i++)
1766     {
1767       if (list_of_commands[i].syntax == NULL)
1768         continue;
1769       add_response_info (sock, "%s", list_of_commands[i].syntax);
1770     }
1771   }
1773   return send_response(sock, RESP_OK, resp_txt);
1774 } /* }}} int handle_request_help */
1776 static int handle_request (DISPATCH_PROTO) /* {{{ */
1778   char *buffer_ptr = buffer;
1779   char *cmd_str = NULL;
1780   command_t *cmd = NULL;
1781   int status;
1783   assert (buffer[buffer_size - 1] == '\0');
1785   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1786   if (status != 0)
1787   {
1788     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1789     return (-1);
1790   }
1792   if (sock != NULL && sock->batch_start)
1793     sock->batch_cmd++;
1795   cmd = find_command(cmd_str);
1796   if (!cmd)
1797     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1799   if (!socket_permission_check (sock, cmd->cmd))
1800     return send_response(sock, RESP_ERR, "Permission denied.\n");
1802   if (!command_check_context(sock, cmd))
1803     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1805   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1806 } /* }}} int handle_request */
1808 static void journal_set_free (journal_set *js) /* {{{ */
1810   if (js == NULL)
1811     return;
1813   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1815   free(js);
1816 } /* }}} journal_set_free */
1818 static void journal_set_remove (journal_set *js) /* {{{ */
1820   if (js == NULL)
1821     return;
1823   for (uint i=0; i < js->files_num; i++)
1824   {
1825     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1826     unlink(js->files[i]);
1827   }
1828 } /* }}} journal_set_remove */
1830 /* close current journal file handle.
1831  * MUST hold journal_lock before calling */
1832 static void journal_close(void) /* {{{ */
1834   if (journal_fh != NULL)
1835   {
1836     if (fclose(journal_fh) != 0)
1837       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1838   }
1840   journal_fh = NULL;
1841   journal_size = 0;
1842 } /* }}} journal_close */
1844 /* MUST hold journal_lock before calling */
1845 static void journal_new_file(void) /* {{{ */
1847   struct timeval now;
1848   int  new_fd;
1849   char new_file[PATH_MAX + 1];
1851   assert(journal_dir != NULL);
1852   assert(journal_cur != NULL);
1854   journal_close();
1856   gettimeofday(&now, NULL);
1857   /* this format assures that the files sort in strcmp() order */
1858   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1859            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1861   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1862                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1863   if (new_fd < 0)
1864     goto error;
1866   journal_fh = fdopen(new_fd, "a");
1867   if (journal_fh == NULL)
1868     goto error;
1870   journal_size = ftell(journal_fh);
1871   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1873   /* record the file in the journal set */
1874   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1876   return;
1878 error:
1879   RRDD_LOG(LOG_CRIT,
1880            "JOURNALING DISABLED: Error while trying to create %s : %s",
1881            new_file, rrd_strerror(errno));
1882   RRDD_LOG(LOG_CRIT,
1883            "JOURNALING DISABLED: All values will be flushed at shutdown");
1885   close(new_fd);
1886   config_flush_at_shutdown = 1;
1888 } /* }}} journal_new_file */
1890 /* MUST NOT hold journal_lock before calling this */
1891 static void journal_rotate(void) /* {{{ */
1893   journal_set *old_js = NULL;
1895   if (journal_dir == NULL)
1896     return;
1898   RRDD_LOG(LOG_DEBUG, "rotating journals");
1900   pthread_mutex_lock(&stats_lock);
1901   ++stats_journal_rotate;
1902   pthread_mutex_unlock(&stats_lock);
1904   pthread_mutex_lock(&journal_lock);
1906   journal_close();
1908   /* rotate the journal sets */
1909   old_js = journal_old;
1910   journal_old = journal_cur;
1911   journal_cur = calloc(1, sizeof(journal_set));
1913   if (journal_cur != NULL)
1914     journal_new_file();
1915   else
1916     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1918   pthread_mutex_unlock(&journal_lock);
1920   journal_set_remove(old_js);
1921   journal_set_free  (old_js);
1923 } /* }}} static void journal_rotate */
1925 /* MUST hold journal_lock when calling */
1926 static void journal_done(void) /* {{{ */
1928   if (journal_cur == NULL)
1929     return;
1931   journal_close();
1933   if (config_flush_at_shutdown)
1934   {
1935     RRDD_LOG(LOG_INFO, "removing journals");
1936     journal_set_remove(journal_old);
1937     journal_set_remove(journal_cur);
1938   }
1939   else
1940   {
1941     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1942              "journals will be used at next startup");
1943   }
1945   journal_set_free(journal_cur);
1946   journal_set_free(journal_old);
1947   free(journal_dir);
1949 } /* }}} static void journal_done */
1951 static int journal_write(char *cmd, char *args) /* {{{ */
1953   int chars;
1955   if (journal_fh == NULL)
1956     return 0;
1958   pthread_mutex_lock(&journal_lock);
1959   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1960   journal_size += chars;
1962   if (journal_size > JOURNAL_MAX)
1963     journal_new_file();
1965   pthread_mutex_unlock(&journal_lock);
1967   if (chars > 0)
1968   {
1969     pthread_mutex_lock(&stats_lock);
1970     stats_journal_bytes += chars;
1971     pthread_mutex_unlock(&stats_lock);
1972   }
1974   return chars;
1975 } /* }}} static int journal_write */
1977 static int journal_replay (const char *file) /* {{{ */
1979   FILE *fh;
1980   int entry_cnt = 0;
1981   int fail_cnt = 0;
1982   uint64_t line = 0;
1983   char entry[CMD_MAX];
1984   time_t now;
1986   if (file == NULL) return 0;
1988   {
1989     char *reason = "unknown error";
1990     int status = 0;
1991     struct stat statbuf;
1993     memset(&statbuf, 0, sizeof(statbuf));
1994     if (stat(file, &statbuf) != 0)
1995     {
1996       reason = "stat error";
1997       status = errno;
1998     }
1999     else if (!S_ISREG(statbuf.st_mode))
2000     {
2001       reason = "not a regular file";
2002       status = EPERM;
2003     }
2004     if (statbuf.st_uid != daemon_uid)
2005     {
2006       reason = "not owned by daemon user";
2007       status = EACCES;
2008     }
2009     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2010     {
2011       reason = "must not be user/group writable";
2012       status = EACCES;
2013     }
2015     if (status != 0)
2016     {
2017       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2018                file, rrd_strerror(status), reason);
2019       return 0;
2020     }
2021   }
2023   fh = fopen(file, "r");
2024   if (fh == NULL)
2025   {
2026     if (errno != ENOENT)
2027       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2028                file, rrd_strerror(errno));
2029     return 0;
2030   }
2031   else
2032     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2034   now = time(NULL);
2036   while(!feof(fh))
2037   {
2038     size_t entry_len;
2040     ++line;
2041     if (fgets(entry, sizeof(entry), fh) == NULL)
2042       break;
2043     entry_len = strlen(entry);
2045     /* check \n termination in case journal writing crashed mid-line */
2046     if (entry_len == 0)
2047       continue;
2048     else if (entry[entry_len - 1] != '\n')
2049     {
2050       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2051       ++fail_cnt;
2052       continue;
2053     }
2055     entry[entry_len - 1] = '\0';
2057     if (handle_request(NULL, now, entry, entry_len) == 0)
2058       ++entry_cnt;
2059     else
2060       ++fail_cnt;
2061   }
2063   fclose(fh);
2065   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2066            entry_cnt, fail_cnt);
2068   return entry_cnt > 0 ? 1 : 0;
2069 } /* }}} static int journal_replay */
2071 static int journal_sort(const void *v1, const void *v2)
2073   char **jn1 = (char **) v1;
2074   char **jn2 = (char **) v2;
2076   return strcmp(*jn1,*jn2);
2079 static void journal_init(void) /* {{{ */
2081   int had_journal = 0;
2082   DIR *dir;
2083   struct dirent *dent;
2084   char path[PATH_MAX+1];
2086   if (journal_dir == NULL) return;
2088   pthread_mutex_lock(&journal_lock);
2090   journal_cur = calloc(1, sizeof(journal_set));
2091   if (journal_cur == NULL)
2092   {
2093     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2094     return;
2095   }
2097   RRDD_LOG(LOG_INFO, "checking for journal files");
2099   /* Handle old journal files during transition.  This gives them the
2100    * correct sort order.  TODO: remove after first release
2101    */
2102   {
2103     char old_path[PATH_MAX+1];
2104     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2105     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2106     rename(old_path, path);
2108     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2109     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2110     rename(old_path, path);
2111   }
2113   dir = opendir(journal_dir);
2114   while ((dent = readdir(dir)) != NULL)
2115   {
2116     /* looks like a journal file? */
2117     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2118       continue;
2120     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2122     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2123     {
2124       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2125                dent->d_name);
2126       break;
2127     }
2128   }
2129   closedir(dir);
2131   qsort(journal_cur->files, journal_cur->files_num,
2132         sizeof(journal_cur->files[0]), journal_sort);
2134   for (uint i=0; i < journal_cur->files_num; i++)
2135     had_journal += journal_replay(journal_cur->files[i]);
2137   journal_new_file();
2139   /* it must have been a crash.  start a flush */
2140   if (had_journal && config_flush_at_shutdown)
2141     flush_old_values(-1);
2143   pthread_mutex_unlock(&journal_lock);
2145   RRDD_LOG(LOG_INFO, "journal processing complete");
2147 } /* }}} static void journal_init */
2149 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2151   assert(sock != NULL);
2153   free(sock->rbuf);  sock->rbuf = NULL;
2154   free(sock->wbuf);  sock->wbuf = NULL;
2155   free(sock);
2156 } /* }}} void free_listen_socket */
2158 static void close_connection(listen_socket_t *sock) /* {{{ */
2160   if (sock->fd >= 0)
2161   {
2162     close(sock->fd);
2163     sock->fd = -1;
2164   }
2166   free_listen_socket(sock);
2168 } /* }}} void close_connection */
2170 static void *connection_thread_main (void *args) /* {{{ */
2172   listen_socket_t *sock;
2173   int fd;
2175   sock = (listen_socket_t *) args;
2176   fd = sock->fd;
2178   /* init read buffers */
2179   sock->next_read = sock->next_cmd = 0;
2180   sock->rbuf = malloc(RBUF_SIZE);
2181   if (sock->rbuf == NULL)
2182   {
2183     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2184     close_connection(sock);
2185     return NULL;
2186   }
2188   pthread_mutex_lock (&connection_threads_lock);
2189   connection_threads_num++;
2190   pthread_mutex_unlock (&connection_threads_lock);
2192   while (state == RUNNING)
2193   {
2194     char *cmd;
2195     ssize_t cmd_len;
2196     ssize_t rbytes;
2197     time_t now;
2199     struct pollfd pollfd;
2200     int status;
2202     pollfd.fd = fd;
2203     pollfd.events = POLLIN | POLLPRI;
2204     pollfd.revents = 0;
2206     status = poll (&pollfd, 1, /* timeout = */ 500);
2207     if (state != RUNNING)
2208       break;
2209     else if (status == 0) /* timeout */
2210       continue;
2211     else if (status < 0) /* error */
2212     {
2213       status = errno;
2214       if (status != EINTR)
2215         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2216       continue;
2217     }
2219     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2220       break;
2221     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2222     {
2223       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2224           "poll(2) returned something unexpected: %#04hx",
2225           pollfd.revents);
2226       break;
2227     }
2229     rbytes = read(fd, sock->rbuf + sock->next_read,
2230                   RBUF_SIZE - sock->next_read);
2231     if (rbytes < 0)
2232     {
2233       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2234       break;
2235     }
2236     else if (rbytes == 0)
2237       break; /* eof */
2239     sock->next_read += rbytes;
2241     if (sock->batch_start)
2242       now = sock->batch_start;
2243     else
2244       now = time(NULL);
2246     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2247     {
2248       status = handle_request (sock, now, cmd, cmd_len+1);
2249       if (status != 0)
2250         goto out_close;
2251     }
2252   }
2254 out_close:
2255   close_connection(sock);
2257   /* Remove this thread from the connection threads list */
2258   pthread_mutex_lock (&connection_threads_lock);
2259   connection_threads_num--;
2260   if (connection_threads_num <= 0)
2261     pthread_cond_broadcast(&connection_threads_done);
2262   pthread_mutex_unlock (&connection_threads_lock);
2264   return (NULL);
2265 } /* }}} void *connection_thread_main */
2267 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2269   int fd;
2270   struct sockaddr_un sa;
2271   listen_socket_t *temp;
2272   int status;
2273   const char *path;
2274   char *path_copy, *dir;
2276   path = sock->addr;
2277   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2278     path += strlen("unix:");
2280   /* dirname may modify its argument */
2281   path_copy = strdup(path);
2282   if (path_copy == NULL)
2283   {
2284     fprintf(stderr, "rrdcached: strdup(): %s\n",
2285         rrd_strerror(errno));
2286     return (-1);
2287   }
2289   dir = dirname(path_copy);
2290   if (rrd_mkdir_p(dir, 0777) != 0)
2291   {
2292     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2293         dir, rrd_strerror(errno));
2294     return (-1);
2295   }
2297   free(path_copy);
2299   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2300       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2301   if (temp == NULL)
2302   {
2303     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2304     return (-1);
2305   }
2306   listen_fds = temp;
2307   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2309   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2310   if (fd < 0)
2311   {
2312     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2313              rrd_strerror(errno));
2314     return (-1);
2315   }
2317   memset (&sa, 0, sizeof (sa));
2318   sa.sun_family = AF_UNIX;
2319   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2321   /* if we've gotten this far, we own the pid file.  any daemon started
2322    * with the same args must not be alive.  therefore, ensure that we can
2323    * create the socket...
2324    */
2325   unlink(path);
2327   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2328   if (status != 0)
2329   {
2330     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2331              path, rrd_strerror(errno));
2332     close (fd);
2333     return (-1);
2334   }
2336   /* tweak the sockets group ownership */
2337   if (sock->socket_group != (gid_t)-1)
2338   {
2339     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2340          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2341     {
2342       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2343     }
2344   }
2346   if (sock->socket_permissions != (mode_t)-1)
2347   {
2348     if (chmod(path, sock->socket_permissions) != 0)
2349       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2350           (unsigned int)sock->socket_permissions, strerror(errno));
2351   }
2353   status = listen (fd, /* backlog = */ 10);
2354   if (status != 0)
2355   {
2356     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2357              path, rrd_strerror(errno));
2358     close (fd);
2359     unlink (path);
2360     return (-1);
2361   }
2363   listen_fds[listen_fds_num].fd = fd;
2364   listen_fds[listen_fds_num].family = PF_UNIX;
2365   strncpy(listen_fds[listen_fds_num].addr, path,
2366           sizeof (listen_fds[listen_fds_num].addr) - 1);
2367   listen_fds_num++;
2369   return (0);
2370 } /* }}} int open_listen_socket_unix */
2372 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2374   struct addrinfo ai_hints;
2375   struct addrinfo *ai_res;
2376   struct addrinfo *ai_ptr;
2377   char addr_copy[NI_MAXHOST];
2378   char *addr;
2379   char *port;
2380   int status;
2382   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2383   addr_copy[sizeof (addr_copy) - 1] = 0;
2384   addr = addr_copy;
2386   memset (&ai_hints, 0, sizeof (ai_hints));
2387   ai_hints.ai_flags = 0;
2388 #ifdef AI_ADDRCONFIG
2389   ai_hints.ai_flags |= AI_ADDRCONFIG;
2390 #endif
2391   ai_hints.ai_family = AF_UNSPEC;
2392   ai_hints.ai_socktype = SOCK_STREAM;
2394   port = NULL;
2395   if (*addr == '[') /* IPv6+port format */
2396   {
2397     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2398     addr++;
2400     port = strchr (addr, ']');
2401     if (port == NULL)
2402     {
2403       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2404       return (-1);
2405     }
2406     *port = 0;
2407     port++;
2409     if (*port == ':')
2410       port++;
2411     else if (*port == 0)
2412       port = NULL;
2413     else
2414     {
2415       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2416       return (-1);
2417     }
2418   } /* if (*addr == '[') */
2419   else
2420   {
2421     port = rindex(addr, ':');
2422     if (port != NULL)
2423     {
2424       *port = 0;
2425       port++;
2426     }
2427   }
2428   ai_res = NULL;
2429   status = getaddrinfo (addr,
2430                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2431                         &ai_hints, &ai_res);
2432   if (status != 0)
2433   {
2434     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2435              addr, gai_strerror (status));
2436     return (-1);
2437   }
2439   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2440   {
2441     int fd;
2442     listen_socket_t *temp;
2443     int one = 1;
2445     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2446         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2447     if (temp == NULL)
2448     {
2449       fprintf (stderr,
2450                "rrdcached: open_listen_socket_network: realloc failed.\n");
2451       continue;
2452     }
2453     listen_fds = temp;
2454     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2456     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2457     if (fd < 0)
2458     {
2459       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2460                rrd_strerror(errno));
2461       continue;
2462     }
2464     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2466     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2467     if (status != 0)
2468     {
2469       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2470                sock->addr, rrd_strerror(errno));
2471       close (fd);
2472       continue;
2473     }
2475     status = listen (fd, /* backlog = */ 10);
2476     if (status != 0)
2477     {
2478       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2479                sock->addr, rrd_strerror(errno));
2480       close (fd);
2481       freeaddrinfo(ai_res);
2482       return (-1);
2483     }
2485     listen_fds[listen_fds_num].fd = fd;
2486     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2487     listen_fds_num++;
2488   } /* for (ai_ptr) */
2490   freeaddrinfo(ai_res);
2491   return (0);
2492 } /* }}} static int open_listen_socket_network */
2494 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2496   assert(sock != NULL);
2497   assert(sock->addr != NULL);
2499   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2500       || sock->addr[0] == '/')
2501     return (open_listen_socket_unix(sock));
2502   else
2503     return (open_listen_socket_network(sock));
2504 } /* }}} int open_listen_socket */
2506 static int close_listen_sockets (void) /* {{{ */
2508   size_t i;
2510   for (i = 0; i < listen_fds_num; i++)
2511   {
2512     close (listen_fds[i].fd);
2514     if (listen_fds[i].family == PF_UNIX)
2515       unlink(listen_fds[i].addr);
2516   }
2518   free (listen_fds);
2519   listen_fds = NULL;
2520   listen_fds_num = 0;
2522   return (0);
2523 } /* }}} int close_listen_sockets */
2525 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2527   struct pollfd *pollfds;
2528   int pollfds_num;
2529   int status;
2530   int i;
2532   if (listen_fds_num < 1)
2533   {
2534     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2535     return (NULL);
2536   }
2538   pollfds_num = listen_fds_num;
2539   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2540   if (pollfds == NULL)
2541   {
2542     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2543     return (NULL);
2544   }
2545   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2547   RRDD_LOG(LOG_INFO, "listening for connections");
2549   while (state == RUNNING)
2550   {
2551     for (i = 0; i < pollfds_num; i++)
2552     {
2553       pollfds[i].fd = listen_fds[i].fd;
2554       pollfds[i].events = POLLIN | POLLPRI;
2555       pollfds[i].revents = 0;
2556     }
2558     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2559     if (state != RUNNING)
2560       break;
2561     else if (status == 0) /* timeout */
2562       continue;
2563     else if (status < 0) /* error */
2564     {
2565       status = errno;
2566       if (status != EINTR)
2567       {
2568         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2569       }
2570       continue;
2571     }
2573     for (i = 0; i < pollfds_num; i++)
2574     {
2575       listen_socket_t *client_sock;
2576       struct sockaddr_storage client_sa;
2577       socklen_t client_sa_size;
2578       pthread_t tid;
2579       pthread_attr_t attr;
2581       if (pollfds[i].revents == 0)
2582         continue;
2584       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2585       {
2586         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2587             "poll(2) returned something unexpected for listen FD #%i.",
2588             pollfds[i].fd);
2589         continue;
2590       }
2592       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2593       if (client_sock == NULL)
2594       {
2595         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2596         continue;
2597       }
2598       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2600       client_sa_size = sizeof (client_sa);
2601       client_sock->fd = accept (pollfds[i].fd,
2602           (struct sockaddr *) &client_sa, &client_sa_size);
2603       if (client_sock->fd < 0)
2604       {
2605         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2606         free(client_sock);
2607         continue;
2608       }
2610       pthread_attr_init (&attr);
2611       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2613       status = pthread_create (&tid, &attr, connection_thread_main,
2614                                client_sock);
2615       if (status != 0)
2616       {
2617         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2618         close_connection(client_sock);
2619         continue;
2620       }
2621     } /* for (pollfds_num) */
2622   } /* while (state == RUNNING) */
2624   RRDD_LOG(LOG_INFO, "starting shutdown");
2626   close_listen_sockets ();
2628   pthread_mutex_lock (&connection_threads_lock);
2629   while (connection_threads_num > 0)
2630     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2631   pthread_mutex_unlock (&connection_threads_lock);
2633   free(pollfds);
2635   return (NULL);
2636 } /* }}} void *listen_thread_main */
2638 static int daemonize (void) /* {{{ */
2640   int pid_fd;
2641   char *base_dir;
2643   daemon_uid = geteuid();
2645   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2646   if (pid_fd < 0)
2647     pid_fd = check_pidfile();
2648   if (pid_fd < 0)
2649     return pid_fd;
2651   /* open all the listen sockets */
2652   if (config_listen_address_list_len > 0)
2653   {
2654     for (size_t i = 0; i < config_listen_address_list_len; i++)
2655       open_listen_socket (config_listen_address_list[i]);
2657     rrd_free_ptrs((void ***) &config_listen_address_list,
2658                   &config_listen_address_list_len);
2659   }
2660   else
2661   {
2662     listen_socket_t sock;
2663     memset(&sock, 0, sizeof(sock));
2664     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2665     open_listen_socket (&sock);
2666   }
2668   if (listen_fds_num < 1)
2669   {
2670     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2671     goto error;
2672   }
2674   if (!stay_foreground)
2675   {
2676     pid_t child;
2678     child = fork ();
2679     if (child < 0)
2680     {
2681       fprintf (stderr, "daemonize: fork(2) failed.\n");
2682       goto error;
2683     }
2684     else if (child > 0)
2685       exit(0);
2687     /* Become session leader */
2688     setsid ();
2690     /* Open the first three file descriptors to /dev/null */
2691     close (2);
2692     close (1);
2693     close (0);
2695     open ("/dev/null", O_RDWR);
2696     if (dup(0) == -1 || dup(0) == -1){
2697         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2698     }
2699   } /* if (!stay_foreground) */
2701   /* Change into the /tmp directory. */
2702   base_dir = (config_base_dir != NULL)
2703     ? config_base_dir
2704     : "/tmp";
2706   if (chdir (base_dir) != 0)
2707   {
2708     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2709     goto error;
2710   }
2712   install_signal_handlers();
2714   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2715   RRDD_LOG(LOG_INFO, "starting up");
2717   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2718                                 (GDestroyNotify) free_cache_item);
2719   if (cache_tree == NULL)
2720   {
2721     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2722     goto error;
2723   }
2725   return write_pidfile (pid_fd);
2727 error:
2728   remove_pidfile();
2729   return -1;
2730 } /* }}} int daemonize */
2732 static int cleanup (void) /* {{{ */
2734   pthread_cond_broadcast (&flush_cond);
2735   pthread_join (flush_thread, NULL);
2737   pthread_cond_broadcast (&queue_cond);
2738   for (int i = 0; i < config_queue_threads; i++)
2739     pthread_join (queue_threads[i], NULL);
2741   if (config_flush_at_shutdown)
2742   {
2743     assert(cache_queue_head == NULL);
2744     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2745   }
2747   free(queue_threads);
2748   free(config_base_dir);
2750   pthread_mutex_lock(&cache_lock);
2751   g_tree_destroy(cache_tree);
2753   pthread_mutex_lock(&journal_lock);
2754   journal_done();
2756   RRDD_LOG(LOG_INFO, "goodbye");
2757   closelog ();
2759   remove_pidfile ();
2760   free(config_pid_file);
2762   return (0);
2763 } /* }}} int cleanup */
2765 static int read_options (int argc, char **argv) /* {{{ */
2767   int option;
2768   int status = 0;
2770   char **permissions = NULL;
2771   size_t permissions_len = 0;
2773   gid_t  socket_group = (gid_t)-1;
2774   mode_t socket_permissions = (mode_t)-1;
2776   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2777   {
2778     switch (option)
2779     {
2780       case 'g':
2781         stay_foreground=1;
2782         break;
2784       case 'l':
2785       {
2786         listen_socket_t *new;
2788         new = malloc(sizeof(listen_socket_t));
2789         if (new == NULL)
2790         {
2791           fprintf(stderr, "read_options: malloc failed.\n");
2792           return(2);
2793         }
2794         memset(new, 0, sizeof(listen_socket_t));
2796         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2798         /* Add permissions to the socket {{{ */
2799         if (permissions_len != 0)
2800         {
2801           size_t i;
2802           for (i = 0; i < permissions_len; i++)
2803           {
2804             status = socket_permission_add (new, permissions[i]);
2805             if (status != 0)
2806             {
2807               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2808                   "socket failed. Most likely, this permission doesn't "
2809                   "exist. Check your command line.\n", permissions[i]);
2810               status = 4;
2811             }
2812           }
2813         }
2814         else /* if (permissions_len == 0) */
2815         {
2816           /* Add permission for ALL commands to the socket. */
2817           size_t i;
2818           for (i = 0; i < list_of_commands_len; i++)
2819           {
2820             status = socket_permission_add (new, list_of_commands[i].cmd);
2821             if (status != 0)
2822             {
2823               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2824                   "socket failed. This should never happen, ever! Sorry.\n",
2825                   permissions[i]);
2826               status = 4;
2827             }
2828           }
2829         }
2830         /* }}} Done adding permissions. */
2832         new->socket_group = socket_group;
2833         new->socket_permissions = socket_permissions;
2835         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2836                          &config_listen_address_list_len, new))
2837         {
2838           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2839           return (2);
2840         }
2841       }
2842       break;
2844       /* set socket group permissions */
2845       case 's':
2846       {
2847         gid_t group_gid;
2848         struct group *grp;
2850         group_gid = strtoul(optarg, NULL, 10);
2851         if (errno != EINVAL && group_gid>0)
2852         {
2853           /* we were passed a number */
2854           grp = getgrgid(group_gid);
2855         }
2856         else
2857         {
2858           grp = getgrnam(optarg);
2859         }
2861         if (grp)
2862         {
2863           socket_group = grp->gr_gid;
2864         }
2865         else
2866         {
2867           /* no idea what the user wanted... */
2868           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2869           return (5);
2870         }
2871       }
2872       break;
2874       /* set socket file permissions */
2875       case 'm':
2876       {
2877         long  tmp;
2878         char *endptr = NULL;
2880         tmp = strtol (optarg, &endptr, 8);
2881         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2882             || (tmp > 07777) || (tmp < 0)) {
2883           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2884               optarg);
2885           return (5);
2886         }
2888         socket_permissions = (mode_t)tmp;
2889       }
2890       break;
2892       case 'P':
2893       {
2894         char *optcopy;
2895         char *saveptr;
2896         char *dummy;
2897         char *ptr;
2899         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2901         optcopy = strdup (optarg);
2902         dummy = optcopy;
2903         saveptr = NULL;
2904         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2905         {
2906           dummy = NULL;
2907           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2908         }
2910         free (optcopy);
2911       }
2912       break;
2914       case 'f':
2915       {
2916         int temp;
2918         temp = atoi (optarg);
2919         if (temp > 0)
2920           config_flush_interval = temp;
2921         else
2922         {
2923           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2924           status = 3;
2925         }
2926       }
2927       break;
2929       case 'w':
2930       {
2931         int temp;
2933         temp = atoi (optarg);
2934         if (temp > 0)
2935           config_write_interval = temp;
2936         else
2937         {
2938           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2939           status = 2;
2940         }
2941       }
2942       break;
2944       case 'z':
2945       {
2946         int temp;
2948         temp = atoi(optarg);
2949         if (temp > 0)
2950           config_write_jitter = temp;
2951         else
2952         {
2953           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2954           status = 2;
2955         }
2957         break;
2958       }
2960       case 't':
2961       {
2962         int threads;
2963         threads = atoi(optarg);
2964         if (threads >= 1)
2965           config_queue_threads = threads;
2966         else
2967         {
2968           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2969           return 1;
2970         }
2971       }
2972       break;
2974       case 'B':
2975         config_write_base_only = 1;
2976         break;
2978       case 'b':
2979       {
2980         size_t len;
2981         char base_realpath[PATH_MAX];
2983         if (config_base_dir != NULL)
2984           free (config_base_dir);
2985         config_base_dir = strdup (optarg);
2986         if (config_base_dir == NULL)
2987         {
2988           fprintf (stderr, "read_options: strdup failed.\n");
2989           return (3);
2990         }
2992         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2993         {
2994           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2995               config_base_dir, rrd_strerror (errno));
2996           return (3);
2997         }
2999         /* make sure that the base directory is not resolved via
3000          * symbolic links.  this makes some performance-enhancing
3001          * assumptions possible (we don't have to resolve paths
3002          * that start with a "/")
3003          */
3004         if (realpath(config_base_dir, base_realpath) == NULL)
3005         {
3006           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3007               "%s\n", config_base_dir, rrd_strerror(errno));
3008           return 5;
3009         }
3011         len = strlen (config_base_dir);
3012         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3013         {
3014           config_base_dir[len - 1] = 0;
3015           len--;
3016         }
3018         if (len < 1)
3019         {
3020           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3021           return (4);
3022         }
3024         _config_base_dir_len = len;
3026         len = strlen (base_realpath);
3027         while ((len > 0) && (base_realpath[len - 1] == '/'))
3028         {
3029           base_realpath[len - 1] = '\0';
3030           len--;
3031         }
3033         if (strncmp(config_base_dir,
3034                          base_realpath, sizeof(base_realpath)) != 0)
3035         {
3036           fprintf(stderr,
3037                   "Base directory (-b) resolved via file system links!\n"
3038                   "Please consult rrdcached '-b' documentation!\n"
3039                   "Consider specifying the real directory (%s)\n",
3040                   base_realpath);
3041           return 5;
3042         }
3043       }
3044       break;
3046       case 'p':
3047       {
3048         if (config_pid_file != NULL)
3049           free (config_pid_file);
3050         config_pid_file = strdup (optarg);
3051         if (config_pid_file == NULL)
3052         {
3053           fprintf (stderr, "read_options: strdup failed.\n");
3054           return (3);
3055         }
3056       }
3057       break;
3059       case 'F':
3060         config_flush_at_shutdown = 1;
3061         break;
3063       case 'j':
3064       {
3065         const char *dir = journal_dir = strdup(optarg);
3067         status = rrd_mkdir_p(dir, 0777);
3068         if (status != 0)
3069         {
3070           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3071               dir, rrd_strerror(errno));
3072           return 6;
3073         }
3075         if (access(dir, R_OK|W_OK|X_OK) != 0)
3076         {
3077           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3078                   errno ? rrd_strerror(errno) : "");
3079           return 6;
3080         }
3081       }
3082       break;
3084       case 'h':
3085       case '?':
3086         printf ("RRDCacheD %s\n"
3087             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3088             "\n"
3089             "Usage: rrdcached [options]\n"
3090             "\n"
3091             "Valid options are:\n"
3092             "  -l <address>  Socket address to listen to.\n"
3093             "  -P <perms>    Sets the permissions to assign to all following "
3094                             "sockets\n"
3095             "  -w <seconds>  Interval in which to write data.\n"
3096             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3097             "  -t <threads>  Number of write threads.\n"
3098             "  -f <seconds>  Interval in which to flush dead data.\n"
3099             "  -p <file>     Location of the PID-file.\n"
3100             "  -b <dir>      Base directory to change to.\n"
3101             "  -B            Restrict file access to paths within -b <dir>\n"
3102             "  -g            Do not fork and run in the foreground.\n"
3103             "  -j <dir>      Directory in which to create the journal files.\n"
3104             "  -F            Always flush all updates at shutdown\n"
3105             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3106             "                (the socket will also have read/write permissions "
3107                             "for that group)\n"
3108             "  -m <mode>     File permissions (octal) of all following UNIX "
3109                             "sockets\n"
3110             "\n"
3111             "For more information and a detailed description of all options "
3112             "please refer\n"
3113             "to the rrdcached(1) manual page.\n",
3114             VERSION);
3115         if (option == 'h')
3116           status = -1;
3117         else
3118           status = 1;
3119         break;
3120     } /* switch (option) */
3121   } /* while (getopt) */
3123   /* advise the user when values are not sane */
3124   if (config_flush_interval < 2 * config_write_interval)
3125     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3126             " 2x write interval (-w) !\n");
3127   if (config_write_jitter > config_write_interval)
3128     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3129             " write interval (-w) !\n");
3131   if (config_write_base_only && config_base_dir == NULL)
3132     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3133             "  Consult the rrdcached documentation\n");
3135   if (journal_dir == NULL)
3136     config_flush_at_shutdown = 1;
3138   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3140   return (status);
3141 } /* }}} int read_options */
3143 int main (int argc, char **argv)
3145   int status;
3147   status = read_options (argc, argv);
3148   if (status != 0)
3149   {
3150     if (status < 0)
3151       status = 0;
3152     return (status);
3153   }
3155   status = daemonize ();
3156   if (status != 0)
3157   {
3158     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3159     return (1);
3160   }
3162   journal_init();
3164   /* start the queue threads */
3165   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3166   if (queue_threads == NULL)
3167   {
3168     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3169     cleanup();
3170     return (1);
3171   }
3172   for (int i = 0; i < config_queue_threads; i++)
3173   {
3174     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3175     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3176     if (status != 0)
3177     {
3178       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3179       cleanup();
3180       return (1);
3181     }
3182   }
3184   /* start the flush thread */
3185   memset(&flush_thread, 0, sizeof(flush_thread));
3186   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3187   if (status != 0)
3188   {
3189     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3190     cleanup();
3191     return (1);
3192   }
3194   listen_thread_main (NULL);
3195   cleanup ();
3197   return (0);
3198 } /* int main */
3200 /*
3201  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3202  */