Code

fix buffer overflow for LONG lines in journal handling code for update requests.
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120   do { \
121     if (stay_foreground) { \
122       fprintf(stderr, __VA_ARGS__); \
123       fprintf(stderr, "\n"); } \
124     syslog ((severity), __VA_ARGS__); \
125   } while (0)
127 /*
128  * Types
129  */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
134   int fd;
135   char addr[PATH_MAX + 1];
136   int family;
138   /* state for BATCH processing */
139   time_t batch_start;
140   int batch_cmd;
142   /* buffered IO */
143   char *rbuf;
144   off_t next_cmd;
145   off_t next_read;
147   char *wbuf;
148   ssize_t wbuf_len;
150   uint32_t permissions;
152   gid_t  socket_group;
153   mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
161                         time_t UNUSED(now),\
162                         char  UNUSED(*buffer),\
163                         size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
166                         DISPATCH_PROTO
168 struct command_s {
169   char   *cmd;
170   int (*handler)(HANDLER_PROTO);
172   char  context;                /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT      (1<<0)
174 #define CMD_CONTEXT_BATCH       (1<<1)
175 #define CMD_CONTEXT_JOURNAL     (1<<2)
176 #define CMD_CONTEXT_ANY         (0x7f)
178   char *syntax;
179   char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
186   char *file;
187   char **values;
188   size_t values_num;
189   time_t last_flush_time;
190   double last_update_stamp;
191 #define CI_FLAGS_IN_TREE  (1<<0)
192 #define CI_FLAGS_IN_QUEUE (1<<1)
193   int flags;
194   pthread_cond_t  flushed;
195   cache_item_t *prev;
196   cache_item_t *next;
197 };
199 struct callback_flush_data_s
201   time_t now;
202   time_t abs_timeout;
203   char **keys;
204   size_t keys_num;
205 };
206 typedef struct callback_flush_data_s callback_flush_data_t;
208 enum queue_side_e
210   HEAD,
211   TAIL
212 };
213 typedef enum queue_side_e queue_side_t;
215 /* describe a set of journal files */
216 typedef struct {
217   char **files;
218   size_t files_num;
219 } journal_set;
221 /* max length of socket command or response */
222 #define CMD_MAX 4096
223 #define RBUF_SIZE (CMD_MAX*2)
225 /*
226  * Variables
227  */
228 static int stay_foreground = 0;
229 static uid_t daemon_uid;
231 static listen_socket_t *listen_fds = NULL;
232 static size_t listen_fds_num = 0;
234 static listen_socket_t default_socket;
236 enum {
237   RUNNING,              /* normal operation */
238   FLUSHING,             /* flushing remaining values */
239   SHUTDOWN              /* shutting down */
240 } state = RUNNING;
242 static pthread_t *queue_threads;
243 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
244 static int config_queue_threads = 4;
246 static pthread_t flush_thread;
247 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
249 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
250 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
251 static int connection_threads_num = 0;
253 /* Cache stuff */
254 static GTree          *cache_tree = NULL;
255 static cache_item_t   *cache_queue_head = NULL;
256 static cache_item_t   *cache_queue_tail = NULL;
257 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
259 static int config_write_interval = 300;
260 static int config_write_jitter   = 0;
261 static int config_flush_interval = 3600;
262 static int config_flush_at_shutdown = 0;
263 static char *config_pid_file = NULL;
264 static char *config_base_dir = NULL;
265 static size_t _config_base_dir_len = 0;
266 static int config_write_base_only = 0;
268 static listen_socket_t **config_listen_address_list = NULL;
269 static size_t config_listen_address_list_len = 0;
271 static uint64_t stats_queue_length = 0;
272 static uint64_t stats_updates_received = 0;
273 static uint64_t stats_flush_received = 0;
274 static uint64_t stats_updates_written = 0;
275 static uint64_t stats_data_sets_written = 0;
276 static uint64_t stats_journal_bytes = 0;
277 static uint64_t stats_journal_rotate = 0;
278 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL;         /* current journal file handle */
287 static long  journal_size = 0;          /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
297 /* 
298  * Functions
299  */
300 static void sig_common (const char *sig) /* {{{ */
302   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
303   state = FLUSHING;
304   pthread_cond_broadcast(&flush_cond);
305   pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
310   sig_common("INT");
311 } /* }}} void sig_int_handler */
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
315   sig_common("TERM");
316 } /* }}} void sig_term_handler */
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
320   config_flush_at_shutdown = 1;
321   sig_common("USR1");
322 } /* }}} void sig_usr1_handler */
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
326   config_flush_at_shutdown = 0;
327   sig_common("USR2");
328 } /* }}} void sig_usr2_handler */
330 static void install_signal_handlers(void) /* {{{ */
332   /* These structures are static, because `sigaction' behaves weird if the are
333    * overwritten.. */
334   static struct sigaction sa_int;
335   static struct sigaction sa_term;
336   static struct sigaction sa_pipe;
337   static struct sigaction sa_usr1;
338   static struct sigaction sa_usr2;
340   /* Install signal handlers */
341   memset (&sa_int, 0, sizeof (sa_int));
342   sa_int.sa_handler = sig_int_handler;
343   sigaction (SIGINT, &sa_int, NULL);
345   memset (&sa_term, 0, sizeof (sa_term));
346   sa_term.sa_handler = sig_term_handler;
347   sigaction (SIGTERM, &sa_term, NULL);
349   memset (&sa_pipe, 0, sizeof (sa_pipe));
350   sa_pipe.sa_handler = SIG_IGN;
351   sigaction (SIGPIPE, &sa_pipe, NULL);
353   memset (&sa_pipe, 0, sizeof (sa_usr1));
354   sa_usr1.sa_handler = sig_usr1_handler;
355   sigaction (SIGUSR1, &sa_usr1, NULL);
357   memset (&sa_usr2, 0, sizeof (sa_usr2));
358   sa_usr2.sa_handler = sig_usr2_handler;
359   sigaction (SIGUSR2, &sa_usr2, NULL);
361 } /* }}} void install_signal_handlers */
363 static int open_pidfile(char *action, int oflag) /* {{{ */
365   int fd;
366   const char *file;
367   char *file_copy, *dir;
369   file = (config_pid_file != NULL)
370     ? config_pid_file
371     : LOCALSTATEDIR "/run/rrdcached.pid";
373   /* dirname may modify its argument */
374   file_copy = strdup(file);
375   if (file_copy == NULL)
376   {
377     fprintf(stderr, "rrdcached: strdup(): %s\n",
378         rrd_strerror(errno));
379     return -1;
380   }
382   dir = dirname(file_copy);
383   if (rrd_mkdir_p(dir, 0777) != 0)
384   {
385     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386         dir, rrd_strerror(errno));
387     return -1;
388   }
390   free(file_copy);
392   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
393   if (fd < 0)
394     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395             action, file, rrd_strerror(errno));
397   return(fd);
398 } /* }}} static int open_pidfile */
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
403   int pid_fd;
404   pid_t pid;
405   char pid_str[16];
407   pid_fd = open_pidfile("open", O_RDWR);
408   if (pid_fd < 0)
409     return pid_fd;
411   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
412     return -1;
414   pid = atoi(pid_str);
415   if (pid <= 0)
416     return -1;
418   /* another running process that we can signal COULD be
419    * a competing rrdcached */
420   if (pid != getpid() && kill(pid, 0) == 0)
421   {
422     fprintf(stderr,
423             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
424     close(pid_fd);
425     return -1;
426   }
428   lseek(pid_fd, 0, SEEK_SET);
429   if (ftruncate(pid_fd, 0) == -1)
430   {
431     fprintf(stderr,
432             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
433     close(pid_fd);
434     return -1;
435   }
437   fprintf(stderr,
438           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439           "rrdcached: starting normally.\n", pid);
441   return pid_fd;
442 } /* }}} static int check_pidfile */
444 static int write_pidfile (int fd) /* {{{ */
446   pid_t pid;
447   FILE *fh;
449   pid = getpid ();
451   fh = fdopen (fd, "w");
452   if (fh == NULL)
453   {
454     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
455     close(fd);
456     return (-1);
457   }
459   fprintf (fh, "%i\n", (int) pid);
460   fclose (fh);
462   return (0);
463 } /* }}} int write_pidfile */
465 static int remove_pidfile (void) /* {{{ */
467   char *file;
468   int status;
470   file = (config_pid_file != NULL)
471     ? config_pid_file
472     : LOCALSTATEDIR "/run/rrdcached.pid";
474   status = unlink (file);
475   if (status == 0)
476     return (0);
477   return (errno);
478 } /* }}} int remove_pidfile */
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
482   char *eol;
484   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485                sock->next_read - sock->next_cmd);
487   if (eol == NULL)
488   {
489     /* no commands left, move remainder back to front of rbuf */
490     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491             sock->next_read - sock->next_cmd);
492     sock->next_read -= sock->next_cmd;
493     sock->next_cmd = 0;
494     *len = 0;
495     return NULL;
496   }
497   else
498   {
499     char *cmd = sock->rbuf + sock->next_cmd;
500     *eol = '\0';
502     sock->next_cmd = eol - sock->rbuf + 1;
504     if (eol > sock->rbuf && *(eol-1) == '\r')
505       *(--eol) = '\0'; /* handle "\r\n" EOL */
507     *len = eol - cmd;
509     return cmd;
510   }
512   /* NOTREACHED */
513   assert(1==0);
514 } /* }}} char *next_cmd */
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
519   char *new_buf;
521   assert(sock != NULL);
523   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
524   if (new_buf == NULL)
525   {
526     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
527     return -1;
528   }
530   strncpy(new_buf + sock->wbuf_len, str, len + 1);
532   sock->wbuf = new_buf;
533   sock->wbuf_len += len;
535   return 0;
536 } /* }}} static int add_to_wbuf */
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
541   va_list argp;
542   char buffer[CMD_MAX];
543   int len;
545   if (JOURNAL_REPLAY(sock)) return 0;
546   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
548   va_start(argp, fmt);
549 #ifdef HAVE_VSNPRINTF
550   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
551 #else
552   len = vsprintf(buffer, fmt, argp);
553 #endif
554   va_end(argp);
555   if (len < 0)
556   {
557     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
558     return -1;
559   }
561   return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
564 static int count_lines(char *str) /* {{{ */
566   int lines = 0;
568   if (str != NULL)
569   {
570     while ((str = strchr(str, '\n')) != NULL)
571     {
572       ++lines;
573       ++str;
574     }
575   }
577   return lines;
578 } /* }}} static int count_lines */
580 /* send the response back to the user.
581  * returns 0 on success, -1 on error
582  * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584                           char *fmt, ...) /* {{{ */
586   va_list argp;
587   char buffer[CMD_MAX];
588   int lines;
589   ssize_t wrote;
590   int rclen, len;
592   if (JOURNAL_REPLAY(sock)) return rc;
594   if (sock->batch_start)
595   {
596     if (rc == RESP_OK)
597       return rc; /* no response on success during BATCH */
598     lines = sock->batch_cmd;
599   }
600   else if (rc == RESP_OK)
601     lines = count_lines(sock->wbuf);
602   else
603     lines = -1;
605   rclen = sprintf(buffer, "%d ", lines);
606   va_start(argp, fmt);
607 #ifdef HAVE_VSNPRINTF
608   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
609 #else
610   len = vsprintf(buffer+rclen, fmt, argp);
611 #endif
612   va_end(argp);
613   if (len < 0)
614     return -1;
616   len += rclen;
618   /* append the result to the wbuf, don't write to the user */
619   if (sock->batch_start)
620     return add_to_wbuf(sock, buffer, len);
622   /* first write must be complete */
623   if (len != write(sock->fd, buffer, len))
624   {
625     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
626     return -1;
627   }
629   if (sock->wbuf != NULL && rc == RESP_OK)
630   {
631     wrote = 0;
632     while (wrote < sock->wbuf_len)
633     {
634       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
635       if (wb <= 0)
636       {
637         RRDD_LOG(LOG_INFO, "send_response: could not write results");
638         return -1;
639       }
640       wrote += wb;
641     }
642   }
644   free(sock->wbuf); sock->wbuf = NULL;
645   sock->wbuf_len = 0;
647   return 0;
648 } /* }}} */
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
652   ci->values = NULL;
653   ci->values_num = 0;
655   ci->last_flush_time = when;
656   if (config_write_jitter > 0)
657     ci->last_flush_time += (rrd_random() % config_write_jitter);
660 /* remove_from_queue
661  * remove a "cache_item_t" item from the queue.
662  * must hold 'cache_lock' when calling this
663  */
664 static void remove_from_queue(cache_item_t *ci) /* {{{ */
666   if (ci == NULL) return;
667   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
669   if (ci->prev == NULL)
670     cache_queue_head = ci->next; /* reset head */
671   else
672     ci->prev->next = ci->next;
674   if (ci->next == NULL)
675     cache_queue_tail = ci->prev; /* reset the tail */
676   else
677     ci->next->prev = ci->prev;
679   ci->next = ci->prev = NULL;
680   ci->flags &= ~CI_FLAGS_IN_QUEUE;
682   pthread_mutex_lock (&stats_lock);
683   assert (stats_queue_length > 0);
684   stats_queue_length--;
685   pthread_mutex_unlock (&stats_lock);
687 } /* }}} static void remove_from_queue */
689 /* free the resources associated with the cache_item_t
690  * must hold cache_lock when calling this function
691  */
692 static void *free_cache_item(cache_item_t *ci) /* {{{ */
694   if (ci == NULL) return NULL;
696   remove_from_queue(ci);
698   for (size_t i=0; i < ci->values_num; i++)
699     free(ci->values[i]);
701   free (ci->values);
702   free (ci->file);
704   /* in case anyone is waiting */
705   pthread_cond_broadcast(&ci->flushed);
706   pthread_cond_destroy(&ci->flushed);
708   free (ci);
710   return NULL;
711 } /* }}} static void *free_cache_item */
713 /*
714  * enqueue_cache_item:
715  * `cache_lock' must be acquired before calling this function!
716  */
717 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
718     queue_side_t side)
720   if (ci == NULL)
721     return (-1);
723   if (ci->values_num == 0)
724     return (0);
726   if (side == HEAD)
727   {
728     if (cache_queue_head == ci)
729       return 0;
731     /* remove if further down in queue */
732     remove_from_queue(ci);
734     ci->prev = NULL;
735     ci->next = cache_queue_head;
736     if (ci->next != NULL)
737       ci->next->prev = ci;
738     cache_queue_head = ci;
740     if (cache_queue_tail == NULL)
741       cache_queue_tail = cache_queue_head;
742   }
743   else /* (side == TAIL) */
744   {
745     /* We don't move values back in the list.. */
746     if (ci->flags & CI_FLAGS_IN_QUEUE)
747       return (0);
749     assert (ci->next == NULL);
750     assert (ci->prev == NULL);
752     ci->prev = cache_queue_tail;
754     if (cache_queue_tail == NULL)
755       cache_queue_head = ci;
756     else
757       cache_queue_tail->next = ci;
759     cache_queue_tail = ci;
760   }
762   ci->flags |= CI_FLAGS_IN_QUEUE;
764   pthread_cond_signal(&queue_cond);
765   pthread_mutex_lock (&stats_lock);
766   stats_queue_length++;
767   pthread_mutex_unlock (&stats_lock);
769   return (0);
770 } /* }}} int enqueue_cache_item */
772 /*
773  * tree_callback_flush:
774  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
775  * while this is in progress.
776  */
777 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
778     gpointer data)
780   cache_item_t *ci;
781   callback_flush_data_t *cfd;
783   ci = (cache_item_t *) value;
784   cfd = (callback_flush_data_t *) data;
786   if (ci->flags & CI_FLAGS_IN_QUEUE)
787     return FALSE;
789   if (ci->values_num > 0
790       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
791   {
792     enqueue_cache_item (ci, TAIL);
793   }
794   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
795       && (ci->values_num <= 0))
796   {
797     assert ((char *) key == ci->file);
798     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
799     {
800       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
801       return (FALSE);
802     }
803   }
805   return (FALSE);
806 } /* }}} gboolean tree_callback_flush */
808 static int flush_old_values (int max_age)
810   callback_flush_data_t cfd;
811   size_t k;
813   memset (&cfd, 0, sizeof (cfd));
814   /* Pass the current time as user data so that we don't need to call
815    * `time' for each node. */
816   cfd.now = time (NULL);
817   cfd.keys = NULL;
818   cfd.keys_num = 0;
820   if (max_age > 0)
821     cfd.abs_timeout = cfd.now - max_age;
822   else
823     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
825   /* `tree_callback_flush' will return the keys of all values that haven't
826    * been touched in the last `config_flush_interval' seconds in `cfd'.
827    * The char*'s in this array point to the same memory as ci->file, so we
828    * don't need to free them separately. */
829   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
831   for (k = 0; k < cfd.keys_num; k++)
832   {
833     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
834     /* should never fail, since we have held the cache_lock
835      * the entire time */
836     assert(status == TRUE);
837   }
839   if (cfd.keys != NULL)
840   {
841     free (cfd.keys);
842     cfd.keys = NULL;
843   }
845   return (0);
846 } /* int flush_old_values */
848 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
850   struct timeval now;
851   struct timespec next_flush;
852   int status;
854   gettimeofday (&now, NULL);
855   next_flush.tv_sec = now.tv_sec + config_flush_interval;
856   next_flush.tv_nsec = 1000 * now.tv_usec;
858   pthread_mutex_lock(&cache_lock);
860   while (state == RUNNING)
861   {
862     gettimeofday (&now, NULL);
863     if ((now.tv_sec > next_flush.tv_sec)
864         || ((now.tv_sec == next_flush.tv_sec)
865           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
866     {
867       RRDD_LOG(LOG_DEBUG, "flushing old values");
869       /* Determine the time of the next cache flush. */
870       next_flush.tv_sec = now.tv_sec + config_flush_interval;
872       /* Flush all values that haven't been written in the last
873        * `config_write_interval' seconds. */
874       flush_old_values (config_write_interval);
876       /* unlock the cache while we rotate so we don't block incoming
877        * updates if the fsync() blocks on disk I/O */
878       pthread_mutex_unlock(&cache_lock);
879       journal_rotate();
880       pthread_mutex_lock(&cache_lock);
881     }
883     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
884     if (status != 0 && status != ETIMEDOUT)
885     {
886       RRDD_LOG (LOG_ERR, "flush_thread_main: "
887                 "pthread_cond_timedwait returned %i.", status);
888     }
889   }
891   if (config_flush_at_shutdown)
892     flush_old_values (-1); /* flush everything */
894   state = SHUTDOWN;
896   pthread_mutex_unlock(&cache_lock);
898   return NULL;
899 } /* void *flush_thread_main */
901 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
903   pthread_mutex_lock (&cache_lock);
905   while (state != SHUTDOWN
906          || (cache_queue_head != NULL && config_flush_at_shutdown))
907   {
908     cache_item_t *ci;
909     char *file;
910     char **values;
911     size_t values_num;
912     int status;
914     /* Now, check if there's something to store away. If not, wait until
915      * something comes in. */
916     if (cache_queue_head == NULL)
917     {
918       status = pthread_cond_wait (&queue_cond, &cache_lock);
919       if ((status != 0) && (status != ETIMEDOUT))
920       {
921         RRDD_LOG (LOG_ERR, "queue_thread_main: "
922             "pthread_cond_wait returned %i.", status);
923       }
924     }
926     /* Check if a value has arrived. This may be NULL if we timed out or there
927      * was an interrupt such as a signal. */
928     if (cache_queue_head == NULL)
929       continue;
931     ci = cache_queue_head;
933     /* copy the relevant parts */
934     file = strdup (ci->file);
935     if (file == NULL)
936     {
937       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
938       continue;
939     }
941     assert(ci->values != NULL);
942     assert(ci->values_num > 0);
944     values = ci->values;
945     values_num = ci->values_num;
947     wipe_ci_values(ci, time(NULL));
948     remove_from_queue(ci);
950     pthread_mutex_unlock (&cache_lock);
952     rrd_clear_error ();
953     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
954     if (status != 0)
955     {
956       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
957           "rrd_update_r (%s) failed with status %i. (%s)",
958           file, status, rrd_get_error());
959     }
961     journal_write("wrote", file);
963     /* Search again in the tree.  It's possible someone issued a "FORGET"
964      * while we were writing the update values. */
965     pthread_mutex_lock(&cache_lock);
966     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
967     if (ci)
968       pthread_cond_broadcast(&ci->flushed);
969     pthread_mutex_unlock(&cache_lock);
971     if (status == 0)
972     {
973       pthread_mutex_lock (&stats_lock);
974       stats_updates_written++;
975       stats_data_sets_written += values_num;
976       pthread_mutex_unlock (&stats_lock);
977     }
979     rrd_free_ptrs((void ***) &values, &values_num);
980     free(file);
982     pthread_mutex_lock (&cache_lock);
983   }
984   pthread_mutex_unlock (&cache_lock);
986   return (NULL);
987 } /* }}} void *queue_thread_main */
989 static int buffer_get_field (char **buffer_ret, /* {{{ */
990     size_t *buffer_size_ret, char **field_ret)
992   char *buffer;
993   size_t buffer_pos;
994   size_t buffer_size;
995   char *field;
996   size_t field_size;
997   int status;
999   buffer = *buffer_ret;
1000   buffer_pos = 0;
1001   buffer_size = *buffer_size_ret;
1002   field = *buffer_ret;
1003   field_size = 0;
1005   if (buffer_size <= 0)
1006     return (-1);
1008   /* This is ensured by `handle_request'. */
1009   assert (buffer[buffer_size - 1] == '\0');
1011   status = -1;
1012   while (buffer_pos < buffer_size)
1013   {
1014     /* Check for end-of-field or end-of-buffer */
1015     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1016     {
1017       field[field_size] = 0;
1018       field_size++;
1019       buffer_pos++;
1020       status = 0;
1021       break;
1022     }
1023     /* Handle escaped characters. */
1024     else if (buffer[buffer_pos] == '\\')
1025     {
1026       if (buffer_pos >= (buffer_size - 1))
1027         break;
1028       buffer_pos++;
1029       field[field_size] = buffer[buffer_pos];
1030       field_size++;
1031       buffer_pos++;
1032     }
1033     /* Normal operation */ 
1034     else
1035     {
1036       field[field_size] = buffer[buffer_pos];
1037       field_size++;
1038       buffer_pos++;
1039     }
1040   } /* while (buffer_pos < buffer_size) */
1042   if (status != 0)
1043     return (status);
1045   *buffer_ret = buffer + buffer_pos;
1046   *buffer_size_ret = buffer_size - buffer_pos;
1047   *field_ret = field;
1049   return (0);
1050 } /* }}} int buffer_get_field */
1052 /* if we're restricting writes to the base directory,
1053  * check whether the file falls within the dir
1054  * returns 1 if OK, otherwise 0
1055  */
1056 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1058   assert(file != NULL);
1060   if (!config_write_base_only
1061       || JOURNAL_REPLAY(sock)
1062       || config_base_dir == NULL)
1063     return 1;
1065   if (strstr(file, "../") != NULL) goto err;
1067   /* relative paths without "../" are ok */
1068   if (*file != '/') return 1;
1070   /* file must be of the format base + "/" + <1+ char filename> */
1071   if (strlen(file) < _config_base_dir_len + 2) goto err;
1072   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1073   if (*(file + _config_base_dir_len) != '/') goto err;
1075   return 1;
1077 err:
1078   if (sock != NULL && sock->fd >= 0)
1079     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1081   return 0;
1082 } /* }}} static int check_file_access */
1084 /* when using a base dir, convert relative paths to absolute paths.
1085  * if necessary, modifies the "filename" pointer to point
1086  * to the new path created in "tmp".  "tmp" is provided
1087  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1088  *
1089  * this allows us to optimize for the expected case (absolute path)
1090  * with a no-op.
1091  */
1092 static void get_abs_path(char **filename, char *tmp)
1094   assert(tmp != NULL);
1095   assert(filename != NULL && *filename != NULL);
1097   if (config_base_dir == NULL || **filename == '/')
1098     return;
1100   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1101   *filename = tmp;
1102 } /* }}} static int get_abs_path */
1104 static int flush_file (const char *filename) /* {{{ */
1106   cache_item_t *ci;
1108   pthread_mutex_lock (&cache_lock);
1110   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1111   if (ci == NULL)
1112   {
1113     pthread_mutex_unlock (&cache_lock);
1114     return (ENOENT);
1115   }
1117   if (ci->values_num > 0)
1118   {
1119     /* Enqueue at head */
1120     enqueue_cache_item (ci, HEAD);
1121     pthread_cond_wait(&ci->flushed, &cache_lock);
1122   }
1124   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1125    * may have been purged during our cond_wait() */
1127   pthread_mutex_unlock(&cache_lock);
1129   return (0);
1130 } /* }}} int flush_file */
1132 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1134   char *err = "Syntax error.\n";
1136   if (cmd && cmd->syntax)
1137     err = cmd->syntax;
1139   return send_response(sock, RESP_ERR, "Usage: %s", err);
1140 } /* }}} static int syntax_error() */
1142 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1144   uint64_t copy_queue_length;
1145   uint64_t copy_updates_received;
1146   uint64_t copy_flush_received;
1147   uint64_t copy_updates_written;
1148   uint64_t copy_data_sets_written;
1149   uint64_t copy_journal_bytes;
1150   uint64_t copy_journal_rotate;
1152   uint64_t tree_nodes_number;
1153   uint64_t tree_depth;
1155   pthread_mutex_lock (&stats_lock);
1156   copy_queue_length       = stats_queue_length;
1157   copy_updates_received   = stats_updates_received;
1158   copy_flush_received     = stats_flush_received;
1159   copy_updates_written    = stats_updates_written;
1160   copy_data_sets_written  = stats_data_sets_written;
1161   copy_journal_bytes      = stats_journal_bytes;
1162   copy_journal_rotate     = stats_journal_rotate;
1163   pthread_mutex_unlock (&stats_lock);
1165   pthread_mutex_lock (&cache_lock);
1166   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1167   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1168   pthread_mutex_unlock (&cache_lock);
1170   add_response_info(sock,
1171                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1172   add_response_info(sock,
1173                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1174   add_response_info(sock,
1175                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1176   add_response_info(sock,
1177                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1178   add_response_info(sock,
1179                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1180   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1181   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1182   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1183   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1185   send_response(sock, RESP_OK, "Statistics follow\n");
1187   return (0);
1188 } /* }}} int handle_request_stats */
1190 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1192   char *file, file_tmp[PATH_MAX];
1193   int status;
1195   status = buffer_get_field (&buffer, &buffer_size, &file);
1196   if (status != 0)
1197   {
1198     return syntax_error(sock,cmd);
1199   }
1200   else
1201   {
1202     pthread_mutex_lock(&stats_lock);
1203     stats_flush_received++;
1204     pthread_mutex_unlock(&stats_lock);
1206     get_abs_path(&file, file_tmp);
1207     if (!check_file_access(file, sock)) return 0;
1209     status = flush_file (file);
1210     if (status == 0)
1211       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1212     else if (status == ENOENT)
1213     {
1214       /* no file in our tree; see whether it exists at all */
1215       struct stat statbuf;
1217       memset(&statbuf, 0, sizeof(statbuf));
1218       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1219         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1220       else
1221         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1222     }
1223     else if (status < 0)
1224       return send_response(sock, RESP_ERR, "Internal error.\n");
1225     else
1226       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1227   }
1229   /* NOTREACHED */
1230   assert(1==0);
1231 } /* }}} int handle_request_flush */
1233 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1235   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1237   pthread_mutex_lock(&cache_lock);
1238   flush_old_values(-1);
1239   pthread_mutex_unlock(&cache_lock);
1241   return send_response(sock, RESP_OK, "Started flush.\n");
1242 } /* }}} static int handle_request_flushall */
1244 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1246   int status;
1247   char *file, file_tmp[PATH_MAX];
1248   cache_item_t *ci;
1250   status = buffer_get_field(&buffer, &buffer_size, &file);
1251   if (status != 0)
1252     return syntax_error(sock,cmd);
1254   get_abs_path(&file, file_tmp);
1256   pthread_mutex_lock(&cache_lock);
1257   ci = g_tree_lookup(cache_tree, file);
1258   if (ci == NULL)
1259   {
1260     pthread_mutex_unlock(&cache_lock);
1261     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1262   }
1264   for (size_t i=0; i < ci->values_num; i++)
1265     add_response_info(sock, "%s\n", ci->values[i]);
1267   pthread_mutex_unlock(&cache_lock);
1268   return send_response(sock, RESP_OK, "updates pending\n");
1269 } /* }}} static int handle_request_pending */
1271 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1273   int status;
1274   gboolean found;
1275   char *file, file_tmp[PATH_MAX];
1277   status = buffer_get_field(&buffer, &buffer_size, &file);
1278   if (status != 0)
1279     return syntax_error(sock,cmd);
1281   get_abs_path(&file, file_tmp);
1282   if (!check_file_access(file, sock)) return 0;
1284   pthread_mutex_lock(&cache_lock);
1285   found = g_tree_remove(cache_tree, file);
1286   pthread_mutex_unlock(&cache_lock);
1288   if (found == TRUE)
1289   {
1290     if (!JOURNAL_REPLAY(sock))
1291       journal_write("forget", file);
1293     return send_response(sock, RESP_OK, "Gone!\n");
1294   }
1295   else
1296     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1298   /* NOTREACHED */
1299   assert(1==0);
1300 } /* }}} static int handle_request_forget */
1302 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1304   cache_item_t *ci;
1306   pthread_mutex_lock(&cache_lock);
1308   ci = cache_queue_head;
1309   while (ci != NULL)
1310   {
1311     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1312     ci = ci->next;
1313   }
1315   pthread_mutex_unlock(&cache_lock);
1317   return send_response(sock, RESP_OK, "in queue.\n");
1318 } /* }}} int handle_request_queue */
1320 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1322   char *file, file_tmp[PATH_MAX];
1323   int values_num = 0;
1324   int status;
1325   char orig_buf[CMD_MAX];
1327   cache_item_t *ci;
1329   /* save it for the journal later */
1330   if (!JOURNAL_REPLAY(sock))
1331     strncpy(orig_buf, buffer, min(CMD_MAX,buffer_size));
1333   status = buffer_get_field (&buffer, &buffer_size, &file);
1334   if (status != 0)
1335     return syntax_error(sock,cmd);
1337   pthread_mutex_lock(&stats_lock);
1338   stats_updates_received++;
1339   pthread_mutex_unlock(&stats_lock);
1341   get_abs_path(&file, file_tmp);
1342   if (!check_file_access(file, sock)) return 0;
1344   pthread_mutex_lock (&cache_lock);
1345   ci = g_tree_lookup (cache_tree, file);
1347   if (ci == NULL) /* {{{ */
1348   {
1349     struct stat statbuf;
1350     cache_item_t *tmp;
1352     /* don't hold the lock while we setup; stat(2) might block */
1353     pthread_mutex_unlock(&cache_lock);
1355     memset (&statbuf, 0, sizeof (statbuf));
1356     status = stat (file, &statbuf);
1357     if (status != 0)
1358     {
1359       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1361       status = errno;
1362       if (status == ENOENT)
1363         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1364       else
1365         return send_response(sock, RESP_ERR,
1366                              "stat failed with error %i.\n", status);
1367     }
1368     if (!S_ISREG (statbuf.st_mode))
1369       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1371     if (access(file, R_OK|W_OK) != 0)
1372       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1373                            file, rrd_strerror(errno));
1375     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1376     if (ci == NULL)
1377     {
1378       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1380       return send_response(sock, RESP_ERR, "malloc failed.\n");
1381     }
1382     memset (ci, 0, sizeof (cache_item_t));
1384     ci->file = strdup (file);
1385     if (ci->file == NULL)
1386     {
1387       free (ci);
1388       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1390       return send_response(sock, RESP_ERR, "strdup failed.\n");
1391     }
1393     wipe_ci_values(ci, now);
1394     ci->flags = CI_FLAGS_IN_TREE;
1395     pthread_cond_init(&ci->flushed, NULL);
1397     pthread_mutex_lock(&cache_lock);
1399     /* another UPDATE might have added this entry in the meantime */
1400     tmp = g_tree_lookup (cache_tree, file);
1401     if (tmp == NULL)
1402       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1403     else
1404     {
1405       free_cache_item (ci);
1406       ci = tmp;
1407     }
1409     /* state may have changed while we were unlocked */
1410     if (state == SHUTDOWN)
1411       return -1;
1412   } /* }}} */
1413   assert (ci != NULL);
1415   /* don't re-write updates in replay mode */
1416   if (!JOURNAL_REPLAY(sock))
1417     journal_write("update", orig_buf);
1419   while (buffer_size > 0)
1420   {
1421     char *value;
1422     double stamp;
1423     char *eostamp;
1425     status = buffer_get_field (&buffer, &buffer_size, &value);
1426     if (status != 0)
1427     {
1428       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1429       break;
1430     }
1432     /* make sure update time is always moving forward. We use double here since
1433        update does support subsecond precision for timestamps ... */
1434     stamp = strtod(value, &eostamp);
1435     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1436     {
1437       pthread_mutex_unlock(&cache_lock);
1438       return send_response(sock, RESP_ERR,
1439                            "Cannot find timestamp in '%s'!\n", value);
1440     }
1441     else if (stamp <= ci->last_update_stamp)
1442     {
1443       pthread_mutex_unlock(&cache_lock);
1444       return send_response(sock, RESP_ERR,
1445                            "illegal attempt to update using time %lf when last"
1446                            " update time is %lf (minimum one second step)\n",
1447                            stamp, ci->last_update_stamp);
1448     }
1449     else
1450       ci->last_update_stamp = stamp;
1452     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1453     {
1454       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1455       continue;
1456     }
1458     values_num++;
1459   }
1461   if (((now - ci->last_flush_time) >= config_write_interval)
1462       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1463       && (ci->values_num > 0))
1464   {
1465     enqueue_cache_item (ci, TAIL);
1466   }
1468   pthread_mutex_unlock (&cache_lock);
1470   if (values_num < 1)
1471     return send_response(sock, RESP_ERR, "No values updated.\n");
1472   else
1473     return send_response(sock, RESP_OK,
1474                          "errors, enqueued %i value(s).\n", values_num);
1476   /* NOTREACHED */
1477   assert(1==0);
1479 } /* }}} int handle_request_update */
1481 /* we came across a "WROTE" entry during journal replay.
1482  * throw away any values that we have accumulated for this file
1483  */
1484 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1486   cache_item_t *ci;
1487   const char *file = buffer;
1489   pthread_mutex_lock(&cache_lock);
1491   ci = g_tree_lookup(cache_tree, file);
1492   if (ci == NULL)
1493   {
1494     pthread_mutex_unlock(&cache_lock);
1495     return (0);
1496   }
1498   if (ci->values)
1499     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1501   wipe_ci_values(ci, now);
1502   remove_from_queue(ci);
1504   pthread_mutex_unlock(&cache_lock);
1505   return (0);
1506 } /* }}} int handle_request_wrote */
1508 /* start "BATCH" processing */
1509 static int batch_start (HANDLER_PROTO) /* {{{ */
1511   int status;
1512   if (sock->batch_start)
1513     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1515   status = send_response(sock, RESP_OK,
1516                          "Go ahead.  End with dot '.' on its own line.\n");
1517   sock->batch_start = time(NULL);
1518   sock->batch_cmd = 0;
1520   return status;
1521 } /* }}} static int batch_start */
1523 /* finish "BATCH" processing and return results to the client */
1524 static int batch_done (HANDLER_PROTO) /* {{{ */
1526   assert(sock->batch_start);
1527   sock->batch_start = 0;
1528   sock->batch_cmd  = 0;
1529   return send_response(sock, RESP_OK, "errors\n");
1530 } /* }}} static int batch_done */
1532 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1534   return -1;
1535 } /* }}} static int handle_request_quit */
1537 static command_t list_of_commands[] = { /* {{{ */
1538   {
1539     "UPDATE",
1540     handle_request_update,
1541     CMD_CONTEXT_ANY,
1542     "UPDATE <filename> <values> [<values> ...]\n"
1543     ,
1544     "Adds the given file to the internal cache if it is not yet known and\n"
1545     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1546     "for details.\n"
1547     "\n"
1548     "Each <values> has the following form:\n"
1549     "  <values> = <time>:<value>[:<value>[...]]\n"
1550     "See the rrdupdate(1) manpage for details.\n"
1551   },
1552   {
1553     "WROTE",
1554     handle_request_wrote,
1555     CMD_CONTEXT_JOURNAL,
1556     NULL,
1557     NULL
1558   },
1559   {
1560     "FLUSH",
1561     handle_request_flush,
1562     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1563     "FLUSH <filename>\n"
1564     ,
1565     "Adds the given filename to the head of the update queue and returns\n"
1566     "after it has been dequeued.\n"
1567   },
1568   {
1569     "FLUSHALL",
1570     handle_request_flushall,
1571     CMD_CONTEXT_CLIENT,
1572     "FLUSHALL\n"
1573     ,
1574     "Triggers writing of all pending updates.  Returns immediately.\n"
1575   },
1576   {
1577     "PENDING",
1578     handle_request_pending,
1579     CMD_CONTEXT_CLIENT,
1580     "PENDING <filename>\n"
1581     ,
1582     "Shows any 'pending' updates for a file, in order.\n"
1583     "The updates shown have not yet been written to the underlying RRD file.\n"
1584   },
1585   {
1586     "FORGET",
1587     handle_request_forget,
1588     CMD_CONTEXT_ANY,
1589     "FORGET <filename>\n"
1590     ,
1591     "Removes the file completely from the cache.\n"
1592     "Any pending updates for the file will be lost.\n"
1593   },
1594   {
1595     "QUEUE",
1596     handle_request_queue,
1597     CMD_CONTEXT_CLIENT,
1598     "QUEUE\n"
1599     ,
1600         "Shows all files in the output queue.\n"
1601     "The output is zero or more lines in the following format:\n"
1602     "(where <num_vals> is the number of values to be written)\n"
1603     "\n"
1604     "<num_vals> <filename>\n"
1605   },
1606   {
1607     "STATS",
1608     handle_request_stats,
1609     CMD_CONTEXT_CLIENT,
1610     "STATS\n"
1611     ,
1612     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1613     "a description of the values.\n"
1614   },
1615   {
1616     "HELP",
1617     handle_request_help,
1618     CMD_CONTEXT_CLIENT,
1619     "HELP [<command>]\n",
1620     NULL, /* special! */
1621   },
1622   {
1623     "BATCH",
1624     batch_start,
1625     CMD_CONTEXT_CLIENT,
1626     "BATCH\n"
1627     ,
1628     "The 'BATCH' command permits the client to initiate a bulk load\n"
1629     "   of commands to rrdcached.\n"
1630     "\n"
1631     "Usage:\n"
1632     "\n"
1633     "    client: BATCH\n"
1634     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1635     "    client: command #1\n"
1636     "    client: command #2\n"
1637     "    client: ... and so on\n"
1638     "    client: .\n"
1639     "    server: 2 errors\n"
1640     "    server: 7 message for command #7\n"
1641     "    server: 9 message for command #9\n"
1642     "\n"
1643     "For more information, consult the rrdcached(1) documentation.\n"
1644   },
1645   {
1646     ".",   /* BATCH terminator */
1647     batch_done,
1648     CMD_CONTEXT_BATCH,
1649     NULL,
1650     NULL
1651   },
1652   {
1653     "QUIT",
1654     handle_request_quit,
1655     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1656     "QUIT\n"
1657     ,
1658     "Disconnect from rrdcached.\n"
1659   }
1660 }; /* }}} command_t list_of_commands[] */
1661 static size_t list_of_commands_len = sizeof (list_of_commands)
1662   / sizeof (list_of_commands[0]);
1664 static command_t *find_command(char *cmd)
1666   size_t i;
1668   for (i = 0; i < list_of_commands_len; i++)
1669     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1670       return (&list_of_commands[i]);
1671   return NULL;
1674 /* We currently use the index in the `list_of_commands' array as a bit position
1675  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1676  * outside these functions so that switching to a more elegant storage method
1677  * is easily possible. */
1678 static ssize_t find_command_index (const char *cmd) /* {{{ */
1680   size_t i;
1682   for (i = 0; i < list_of_commands_len; i++)
1683     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1684       return ((ssize_t) i);
1685   return (-1);
1686 } /* }}} ssize_t find_command_index */
1688 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1689     const char *cmd)
1691   ssize_t i;
1693   if (JOURNAL_REPLAY(sock))
1694     return (1);
1696   if (cmd == NULL)
1697     return (-1);
1699   if ((strcasecmp ("QUIT", cmd) == 0)
1700       || (strcasecmp ("HELP", cmd) == 0))
1701     return (1);
1702   else if (strcmp (".", cmd) == 0)
1703     cmd = "BATCH";
1705   i = find_command_index (cmd);
1706   if (i < 0)
1707     return (-1);
1708   assert (i < 32);
1710   if ((sock->permissions & (1 << i)) != 0)
1711     return (1);
1712   return (0);
1713 } /* }}} int socket_permission_check */
1715 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1716     const char *cmd)
1718   ssize_t i;
1720   i = find_command_index (cmd);
1721   if (i < 0)
1722     return (-1);
1723   assert (i < 32);
1725   sock->permissions |= (1 << i);
1726   return (0);
1727 } /* }}} int socket_permission_add */
1729 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1731   sock->permissions = 0;
1732 } /* }}} socket_permission_clear */
1734 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1735     listen_socket_t *src)
1737   dest->permissions = src->permissions;
1738 } /* }}} socket_permission_copy */
1740 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
1742   size_t i;
1744   sock->permissions = 0;
1745   for (i = 0; i < list_of_commands_len; i++)
1746     sock->permissions |= (1 << i);
1747 } /* }}} void socket_permission_set_all */
1749 /* check whether commands are received in the expected context */
1750 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1752   if (JOURNAL_REPLAY(sock))
1753     return (cmd->context & CMD_CONTEXT_JOURNAL);
1754   else if (sock->batch_start)
1755     return (cmd->context & CMD_CONTEXT_BATCH);
1756   else
1757     return (cmd->context & CMD_CONTEXT_CLIENT);
1759   /* NOTREACHED */
1760   assert(1==0);
1763 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1765   int status;
1766   char *cmd_str;
1767   char *resp_txt;
1768   command_t *help = NULL;
1770   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1771   if (status == 0)
1772     help = find_command(cmd_str);
1774   if (help && (help->syntax || help->help))
1775   {
1776     char tmp[CMD_MAX];
1778     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1779     resp_txt = tmp;
1781     if (help->syntax)
1782       add_response_info(sock, "Usage: %s\n", help->syntax);
1784     if (help->help)
1785       add_response_info(sock, "%s\n", help->help);
1786   }
1787   else
1788   {
1789     size_t i;
1791     resp_txt = "Command overview\n";
1793     for (i = 0; i < list_of_commands_len; i++)
1794     {
1795       if (list_of_commands[i].syntax == NULL)
1796         continue;
1797       add_response_info (sock, "%s", list_of_commands[i].syntax);
1798     }
1799   }
1801   return send_response(sock, RESP_OK, resp_txt);
1802 } /* }}} int handle_request_help */
1804 static int handle_request (DISPATCH_PROTO) /* {{{ */
1806   char *buffer_ptr = buffer;
1807   char *cmd_str = NULL;
1808   command_t *cmd = NULL;
1809   int status;
1811   assert (buffer[buffer_size - 1] == '\0');
1813   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1814   if (status != 0)
1815   {
1816     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1817     return (-1);
1818   }
1820   if (sock != NULL && sock->batch_start)
1821     sock->batch_cmd++;
1823   cmd = find_command(cmd_str);
1824   if (!cmd)
1825     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1827   if (!socket_permission_check (sock, cmd->cmd))
1828     return send_response(sock, RESP_ERR, "Permission denied.\n");
1830   if (!command_check_context(sock, cmd))
1831     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1833   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1834 } /* }}} int handle_request */
1836 static void journal_set_free (journal_set *js) /* {{{ */
1838   if (js == NULL)
1839     return;
1841   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1843   free(js);
1844 } /* }}} journal_set_free */
1846 static void journal_set_remove (journal_set *js) /* {{{ */
1848   if (js == NULL)
1849     return;
1851   for (uint i=0; i < js->files_num; i++)
1852   {
1853     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1854     unlink(js->files[i]);
1855   }
1856 } /* }}} journal_set_remove */
1858 /* close current journal file handle.
1859  * MUST hold journal_lock before calling */
1860 static void journal_close(void) /* {{{ */
1862   if (journal_fh != NULL)
1863   {
1864     if (fclose(journal_fh) != 0)
1865       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1866   }
1868   journal_fh = NULL;
1869   journal_size = 0;
1870 } /* }}} journal_close */
1872 /* MUST hold journal_lock before calling */
1873 static void journal_new_file(void) /* {{{ */
1875   struct timeval now;
1876   int  new_fd;
1877   char new_file[PATH_MAX + 1];
1879   assert(journal_dir != NULL);
1880   assert(journal_cur != NULL);
1882   journal_close();
1884   gettimeofday(&now, NULL);
1885   /* this format assures that the files sort in strcmp() order */
1886   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1887            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1889   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1890                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1891   if (new_fd < 0)
1892     goto error;
1894   journal_fh = fdopen(new_fd, "a");
1895   if (journal_fh == NULL)
1896     goto error;
1898   journal_size = ftell(journal_fh);
1899   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1901   /* record the file in the journal set */
1902   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1904   return;
1906 error:
1907   RRDD_LOG(LOG_CRIT,
1908            "JOURNALING DISABLED: Error while trying to create %s : %s",
1909            new_file, rrd_strerror(errno));
1910   RRDD_LOG(LOG_CRIT,
1911            "JOURNALING DISABLED: All values will be flushed at shutdown");
1913   close(new_fd);
1914   config_flush_at_shutdown = 1;
1916 } /* }}} journal_new_file */
1918 /* MUST NOT hold journal_lock before calling this */
1919 static void journal_rotate(void) /* {{{ */
1921   journal_set *old_js = NULL;
1923   if (journal_dir == NULL)
1924     return;
1926   RRDD_LOG(LOG_DEBUG, "rotating journals");
1928   pthread_mutex_lock(&stats_lock);
1929   ++stats_journal_rotate;
1930   pthread_mutex_unlock(&stats_lock);
1932   pthread_mutex_lock(&journal_lock);
1934   journal_close();
1936   /* rotate the journal sets */
1937   old_js = journal_old;
1938   journal_old = journal_cur;
1939   journal_cur = calloc(1, sizeof(journal_set));
1941   if (journal_cur != NULL)
1942     journal_new_file();
1943   else
1944     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1946   pthread_mutex_unlock(&journal_lock);
1948   journal_set_remove(old_js);
1949   journal_set_free  (old_js);
1951 } /* }}} static void journal_rotate */
1953 /* MUST hold journal_lock when calling */
1954 static void journal_done(void) /* {{{ */
1956   if (journal_cur == NULL)
1957     return;
1959   journal_close();
1961   if (config_flush_at_shutdown)
1962   {
1963     RRDD_LOG(LOG_INFO, "removing journals");
1964     journal_set_remove(journal_old);
1965     journal_set_remove(journal_cur);
1966   }
1967   else
1968   {
1969     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1970              "journals will be used at next startup");
1971   }
1973   journal_set_free(journal_cur);
1974   journal_set_free(journal_old);
1975   free(journal_dir);
1977 } /* }}} static void journal_done */
1979 static int journal_write(char *cmd, char *args) /* {{{ */
1981   int chars;
1983   if (journal_fh == NULL)
1984     return 0;
1986   pthread_mutex_lock(&journal_lock);
1987   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1988   journal_size += chars;
1990   if (journal_size > JOURNAL_MAX)
1991     journal_new_file();
1993   pthread_mutex_unlock(&journal_lock);
1995   if (chars > 0)
1996   {
1997     pthread_mutex_lock(&stats_lock);
1998     stats_journal_bytes += chars;
1999     pthread_mutex_unlock(&stats_lock);
2000   }
2002   return chars;
2003 } /* }}} static int journal_write */
2005 static int journal_replay (const char *file) /* {{{ */
2007   FILE *fh;
2008   int entry_cnt = 0;
2009   int fail_cnt = 0;
2010   uint64_t line = 0;
2011   char entry[CMD_MAX];
2012   time_t now;
2014   if (file == NULL) return 0;
2016   {
2017     char *reason = "unknown error";
2018     int status = 0;
2019     struct stat statbuf;
2021     memset(&statbuf, 0, sizeof(statbuf));
2022     if (stat(file, &statbuf) != 0)
2023     {
2024       reason = "stat error";
2025       status = errno;
2026     }
2027     else if (!S_ISREG(statbuf.st_mode))
2028     {
2029       reason = "not a regular file";
2030       status = EPERM;
2031     }
2032     if (statbuf.st_uid != daemon_uid)
2033     {
2034       reason = "not owned by daemon user";
2035       status = EACCES;
2036     }
2037     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2038     {
2039       reason = "must not be user/group writable";
2040       status = EACCES;
2041     }
2043     if (status != 0)
2044     {
2045       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2046                file, rrd_strerror(status), reason);
2047       return 0;
2048     }
2049   }
2051   fh = fopen(file, "r");
2052   if (fh == NULL)
2053   {
2054     if (errno != ENOENT)
2055       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2056                file, rrd_strerror(errno));
2057     return 0;
2058   }
2059   else
2060     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2062   now = time(NULL);
2064   while(!feof(fh))
2065   {
2066     size_t entry_len;
2068     ++line;
2069     if (fgets(entry, sizeof(entry), fh) == NULL)
2070       break;
2071     entry_len = strlen(entry);
2073     /* check \n termination in case journal writing crashed mid-line */
2074     if (entry_len == 0)
2075       continue;
2076     else if (entry[entry_len - 1] != '\n')
2077     {
2078       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2079       ++fail_cnt;
2080       continue;
2081     }
2083     entry[entry_len - 1] = '\0';
2085     if (handle_request(NULL, now, entry, entry_len) == 0)
2086       ++entry_cnt;
2087     else
2088       ++fail_cnt;
2089   }
2091   fclose(fh);
2093   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2094            entry_cnt, fail_cnt);
2096   return entry_cnt > 0 ? 1 : 0;
2097 } /* }}} static int journal_replay */
2099 static int journal_sort(const void *v1, const void *v2)
2101   char **jn1 = (char **) v1;
2102   char **jn2 = (char **) v2;
2104   return strcmp(*jn1,*jn2);
2107 static void journal_init(void) /* {{{ */
2109   int had_journal = 0;
2110   DIR *dir;
2111   struct dirent *dent;
2112   char path[PATH_MAX+1];
2114   if (journal_dir == NULL) return;
2116   pthread_mutex_lock(&journal_lock);
2118   journal_cur = calloc(1, sizeof(journal_set));
2119   if (journal_cur == NULL)
2120   {
2121     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2122     return;
2123   }
2125   RRDD_LOG(LOG_INFO, "checking for journal files");
2127   /* Handle old journal files during transition.  This gives them the
2128    * correct sort order.  TODO: remove after first release
2129    */
2130   {
2131     char old_path[PATH_MAX+1];
2132     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2133     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2134     rename(old_path, path);
2136     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2137     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2138     rename(old_path, path);
2139   }
2141   dir = opendir(journal_dir);
2142   if (!dir) {
2143     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2144     return;
2145   }
2146   while ((dent = readdir(dir)) != NULL)
2147   {
2148     /* looks like a journal file? */
2149     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2150       continue;
2152     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2154     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2155     {
2156       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2157                dent->d_name);
2158       break;
2159     }
2160   }
2161   closedir(dir);
2163   qsort(journal_cur->files, journal_cur->files_num,
2164         sizeof(journal_cur->files[0]), journal_sort);
2166   for (uint i=0; i < journal_cur->files_num; i++)
2167     had_journal += journal_replay(journal_cur->files[i]);
2169   journal_new_file();
2171   /* it must have been a crash.  start a flush */
2172   if (had_journal && config_flush_at_shutdown)
2173     flush_old_values(-1);
2175   pthread_mutex_unlock(&journal_lock);
2177   RRDD_LOG(LOG_INFO, "journal processing complete");
2179 } /* }}} static void journal_init */
2181 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2183   assert(sock != NULL);
2185   free(sock->rbuf);  sock->rbuf = NULL;
2186   free(sock->wbuf);  sock->wbuf = NULL;
2187   free(sock);
2188 } /* }}} void free_listen_socket */
2190 static void close_connection(listen_socket_t *sock) /* {{{ */
2192   if (sock->fd >= 0)
2193   {
2194     close(sock->fd);
2195     sock->fd = -1;
2196   }
2198   free_listen_socket(sock);
2200 } /* }}} void close_connection */
2202 static void *connection_thread_main (void *args) /* {{{ */
2204   listen_socket_t *sock;
2205   int fd;
2207   sock = (listen_socket_t *) args;
2208   fd = sock->fd;
2210   /* init read buffers */
2211   sock->next_read = sock->next_cmd = 0;
2212   sock->rbuf = malloc(RBUF_SIZE);
2213   if (sock->rbuf == NULL)
2214   {
2215     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2216     close_connection(sock);
2217     return NULL;
2218   }
2220   pthread_mutex_lock (&connection_threads_lock);
2221 #ifdef HAVE_LIBWRAP
2222   /* LIBWRAP does not support multiple threads! By putting this code
2223      inside pthread_mutex_lock we do not have to worry about request_info
2224      getting overwritten by another thread.
2225   */
2226   struct request_info req;
2227   request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2228   fromhost(&req);
2229   if(!hosts_access(&req)) {
2230     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2231     pthread_mutex_unlock (&connection_threads_lock);
2232     close_connection(sock);
2233     return NULL;
2234   }
2235 #endif /* HAVE_LIBWRAP */
2236   connection_threads_num++;
2237   pthread_mutex_unlock (&connection_threads_lock);
2239   while (state == RUNNING)
2240   {
2241     char *cmd;
2242     ssize_t cmd_len;
2243     ssize_t rbytes;
2244     time_t now;
2246     struct pollfd pollfd;
2247     int status;
2249     pollfd.fd = fd;
2250     pollfd.events = POLLIN | POLLPRI;
2251     pollfd.revents = 0;
2253     status = poll (&pollfd, 1, /* timeout = */ 500);
2254     if (state != RUNNING)
2255       break;
2256     else if (status == 0) /* timeout */
2257       continue;
2258     else if (status < 0) /* error */
2259     {
2260       status = errno;
2261       if (status != EINTR)
2262         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2263       continue;
2264     }
2266     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2267       break;
2268     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2269     {
2270       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2271           "poll(2) returned something unexpected: %#04hx",
2272           pollfd.revents);
2273       break;
2274     }
2276     rbytes = read(fd, sock->rbuf + sock->next_read,
2277                   RBUF_SIZE - sock->next_read);
2278     if (rbytes < 0)
2279     {
2280       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2281       break;
2282     }
2283     else if (rbytes == 0)
2284       break; /* eof */
2286     sock->next_read += rbytes;
2288     if (sock->batch_start)
2289       now = sock->batch_start;
2290     else
2291       now = time(NULL);
2293     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2294     {
2295       status = handle_request (sock, now, cmd, cmd_len+1);
2296       if (status != 0)
2297         goto out_close;
2298     }
2299   }
2301 out_close:
2302   close_connection(sock);
2304   /* Remove this thread from the connection threads list */
2305   pthread_mutex_lock (&connection_threads_lock);
2306   connection_threads_num--;
2307   if (connection_threads_num <= 0)
2308     pthread_cond_broadcast(&connection_threads_done);
2309   pthread_mutex_unlock (&connection_threads_lock);
2311   return (NULL);
2312 } /* }}} void *connection_thread_main */
2314 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2316   int fd;
2317   struct sockaddr_un sa;
2318   listen_socket_t *temp;
2319   int status;
2320   const char *path;
2321   char *path_copy, *dir;
2323   path = sock->addr;
2324   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2325     path += strlen("unix:");
2327   /* dirname may modify its argument */
2328   path_copy = strdup(path);
2329   if (path_copy == NULL)
2330   {
2331     fprintf(stderr, "rrdcached: strdup(): %s\n",
2332         rrd_strerror(errno));
2333     return (-1);
2334   }
2336   dir = dirname(path_copy);
2337   if (rrd_mkdir_p(dir, 0777) != 0)
2338   {
2339     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2340         dir, rrd_strerror(errno));
2341     return (-1);
2342   }
2344   free(path_copy);
2346   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2347       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2348   if (temp == NULL)
2349   {
2350     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2351     return (-1);
2352   }
2353   listen_fds = temp;
2354   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2356   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2357   if (fd < 0)
2358   {
2359     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2360              rrd_strerror(errno));
2361     return (-1);
2362   }
2364   memset (&sa, 0, sizeof (sa));
2365   sa.sun_family = AF_UNIX;
2366   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2368   /* if we've gotten this far, we own the pid file.  any daemon started
2369    * with the same args must not be alive.  therefore, ensure that we can
2370    * create the socket...
2371    */
2372   unlink(path);
2374   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2375   if (status != 0)
2376   {
2377     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2378              path, rrd_strerror(errno));
2379     close (fd);
2380     return (-1);
2381   }
2383   /* tweak the sockets group ownership */
2384   if (sock->socket_group != (gid_t)-1)
2385   {
2386     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2387          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2388     {
2389       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2390     }
2391   }
2393   if (sock->socket_permissions != (mode_t)-1)
2394   {
2395     if (chmod(path, sock->socket_permissions) != 0)
2396       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2397           (unsigned int)sock->socket_permissions, strerror(errno));
2398   }
2400   status = listen (fd, /* backlog = */ 10);
2401   if (status != 0)
2402   {
2403     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2404              path, rrd_strerror(errno));
2405     close (fd);
2406     unlink (path);
2407     return (-1);
2408   }
2410   listen_fds[listen_fds_num].fd = fd;
2411   listen_fds[listen_fds_num].family = PF_UNIX;
2412   strncpy(listen_fds[listen_fds_num].addr, path,
2413           sizeof (listen_fds[listen_fds_num].addr) - 1);
2414   listen_fds_num++;
2416   return (0);
2417 } /* }}} int open_listen_socket_unix */
2419 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2421   struct addrinfo ai_hints;
2422   struct addrinfo *ai_res;
2423   struct addrinfo *ai_ptr;
2424   char addr_copy[NI_MAXHOST];
2425   char *addr;
2426   char *port;
2427   int status;
2429   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2430   addr_copy[sizeof (addr_copy) - 1] = 0;
2431   addr = addr_copy;
2433   memset (&ai_hints, 0, sizeof (ai_hints));
2434   ai_hints.ai_flags = 0;
2435 #ifdef AI_ADDRCONFIG
2436   ai_hints.ai_flags |= AI_ADDRCONFIG;
2437 #endif
2438   ai_hints.ai_family = AF_UNSPEC;
2439   ai_hints.ai_socktype = SOCK_STREAM;
2441   port = NULL;
2442   if (*addr == '[') /* IPv6+port format */
2443   {
2444     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2445     addr++;
2447     port = strchr (addr, ']');
2448     if (port == NULL)
2449     {
2450       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2451       return (-1);
2452     }
2453     *port = 0;
2454     port++;
2456     if (*port == ':')
2457       port++;
2458     else if (*port == 0)
2459       port = NULL;
2460     else
2461     {
2462       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2463       return (-1);
2464     }
2465   } /* if (*addr == '[') */
2466   else
2467   {
2468     port = rindex(addr, ':');
2469     if (port != NULL)
2470     {
2471       *port = 0;
2472       port++;
2473     }
2474   }
2475   ai_res = NULL;
2476   status = getaddrinfo (addr,
2477                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2478                         &ai_hints, &ai_res);
2479   if (status != 0)
2480   {
2481     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2482              addr, gai_strerror (status));
2483     return (-1);
2484   }
2486   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2487   {
2488     int fd;
2489     listen_socket_t *temp;
2490     int one = 1;
2492     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2493         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2494     if (temp == NULL)
2495     {
2496       fprintf (stderr,
2497                "rrdcached: open_listen_socket_network: realloc failed.\n");
2498       continue;
2499     }
2500     listen_fds = temp;
2501     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2503     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2504     if (fd < 0)
2505     {
2506       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2507                rrd_strerror(errno));
2508       continue;
2509     }
2511     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2513     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2514     if (status != 0)
2515     {
2516       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2517                sock->addr, rrd_strerror(errno));
2518       close (fd);
2519       continue;
2520     }
2522     status = listen (fd, /* backlog = */ 10);
2523     if (status != 0)
2524     {
2525       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2526                sock->addr, rrd_strerror(errno));
2527       close (fd);
2528       freeaddrinfo(ai_res);
2529       return (-1);
2530     }
2532     listen_fds[listen_fds_num].fd = fd;
2533     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2534     listen_fds_num++;
2535   } /* for (ai_ptr) */
2537   freeaddrinfo(ai_res);
2538   return (0);
2539 } /* }}} static int open_listen_socket_network */
2541 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2543   assert(sock != NULL);
2544   assert(sock->addr != NULL);
2546   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2547       || sock->addr[0] == '/')
2548     return (open_listen_socket_unix(sock));
2549   else
2550     return (open_listen_socket_network(sock));
2551 } /* }}} int open_listen_socket */
2553 static int close_listen_sockets (void) /* {{{ */
2555   size_t i;
2557   for (i = 0; i < listen_fds_num; i++)
2558   {
2559     close (listen_fds[i].fd);
2561     if (listen_fds[i].family == PF_UNIX)
2562       unlink(listen_fds[i].addr);
2563   }
2565   free (listen_fds);
2566   listen_fds = NULL;
2567   listen_fds_num = 0;
2569   return (0);
2570 } /* }}} int close_listen_sockets */
2572 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2574   struct pollfd *pollfds;
2575   int pollfds_num;
2576   int status;
2577   int i;
2579   if (listen_fds_num < 1)
2580   {
2581     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2582     return (NULL);
2583   }
2585   pollfds_num = listen_fds_num;
2586   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2587   if (pollfds == NULL)
2588   {
2589     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2590     return (NULL);
2591   }
2592   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2594   RRDD_LOG(LOG_INFO, "listening for connections");
2596   while (state == RUNNING)
2597   {
2598     for (i = 0; i < pollfds_num; i++)
2599     {
2600       pollfds[i].fd = listen_fds[i].fd;
2601       pollfds[i].events = POLLIN | POLLPRI;
2602       pollfds[i].revents = 0;
2603     }
2605     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2606     if (state != RUNNING)
2607       break;
2608     else if (status == 0) /* timeout */
2609       continue;
2610     else if (status < 0) /* error */
2611     {
2612       status = errno;
2613       if (status != EINTR)
2614       {
2615         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2616       }
2617       continue;
2618     }
2620     for (i = 0; i < pollfds_num; i++)
2621     {
2622       listen_socket_t *client_sock;
2623       struct sockaddr_storage client_sa;
2624       socklen_t client_sa_size;
2625       pthread_t tid;
2626       pthread_attr_t attr;
2628       if (pollfds[i].revents == 0)
2629         continue;
2631       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2632       {
2633         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2634             "poll(2) returned something unexpected for listen FD #%i.",
2635             pollfds[i].fd);
2636         continue;
2637       }
2639       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2640       if (client_sock == NULL)
2641       {
2642         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2643         continue;
2644       }
2645       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2647       client_sa_size = sizeof (client_sa);
2648       client_sock->fd = accept (pollfds[i].fd,
2649           (struct sockaddr *) &client_sa, &client_sa_size);
2650       if (client_sock->fd < 0)
2651       {
2652         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2653         free(client_sock);
2654         continue;
2655       }
2657       pthread_attr_init (&attr);
2658       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2660       status = pthread_create (&tid, &attr, connection_thread_main,
2661                                client_sock);
2662       if (status != 0)
2663       {
2664         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2665         close_connection(client_sock);
2666         continue;
2667       }
2668     } /* for (pollfds_num) */
2669   } /* while (state == RUNNING) */
2671   RRDD_LOG(LOG_INFO, "starting shutdown");
2673   close_listen_sockets ();
2675   pthread_mutex_lock (&connection_threads_lock);
2676   while (connection_threads_num > 0)
2677     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2678   pthread_mutex_unlock (&connection_threads_lock);
2680   free(pollfds);
2682   return (NULL);
2683 } /* }}} void *listen_thread_main */
2685 static int daemonize (void) /* {{{ */
2687   int pid_fd;
2688   char *base_dir;
2690   daemon_uid = geteuid();
2692   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2693   if (pid_fd < 0)
2694     pid_fd = check_pidfile();
2695   if (pid_fd < 0)
2696     return pid_fd;
2698   /* open all the listen sockets */
2699   if (config_listen_address_list_len > 0)
2700   {
2701     for (size_t i = 0; i < config_listen_address_list_len; i++)
2702       open_listen_socket (config_listen_address_list[i]);
2704     rrd_free_ptrs((void ***) &config_listen_address_list,
2705                   &config_listen_address_list_len);
2706   }
2707   else
2708   {
2709     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2710         sizeof(default_socket.addr) - 1);
2711     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2713     if (default_socket.permissions == 0)
2714       socket_permission_set_all (&default_socket);
2716     open_listen_socket (&default_socket);
2717   }
2719   if (listen_fds_num < 1)
2720   {
2721     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2722     goto error;
2723   }
2725   if (!stay_foreground)
2726   {
2727     pid_t child;
2729     child = fork ();
2730     if (child < 0)
2731     {
2732       fprintf (stderr, "daemonize: fork(2) failed.\n");
2733       goto error;
2734     }
2735     else if (child > 0)
2736       exit(0);
2738     /* Become session leader */
2739     setsid ();
2741     /* Open the first three file descriptors to /dev/null */
2742     close (2);
2743     close (1);
2744     close (0);
2746     open ("/dev/null", O_RDWR);
2747     if (dup(0) == -1 || dup(0) == -1){
2748         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2749     }
2750   } /* if (!stay_foreground) */
2752   /* Change into the /tmp directory. */
2753   base_dir = (config_base_dir != NULL)
2754     ? config_base_dir
2755     : "/tmp";
2757   if (chdir (base_dir) != 0)
2758   {
2759     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2760     goto error;
2761   }
2763   install_signal_handlers();
2765   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2766   RRDD_LOG(LOG_INFO, "starting up");
2768   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2769                                 (GDestroyNotify) free_cache_item);
2770   if (cache_tree == NULL)
2771   {
2772     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2773     goto error;
2774   }
2776   return write_pidfile (pid_fd);
2778 error:
2779   remove_pidfile();
2780   return -1;
2781 } /* }}} int daemonize */
2783 static int cleanup (void) /* {{{ */
2785   pthread_cond_broadcast (&flush_cond);
2786   pthread_join (flush_thread, NULL);
2788   pthread_cond_broadcast (&queue_cond);
2789   for (int i = 0; i < config_queue_threads; i++)
2790     pthread_join (queue_threads[i], NULL);
2792   if (config_flush_at_shutdown)
2793   {
2794     assert(cache_queue_head == NULL);
2795     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2796   }
2798   free(queue_threads);
2799   free(config_base_dir);
2801   pthread_mutex_lock(&cache_lock);
2802   g_tree_destroy(cache_tree);
2804   pthread_mutex_lock(&journal_lock);
2805   journal_done();
2807   RRDD_LOG(LOG_INFO, "goodbye");
2808   closelog ();
2810   remove_pidfile ();
2811   free(config_pid_file);
2813   return (0);
2814 } /* }}} int cleanup */
2816 static int read_options (int argc, char **argv) /* {{{ */
2818   int option;
2819   int status = 0;
2821   socket_permission_clear (&default_socket);
2823   default_socket.socket_group = (gid_t)-1;
2824   default_socket.socket_permissions = (mode_t)-1;
2826   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2827   {
2828     switch (option)
2829     {
2830       case 'g':
2831         stay_foreground=1;
2832         break;
2834       case 'l':
2835       {
2836         listen_socket_t *new;
2838         new = malloc(sizeof(listen_socket_t));
2839         if (new == NULL)
2840         {
2841           fprintf(stderr, "read_options: malloc failed.\n");
2842           return(2);
2843         }
2844         memset(new, 0, sizeof(listen_socket_t));
2846         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2848         /* Add permissions to the socket {{{ */
2849         if (default_socket.permissions != 0)
2850         {
2851           socket_permission_copy (new, &default_socket);
2852         }
2853         else /* if (default_socket.permissions == 0) */
2854         {
2855           /* Add permission for ALL commands to the socket. */
2856           socket_permission_set_all (new);
2857         }
2858         /* }}} Done adding permissions. */
2860         new->socket_group = default_socket.socket_group;
2861         new->socket_permissions = default_socket.socket_permissions;
2863         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2864                          &config_listen_address_list_len, new))
2865         {
2866           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2867           return (2);
2868         }
2869       }
2870       break;
2872       /* set socket group permissions */
2873       case 's':
2874       {
2875         gid_t group_gid;
2876         struct group *grp;
2878         group_gid = strtoul(optarg, NULL, 10);
2879         if (errno != EINVAL && group_gid>0)
2880         {
2881           /* we were passed a number */
2882           grp = getgrgid(group_gid);
2883         }
2884         else
2885         {
2886           grp = getgrnam(optarg);
2887         }
2889         if (grp)
2890         {
2891           default_socket.socket_group = grp->gr_gid;
2892         }
2893         else
2894         {
2895           /* no idea what the user wanted... */
2896           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2897           return (5);
2898         }
2899       }
2900       break;
2902       /* set socket file permissions */
2903       case 'm':
2904       {
2905         long  tmp;
2906         char *endptr = NULL;
2908         tmp = strtol (optarg, &endptr, 8);
2909         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2910             || (tmp > 07777) || (tmp < 0)) {
2911           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2912               optarg);
2913           return (5);
2914         }
2916         default_socket.socket_permissions = (mode_t)tmp;
2917       }
2918       break;
2920       case 'P':
2921       {
2922         char *optcopy;
2923         char *saveptr;
2924         char *dummy;
2925         char *ptr;
2927         socket_permission_clear (&default_socket);
2929         optcopy = strdup (optarg);
2930         dummy = optcopy;
2931         saveptr = NULL;
2932         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2933         {
2934           dummy = NULL;
2935           status = socket_permission_add (&default_socket, ptr);
2936           if (status != 0)
2937           {
2938             fprintf (stderr, "read_options: Adding permission \"%s\" to "
2939                 "socket failed. Most likely, this permission doesn't "
2940                 "exist. Check your command line.\n", ptr);
2941             status = 4;
2942           }
2943         }
2945         free (optcopy);
2946       }
2947       break;
2949       case 'f':
2950       {
2951         int temp;
2953         temp = atoi (optarg);
2954         if (temp > 0)
2955           config_flush_interval = temp;
2956         else
2957         {
2958           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2959           status = 3;
2960         }
2961       }
2962       break;
2964       case 'w':
2965       {
2966         int temp;
2968         temp = atoi (optarg);
2969         if (temp > 0)
2970           config_write_interval = temp;
2971         else
2972         {
2973           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2974           status = 2;
2975         }
2976       }
2977       break;
2979       case 'z':
2980       {
2981         int temp;
2983         temp = atoi(optarg);
2984         if (temp > 0)
2985           config_write_jitter = temp;
2986         else
2987         {
2988           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2989           status = 2;
2990         }
2992         break;
2993       }
2995       case 't':
2996       {
2997         int threads;
2998         threads = atoi(optarg);
2999         if (threads >= 1)
3000           config_queue_threads = threads;
3001         else
3002         {
3003           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3004           return 1;
3005         }
3006       }
3007       break;
3009       case 'B':
3010         config_write_base_only = 1;
3011         break;
3013       case 'b':
3014       {
3015         size_t len;
3016         char base_realpath[PATH_MAX];
3018         if (config_base_dir != NULL)
3019           free (config_base_dir);
3020         config_base_dir = strdup (optarg);
3021         if (config_base_dir == NULL)
3022         {
3023           fprintf (stderr, "read_options: strdup failed.\n");
3024           return (3);
3025         }
3027         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3028         {
3029           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3030               config_base_dir, rrd_strerror (errno));
3031           return (3);
3032         }
3034         /* make sure that the base directory is not resolved via
3035          * symbolic links.  this makes some performance-enhancing
3036          * assumptions possible (we don't have to resolve paths
3037          * that start with a "/")
3038          */
3039         if (realpath(config_base_dir, base_realpath) == NULL)
3040         {
3041           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3042               "%s\n", config_base_dir, rrd_strerror(errno));
3043           return 5;
3044         }
3046         len = strlen (config_base_dir);
3047         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3048         {
3049           config_base_dir[len - 1] = 0;
3050           len--;
3051         }
3053         if (len < 1)
3054         {
3055           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3056           return (4);
3057         }
3059         _config_base_dir_len = len;
3061         len = strlen (base_realpath);
3062         while ((len > 0) && (base_realpath[len - 1] == '/'))
3063         {
3064           base_realpath[len - 1] = '\0';
3065           len--;
3066         }
3068         if (strncmp(config_base_dir,
3069                          base_realpath, sizeof(base_realpath)) != 0)
3070         {
3071           fprintf(stderr,
3072                   "Base directory (-b) resolved via file system links!\n"
3073                   "Please consult rrdcached '-b' documentation!\n"
3074                   "Consider specifying the real directory (%s)\n",
3075                   base_realpath);
3076           return 5;
3077         }
3078       }
3079       break;
3081       case 'p':
3082       {
3083         if (config_pid_file != NULL)
3084           free (config_pid_file);
3085         config_pid_file = strdup (optarg);
3086         if (config_pid_file == NULL)
3087         {
3088           fprintf (stderr, "read_options: strdup failed.\n");
3089           return (3);
3090         }
3091       }
3092       break;
3094       case 'F':
3095         config_flush_at_shutdown = 1;
3096         break;
3098       case 'j':
3099       {
3100         char journal_dir_actual[PATH_MAX];
3101         const char *dir;
3102         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3104         status = rrd_mkdir_p(dir, 0777);
3105         if (status != 0)
3106         {
3107           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3108               dir, rrd_strerror(errno));
3109           return 6;
3110         }
3112         if (access(dir, R_OK|W_OK|X_OK) != 0)
3113         {
3114           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3115                   errno ? rrd_strerror(errno) : "");
3116           return 6;
3117         }
3118       }
3119       break;
3121       case 'h':
3122       case '?':
3123         printf ("RRDCacheD %s\n"
3124             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3125             "\n"
3126             "Usage: rrdcached [options]\n"
3127             "\n"
3128             "Valid options are:\n"
3129             "  -l <address>  Socket address to listen to.\n"
3130             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3131             "  -P <perms>    Sets the permissions to assign to all following "
3132                             "sockets\n"
3133             "  -w <seconds>  Interval in which to write data.\n"
3134             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3135             "  -t <threads>  Number of write threads.\n"
3136             "  -f <seconds>  Interval in which to flush dead data.\n"
3137             "  -p <file>     Location of the PID-file.\n"
3138             "  -b <dir>      Base directory to change to.\n"
3139             "  -B            Restrict file access to paths within -b <dir>\n"
3140             "  -g            Do not fork and run in the foreground.\n"
3141             "  -j <dir>      Directory in which to create the journal files.\n"
3142             "  -F            Always flush all updates at shutdown\n"
3143             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3144             "                (the socket will also have read/write permissions "
3145                             "for that group)\n"
3146             "  -m <mode>     File permissions (octal) of all following UNIX "
3147                             "sockets\n"
3148             "\n"
3149             "For more information and a detailed description of all options "
3150             "please refer\n"
3151             "to the rrdcached(1) manual page.\n",
3152             VERSION);
3153         if (option == 'h')
3154           status = -1;
3155         else
3156           status = 1;
3157         break;
3158     } /* switch (option) */
3159   } /* while (getopt) */
3161   /* advise the user when values are not sane */
3162   if (config_flush_interval < 2 * config_write_interval)
3163     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3164             " 2x write interval (-w) !\n");
3165   if (config_write_jitter > config_write_interval)
3166     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3167             " write interval (-w) !\n");
3169   if (config_write_base_only && config_base_dir == NULL)
3170     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3171             "  Consult the rrdcached documentation\n");
3173   if (journal_dir == NULL)
3174     config_flush_at_shutdown = 1;
3176   return (status);
3177 } /* }}} int read_options */
3179 int main (int argc, char **argv)
3181   int status;
3183   status = read_options (argc, argv);
3184   if (status != 0)
3185   {
3186     if (status < 0)
3187       status = 0;
3188     return (status);
3189   }
3191   status = daemonize ();
3192   if (status != 0)
3193   {
3194     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3195     return (1);
3196   }
3198   journal_init();
3200   /* start the queue threads */
3201   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3202   if (queue_threads == NULL)
3203   {
3204     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3205     cleanup();
3206     return (1);
3207   }
3208   for (int i = 0; i < config_queue_threads; i++)
3209   {
3210     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3211     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3212     if (status != 0)
3213     {
3214       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3215       cleanup();
3216       return (1);
3217     }
3218   }
3220   /* start the flush thread */
3221   memset(&flush_thread, 0, sizeof(flush_thread));
3222   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3223   if (status != 0)
3224   {
3225     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3226     cleanup();
3227     return (1);
3228   }
3230   listen_thread_main (NULL);
3231   cleanup ();
3233   return (0);
3234 } /* int main */
3236 /*
3237  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3238  */