Code

add hosts_access support to rrdcached -- Shaun Reitan mailinglists@unix-scripts.com
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120   do { \
121     if (stay_foreground) { \
122       fprintf(stderr, __VA_ARGS__); \
123       fprintf(stderr, "\n"); } \
124     syslog ((severity), __VA_ARGS__); \
125   } while (0)
127 /*
128  * Types
129  */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
134   int fd;
135   char addr[PATH_MAX + 1];
136   int family;
138   /* state for BATCH processing */
139   time_t batch_start;
140   int batch_cmd;
142   /* buffered IO */
143   char *rbuf;
144   off_t next_cmd;
145   off_t next_read;
147   char *wbuf;
148   ssize_t wbuf_len;
150   uint32_t permissions;
152   gid_t  socket_group;
153   mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
161                         time_t UNUSED(now),\
162                         char  UNUSED(*buffer),\
163                         size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
166                         DISPATCH_PROTO
168 struct command_s {
169   char   *cmd;
170   int (*handler)(HANDLER_PROTO);
172   char  context;                /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT      (1<<0)
174 #define CMD_CONTEXT_BATCH       (1<<1)
175 #define CMD_CONTEXT_JOURNAL     (1<<2)
176 #define CMD_CONTEXT_ANY         (0x7f)
178   char *syntax;
179   char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
186   char *file;
187   char **values;
188   size_t values_num;
189   time_t last_flush_time;
190   time_t last_update_stamp;
191 #define CI_FLAGS_IN_TREE  (1<<0)
192 #define CI_FLAGS_IN_QUEUE (1<<1)
193   int flags;
194   pthread_cond_t  flushed;
195   cache_item_t *prev;
196   cache_item_t *next;
197 };
199 struct callback_flush_data_s
201   time_t now;
202   time_t abs_timeout;
203   char **keys;
204   size_t keys_num;
205 };
206 typedef struct callback_flush_data_s callback_flush_data_t;
208 enum queue_side_e
210   HEAD,
211   TAIL
212 };
213 typedef enum queue_side_e queue_side_t;
215 /* describe a set of journal files */
216 typedef struct {
217   char **files;
218   size_t files_num;
219 } journal_set;
221 /* max length of socket command or response */
222 #define CMD_MAX 4096
223 #define RBUF_SIZE (CMD_MAX*2)
225 /*
226  * Variables
227  */
228 static int stay_foreground = 0;
229 static uid_t daemon_uid;
231 static listen_socket_t *listen_fds = NULL;
232 static size_t listen_fds_num = 0;
234 static listen_socket_t default_socket;
236 enum {
237   RUNNING,              /* normal operation */
238   FLUSHING,             /* flushing remaining values */
239   SHUTDOWN              /* shutting down */
240 } state = RUNNING;
242 static pthread_t *queue_threads;
243 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
244 static int config_queue_threads = 4;
246 static pthread_t flush_thread;
247 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
249 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
250 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
251 static int connection_threads_num = 0;
253 /* Cache stuff */
254 static GTree          *cache_tree = NULL;
255 static cache_item_t   *cache_queue_head = NULL;
256 static cache_item_t   *cache_queue_tail = NULL;
257 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
259 static int config_write_interval = 300;
260 static int config_write_jitter   = 0;
261 static int config_flush_interval = 3600;
262 static int config_flush_at_shutdown = 0;
263 static char *config_pid_file = NULL;
264 static char *config_base_dir = NULL;
265 static size_t _config_base_dir_len = 0;
266 static int config_write_base_only = 0;
268 static listen_socket_t **config_listen_address_list = NULL;
269 static size_t config_listen_address_list_len = 0;
271 static uint64_t stats_queue_length = 0;
272 static uint64_t stats_updates_received = 0;
273 static uint64_t stats_flush_received = 0;
274 static uint64_t stats_updates_written = 0;
275 static uint64_t stats_data_sets_written = 0;
276 static uint64_t stats_journal_bytes = 0;
277 static uint64_t stats_journal_rotate = 0;
278 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL;         /* current journal file handle */
287 static long  journal_size = 0;          /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
297 /* 
298  * Functions
299  */
300 static void sig_common (const char *sig) /* {{{ */
302   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
303   state = FLUSHING;
304   pthread_cond_broadcast(&flush_cond);
305   pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
310   sig_common("INT");
311 } /* }}} void sig_int_handler */
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
315   sig_common("TERM");
316 } /* }}} void sig_term_handler */
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
320   config_flush_at_shutdown = 1;
321   sig_common("USR1");
322 } /* }}} void sig_usr1_handler */
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
326   config_flush_at_shutdown = 0;
327   sig_common("USR2");
328 } /* }}} void sig_usr2_handler */
330 static void install_signal_handlers(void) /* {{{ */
332   /* These structures are static, because `sigaction' behaves weird if the are
333    * overwritten.. */
334   static struct sigaction sa_int;
335   static struct sigaction sa_term;
336   static struct sigaction sa_pipe;
337   static struct sigaction sa_usr1;
338   static struct sigaction sa_usr2;
340   /* Install signal handlers */
341   memset (&sa_int, 0, sizeof (sa_int));
342   sa_int.sa_handler = sig_int_handler;
343   sigaction (SIGINT, &sa_int, NULL);
345   memset (&sa_term, 0, sizeof (sa_term));
346   sa_term.sa_handler = sig_term_handler;
347   sigaction (SIGTERM, &sa_term, NULL);
349   memset (&sa_pipe, 0, sizeof (sa_pipe));
350   sa_pipe.sa_handler = SIG_IGN;
351   sigaction (SIGPIPE, &sa_pipe, NULL);
353   memset (&sa_pipe, 0, sizeof (sa_usr1));
354   sa_usr1.sa_handler = sig_usr1_handler;
355   sigaction (SIGUSR1, &sa_usr1, NULL);
357   memset (&sa_usr2, 0, sizeof (sa_usr2));
358   sa_usr2.sa_handler = sig_usr2_handler;
359   sigaction (SIGUSR2, &sa_usr2, NULL);
361 } /* }}} void install_signal_handlers */
363 static int open_pidfile(char *action, int oflag) /* {{{ */
365   int fd;
366   const char *file;
367   char *file_copy, *dir;
369   file = (config_pid_file != NULL)
370     ? config_pid_file
371     : LOCALSTATEDIR "/run/rrdcached.pid";
373   /* dirname may modify its argument */
374   file_copy = strdup(file);
375   if (file_copy == NULL)
376   {
377     fprintf(stderr, "rrdcached: strdup(): %s\n",
378         rrd_strerror(errno));
379     return -1;
380   }
382   dir = dirname(file_copy);
383   if (rrd_mkdir_p(dir, 0777) != 0)
384   {
385     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386         dir, rrd_strerror(errno));
387     return -1;
388   }
390   free(file_copy);
392   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
393   if (fd < 0)
394     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395             action, file, rrd_strerror(errno));
397   return(fd);
398 } /* }}} static int open_pidfile */
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
403   int pid_fd;
404   pid_t pid;
405   char pid_str[16];
407   pid_fd = open_pidfile("open", O_RDWR);
408   if (pid_fd < 0)
409     return pid_fd;
411   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
412     return -1;
414   pid = atoi(pid_str);
415   if (pid <= 0)
416     return -1;
418   /* another running process that we can signal COULD be
419    * a competing rrdcached */
420   if (pid != getpid() && kill(pid, 0) == 0)
421   {
422     fprintf(stderr,
423             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
424     close(pid_fd);
425     return -1;
426   }
428   lseek(pid_fd, 0, SEEK_SET);
429   if (ftruncate(pid_fd, 0) == -1)
430   {
431     fprintf(stderr,
432             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
433     close(pid_fd);
434     return -1;
435   }
437   fprintf(stderr,
438           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439           "rrdcached: starting normally.\n", pid);
441   return pid_fd;
442 } /* }}} static int check_pidfile */
444 static int write_pidfile (int fd) /* {{{ */
446   pid_t pid;
447   FILE *fh;
449   pid = getpid ();
451   fh = fdopen (fd, "w");
452   if (fh == NULL)
453   {
454     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
455     close(fd);
456     return (-1);
457   }
459   fprintf (fh, "%i\n", (int) pid);
460   fclose (fh);
462   return (0);
463 } /* }}} int write_pidfile */
465 static int remove_pidfile (void) /* {{{ */
467   char *file;
468   int status;
470   file = (config_pid_file != NULL)
471     ? config_pid_file
472     : LOCALSTATEDIR "/run/rrdcached.pid";
474   status = unlink (file);
475   if (status == 0)
476     return (0);
477   return (errno);
478 } /* }}} int remove_pidfile */
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
482   char *eol;
484   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485                sock->next_read - sock->next_cmd);
487   if (eol == NULL)
488   {
489     /* no commands left, move remainder back to front of rbuf */
490     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491             sock->next_read - sock->next_cmd);
492     sock->next_read -= sock->next_cmd;
493     sock->next_cmd = 0;
494     *len = 0;
495     return NULL;
496   }
497   else
498   {
499     char *cmd = sock->rbuf + sock->next_cmd;
500     *eol = '\0';
502     sock->next_cmd = eol - sock->rbuf + 1;
504     if (eol > sock->rbuf && *(eol-1) == '\r')
505       *(--eol) = '\0'; /* handle "\r\n" EOL */
507     *len = eol - cmd;
509     return cmd;
510   }
512   /* NOTREACHED */
513   assert(1==0);
514 } /* }}} char *next_cmd */
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
519   char *new_buf;
521   assert(sock != NULL);
523   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
524   if (new_buf == NULL)
525   {
526     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
527     return -1;
528   }
530   strncpy(new_buf + sock->wbuf_len, str, len + 1);
532   sock->wbuf = new_buf;
533   sock->wbuf_len += len;
535   return 0;
536 } /* }}} static int add_to_wbuf */
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
541   va_list argp;
542   char buffer[CMD_MAX];
543   int len;
545   if (JOURNAL_REPLAY(sock)) return 0;
546   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
548   va_start(argp, fmt);
549 #ifdef HAVE_VSNPRINTF
550   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
551 #else
552   len = vsprintf(buffer, fmt, argp);
553 #endif
554   va_end(argp);
555   if (len < 0)
556   {
557     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
558     return -1;
559   }
561   return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
564 static int count_lines(char *str) /* {{{ */
566   int lines = 0;
568   if (str != NULL)
569   {
570     while ((str = strchr(str, '\n')) != NULL)
571     {
572       ++lines;
573       ++str;
574     }
575   }
577   return lines;
578 } /* }}} static int count_lines */
580 /* send the response back to the user.
581  * returns 0 on success, -1 on error
582  * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584                           char *fmt, ...) /* {{{ */
586   va_list argp;
587   char buffer[CMD_MAX];
588   int lines;
589   ssize_t wrote;
590   int rclen, len;
592   if (JOURNAL_REPLAY(sock)) return rc;
594   if (sock->batch_start)
595   {
596     if (rc == RESP_OK)
597       return rc; /* no response on success during BATCH */
598     lines = sock->batch_cmd;
599   }
600   else if (rc == RESP_OK)
601     lines = count_lines(sock->wbuf);
602   else
603     lines = -1;
605   rclen = sprintf(buffer, "%d ", lines);
606   va_start(argp, fmt);
607 #ifdef HAVE_VSNPRINTF
608   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
609 #else
610   len = vsprintf(buffer+rclen, fmt, argp);
611 #endif
612   va_end(argp);
613   if (len < 0)
614     return -1;
616   len += rclen;
618   /* append the result to the wbuf, don't write to the user */
619   if (sock->batch_start)
620     return add_to_wbuf(sock, buffer, len);
622   /* first write must be complete */
623   if (len != write(sock->fd, buffer, len))
624   {
625     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
626     return -1;
627   }
629   if (sock->wbuf != NULL && rc == RESP_OK)
630   {
631     wrote = 0;
632     while (wrote < sock->wbuf_len)
633     {
634       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
635       if (wb <= 0)
636       {
637         RRDD_LOG(LOG_INFO, "send_response: could not write results");
638         return -1;
639       }
640       wrote += wb;
641     }
642   }
644   free(sock->wbuf); sock->wbuf = NULL;
645   sock->wbuf_len = 0;
647   return 0;
648 } /* }}} */
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
652   ci->values = NULL;
653   ci->values_num = 0;
655   ci->last_flush_time = when;
656   if (config_write_jitter > 0)
657     ci->last_flush_time += (rrd_random() % config_write_jitter);
660 /* remove_from_queue
661  * remove a "cache_item_t" item from the queue.
662  * must hold 'cache_lock' when calling this
663  */
664 static void remove_from_queue(cache_item_t *ci) /* {{{ */
666   if (ci == NULL) return;
667   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
669   if (ci->prev == NULL)
670     cache_queue_head = ci->next; /* reset head */
671   else
672     ci->prev->next = ci->next;
674   if (ci->next == NULL)
675     cache_queue_tail = ci->prev; /* reset the tail */
676   else
677     ci->next->prev = ci->prev;
679   ci->next = ci->prev = NULL;
680   ci->flags &= ~CI_FLAGS_IN_QUEUE;
682   pthread_mutex_lock (&stats_lock);
683   assert (stats_queue_length > 0);
684   stats_queue_length--;
685   pthread_mutex_unlock (&stats_lock);
687 } /* }}} static void remove_from_queue */
689 /* free the resources associated with the cache_item_t
690  * must hold cache_lock when calling this function
691  */
692 static void *free_cache_item(cache_item_t *ci) /* {{{ */
694   if (ci == NULL) return NULL;
696   remove_from_queue(ci);
698   for (size_t i=0; i < ci->values_num; i++)
699     free(ci->values[i]);
701   free (ci->values);
702   free (ci->file);
704   /* in case anyone is waiting */
705   pthread_cond_broadcast(&ci->flushed);
706   pthread_cond_destroy(&ci->flushed);
708   free (ci);
710   return NULL;
711 } /* }}} static void *free_cache_item */
713 /*
714  * enqueue_cache_item:
715  * `cache_lock' must be acquired before calling this function!
716  */
717 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
718     queue_side_t side)
720   if (ci == NULL)
721     return (-1);
723   if (ci->values_num == 0)
724     return (0);
726   if (side == HEAD)
727   {
728     if (cache_queue_head == ci)
729       return 0;
731     /* remove if further down in queue */
732     remove_from_queue(ci);
734     ci->prev = NULL;
735     ci->next = cache_queue_head;
736     if (ci->next != NULL)
737       ci->next->prev = ci;
738     cache_queue_head = ci;
740     if (cache_queue_tail == NULL)
741       cache_queue_tail = cache_queue_head;
742   }
743   else /* (side == TAIL) */
744   {
745     /* We don't move values back in the list.. */
746     if (ci->flags & CI_FLAGS_IN_QUEUE)
747       return (0);
749     assert (ci->next == NULL);
750     assert (ci->prev == NULL);
752     ci->prev = cache_queue_tail;
754     if (cache_queue_tail == NULL)
755       cache_queue_head = ci;
756     else
757       cache_queue_tail->next = ci;
759     cache_queue_tail = ci;
760   }
762   ci->flags |= CI_FLAGS_IN_QUEUE;
764   pthread_cond_signal(&queue_cond);
765   pthread_mutex_lock (&stats_lock);
766   stats_queue_length++;
767   pthread_mutex_unlock (&stats_lock);
769   return (0);
770 } /* }}} int enqueue_cache_item */
772 /*
773  * tree_callback_flush:
774  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
775  * while this is in progress.
776  */
777 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
778     gpointer data)
780   cache_item_t *ci;
781   callback_flush_data_t *cfd;
783   ci = (cache_item_t *) value;
784   cfd = (callback_flush_data_t *) data;
786   if (ci->flags & CI_FLAGS_IN_QUEUE)
787     return FALSE;
789   if (ci->values_num > 0
790       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
791   {
792     enqueue_cache_item (ci, TAIL);
793   }
794   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
795       && (ci->values_num <= 0))
796   {
797     assert ((char *) key == ci->file);
798     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
799     {
800       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
801       return (FALSE);
802     }
803   }
805   return (FALSE);
806 } /* }}} gboolean tree_callback_flush */
808 static int flush_old_values (int max_age)
810   callback_flush_data_t cfd;
811   size_t k;
813   memset (&cfd, 0, sizeof (cfd));
814   /* Pass the current time as user data so that we don't need to call
815    * `time' for each node. */
816   cfd.now = time (NULL);
817   cfd.keys = NULL;
818   cfd.keys_num = 0;
820   if (max_age > 0)
821     cfd.abs_timeout = cfd.now - max_age;
822   else
823     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
825   /* `tree_callback_flush' will return the keys of all values that haven't
826    * been touched in the last `config_flush_interval' seconds in `cfd'.
827    * The char*'s in this array point to the same memory as ci->file, so we
828    * don't need to free them separately. */
829   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
831   for (k = 0; k < cfd.keys_num; k++)
832   {
833     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
834     /* should never fail, since we have held the cache_lock
835      * the entire time */
836     assert(status == TRUE);
837   }
839   if (cfd.keys != NULL)
840   {
841     free (cfd.keys);
842     cfd.keys = NULL;
843   }
845   return (0);
846 } /* int flush_old_values */
848 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
850   struct timeval now;
851   struct timespec next_flush;
852   int status;
854   gettimeofday (&now, NULL);
855   next_flush.tv_sec = now.tv_sec + config_flush_interval;
856   next_flush.tv_nsec = 1000 * now.tv_usec;
858   pthread_mutex_lock(&cache_lock);
860   while (state == RUNNING)
861   {
862     gettimeofday (&now, NULL);
863     if ((now.tv_sec > next_flush.tv_sec)
864         || ((now.tv_sec == next_flush.tv_sec)
865           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
866     {
867       RRDD_LOG(LOG_DEBUG, "flushing old values");
869       /* Determine the time of the next cache flush. */
870       next_flush.tv_sec = now.tv_sec + config_flush_interval;
872       /* Flush all values that haven't been written in the last
873        * `config_write_interval' seconds. */
874       flush_old_values (config_write_interval);
876       /* unlock the cache while we rotate so we don't block incoming
877        * updates if the fsync() blocks on disk I/O */
878       pthread_mutex_unlock(&cache_lock);
879       journal_rotate();
880       pthread_mutex_lock(&cache_lock);
881     }
883     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
884     if (status != 0 && status != ETIMEDOUT)
885     {
886       RRDD_LOG (LOG_ERR, "flush_thread_main: "
887                 "pthread_cond_timedwait returned %i.", status);
888     }
889   }
891   if (config_flush_at_shutdown)
892     flush_old_values (-1); /* flush everything */
894   state = SHUTDOWN;
896   pthread_mutex_unlock(&cache_lock);
898   return NULL;
899 } /* void *flush_thread_main */
901 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
903   pthread_mutex_lock (&cache_lock);
905   while (state != SHUTDOWN
906          || (cache_queue_head != NULL && config_flush_at_shutdown))
907   {
908     cache_item_t *ci;
909     char *file;
910     char **values;
911     size_t values_num;
912     int status;
914     /* Now, check if there's something to store away. If not, wait until
915      * something comes in. */
916     if (cache_queue_head == NULL)
917     {
918       status = pthread_cond_wait (&queue_cond, &cache_lock);
919       if ((status != 0) && (status != ETIMEDOUT))
920       {
921         RRDD_LOG (LOG_ERR, "queue_thread_main: "
922             "pthread_cond_wait returned %i.", status);
923       }
924     }
926     /* Check if a value has arrived. This may be NULL if we timed out or there
927      * was an interrupt such as a signal. */
928     if (cache_queue_head == NULL)
929       continue;
931     ci = cache_queue_head;
933     /* copy the relevant parts */
934     file = strdup (ci->file);
935     if (file == NULL)
936     {
937       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
938       continue;
939     }
941     assert(ci->values != NULL);
942     assert(ci->values_num > 0);
944     values = ci->values;
945     values_num = ci->values_num;
947     wipe_ci_values(ci, time(NULL));
948     remove_from_queue(ci);
950     pthread_mutex_unlock (&cache_lock);
952     rrd_clear_error ();
953     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
954     if (status != 0)
955     {
956       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
957           "rrd_update_r (%s) failed with status %i. (%s)",
958           file, status, rrd_get_error());
959     }
961     journal_write("wrote", file);
963     /* Search again in the tree.  It's possible someone issued a "FORGET"
964      * while we were writing the update values. */
965     pthread_mutex_lock(&cache_lock);
966     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
967     if (ci)
968       pthread_cond_broadcast(&ci->flushed);
969     pthread_mutex_unlock(&cache_lock);
971     if (status == 0)
972     {
973       pthread_mutex_lock (&stats_lock);
974       stats_updates_written++;
975       stats_data_sets_written += values_num;
976       pthread_mutex_unlock (&stats_lock);
977     }
979     rrd_free_ptrs((void ***) &values, &values_num);
980     free(file);
982     pthread_mutex_lock (&cache_lock);
983   }
984   pthread_mutex_unlock (&cache_lock);
986   return (NULL);
987 } /* }}} void *queue_thread_main */
989 static int buffer_get_field (char **buffer_ret, /* {{{ */
990     size_t *buffer_size_ret, char **field_ret)
992   char *buffer;
993   size_t buffer_pos;
994   size_t buffer_size;
995   char *field;
996   size_t field_size;
997   int status;
999   buffer = *buffer_ret;
1000   buffer_pos = 0;
1001   buffer_size = *buffer_size_ret;
1002   field = *buffer_ret;
1003   field_size = 0;
1005   if (buffer_size <= 0)
1006     return (-1);
1008   /* This is ensured by `handle_request'. */
1009   assert (buffer[buffer_size - 1] == '\0');
1011   status = -1;
1012   while (buffer_pos < buffer_size)
1013   {
1014     /* Check for end-of-field or end-of-buffer */
1015     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1016     {
1017       field[field_size] = 0;
1018       field_size++;
1019       buffer_pos++;
1020       status = 0;
1021       break;
1022     }
1023     /* Handle escaped characters. */
1024     else if (buffer[buffer_pos] == '\\')
1025     {
1026       if (buffer_pos >= (buffer_size - 1))
1027         break;
1028       buffer_pos++;
1029       field[field_size] = buffer[buffer_pos];
1030       field_size++;
1031       buffer_pos++;
1032     }
1033     /* Normal operation */ 
1034     else
1035     {
1036       field[field_size] = buffer[buffer_pos];
1037       field_size++;
1038       buffer_pos++;
1039     }
1040   } /* while (buffer_pos < buffer_size) */
1042   if (status != 0)
1043     return (status);
1045   *buffer_ret = buffer + buffer_pos;
1046   *buffer_size_ret = buffer_size - buffer_pos;
1047   *field_ret = field;
1049   return (0);
1050 } /* }}} int buffer_get_field */
1052 /* if we're restricting writes to the base directory,
1053  * check whether the file falls within the dir
1054  * returns 1 if OK, otherwise 0
1055  */
1056 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1058   assert(file != NULL);
1060   if (!config_write_base_only
1061       || JOURNAL_REPLAY(sock)
1062       || config_base_dir == NULL)
1063     return 1;
1065   if (strstr(file, "../") != NULL) goto err;
1067   /* relative paths without "../" are ok */
1068   if (*file != '/') return 1;
1070   /* file must be of the format base + "/" + <1+ char filename> */
1071   if (strlen(file) < _config_base_dir_len + 2) goto err;
1072   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1073   if (*(file + _config_base_dir_len) != '/') goto err;
1075   return 1;
1077 err:
1078   if (sock != NULL && sock->fd >= 0)
1079     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1081   return 0;
1082 } /* }}} static int check_file_access */
1084 /* when using a base dir, convert relative paths to absolute paths.
1085  * if necessary, modifies the "filename" pointer to point
1086  * to the new path created in "tmp".  "tmp" is provided
1087  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1088  *
1089  * this allows us to optimize for the expected case (absolute path)
1090  * with a no-op.
1091  */
1092 static void get_abs_path(char **filename, char *tmp)
1094   assert(tmp != NULL);
1095   assert(filename != NULL && *filename != NULL);
1097   if (config_base_dir == NULL || **filename == '/')
1098     return;
1100   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1101   *filename = tmp;
1102 } /* }}} static int get_abs_path */
1104 static int flush_file (const char *filename) /* {{{ */
1106   cache_item_t *ci;
1108   pthread_mutex_lock (&cache_lock);
1110   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1111   if (ci == NULL)
1112   {
1113     pthread_mutex_unlock (&cache_lock);
1114     return (ENOENT);
1115   }
1117   if (ci->values_num > 0)
1118   {
1119     /* Enqueue at head */
1120     enqueue_cache_item (ci, HEAD);
1121     pthread_cond_wait(&ci->flushed, &cache_lock);
1122   }
1124   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1125    * may have been purged during our cond_wait() */
1127   pthread_mutex_unlock(&cache_lock);
1129   return (0);
1130 } /* }}} int flush_file */
1132 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1134   char *err = "Syntax error.\n";
1136   if (cmd && cmd->syntax)
1137     err = cmd->syntax;
1139   return send_response(sock, RESP_ERR, "Usage: %s", err);
1140 } /* }}} static int syntax_error() */
1142 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1144   uint64_t copy_queue_length;
1145   uint64_t copy_updates_received;
1146   uint64_t copy_flush_received;
1147   uint64_t copy_updates_written;
1148   uint64_t copy_data_sets_written;
1149   uint64_t copy_journal_bytes;
1150   uint64_t copy_journal_rotate;
1152   uint64_t tree_nodes_number;
1153   uint64_t tree_depth;
1155   pthread_mutex_lock (&stats_lock);
1156   copy_queue_length       = stats_queue_length;
1157   copy_updates_received   = stats_updates_received;
1158   copy_flush_received     = stats_flush_received;
1159   copy_updates_written    = stats_updates_written;
1160   copy_data_sets_written  = stats_data_sets_written;
1161   copy_journal_bytes      = stats_journal_bytes;
1162   copy_journal_rotate     = stats_journal_rotate;
1163   pthread_mutex_unlock (&stats_lock);
1165   pthread_mutex_lock (&cache_lock);
1166   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1167   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1168   pthread_mutex_unlock (&cache_lock);
1170   add_response_info(sock,
1171                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1172   add_response_info(sock,
1173                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1174   add_response_info(sock,
1175                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1176   add_response_info(sock,
1177                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1178   add_response_info(sock,
1179                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1180   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1181   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1182   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1183   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1185   send_response(sock, RESP_OK, "Statistics follow\n");
1187   return (0);
1188 } /* }}} int handle_request_stats */
1190 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1192   char *file, file_tmp[PATH_MAX];
1193   int status;
1195   status = buffer_get_field (&buffer, &buffer_size, &file);
1196   if (status != 0)
1197   {
1198     return syntax_error(sock,cmd);
1199   }
1200   else
1201   {
1202     pthread_mutex_lock(&stats_lock);
1203     stats_flush_received++;
1204     pthread_mutex_unlock(&stats_lock);
1206     get_abs_path(&file, file_tmp);
1207     if (!check_file_access(file, sock)) return 0;
1209     status = flush_file (file);
1210     if (status == 0)
1211       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1212     else if (status == ENOENT)
1213     {
1214       /* no file in our tree; see whether it exists at all */
1215       struct stat statbuf;
1217       memset(&statbuf, 0, sizeof(statbuf));
1218       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1219         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1220       else
1221         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1222     }
1223     else if (status < 0)
1224       return send_response(sock, RESP_ERR, "Internal error.\n");
1225     else
1226       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1227   }
1229   /* NOTREACHED */
1230   assert(1==0);
1231 } /* }}} int handle_request_flush */
1233 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1235   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1237   pthread_mutex_lock(&cache_lock);
1238   flush_old_values(-1);
1239   pthread_mutex_unlock(&cache_lock);
1241   return send_response(sock, RESP_OK, "Started flush.\n");
1242 } /* }}} static int handle_request_flushall */
1244 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1246   int status;
1247   char *file, file_tmp[PATH_MAX];
1248   cache_item_t *ci;
1250   status = buffer_get_field(&buffer, &buffer_size, &file);
1251   if (status != 0)
1252     return syntax_error(sock,cmd);
1254   get_abs_path(&file, file_tmp);
1256   pthread_mutex_lock(&cache_lock);
1257   ci = g_tree_lookup(cache_tree, file);
1258   if (ci == NULL)
1259   {
1260     pthread_mutex_unlock(&cache_lock);
1261     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1262   }
1264   for (size_t i=0; i < ci->values_num; i++)
1265     add_response_info(sock, "%s\n", ci->values[i]);
1267   pthread_mutex_unlock(&cache_lock);
1268   return send_response(sock, RESP_OK, "updates pending\n");
1269 } /* }}} static int handle_request_pending */
1271 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1273   int status;
1274   gboolean found;
1275   char *file, file_tmp[PATH_MAX];
1277   status = buffer_get_field(&buffer, &buffer_size, &file);
1278   if (status != 0)
1279     return syntax_error(sock,cmd);
1281   get_abs_path(&file, file_tmp);
1282   if (!check_file_access(file, sock)) return 0;
1284   pthread_mutex_lock(&cache_lock);
1285   found = g_tree_remove(cache_tree, file);
1286   pthread_mutex_unlock(&cache_lock);
1288   if (found == TRUE)
1289   {
1290     if (!JOURNAL_REPLAY(sock))
1291       journal_write("forget", file);
1293     return send_response(sock, RESP_OK, "Gone!\n");
1294   }
1295   else
1296     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1298   /* NOTREACHED */
1299   assert(1==0);
1300 } /* }}} static int handle_request_forget */
1302 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1304   cache_item_t *ci;
1306   pthread_mutex_lock(&cache_lock);
1308   ci = cache_queue_head;
1309   while (ci != NULL)
1310   {
1311     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1312     ci = ci->next;
1313   }
1315   pthread_mutex_unlock(&cache_lock);
1317   return send_response(sock, RESP_OK, "in queue.\n");
1318 } /* }}} int handle_request_queue */
1320 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1322   char *file, file_tmp[PATH_MAX];
1323   int values_num = 0;
1324   int status;
1325   char orig_buf[CMD_MAX];
1327   cache_item_t *ci;
1329   /* save it for the journal later */
1330   if (!JOURNAL_REPLAY(sock))
1331     strncpy(orig_buf, buffer, buffer_size);
1333   status = buffer_get_field (&buffer, &buffer_size, &file);
1334   if (status != 0)
1335     return syntax_error(sock,cmd);
1337   pthread_mutex_lock(&stats_lock);
1338   stats_updates_received++;
1339   pthread_mutex_unlock(&stats_lock);
1341   get_abs_path(&file, file_tmp);
1342   if (!check_file_access(file, sock)) return 0;
1344   pthread_mutex_lock (&cache_lock);
1345   ci = g_tree_lookup (cache_tree, file);
1347   if (ci == NULL) /* {{{ */
1348   {
1349     struct stat statbuf;
1350     cache_item_t *tmp;
1352     /* don't hold the lock while we setup; stat(2) might block */
1353     pthread_mutex_unlock(&cache_lock);
1355     memset (&statbuf, 0, sizeof (statbuf));
1356     status = stat (file, &statbuf);
1357     if (status != 0)
1358     {
1359       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1361       status = errno;
1362       if (status == ENOENT)
1363         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1364       else
1365         return send_response(sock, RESP_ERR,
1366                              "stat failed with error %i.\n", status);
1367     }
1368     if (!S_ISREG (statbuf.st_mode))
1369       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1371     if (access(file, R_OK|W_OK) != 0)
1372       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1373                            file, rrd_strerror(errno));
1375     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1376     if (ci == NULL)
1377     {
1378       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1380       return send_response(sock, RESP_ERR, "malloc failed.\n");
1381     }
1382     memset (ci, 0, sizeof (cache_item_t));
1384     ci->file = strdup (file);
1385     if (ci->file == NULL)
1386     {
1387       free (ci);
1388       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1390       return send_response(sock, RESP_ERR, "strdup failed.\n");
1391     }
1393     wipe_ci_values(ci, now);
1394     ci->flags = CI_FLAGS_IN_TREE;
1395     pthread_cond_init(&ci->flushed, NULL);
1397     pthread_mutex_lock(&cache_lock);
1399     /* another UPDATE might have added this entry in the meantime */
1400     tmp = g_tree_lookup (cache_tree, file);
1401     if (tmp == NULL)
1402       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1403     else
1404     {
1405       free_cache_item (ci);
1406       ci = tmp;
1407     }
1409     /* state may have changed while we were unlocked */
1410     if (state == SHUTDOWN)
1411       return -1;
1412   } /* }}} */
1413   assert (ci != NULL);
1415   /* don't re-write updates in replay mode */
1416   if (!JOURNAL_REPLAY(sock))
1417     journal_write("update", orig_buf);
1419   while (buffer_size > 0)
1420   {
1421     char *value;
1422     time_t stamp;
1423     char *eostamp;
1425     status = buffer_get_field (&buffer, &buffer_size, &value);
1426     if (status != 0)
1427     {
1428       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1429       break;
1430     }
1432     /* make sure update time is always moving forward */
1433     stamp = strtol(value, &eostamp, 10);
1434     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1435     {
1436       pthread_mutex_unlock(&cache_lock);
1437       return send_response(sock, RESP_ERR,
1438                            "Cannot find timestamp in '%s'!\n", value);
1439     }
1440     else if (stamp <= ci->last_update_stamp)
1441     {
1442       pthread_mutex_unlock(&cache_lock);
1443       return send_response(sock, RESP_ERR,
1444                            "illegal attempt to update using time %ld when last"
1445                            " update time is %ld (minimum one second step)\n",
1446                            stamp, ci->last_update_stamp);
1447     }
1448     else
1449       ci->last_update_stamp = stamp;
1451     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1452     {
1453       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454       continue;
1455     }
1457     values_num++;
1458   }
1460   if (((now - ci->last_flush_time) >= config_write_interval)
1461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462       && (ci->values_num > 0))
1463   {
1464     enqueue_cache_item (ci, TAIL);
1465   }
1467   pthread_mutex_unlock (&cache_lock);
1469   if (values_num < 1)
1470     return send_response(sock, RESP_ERR, "No values updated.\n");
1471   else
1472     return send_response(sock, RESP_OK,
1473                          "errors, enqueued %i value(s).\n", values_num);
1475   /* NOTREACHED */
1476   assert(1==0);
1478 } /* }}} int handle_request_update */
1480 /* we came across a "WROTE" entry during journal replay.
1481  * throw away any values that we have accumulated for this file
1482  */
1483 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1485   cache_item_t *ci;
1486   const char *file = buffer;
1488   pthread_mutex_lock(&cache_lock);
1490   ci = g_tree_lookup(cache_tree, file);
1491   if (ci == NULL)
1492   {
1493     pthread_mutex_unlock(&cache_lock);
1494     return (0);
1495   }
1497   if (ci->values)
1498     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1500   wipe_ci_values(ci, now);
1501   remove_from_queue(ci);
1503   pthread_mutex_unlock(&cache_lock);
1504   return (0);
1505 } /* }}} int handle_request_wrote */
1507 /* start "BATCH" processing */
1508 static int batch_start (HANDLER_PROTO) /* {{{ */
1510   int status;
1511   if (sock->batch_start)
1512     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1514   status = send_response(sock, RESP_OK,
1515                          "Go ahead.  End with dot '.' on its own line.\n");
1516   sock->batch_start = time(NULL);
1517   sock->batch_cmd = 0;
1519   return status;
1520 } /* }}} static int batch_start */
1522 /* finish "BATCH" processing and return results to the client */
1523 static int batch_done (HANDLER_PROTO) /* {{{ */
1525   assert(sock->batch_start);
1526   sock->batch_start = 0;
1527   sock->batch_cmd  = 0;
1528   return send_response(sock, RESP_OK, "errors\n");
1529 } /* }}} static int batch_done */
1531 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1533   return -1;
1534 } /* }}} static int handle_request_quit */
1536 static command_t list_of_commands[] = { /* {{{ */
1537   {
1538     "UPDATE",
1539     handle_request_update,
1540     CMD_CONTEXT_ANY,
1541     "UPDATE <filename> <values> [<values> ...]\n"
1542     ,
1543     "Adds the given file to the internal cache if it is not yet known and\n"
1544     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1545     "for details.\n"
1546     "\n"
1547     "Each <values> has the following form:\n"
1548     "  <values> = <time>:<value>[:<value>[...]]\n"
1549     "See the rrdupdate(1) manpage for details.\n"
1550   },
1551   {
1552     "WROTE",
1553     handle_request_wrote,
1554     CMD_CONTEXT_JOURNAL,
1555     NULL,
1556     NULL
1557   },
1558   {
1559     "FLUSH",
1560     handle_request_flush,
1561     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1562     "FLUSH <filename>\n"
1563     ,
1564     "Adds the given filename to the head of the update queue and returns\n"
1565     "after it has been dequeued.\n"
1566   },
1567   {
1568     "FLUSHALL",
1569     handle_request_flushall,
1570     CMD_CONTEXT_CLIENT,
1571     "FLUSHALL\n"
1572     ,
1573     "Triggers writing of all pending updates.  Returns immediately.\n"
1574   },
1575   {
1576     "PENDING",
1577     handle_request_pending,
1578     CMD_CONTEXT_CLIENT,
1579     "PENDING <filename>\n"
1580     ,
1581     "Shows any 'pending' updates for a file, in order.\n"
1582     "The updates shown have not yet been written to the underlying RRD file.\n"
1583   },
1584   {
1585     "FORGET",
1586     handle_request_forget,
1587     CMD_CONTEXT_ANY,
1588     "FORGET <filename>\n"
1589     ,
1590     "Removes the file completely from the cache.\n"
1591     "Any pending updates for the file will be lost.\n"
1592   },
1593   {
1594     "QUEUE",
1595     handle_request_queue,
1596     CMD_CONTEXT_CLIENT,
1597     "QUEUE\n"
1598     ,
1599         "Shows all files in the output queue.\n"
1600     "The output is zero or more lines in the following format:\n"
1601     "(where <num_vals> is the number of values to be written)\n"
1602     "\n"
1603     "<num_vals> <filename>\n"
1604   },
1605   {
1606     "STATS",
1607     handle_request_stats,
1608     CMD_CONTEXT_CLIENT,
1609     "STATS\n"
1610     ,
1611     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1612     "a description of the values.\n"
1613   },
1614   {
1615     "HELP",
1616     handle_request_help,
1617     CMD_CONTEXT_CLIENT,
1618     "HELP [<command>]\n",
1619     NULL, /* special! */
1620   },
1621   {
1622     "BATCH",
1623     batch_start,
1624     CMD_CONTEXT_CLIENT,
1625     "BATCH\n"
1626     ,
1627     "The 'BATCH' command permits the client to initiate a bulk load\n"
1628     "   of commands to rrdcached.\n"
1629     "\n"
1630     "Usage:\n"
1631     "\n"
1632     "    client: BATCH\n"
1633     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1634     "    client: command #1\n"
1635     "    client: command #2\n"
1636     "    client: ... and so on\n"
1637     "    client: .\n"
1638     "    server: 2 errors\n"
1639     "    server: 7 message for command #7\n"
1640     "    server: 9 message for command #9\n"
1641     "\n"
1642     "For more information, consult the rrdcached(1) documentation.\n"
1643   },
1644   {
1645     ".",   /* BATCH terminator */
1646     batch_done,
1647     CMD_CONTEXT_BATCH,
1648     NULL,
1649     NULL
1650   },
1651   {
1652     "QUIT",
1653     handle_request_quit,
1654     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1655     "QUIT\n"
1656     ,
1657     "Disconnect from rrdcached.\n"
1658   }
1659 }; /* }}} command_t list_of_commands[] */
1660 static size_t list_of_commands_len = sizeof (list_of_commands)
1661   / sizeof (list_of_commands[0]);
1663 static command_t *find_command(char *cmd)
1665   size_t i;
1667   for (i = 0; i < list_of_commands_len; i++)
1668     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1669       return (&list_of_commands[i]);
1670   return NULL;
1673 /* We currently use the index in the `list_of_commands' array as a bit position
1674  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1675  * outside these functions so that switching to a more elegant storage method
1676  * is easily possible. */
1677 static ssize_t find_command_index (const char *cmd) /* {{{ */
1679   size_t i;
1681   for (i = 0; i < list_of_commands_len; i++)
1682     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1683       return ((ssize_t) i);
1684   return (-1);
1685 } /* }}} ssize_t find_command_index */
1687 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1688     const char *cmd)
1690   ssize_t i;
1692   if (JOURNAL_REPLAY(sock))
1693     return (1);
1695   if (cmd == NULL)
1696     return (-1);
1698   if ((strcasecmp ("QUIT", cmd) == 0)
1699       || (strcasecmp ("HELP", cmd) == 0))
1700     return (1);
1701   else if (strcmp (".", cmd) == 0)
1702     cmd = "BATCH";
1704   i = find_command_index (cmd);
1705   if (i < 0)
1706     return (-1);
1707   assert (i < 32);
1709   if ((sock->permissions & (1 << i)) != 0)
1710     return (1);
1711   return (0);
1712 } /* }}} int socket_permission_check */
1714 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1715     const char *cmd)
1717   ssize_t i;
1719   i = find_command_index (cmd);
1720   if (i < 0)
1721     return (-1);
1722   assert (i < 32);
1724   sock->permissions |= (1 << i);
1725   return (0);
1726 } /* }}} int socket_permission_add */
1728 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1730   sock->permissions = 0;
1731 } /* }}} socket_permission_clear */
1733 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1734     listen_socket_t *src)
1736   dest->permissions = src->permissions;
1737 } /* }}} socket_permission_copy */
1739 /* check whether commands are received in the expected context */
1740 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1742   if (JOURNAL_REPLAY(sock))
1743     return (cmd->context & CMD_CONTEXT_JOURNAL);
1744   else if (sock->batch_start)
1745     return (cmd->context & CMD_CONTEXT_BATCH);
1746   else
1747     return (cmd->context & CMD_CONTEXT_CLIENT);
1749   /* NOTREACHED */
1750   assert(1==0);
1753 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1755   int status;
1756   char *cmd_str;
1757   char *resp_txt;
1758   command_t *help = NULL;
1760   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1761   if (status == 0)
1762     help = find_command(cmd_str);
1764   if (help && (help->syntax || help->help))
1765   {
1766     char tmp[CMD_MAX];
1768     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1769     resp_txt = tmp;
1771     if (help->syntax)
1772       add_response_info(sock, "Usage: %s\n", help->syntax);
1774     if (help->help)
1775       add_response_info(sock, "%s\n", help->help);
1776   }
1777   else
1778   {
1779     size_t i;
1781     resp_txt = "Command overview\n";
1783     for (i = 0; i < list_of_commands_len; i++)
1784     {
1785       if (list_of_commands[i].syntax == NULL)
1786         continue;
1787       add_response_info (sock, "%s", list_of_commands[i].syntax);
1788     }
1789   }
1791   return send_response(sock, RESP_OK, resp_txt);
1792 } /* }}} int handle_request_help */
1794 static int handle_request (DISPATCH_PROTO) /* {{{ */
1796   char *buffer_ptr = buffer;
1797   char *cmd_str = NULL;
1798   command_t *cmd = NULL;
1799   int status;
1801   assert (buffer[buffer_size - 1] == '\0');
1803   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1804   if (status != 0)
1805   {
1806     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1807     return (-1);
1808   }
1810   if (sock != NULL && sock->batch_start)
1811     sock->batch_cmd++;
1813   cmd = find_command(cmd_str);
1814   if (!cmd)
1815     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1817   if (!socket_permission_check (sock, cmd->cmd))
1818     return send_response(sock, RESP_ERR, "Permission denied.\n");
1820   if (!command_check_context(sock, cmd))
1821     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1823   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1824 } /* }}} int handle_request */
1826 static void journal_set_free (journal_set *js) /* {{{ */
1828   if (js == NULL)
1829     return;
1831   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1833   free(js);
1834 } /* }}} journal_set_free */
1836 static void journal_set_remove (journal_set *js) /* {{{ */
1838   if (js == NULL)
1839     return;
1841   for (uint i=0; i < js->files_num; i++)
1842   {
1843     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1844     unlink(js->files[i]);
1845   }
1846 } /* }}} journal_set_remove */
1848 /* close current journal file handle.
1849  * MUST hold journal_lock before calling */
1850 static void journal_close(void) /* {{{ */
1852   if (journal_fh != NULL)
1853   {
1854     if (fclose(journal_fh) != 0)
1855       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1856   }
1858   journal_fh = NULL;
1859   journal_size = 0;
1860 } /* }}} journal_close */
1862 /* MUST hold journal_lock before calling */
1863 static void journal_new_file(void) /* {{{ */
1865   struct timeval now;
1866   int  new_fd;
1867   char new_file[PATH_MAX + 1];
1869   assert(journal_dir != NULL);
1870   assert(journal_cur != NULL);
1872   journal_close();
1874   gettimeofday(&now, NULL);
1875   /* this format assures that the files sort in strcmp() order */
1876   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1877            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1879   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1880                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1881   if (new_fd < 0)
1882     goto error;
1884   journal_fh = fdopen(new_fd, "a");
1885   if (journal_fh == NULL)
1886     goto error;
1888   journal_size = ftell(journal_fh);
1889   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1891   /* record the file in the journal set */
1892   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1894   return;
1896 error:
1897   RRDD_LOG(LOG_CRIT,
1898            "JOURNALING DISABLED: Error while trying to create %s : %s",
1899            new_file, rrd_strerror(errno));
1900   RRDD_LOG(LOG_CRIT,
1901            "JOURNALING DISABLED: All values will be flushed at shutdown");
1903   close(new_fd);
1904   config_flush_at_shutdown = 1;
1906 } /* }}} journal_new_file */
1908 /* MUST NOT hold journal_lock before calling this */
1909 static void journal_rotate(void) /* {{{ */
1911   journal_set *old_js = NULL;
1913   if (journal_dir == NULL)
1914     return;
1916   RRDD_LOG(LOG_DEBUG, "rotating journals");
1918   pthread_mutex_lock(&stats_lock);
1919   ++stats_journal_rotate;
1920   pthread_mutex_unlock(&stats_lock);
1922   pthread_mutex_lock(&journal_lock);
1924   journal_close();
1926   /* rotate the journal sets */
1927   old_js = journal_old;
1928   journal_old = journal_cur;
1929   journal_cur = calloc(1, sizeof(journal_set));
1931   if (journal_cur != NULL)
1932     journal_new_file();
1933   else
1934     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1936   pthread_mutex_unlock(&journal_lock);
1938   journal_set_remove(old_js);
1939   journal_set_free  (old_js);
1941 } /* }}} static void journal_rotate */
1943 /* MUST hold journal_lock when calling */
1944 static void journal_done(void) /* {{{ */
1946   if (journal_cur == NULL)
1947     return;
1949   journal_close();
1951   if (config_flush_at_shutdown)
1952   {
1953     RRDD_LOG(LOG_INFO, "removing journals");
1954     journal_set_remove(journal_old);
1955     journal_set_remove(journal_cur);
1956   }
1957   else
1958   {
1959     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1960              "journals will be used at next startup");
1961   }
1963   journal_set_free(journal_cur);
1964   journal_set_free(journal_old);
1965   free(journal_dir);
1967 } /* }}} static void journal_done */
1969 static int journal_write(char *cmd, char *args) /* {{{ */
1971   int chars;
1973   if (journal_fh == NULL)
1974     return 0;
1976   pthread_mutex_lock(&journal_lock);
1977   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1978   journal_size += chars;
1980   if (journal_size > JOURNAL_MAX)
1981     journal_new_file();
1983   pthread_mutex_unlock(&journal_lock);
1985   if (chars > 0)
1986   {
1987     pthread_mutex_lock(&stats_lock);
1988     stats_journal_bytes += chars;
1989     pthread_mutex_unlock(&stats_lock);
1990   }
1992   return chars;
1993 } /* }}} static int journal_write */
1995 static int journal_replay (const char *file) /* {{{ */
1997   FILE *fh;
1998   int entry_cnt = 0;
1999   int fail_cnt = 0;
2000   uint64_t line = 0;
2001   char entry[CMD_MAX];
2002   time_t now;
2004   if (file == NULL) return 0;
2006   {
2007     char *reason = "unknown error";
2008     int status = 0;
2009     struct stat statbuf;
2011     memset(&statbuf, 0, sizeof(statbuf));
2012     if (stat(file, &statbuf) != 0)
2013     {
2014       reason = "stat error";
2015       status = errno;
2016     }
2017     else if (!S_ISREG(statbuf.st_mode))
2018     {
2019       reason = "not a regular file";
2020       status = EPERM;
2021     }
2022     if (statbuf.st_uid != daemon_uid)
2023     {
2024       reason = "not owned by daemon user";
2025       status = EACCES;
2026     }
2027     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2028     {
2029       reason = "must not be user/group writable";
2030       status = EACCES;
2031     }
2033     if (status != 0)
2034     {
2035       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2036                file, rrd_strerror(status), reason);
2037       return 0;
2038     }
2039   }
2041   fh = fopen(file, "r");
2042   if (fh == NULL)
2043   {
2044     if (errno != ENOENT)
2045       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2046                file, rrd_strerror(errno));
2047     return 0;
2048   }
2049   else
2050     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2052   now = time(NULL);
2054   while(!feof(fh))
2055   {
2056     size_t entry_len;
2058     ++line;
2059     if (fgets(entry, sizeof(entry), fh) == NULL)
2060       break;
2061     entry_len = strlen(entry);
2063     /* check \n termination in case journal writing crashed mid-line */
2064     if (entry_len == 0)
2065       continue;
2066     else if (entry[entry_len - 1] != '\n')
2067     {
2068       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2069       ++fail_cnt;
2070       continue;
2071     }
2073     entry[entry_len - 1] = '\0';
2075     if (handle_request(NULL, now, entry, entry_len) == 0)
2076       ++entry_cnt;
2077     else
2078       ++fail_cnt;
2079   }
2081   fclose(fh);
2083   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2084            entry_cnt, fail_cnt);
2086   return entry_cnt > 0 ? 1 : 0;
2087 } /* }}} static int journal_replay */
2089 static int journal_sort(const void *v1, const void *v2)
2091   char **jn1 = (char **) v1;
2092   char **jn2 = (char **) v2;
2094   return strcmp(*jn1,*jn2);
2097 static void journal_init(void) /* {{{ */
2099   int had_journal = 0;
2100   DIR *dir;
2101   struct dirent *dent;
2102   char path[PATH_MAX+1];
2104   if (journal_dir == NULL) return;
2106   pthread_mutex_lock(&journal_lock);
2108   journal_cur = calloc(1, sizeof(journal_set));
2109   if (journal_cur == NULL)
2110   {
2111     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2112     return;
2113   }
2115   RRDD_LOG(LOG_INFO, "checking for journal files");
2117   /* Handle old journal files during transition.  This gives them the
2118    * correct sort order.  TODO: remove after first release
2119    */
2120   {
2121     char old_path[PATH_MAX+1];
2122     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2123     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2124     rename(old_path, path);
2126     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2127     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2128     rename(old_path, path);
2129   }
2131   dir = opendir(journal_dir);
2132   if (!dir) {
2133     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2134     return;
2135   }
2136   while ((dent = readdir(dir)) != NULL)
2137   {
2138     /* looks like a journal file? */
2139     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2140       continue;
2142     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2144     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2145     {
2146       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2147                dent->d_name);
2148       break;
2149     }
2150   }
2151   closedir(dir);
2153   qsort(journal_cur->files, journal_cur->files_num,
2154         sizeof(journal_cur->files[0]), journal_sort);
2156   for (uint i=0; i < journal_cur->files_num; i++)
2157     had_journal += journal_replay(journal_cur->files[i]);
2159   journal_new_file();
2161   /* it must have been a crash.  start a flush */
2162   if (had_journal && config_flush_at_shutdown)
2163     flush_old_values(-1);
2165   pthread_mutex_unlock(&journal_lock);
2167   RRDD_LOG(LOG_INFO, "journal processing complete");
2169 } /* }}} static void journal_init */
2171 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2173   assert(sock != NULL);
2175   free(sock->rbuf);  sock->rbuf = NULL;
2176   free(sock->wbuf);  sock->wbuf = NULL;
2177   free(sock);
2178 } /* }}} void free_listen_socket */
2180 static void close_connection(listen_socket_t *sock) /* {{{ */
2182   if (sock->fd >= 0)
2183   {
2184     close(sock->fd);
2185     sock->fd = -1;
2186   }
2188   free_listen_socket(sock);
2190 } /* }}} void close_connection */
2192 static void *connection_thread_main (void *args) /* {{{ */
2194   listen_socket_t *sock;
2195   int fd;
2197   sock = (listen_socket_t *) args;
2198   fd = sock->fd;
2200   /* init read buffers */
2201   sock->next_read = sock->next_cmd = 0;
2202   sock->rbuf = malloc(RBUF_SIZE);
2203   if (sock->rbuf == NULL)
2204   {
2205     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2206     close_connection(sock);
2207     return NULL;
2208   }
2210   pthread_mutex_lock (&connection_threads_lock);
2211 #ifdef HAVE_LIBWRAP
2212   /* LIBWRAP does not support multiple threads! By putting this code
2213      inside pthread_mutex_lock we do not have to worry about request_info
2214      getting overwritten by another thread.
2215   */
2216   struct request_info req;
2217   request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2218   fromhost(&req);
2219   if(!hosts_access(&req)) {
2220     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2221     pthread_mutex_unlock (&connection_threads_lock);
2222     close_connection(sock);
2223     return NULL;
2224   }
2225 #endif /* HAVE_LIBWRAP */
2226   connection_threads_num++;
2227   pthread_mutex_unlock (&connection_threads_lock);
2229   while (state == RUNNING)
2230   {
2231     char *cmd;
2232     ssize_t cmd_len;
2233     ssize_t rbytes;
2234     time_t now;
2236     struct pollfd pollfd;
2237     int status;
2239     pollfd.fd = fd;
2240     pollfd.events = POLLIN | POLLPRI;
2241     pollfd.revents = 0;
2243     status = poll (&pollfd, 1, /* timeout = */ 500);
2244     if (state != RUNNING)
2245       break;
2246     else if (status == 0) /* timeout */
2247       continue;
2248     else if (status < 0) /* error */
2249     {
2250       status = errno;
2251       if (status != EINTR)
2252         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2253       continue;
2254     }
2256     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2257       break;
2258     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2259     {
2260       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2261           "poll(2) returned something unexpected: %#04hx",
2262           pollfd.revents);
2263       break;
2264     }
2266     rbytes = read(fd, sock->rbuf + sock->next_read,
2267                   RBUF_SIZE - sock->next_read);
2268     if (rbytes < 0)
2269     {
2270       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2271       break;
2272     }
2273     else if (rbytes == 0)
2274       break; /* eof */
2276     sock->next_read += rbytes;
2278     if (sock->batch_start)
2279       now = sock->batch_start;
2280     else
2281       now = time(NULL);
2283     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2284     {
2285       status = handle_request (sock, now, cmd, cmd_len+1);
2286       if (status != 0)
2287         goto out_close;
2288     }
2289   }
2291 out_close:
2292   close_connection(sock);
2294   /* Remove this thread from the connection threads list */
2295   pthread_mutex_lock (&connection_threads_lock);
2296   connection_threads_num--;
2297   if (connection_threads_num <= 0)
2298     pthread_cond_broadcast(&connection_threads_done);
2299   pthread_mutex_unlock (&connection_threads_lock);
2301   return (NULL);
2302 } /* }}} void *connection_thread_main */
2304 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2306   int fd;
2307   struct sockaddr_un sa;
2308   listen_socket_t *temp;
2309   int status;
2310   const char *path;
2311   char *path_copy, *dir;
2313   path = sock->addr;
2314   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2315     path += strlen("unix:");
2317   /* dirname may modify its argument */
2318   path_copy = strdup(path);
2319   if (path_copy == NULL)
2320   {
2321     fprintf(stderr, "rrdcached: strdup(): %s\n",
2322         rrd_strerror(errno));
2323     return (-1);
2324   }
2326   dir = dirname(path_copy);
2327   if (rrd_mkdir_p(dir, 0777) != 0)
2328   {
2329     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2330         dir, rrd_strerror(errno));
2331     return (-1);
2332   }
2334   free(path_copy);
2336   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2337       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2338   if (temp == NULL)
2339   {
2340     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2341     return (-1);
2342   }
2343   listen_fds = temp;
2344   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2346   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2347   if (fd < 0)
2348   {
2349     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2350              rrd_strerror(errno));
2351     return (-1);
2352   }
2354   memset (&sa, 0, sizeof (sa));
2355   sa.sun_family = AF_UNIX;
2356   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2358   /* if we've gotten this far, we own the pid file.  any daemon started
2359    * with the same args must not be alive.  therefore, ensure that we can
2360    * create the socket...
2361    */
2362   unlink(path);
2364   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2365   if (status != 0)
2366   {
2367     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2368              path, rrd_strerror(errno));
2369     close (fd);
2370     return (-1);
2371   }
2373   /* tweak the sockets group ownership */
2374   if (sock->socket_group != (gid_t)-1)
2375   {
2376     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2377          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2378     {
2379       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2380     }
2381   }
2383   if (sock->socket_permissions != (mode_t)-1)
2384   {
2385     if (chmod(path, sock->socket_permissions) != 0)
2386       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2387           (unsigned int)sock->socket_permissions, strerror(errno));
2388   }
2390   status = listen (fd, /* backlog = */ 10);
2391   if (status != 0)
2392   {
2393     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2394              path, rrd_strerror(errno));
2395     close (fd);
2396     unlink (path);
2397     return (-1);
2398   }
2400   listen_fds[listen_fds_num].fd = fd;
2401   listen_fds[listen_fds_num].family = PF_UNIX;
2402   strncpy(listen_fds[listen_fds_num].addr, path,
2403           sizeof (listen_fds[listen_fds_num].addr) - 1);
2404   listen_fds_num++;
2406   return (0);
2407 } /* }}} int open_listen_socket_unix */
2409 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2411   struct addrinfo ai_hints;
2412   struct addrinfo *ai_res;
2413   struct addrinfo *ai_ptr;
2414   char addr_copy[NI_MAXHOST];
2415   char *addr;
2416   char *port;
2417   int status;
2419   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2420   addr_copy[sizeof (addr_copy) - 1] = 0;
2421   addr = addr_copy;
2423   memset (&ai_hints, 0, sizeof (ai_hints));
2424   ai_hints.ai_flags = 0;
2425 #ifdef AI_ADDRCONFIG
2426   ai_hints.ai_flags |= AI_ADDRCONFIG;
2427 #endif
2428   ai_hints.ai_family = AF_UNSPEC;
2429   ai_hints.ai_socktype = SOCK_STREAM;
2431   port = NULL;
2432   if (*addr == '[') /* IPv6+port format */
2433   {
2434     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2435     addr++;
2437     port = strchr (addr, ']');
2438     if (port == NULL)
2439     {
2440       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2441       return (-1);
2442     }
2443     *port = 0;
2444     port++;
2446     if (*port == ':')
2447       port++;
2448     else if (*port == 0)
2449       port = NULL;
2450     else
2451     {
2452       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2453       return (-1);
2454     }
2455   } /* if (*addr == '[') */
2456   else
2457   {
2458     port = rindex(addr, ':');
2459     if (port != NULL)
2460     {
2461       *port = 0;
2462       port++;
2463     }
2464   }
2465   ai_res = NULL;
2466   status = getaddrinfo (addr,
2467                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2468                         &ai_hints, &ai_res);
2469   if (status != 0)
2470   {
2471     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2472              addr, gai_strerror (status));
2473     return (-1);
2474   }
2476   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2477   {
2478     int fd;
2479     listen_socket_t *temp;
2480     int one = 1;
2482     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2483         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2484     if (temp == NULL)
2485     {
2486       fprintf (stderr,
2487                "rrdcached: open_listen_socket_network: realloc failed.\n");
2488       continue;
2489     }
2490     listen_fds = temp;
2491     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2493     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2494     if (fd < 0)
2495     {
2496       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2497                rrd_strerror(errno));
2498       continue;
2499     }
2501     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2503     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2504     if (status != 0)
2505     {
2506       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2507                sock->addr, rrd_strerror(errno));
2508       close (fd);
2509       continue;
2510     }
2512     status = listen (fd, /* backlog = */ 10);
2513     if (status != 0)
2514     {
2515       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2516                sock->addr, rrd_strerror(errno));
2517       close (fd);
2518       freeaddrinfo(ai_res);
2519       return (-1);
2520     }
2522     listen_fds[listen_fds_num].fd = fd;
2523     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2524     listen_fds_num++;
2525   } /* for (ai_ptr) */
2527   freeaddrinfo(ai_res);
2528   return (0);
2529 } /* }}} static int open_listen_socket_network */
2531 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2533   assert(sock != NULL);
2534   assert(sock->addr != NULL);
2536   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2537       || sock->addr[0] == '/')
2538     return (open_listen_socket_unix(sock));
2539   else
2540     return (open_listen_socket_network(sock));
2541 } /* }}} int open_listen_socket */
2543 static int close_listen_sockets (void) /* {{{ */
2545   size_t i;
2547   for (i = 0; i < listen_fds_num; i++)
2548   {
2549     close (listen_fds[i].fd);
2551     if (listen_fds[i].family == PF_UNIX)
2552       unlink(listen_fds[i].addr);
2553   }
2555   free (listen_fds);
2556   listen_fds = NULL;
2557   listen_fds_num = 0;
2559   return (0);
2560 } /* }}} int close_listen_sockets */
2562 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2564   struct pollfd *pollfds;
2565   int pollfds_num;
2566   int status;
2567   int i;
2569   if (listen_fds_num < 1)
2570   {
2571     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2572     return (NULL);
2573   }
2575   pollfds_num = listen_fds_num;
2576   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2577   if (pollfds == NULL)
2578   {
2579     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2580     return (NULL);
2581   }
2582   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2584   RRDD_LOG(LOG_INFO, "listening for connections");
2586   while (state == RUNNING)
2587   {
2588     for (i = 0; i < pollfds_num; i++)
2589     {
2590       pollfds[i].fd = listen_fds[i].fd;
2591       pollfds[i].events = POLLIN | POLLPRI;
2592       pollfds[i].revents = 0;
2593     }
2595     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2596     if (state != RUNNING)
2597       break;
2598     else if (status == 0) /* timeout */
2599       continue;
2600     else if (status < 0) /* error */
2601     {
2602       status = errno;
2603       if (status != EINTR)
2604       {
2605         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2606       }
2607       continue;
2608     }
2610     for (i = 0; i < pollfds_num; i++)
2611     {
2612       listen_socket_t *client_sock;
2613       struct sockaddr_storage client_sa;
2614       socklen_t client_sa_size;
2615       pthread_t tid;
2616       pthread_attr_t attr;
2618       if (pollfds[i].revents == 0)
2619         continue;
2621       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2622       {
2623         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2624             "poll(2) returned something unexpected for listen FD #%i.",
2625             pollfds[i].fd);
2626         continue;
2627       }
2629       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2630       if (client_sock == NULL)
2631       {
2632         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2633         continue;
2634       }
2635       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2637       client_sa_size = sizeof (client_sa);
2638       client_sock->fd = accept (pollfds[i].fd,
2639           (struct sockaddr *) &client_sa, &client_sa_size);
2640       if (client_sock->fd < 0)
2641       {
2642         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2643         free(client_sock);
2644         continue;
2645       }
2647       pthread_attr_init (&attr);
2648       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2650       status = pthread_create (&tid, &attr, connection_thread_main,
2651                                client_sock);
2652       if (status != 0)
2653       {
2654         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2655         close_connection(client_sock);
2656         continue;
2657       }
2658     } /* for (pollfds_num) */
2659   } /* while (state == RUNNING) */
2661   RRDD_LOG(LOG_INFO, "starting shutdown");
2663   close_listen_sockets ();
2665   pthread_mutex_lock (&connection_threads_lock);
2666   while (connection_threads_num > 0)
2667     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2668   pthread_mutex_unlock (&connection_threads_lock);
2670   free(pollfds);
2672   return (NULL);
2673 } /* }}} void *listen_thread_main */
2675 static int daemonize (void) /* {{{ */
2677   int pid_fd;
2678   char *base_dir;
2680   daemon_uid = geteuid();
2682   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2683   if (pid_fd < 0)
2684     pid_fd = check_pidfile();
2685   if (pid_fd < 0)
2686     return pid_fd;
2688   /* open all the listen sockets */
2689   if (config_listen_address_list_len > 0)
2690   {
2691     for (size_t i = 0; i < config_listen_address_list_len; i++)
2692       open_listen_socket (config_listen_address_list[i]);
2694     rrd_free_ptrs((void ***) &config_listen_address_list,
2695                   &config_listen_address_list_len);
2696   }
2697   else
2698   {
2699     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2700         sizeof(default_socket.addr) - 1);
2701     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2702     open_listen_socket (&default_socket);
2703   }
2705   if (listen_fds_num < 1)
2706   {
2707     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2708     goto error;
2709   }
2711   if (!stay_foreground)
2712   {
2713     pid_t child;
2715     child = fork ();
2716     if (child < 0)
2717     {
2718       fprintf (stderr, "daemonize: fork(2) failed.\n");
2719       goto error;
2720     }
2721     else if (child > 0)
2722       exit(0);
2724     /* Become session leader */
2725     setsid ();
2727     /* Open the first three file descriptors to /dev/null */
2728     close (2);
2729     close (1);
2730     close (0);
2732     open ("/dev/null", O_RDWR);
2733     if (dup(0) == -1 || dup(0) == -1){
2734         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2735     }
2736   } /* if (!stay_foreground) */
2738   /* Change into the /tmp directory. */
2739   base_dir = (config_base_dir != NULL)
2740     ? config_base_dir
2741     : "/tmp";
2743   if (chdir (base_dir) != 0)
2744   {
2745     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2746     goto error;
2747   }
2749   install_signal_handlers();
2751   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2752   RRDD_LOG(LOG_INFO, "starting up");
2754   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2755                                 (GDestroyNotify) free_cache_item);
2756   if (cache_tree == NULL)
2757   {
2758     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2759     goto error;
2760   }
2762   return write_pidfile (pid_fd);
2764 error:
2765   remove_pidfile();
2766   return -1;
2767 } /* }}} int daemonize */
2769 static int cleanup (void) /* {{{ */
2771   pthread_cond_broadcast (&flush_cond);
2772   pthread_join (flush_thread, NULL);
2774   pthread_cond_broadcast (&queue_cond);
2775   for (int i = 0; i < config_queue_threads; i++)
2776     pthread_join (queue_threads[i], NULL);
2778   if (config_flush_at_shutdown)
2779   {
2780     assert(cache_queue_head == NULL);
2781     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2782   }
2784   free(queue_threads);
2785   free(config_base_dir);
2787   pthread_mutex_lock(&cache_lock);
2788   g_tree_destroy(cache_tree);
2790   pthread_mutex_lock(&journal_lock);
2791   journal_done();
2793   RRDD_LOG(LOG_INFO, "goodbye");
2794   closelog ();
2796   remove_pidfile ();
2797   free(config_pid_file);
2799   return (0);
2800 } /* }}} int cleanup */
2802 static int read_options (int argc, char **argv) /* {{{ */
2804   int option;
2805   int status = 0;
2807   socket_permission_clear (&default_socket);
2809   default_socket.socket_group = (gid_t)-1;
2810   default_socket.socket_permissions = (mode_t)-1;
2812   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2813   {
2814     switch (option)
2815     {
2816       case 'g':
2817         stay_foreground=1;
2818         break;
2820       case 'l':
2821       {
2822         listen_socket_t *new;
2824         new = malloc(sizeof(listen_socket_t));
2825         if (new == NULL)
2826         {
2827           fprintf(stderr, "read_options: malloc failed.\n");
2828           return(2);
2829         }
2830         memset(new, 0, sizeof(listen_socket_t));
2832         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2834         /* Add permissions to the socket {{{ */
2835         if (default_socket.permissions != 0)
2836         {
2837           socket_permission_copy (new, &default_socket);
2838         }
2839         else /* if (default_socket.permissions == 0) */
2840         {
2841           /* Add permission for ALL commands to the socket. */
2842           size_t i;
2843           for (i = 0; i < list_of_commands_len; i++)
2844           {
2845             status = socket_permission_add (new, list_of_commands[i].cmd);
2846             if (status != 0)
2847             {
2848               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2849                   "socket failed. This should never happen, ever! Sorry.\n",
2850                   list_of_commands[i].cmd);
2851               status = 4;
2852             }
2853           }
2854         }
2855         /* }}} Done adding permissions. */
2857         new->socket_group = default_socket.socket_group;
2858         new->socket_permissions = default_socket.socket_permissions;
2860         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2861                          &config_listen_address_list_len, new))
2862         {
2863           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2864           return (2);
2865         }
2866       }
2867       break;
2869       /* set socket group permissions */
2870       case 's':
2871       {
2872         gid_t group_gid;
2873         struct group *grp;
2875         group_gid = strtoul(optarg, NULL, 10);
2876         if (errno != EINVAL && group_gid>0)
2877         {
2878           /* we were passed a number */
2879           grp = getgrgid(group_gid);
2880         }
2881         else
2882         {
2883           grp = getgrnam(optarg);
2884         }
2886         if (grp)
2887         {
2888           default_socket.socket_group = grp->gr_gid;
2889         }
2890         else
2891         {
2892           /* no idea what the user wanted... */
2893           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2894           return (5);
2895         }
2896       }
2897       break;
2899       /* set socket file permissions */
2900       case 'm':
2901       {
2902         long  tmp;
2903         char *endptr = NULL;
2905         tmp = strtol (optarg, &endptr, 8);
2906         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2907             || (tmp > 07777) || (tmp < 0)) {
2908           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2909               optarg);
2910           return (5);
2911         }
2913         default_socket.socket_permissions = (mode_t)tmp;
2914       }
2915       break;
2917       case 'P':
2918       {
2919         char *optcopy;
2920         char *saveptr;
2921         char *dummy;
2922         char *ptr;
2924         socket_permission_clear (&default_socket);
2926         optcopy = strdup (optarg);
2927         dummy = optcopy;
2928         saveptr = NULL;
2929         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2930         {
2931           dummy = NULL;
2932           status = socket_permission_add (&default_socket, ptr);
2933           if (status != 0)
2934           {
2935             fprintf (stderr, "read_options: Adding permission \"%s\" to "
2936                 "socket failed. Most likely, this permission doesn't "
2937                 "exist. Check your command line.\n", ptr);
2938             status = 4;
2939           }
2940         }
2942         free (optcopy);
2943       }
2944       break;
2946       case 'f':
2947       {
2948         int temp;
2950         temp = atoi (optarg);
2951         if (temp > 0)
2952           config_flush_interval = temp;
2953         else
2954         {
2955           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2956           status = 3;
2957         }
2958       }
2959       break;
2961       case 'w':
2962       {
2963         int temp;
2965         temp = atoi (optarg);
2966         if (temp > 0)
2967           config_write_interval = temp;
2968         else
2969         {
2970           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2971           status = 2;
2972         }
2973       }
2974       break;
2976       case 'z':
2977       {
2978         int temp;
2980         temp = atoi(optarg);
2981         if (temp > 0)
2982           config_write_jitter = temp;
2983         else
2984         {
2985           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2986           status = 2;
2987         }
2989         break;
2990       }
2992       case 't':
2993       {
2994         int threads;
2995         threads = atoi(optarg);
2996         if (threads >= 1)
2997           config_queue_threads = threads;
2998         else
2999         {
3000           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3001           return 1;
3002         }
3003       }
3004       break;
3006       case 'B':
3007         config_write_base_only = 1;
3008         break;
3010       case 'b':
3011       {
3012         size_t len;
3013         char base_realpath[PATH_MAX];
3015         if (config_base_dir != NULL)
3016           free (config_base_dir);
3017         config_base_dir = strdup (optarg);
3018         if (config_base_dir == NULL)
3019         {
3020           fprintf (stderr, "read_options: strdup failed.\n");
3021           return (3);
3022         }
3024         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3025         {
3026           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3027               config_base_dir, rrd_strerror (errno));
3028           return (3);
3029         }
3031         /* make sure that the base directory is not resolved via
3032          * symbolic links.  this makes some performance-enhancing
3033          * assumptions possible (we don't have to resolve paths
3034          * that start with a "/")
3035          */
3036         if (realpath(config_base_dir, base_realpath) == NULL)
3037         {
3038           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3039               "%s\n", config_base_dir, rrd_strerror(errno));
3040           return 5;
3041         }
3043         len = strlen (config_base_dir);
3044         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3045         {
3046           config_base_dir[len - 1] = 0;
3047           len--;
3048         }
3050         if (len < 1)
3051         {
3052           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3053           return (4);
3054         }
3056         _config_base_dir_len = len;
3058         len = strlen (base_realpath);
3059         while ((len > 0) && (base_realpath[len - 1] == '/'))
3060         {
3061           base_realpath[len - 1] = '\0';
3062           len--;
3063         }
3065         if (strncmp(config_base_dir,
3066                          base_realpath, sizeof(base_realpath)) != 0)
3067         {
3068           fprintf(stderr,
3069                   "Base directory (-b) resolved via file system links!\n"
3070                   "Please consult rrdcached '-b' documentation!\n"
3071                   "Consider specifying the real directory (%s)\n",
3072                   base_realpath);
3073           return 5;
3074         }
3075       }
3076       break;
3078       case 'p':
3079       {
3080         if (config_pid_file != NULL)
3081           free (config_pid_file);
3082         config_pid_file = strdup (optarg);
3083         if (config_pid_file == NULL)
3084         {
3085           fprintf (stderr, "read_options: strdup failed.\n");
3086           return (3);
3087         }
3088       }
3089       break;
3091       case 'F':
3092         config_flush_at_shutdown = 1;
3093         break;
3095       case 'j':
3096       {
3097         char journal_dir_actual[PATH_MAX];
3098         const char *dir;
3099         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3101         status = rrd_mkdir_p(dir, 0777);
3102         if (status != 0)
3103         {
3104           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3105               dir, rrd_strerror(errno));
3106           return 6;
3107         }
3109         if (access(dir, R_OK|W_OK|X_OK) != 0)
3110         {
3111           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3112                   errno ? rrd_strerror(errno) : "");
3113           return 6;
3114         }
3115       }
3116       break;
3118       case 'h':
3119       case '?':
3120         printf ("RRDCacheD %s\n"
3121             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3122             "\n"
3123             "Usage: rrdcached [options]\n"
3124             "\n"
3125             "Valid options are:\n"
3126             "  -l <address>  Socket address to listen to.\n"
3127             "  -P <perms>    Sets the permissions to assign to all following "
3128                             "sockets\n"
3129             "  -w <seconds>  Interval in which to write data.\n"
3130             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3131             "  -t <threads>  Number of write threads.\n"
3132             "  -f <seconds>  Interval in which to flush dead data.\n"
3133             "  -p <file>     Location of the PID-file.\n"
3134             "  -b <dir>      Base directory to change to.\n"
3135             "  -B            Restrict file access to paths within -b <dir>\n"
3136             "  -g            Do not fork and run in the foreground.\n"
3137             "  -j <dir>      Directory in which to create the journal files.\n"
3138             "  -F            Always flush all updates at shutdown\n"
3139             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3140             "                (the socket will also have read/write permissions "
3141                             "for that group)\n"
3142             "  -m <mode>     File permissions (octal) of all following UNIX "
3143                             "sockets\n"
3144             "\n"
3145             "For more information and a detailed description of all options "
3146             "please refer\n"
3147             "to the rrdcached(1) manual page.\n",
3148             VERSION);
3149         if (option == 'h')
3150           status = -1;
3151         else
3152           status = 1;
3153         break;
3154     } /* switch (option) */
3155   } /* while (getopt) */
3157   /* advise the user when values are not sane */
3158   if (config_flush_interval < 2 * config_write_interval)
3159     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3160             " 2x write interval (-w) !\n");
3161   if (config_write_jitter > config_write_interval)
3162     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3163             " write interval (-w) !\n");
3165   if (config_write_base_only && config_base_dir == NULL)
3166     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3167             "  Consult the rrdcached documentation\n");
3169   if (journal_dir == NULL)
3170     config_flush_at_shutdown = 1;
3172   return (status);
3173 } /* }}} int read_options */
3175 int main (int argc, char **argv)
3177   int status;
3179   status = read_options (argc, argv);
3180   if (status != 0)
3181   {
3182     if (status < 0)
3183       status = 0;
3184     return (status);
3185   }
3187   status = daemonize ();
3188   if (status != 0)
3189   {
3190     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3191     return (1);
3192   }
3194   journal_init();
3196   /* start the queue threads */
3197   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3198   if (queue_threads == NULL)
3199   {
3200     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3201     cleanup();
3202     return (1);
3203   }
3204   for (int i = 0; i < config_queue_threads; i++)
3205   {
3206     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3207     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3208     if (status != 0)
3209     {
3210       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3211       cleanup();
3212       return (1);
3213     }
3214   }
3216   /* start the flush thread */
3217   memset(&flush_thread, 0, sizeof(flush_thread));
3218   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3219   if (status != 0)
3220   {
3221     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3222     cleanup();
3223     return (1);
3224   }
3226   listen_thread_main (NULL);
3227   cleanup ();
3229   return (0);
3230 } /* int main */
3232 /*
3233  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3234  */