Code

Add a "FETCH" command to RRDCacheD which behaves like a (simplified
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
111 #include <glib-2.0/glib.h>
112 /* }}} */
114 #define RRDD_LOG(severity, ...) \
115   do { \
116     if (stay_foreground) \
117       fprintf(stderr, __VA_ARGS__); \
118     syslog ((severity), __VA_ARGS__); \
119   } while (0)
121 #ifndef __GNUC__
122 # define __attribute__(x) /**/
123 #endif
125 /*
126  * Types
127  */
128 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
130 struct listen_socket_s
132   int fd;
133   char addr[PATH_MAX + 1];
134   int family;
136   /* state for BATCH processing */
137   time_t batch_start;
138   int batch_cmd;
140   /* buffered IO */
141   char *rbuf;
142   off_t next_cmd;
143   off_t next_read;
145   char *wbuf;
146   ssize_t wbuf_len;
148   uint32_t permissions;
150   gid_t  socket_group;
151   mode_t socket_permissions;
152 };
153 typedef struct listen_socket_s listen_socket_t;
155 struct command_s;
156 typedef struct command_s command_t;
157 /* note: guard against "unused" warnings in the handlers */
158 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
159                         time_t now              __attribute__((unused)),\
160                         char  *buffer           __attribute__((unused)),\
161                         size_t buffer_size      __attribute__((unused))
163 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
164                         DISPATCH_PROTO
166 struct command_s {
167   char   *cmd;
168   int (*handler)(HANDLER_PROTO);
170   char  context;                /* where we expect to see it */
171 #define CMD_CONTEXT_CLIENT      (1<<0)
172 #define CMD_CONTEXT_BATCH       (1<<1)
173 #define CMD_CONTEXT_JOURNAL     (1<<2)
174 #define CMD_CONTEXT_ANY         (0x7f)
176   char *syntax;
177   char *help;
178 };
180 struct cache_item_s;
181 typedef struct cache_item_s cache_item_t;
182 struct cache_item_s
184   char *file;
185   char **values;
186   size_t values_num;            /* number of valid pointers */
187   size_t values_alloc;          /* number of allocated pointers */
188   time_t last_flush_time;
189   time_t last_update_stamp;
190 #define CI_FLAGS_IN_TREE  (1<<0)
191 #define CI_FLAGS_IN_QUEUE (1<<1)
192   int flags;
193   pthread_cond_t  flushed;
194   cache_item_t *prev;
195   cache_item_t *next;
196 };
198 struct callback_flush_data_s
200   time_t now;
201   time_t abs_timeout;
202   char **keys;
203   size_t keys_num;
204 };
205 typedef struct callback_flush_data_s callback_flush_data_t;
207 enum queue_side_e
209   HEAD,
210   TAIL
211 };
212 typedef enum queue_side_e queue_side_t;
214 /* describe a set of journal files */
215 typedef struct {
216   char **files;
217   size_t files_num;
218 } journal_set;
220 /* max length of socket command or response */
221 #define CMD_MAX 4096
222 #define RBUF_SIZE (CMD_MAX*2)
224 /*
225  * Variables
226  */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
233 enum {
234   RUNNING,              /* normal operation */
235   FLUSHING,             /* flushing remaining values */
236   SHUTDOWN              /* shutting down */
237 } state = RUNNING;
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
250 /* Cache stuff */
251 static GTree          *cache_tree = NULL;
252 static cache_item_t   *cache_queue_head = NULL;
253 static cache_item_t   *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
256 static int config_write_interval = 300;
257 static int config_write_jitter   = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
278 /* Journaled updates */
279 #define JOURNAL_REPLAY(s) ((s) == NULL)
280 #define JOURNAL_BASE "rrd.journal"
281 static journal_set *journal_cur = NULL;
282 static journal_set *journal_old = NULL;
283 static char *journal_dir = NULL;
284 static FILE *journal_fh = NULL;         /* current journal file handle */
285 static long  journal_size = 0;          /* current journal size */
286 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
287 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
288 static int journal_write(char *cmd, char *args);
289 static void journal_done(void);
290 static void journal_rotate(void);
292 /* prototypes for forward refernces */
293 static int handle_request_help (HANDLER_PROTO);
295 /* 
296  * Functions
297  */
298 static void sig_common (const char *sig) /* {{{ */
300   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
301   state = FLUSHING;
302   pthread_cond_broadcast(&flush_cond);
303   pthread_cond_broadcast(&queue_cond);
304 } /* }}} void sig_common */
306 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
308   sig_common("INT");
309 } /* }}} void sig_int_handler */
311 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
313   sig_common("TERM");
314 } /* }}} void sig_term_handler */
316 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
318   config_flush_at_shutdown = 1;
319   sig_common("USR1");
320 } /* }}} void sig_usr1_handler */
322 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
324   config_flush_at_shutdown = 0;
325   sig_common("USR2");
326 } /* }}} void sig_usr2_handler */
328 static void install_signal_handlers(void) /* {{{ */
330   /* These structures are static, because `sigaction' behaves weird if the are
331    * overwritten.. */
332   static struct sigaction sa_int;
333   static struct sigaction sa_term;
334   static struct sigaction sa_pipe;
335   static struct sigaction sa_usr1;
336   static struct sigaction sa_usr2;
338   /* Install signal handlers */
339   memset (&sa_int, 0, sizeof (sa_int));
340   sa_int.sa_handler = sig_int_handler;
341   sigaction (SIGINT, &sa_int, NULL);
343   memset (&sa_term, 0, sizeof (sa_term));
344   sa_term.sa_handler = sig_term_handler;
345   sigaction (SIGTERM, &sa_term, NULL);
347   memset (&sa_pipe, 0, sizeof (sa_pipe));
348   sa_pipe.sa_handler = SIG_IGN;
349   sigaction (SIGPIPE, &sa_pipe, NULL);
351   memset (&sa_pipe, 0, sizeof (sa_usr1));
352   sa_usr1.sa_handler = sig_usr1_handler;
353   sigaction (SIGUSR1, &sa_usr1, NULL);
355   memset (&sa_usr2, 0, sizeof (sa_usr2));
356   sa_usr2.sa_handler = sig_usr2_handler;
357   sigaction (SIGUSR2, &sa_usr2, NULL);
359 } /* }}} void install_signal_handlers */
361 static int open_pidfile(char *action, int oflag) /* {{{ */
363   int fd;
364   const char *file;
365   char *file_copy, *dir;
367   file = (config_pid_file != NULL)
368     ? config_pid_file
369     : LOCALSTATEDIR "/run/rrdcached.pid";
371   /* dirname may modify its argument */
372   file_copy = strdup(file);
373   if (file_copy == NULL)
374   {
375     fprintf(stderr, "rrdcached: strdup(): %s\n",
376         rrd_strerror(errno));
377     return -1;
378   }
380   dir = dirname(file_copy);
381   if (rrd_mkdir_p(dir, 0777) != 0)
382   {
383     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
384         dir, rrd_strerror(errno));
385     return -1;
386   }
388   free(file_copy);
390   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
391   if (fd < 0)
392     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
393             action, file, rrd_strerror(errno));
395   return(fd);
396 } /* }}} static int open_pidfile */
398 /* check existing pid file to see whether a daemon is running */
399 static int check_pidfile(void)
401   int pid_fd;
402   pid_t pid;
403   char pid_str[16];
405   pid_fd = open_pidfile("open", O_RDWR);
406   if (pid_fd < 0)
407     return pid_fd;
409   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
410     return -1;
412   pid = atoi(pid_str);
413   if (pid <= 0)
414     return -1;
416   /* another running process that we can signal COULD be
417    * a competing rrdcached */
418   if (pid != getpid() && kill(pid, 0) == 0)
419   {
420     fprintf(stderr,
421             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
422     close(pid_fd);
423     return -1;
424   }
426   lseek(pid_fd, 0, SEEK_SET);
427   if (ftruncate(pid_fd, 0) == -1)
428   {
429     fprintf(stderr,
430             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
431     close(pid_fd);
432     return -1;
433   }
435   fprintf(stderr,
436           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
437           "rrdcached: starting normally.\n", pid);
439   return pid_fd;
440 } /* }}} static int check_pidfile */
442 static int write_pidfile (int fd) /* {{{ */
444   pid_t pid;
445   FILE *fh;
447   pid = getpid ();
449   fh = fdopen (fd, "w");
450   if (fh == NULL)
451   {
452     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
453     close(fd);
454     return (-1);
455   }
457   fprintf (fh, "%i\n", (int) pid);
458   fclose (fh);
460   return (0);
461 } /* }}} int write_pidfile */
463 static int remove_pidfile (void) /* {{{ */
465   char *file;
466   int status;
468   file = (config_pid_file != NULL)
469     ? config_pid_file
470     : LOCALSTATEDIR "/run/rrdcached.pid";
472   status = unlink (file);
473   if (status == 0)
474     return (0);
475   return (errno);
476 } /* }}} int remove_pidfile */
478 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
480   char *eol;
482   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
483                sock->next_read - sock->next_cmd);
485   if (eol == NULL)
486   {
487     /* no commands left, move remainder back to front of rbuf */
488     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
489             sock->next_read - sock->next_cmd);
490     sock->next_read -= sock->next_cmd;
491     sock->next_cmd = 0;
492     *len = 0;
493     return NULL;
494   }
495   else
496   {
497     char *cmd = sock->rbuf + sock->next_cmd;
498     *eol = '\0';
500     sock->next_cmd = eol - sock->rbuf + 1;
502     if (eol > sock->rbuf && *(eol-1) == '\r')
503       *(--eol) = '\0'; /* handle "\r\n" EOL */
505     *len = eol - cmd;
507     return cmd;
508   }
510   /* NOTREACHED */
511   assert(1==0);
512 } /* }}} char *next_cmd */
514 /* add the characters directly to the write buffer */
515 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
517   char *new_buf;
519   assert(sock != NULL);
521   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
522   if (new_buf == NULL)
523   {
524     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
525     return -1;
526   }
528   strncpy(new_buf + sock->wbuf_len, str, len + 1);
530   sock->wbuf = new_buf;
531   sock->wbuf_len += len;
533   return 0;
534 } /* }}} static int add_to_wbuf */
536 /* add the text to the "extra" info that's sent after the status line */
537 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
539   va_list argp;
540   char buffer[CMD_MAX];
541   int len;
543   if (JOURNAL_REPLAY(sock)) return 0;
544   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
546   va_start(argp, fmt);
547 #ifdef HAVE_VSNPRINTF
548   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
549 #else
550   len = vsprintf(buffer, fmt, argp);
551 #endif
552   va_end(argp);
553   if (len < 0)
554   {
555     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
556     return -1;
557   }
559   return add_to_wbuf(sock, buffer, len);
560 } /* }}} static int add_response_info */
562 static int count_lines(char *str) /* {{{ */
564   int lines = 0;
566   if (str != NULL)
567   {
568     while ((str = strchr(str, '\n')) != NULL)
569     {
570       ++lines;
571       ++str;
572     }
573   }
575   return lines;
576 } /* }}} static int count_lines */
578 /* send the response back to the user.
579  * returns 0 on success, -1 on error
580  * write buffer is always zeroed after this call */
581 static int send_response (listen_socket_t *sock, response_code rc,
582                           char *fmt, ...) /* {{{ */
584   va_list argp;
585   char buffer[CMD_MAX];
586   int lines;
587   ssize_t wrote;
588   int rclen, len;
590   if (JOURNAL_REPLAY(sock)) return rc;
592   if (sock->batch_start)
593   {
594     if (rc == RESP_OK)
595       return rc; /* no response on success during BATCH */
596     lines = sock->batch_cmd;
597   }
598   else if (rc == RESP_OK)
599     lines = count_lines(sock->wbuf);
600   else
601     lines = -1;
603   rclen = sprintf(buffer, "%d ", lines);
604   va_start(argp, fmt);
605 #ifdef HAVE_VSNPRINTF
606   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
607 #else
608   len = vsprintf(buffer+rclen, fmt, argp);
609 #endif
610   va_end(argp);
611   if (len < 0)
612     return -1;
614   len += rclen;
616   /* append the result to the wbuf, don't write to the user */
617   if (sock->batch_start)
618     return add_to_wbuf(sock, buffer, len);
620   /* first write must be complete */
621   if (len != write(sock->fd, buffer, len))
622   {
623     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
624     return -1;
625   }
627   if (sock->wbuf != NULL && rc == RESP_OK)
628   {
629     wrote = 0;
630     while (wrote < sock->wbuf_len)
631     {
632       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
633       if (wb <= 0)
634       {
635         RRDD_LOG(LOG_INFO, "send_response: could not write results");
636         return -1;
637       }
638       wrote += wb;
639     }
640   }
642   free(sock->wbuf); sock->wbuf = NULL;
643   sock->wbuf_len = 0;
645   return 0;
646 } /* }}} */
648 static void wipe_ci_values(cache_item_t *ci, time_t when)
650   ci->values = NULL;
651   ci->values_num = 0;
652   ci->values_alloc = 0;
654   ci->last_flush_time = when;
655   if (config_write_jitter > 0)
656     ci->last_flush_time += (rrd_random() % config_write_jitter);
659 /* remove_from_queue
660  * remove a "cache_item_t" item from the queue.
661  * must hold 'cache_lock' when calling this
662  */
663 static void remove_from_queue(cache_item_t *ci) /* {{{ */
665   if (ci == NULL) return;
666   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
668   if (ci->prev == NULL)
669     cache_queue_head = ci->next; /* reset head */
670   else
671     ci->prev->next = ci->next;
673   if (ci->next == NULL)
674     cache_queue_tail = ci->prev; /* reset the tail */
675   else
676     ci->next->prev = ci->prev;
678   ci->next = ci->prev = NULL;
679   ci->flags &= ~CI_FLAGS_IN_QUEUE;
681   pthread_mutex_lock (&stats_lock);
682   assert (stats_queue_length > 0);
683   stats_queue_length--;
684   pthread_mutex_unlock (&stats_lock);
686 } /* }}} static void remove_from_queue */
688 /* free the resources associated with the cache_item_t
689  * must hold cache_lock when calling this function
690  */
691 static void *free_cache_item(cache_item_t *ci) /* {{{ */
693   if (ci == NULL) return NULL;
695   remove_from_queue(ci);
697   for (size_t i=0; i < ci->values_num; i++)
698     free(ci->values[i]);
700   free (ci->values);
701   free (ci->file);
703   /* in case anyone is waiting */
704   pthread_cond_broadcast(&ci->flushed);
705   pthread_cond_destroy(&ci->flushed);
707   free (ci);
709   return NULL;
710 } /* }}} static void *free_cache_item */
712 /*
713  * enqueue_cache_item:
714  * `cache_lock' must be acquired before calling this function!
715  */
716 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
717     queue_side_t side)
719   if (ci == NULL)
720     return (-1);
722   if (ci->values_num == 0)
723     return (0);
725   if (side == HEAD)
726   {
727     if (cache_queue_head == ci)
728       return 0;
730     /* remove if further down in queue */
731     remove_from_queue(ci);
733     ci->prev = NULL;
734     ci->next = cache_queue_head;
735     if (ci->next != NULL)
736       ci->next->prev = ci;
737     cache_queue_head = ci;
739     if (cache_queue_tail == NULL)
740       cache_queue_tail = cache_queue_head;
741   }
742   else /* (side == TAIL) */
743   {
744     /* We don't move values back in the list.. */
745     if (ci->flags & CI_FLAGS_IN_QUEUE)
746       return (0);
748     assert (ci->next == NULL);
749     assert (ci->prev == NULL);
751     ci->prev = cache_queue_tail;
753     if (cache_queue_tail == NULL)
754       cache_queue_head = ci;
755     else
756       cache_queue_tail->next = ci;
758     cache_queue_tail = ci;
759   }
761   ci->flags |= CI_FLAGS_IN_QUEUE;
763   pthread_cond_signal(&queue_cond);
764   pthread_mutex_lock (&stats_lock);
765   stats_queue_length++;
766   pthread_mutex_unlock (&stats_lock);
768   return (0);
769 } /* }}} int enqueue_cache_item */
771 /*
772  * tree_callback_flush:
773  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
774  * while this is in progress.
775  */
776 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
777     gpointer data)
779   cache_item_t *ci;
780   callback_flush_data_t *cfd;
782   ci = (cache_item_t *) value;
783   cfd = (callback_flush_data_t *) data;
785   if (ci->flags & CI_FLAGS_IN_QUEUE)
786     return FALSE;
788   if (ci->values_num > 0
789       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
790   {
791     enqueue_cache_item (ci, TAIL);
792   }
793   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
794       && (ci->values_num <= 0))
795   {
796     assert ((char *) key == ci->file);
797     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
798     {
799       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
800       return (FALSE);
801     }
802   }
804   return (FALSE);
805 } /* }}} gboolean tree_callback_flush */
807 static int flush_old_values (int max_age)
809   callback_flush_data_t cfd;
810   size_t k;
812   memset (&cfd, 0, sizeof (cfd));
813   /* Pass the current time as user data so that we don't need to call
814    * `time' for each node. */
815   cfd.now = time (NULL);
816   cfd.keys = NULL;
817   cfd.keys_num = 0;
819   if (max_age > 0)
820     cfd.abs_timeout = cfd.now - max_age;
821   else
822     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
824   /* `tree_callback_flush' will return the keys of all values that haven't
825    * been touched in the last `config_flush_interval' seconds in `cfd'.
826    * The char*'s in this array point to the same memory as ci->file, so we
827    * don't need to free them separately. */
828   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
830   for (k = 0; k < cfd.keys_num; k++)
831   {
832     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
833     /* should never fail, since we have held the cache_lock
834      * the entire time */
835     assert(status == TRUE);
836   }
838   if (cfd.keys != NULL)
839   {
840     free (cfd.keys);
841     cfd.keys = NULL;
842   }
844   return (0);
845 } /* int flush_old_values */
847 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
849   struct timeval now;
850   struct timespec next_flush;
851   int status;
853   gettimeofday (&now, NULL);
854   next_flush.tv_sec = now.tv_sec + config_flush_interval;
855   next_flush.tv_nsec = 1000 * now.tv_usec;
857   pthread_mutex_lock(&cache_lock);
859   while (state == RUNNING)
860   {
861     gettimeofday (&now, NULL);
862     if ((now.tv_sec > next_flush.tv_sec)
863         || ((now.tv_sec == next_flush.tv_sec)
864           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
865     {
866       RRDD_LOG(LOG_DEBUG, "flushing old values");
868       /* Determine the time of the next cache flush. */
869       next_flush.tv_sec = now.tv_sec + config_flush_interval;
871       /* Flush all values that haven't been written in the last
872        * `config_write_interval' seconds. */
873       flush_old_values (config_write_interval);
875       /* unlock the cache while we rotate so we don't block incoming
876        * updates if the fsync() blocks on disk I/O */
877       pthread_mutex_unlock(&cache_lock);
878       journal_rotate();
879       pthread_mutex_lock(&cache_lock);
880     }
882     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
883     if (status != 0 && status != ETIMEDOUT)
884     {
885       RRDD_LOG (LOG_ERR, "flush_thread_main: "
886                 "pthread_cond_timedwait returned %i.", status);
887     }
888   }
890   if (config_flush_at_shutdown)
891     flush_old_values (-1); /* flush everything */
893   state = SHUTDOWN;
895   pthread_mutex_unlock(&cache_lock);
897   return NULL;
898 } /* void *flush_thread_main */
900 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
902   pthread_mutex_lock (&cache_lock);
904   while (state != SHUTDOWN
905          || (cache_queue_head != NULL && config_flush_at_shutdown))
906   {
907     cache_item_t *ci;
908     char *file;
909     char **values;
910     size_t values_num;
911     int status;
913     /* Now, check if there's something to store away. If not, wait until
914      * something comes in. */
915     if (cache_queue_head == NULL)
916     {
917       status = pthread_cond_wait (&queue_cond, &cache_lock);
918       if ((status != 0) && (status != ETIMEDOUT))
919       {
920         RRDD_LOG (LOG_ERR, "queue_thread_main: "
921             "pthread_cond_wait returned %i.", status);
922       }
923     }
925     /* Check if a value has arrived. This may be NULL if we timed out or there
926      * was an interrupt such as a signal. */
927     if (cache_queue_head == NULL)
928       continue;
930     ci = cache_queue_head;
932     /* copy the relevant parts */
933     file = strdup (ci->file);
934     if (file == NULL)
935     {
936       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
937       continue;
938     }
940     assert(ci->values != NULL);
941     assert(ci->values_num > 0);
943     values = ci->values;
944     values_num = ci->values_num;
946     wipe_ci_values(ci, time(NULL));
947     remove_from_queue(ci);
949     pthread_mutex_unlock (&cache_lock);
951     rrd_clear_error ();
952     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
953     if (status != 0)
954     {
955       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
956           "rrd_update_r (%s) failed with status %i. (%s)",
957           file, status, rrd_get_error());
958     }
960     journal_write("wrote", file);
962     /* Search again in the tree.  It's possible someone issued a "FORGET"
963      * while we were writing the update values. */
964     pthread_mutex_lock(&cache_lock);
965     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
966     if (ci)
967       pthread_cond_broadcast(&ci->flushed);
968     pthread_mutex_unlock(&cache_lock);
970     if (status == 0)
971     {
972       pthread_mutex_lock (&stats_lock);
973       stats_updates_written++;
974       stats_data_sets_written += values_num;
975       pthread_mutex_unlock (&stats_lock);
976     }
978     rrd_free_ptrs((void ***) &values, &values_num);
979     free(file);
981     pthread_mutex_lock (&cache_lock);
982   }
983   pthread_mutex_unlock (&cache_lock);
985   return (NULL);
986 } /* }}} void *queue_thread_main */
988 static int buffer_get_field (char **buffer_ret, /* {{{ */
989     size_t *buffer_size_ret, char **field_ret)
991   char *buffer;
992   size_t buffer_pos;
993   size_t buffer_size;
994   char *field;
995   size_t field_size;
996   int status;
998   buffer = *buffer_ret;
999   buffer_pos = 0;
1000   buffer_size = *buffer_size_ret;
1001   field = *buffer_ret;
1002   field_size = 0;
1004   if (buffer_size <= 0)
1005     return (-1);
1007   /* This is ensured by `handle_request'. */
1008   assert (buffer[buffer_size - 1] == '\0');
1010   status = -1;
1011   while (buffer_pos < buffer_size)
1012   {
1013     /* Check for end-of-field or end-of-buffer */
1014     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1015     {
1016       field[field_size] = 0;
1017       field_size++;
1018       buffer_pos++;
1019       status = 0;
1020       break;
1021     }
1022     /* Handle escaped characters. */
1023     else if (buffer[buffer_pos] == '\\')
1024     {
1025       if (buffer_pos >= (buffer_size - 1))
1026         break;
1027       buffer_pos++;
1028       field[field_size] = buffer[buffer_pos];
1029       field_size++;
1030       buffer_pos++;
1031     }
1032     /* Normal operation */ 
1033     else
1034     {
1035       field[field_size] = buffer[buffer_pos];
1036       field_size++;
1037       buffer_pos++;
1038     }
1039   } /* while (buffer_pos < buffer_size) */
1041   if (status != 0)
1042     return (status);
1044   *buffer_ret = buffer + buffer_pos;
1045   *buffer_size_ret = buffer_size - buffer_pos;
1046   *field_ret = field;
1048   return (0);
1049 } /* }}} int buffer_get_field */
1051 /* if we're restricting writes to the base directory,
1052  * check whether the file falls within the dir
1053  * returns 1 if OK, otherwise 0
1054  */
1055 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1057   assert(file != NULL);
1059   if (!config_write_base_only
1060       || JOURNAL_REPLAY(sock)
1061       || config_base_dir == NULL)
1062     return 1;
1064   if (strstr(file, "../") != NULL) goto err;
1066   /* relative paths without "../" are ok */
1067   if (*file != '/') return 1;
1069   /* file must be of the format base + "/" + <1+ char filename> */
1070   if (strlen(file) < _config_base_dir_len + 2) goto err;
1071   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1072   if (*(file + _config_base_dir_len) != '/') goto err;
1074   return 1;
1076 err:
1077   if (sock != NULL && sock->fd >= 0)
1078     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1080   return 0;
1081 } /* }}} static int check_file_access */
1083 /* when using a base dir, convert relative paths to absolute paths.
1084  * if necessary, modifies the "filename" pointer to point
1085  * to the new path created in "tmp".  "tmp" is provided
1086  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1087  *
1088  * this allows us to optimize for the expected case (absolute path)
1089  * with a no-op.
1090  */
1091 static void get_abs_path(char **filename, char *tmp)
1093   assert(tmp != NULL);
1094   assert(filename != NULL && *filename != NULL);
1096   if (config_base_dir == NULL || **filename == '/')
1097     return;
1099   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1100   *filename = tmp;
1101 } /* }}} static int get_abs_path */
1103 static int flush_file (const char *filename) /* {{{ */
1105   cache_item_t *ci;
1107   pthread_mutex_lock (&cache_lock);
1109   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1110   if (ci == NULL)
1111   {
1112     pthread_mutex_unlock (&cache_lock);
1113     return (ENOENT);
1114   }
1116   if (ci->values_num > 0)
1117   {
1118     /* Enqueue at head */
1119     enqueue_cache_item (ci, HEAD);
1120     pthread_cond_wait(&ci->flushed, &cache_lock);
1121   }
1123   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1124    * may have been purged during our cond_wait() */
1126   pthread_mutex_unlock(&cache_lock);
1128   return (0);
1129 } /* }}} int flush_file */
1131 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1133   char *err = "Syntax error.\n";
1135   if (cmd && cmd->syntax)
1136     err = cmd->syntax;
1138   return send_response(sock, RESP_ERR, "Usage: %s", err);
1139 } /* }}} static int syntax_error() */
1141 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1143   uint64_t copy_queue_length;
1144   uint64_t copy_updates_received;
1145   uint64_t copy_flush_received;
1146   uint64_t copy_updates_written;
1147   uint64_t copy_data_sets_written;
1148   uint64_t copy_journal_bytes;
1149   uint64_t copy_journal_rotate;
1151   uint64_t tree_nodes_number;
1152   uint64_t tree_depth;
1154   pthread_mutex_lock (&stats_lock);
1155   copy_queue_length       = stats_queue_length;
1156   copy_updates_received   = stats_updates_received;
1157   copy_flush_received     = stats_flush_received;
1158   copy_updates_written    = stats_updates_written;
1159   copy_data_sets_written  = stats_data_sets_written;
1160   copy_journal_bytes      = stats_journal_bytes;
1161   copy_journal_rotate     = stats_journal_rotate;
1162   pthread_mutex_unlock (&stats_lock);
1164   pthread_mutex_lock (&cache_lock);
1165   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1166   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1167   pthread_mutex_unlock (&cache_lock);
1169   add_response_info(sock,
1170                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1171   add_response_info(sock,
1172                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1173   add_response_info(sock,
1174                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1175   add_response_info(sock,
1176                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1177   add_response_info(sock,
1178                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1179   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1180   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1181   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1182   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1184   send_response(sock, RESP_OK, "Statistics follow\n");
1186   return (0);
1187 } /* }}} int handle_request_stats */
1189 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1191   char *file, file_tmp[PATH_MAX];
1192   int status;
1194   status = buffer_get_field (&buffer, &buffer_size, &file);
1195   if (status != 0)
1196   {
1197     return syntax_error(sock,cmd);
1198   }
1199   else
1200   {
1201     pthread_mutex_lock(&stats_lock);
1202     stats_flush_received++;
1203     pthread_mutex_unlock(&stats_lock);
1205     get_abs_path(&file, file_tmp);
1206     if (!check_file_access(file, sock)) return 0;
1208     status = flush_file (file);
1209     if (status == 0)
1210       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1211     else if (status == ENOENT)
1212     {
1213       /* no file in our tree; see whether it exists at all */
1214       struct stat statbuf;
1216       memset(&statbuf, 0, sizeof(statbuf));
1217       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1218         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1219       else
1220         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1221     }
1222     else if (status < 0)
1223       return send_response(sock, RESP_ERR, "Internal error.\n");
1224     else
1225       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1226   }
1228   /* NOTREACHED */
1229   assert(1==0);
1230 } /* }}} int handle_request_flush */
1232 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1234   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1236   pthread_mutex_lock(&cache_lock);
1237   flush_old_values(-1);
1238   pthread_mutex_unlock(&cache_lock);
1240   return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1243 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1245   int status;
1246   char *file, file_tmp[PATH_MAX];
1247   cache_item_t *ci;
1249   status = buffer_get_field(&buffer, &buffer_size, &file);
1250   if (status != 0)
1251     return syntax_error(sock,cmd);
1253   get_abs_path(&file, file_tmp);
1255   pthread_mutex_lock(&cache_lock);
1256   ci = g_tree_lookup(cache_tree, file);
1257   if (ci == NULL)
1258   {
1259     pthread_mutex_unlock(&cache_lock);
1260     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1261   }
1263   for (size_t i=0; i < ci->values_num; i++)
1264     add_response_info(sock, "%s\n", ci->values[i]);
1266   pthread_mutex_unlock(&cache_lock);
1267   return send_response(sock, RESP_OK, "updates pending\n");
1268 } /* }}} static int handle_request_pending */
1270 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1272   int status;
1273   gboolean found;
1274   char *file, file_tmp[PATH_MAX];
1276   status = buffer_get_field(&buffer, &buffer_size, &file);
1277   if (status != 0)
1278     return syntax_error(sock,cmd);
1280   get_abs_path(&file, file_tmp);
1281   if (!check_file_access(file, sock)) return 0;
1283   pthread_mutex_lock(&cache_lock);
1284   found = g_tree_remove(cache_tree, file);
1285   pthread_mutex_unlock(&cache_lock);
1287   if (found == TRUE)
1288   {
1289     if (!JOURNAL_REPLAY(sock))
1290       journal_write("forget", file);
1292     return send_response(sock, RESP_OK, "Gone!\n");
1293   }
1294   else
1295     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1297   /* NOTREACHED */
1298   assert(1==0);
1299 } /* }}} static int handle_request_forget */
1301 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1303   cache_item_t *ci;
1305   pthread_mutex_lock(&cache_lock);
1307   ci = cache_queue_head;
1308   while (ci != NULL)
1309   {
1310     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1311     ci = ci->next;
1312   }
1314   pthread_mutex_unlock(&cache_lock);
1316   return send_response(sock, RESP_OK, "in queue.\n");
1317 } /* }}} int handle_request_queue */
1319 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1321   char *file, file_tmp[PATH_MAX];
1322   int values_num = 0;
1323   int status;
1324   char orig_buf[CMD_MAX];
1326   cache_item_t *ci;
1328   /* save it for the journal later */
1329   if (!JOURNAL_REPLAY(sock))
1330     strncpy(orig_buf, buffer, buffer_size);
1332   status = buffer_get_field (&buffer, &buffer_size, &file);
1333   if (status != 0)
1334     return syntax_error(sock,cmd);
1336   pthread_mutex_lock(&stats_lock);
1337   stats_updates_received++;
1338   pthread_mutex_unlock(&stats_lock);
1340   get_abs_path(&file, file_tmp);
1341   if (!check_file_access(file, sock)) return 0;
1343   pthread_mutex_lock (&cache_lock);
1344   ci = g_tree_lookup (cache_tree, file);
1346   if (ci == NULL) /* {{{ */
1347   {
1348     struct stat statbuf;
1349     cache_item_t *tmp;
1351     /* don't hold the lock while we setup; stat(2) might block */
1352     pthread_mutex_unlock(&cache_lock);
1354     memset (&statbuf, 0, sizeof (statbuf));
1355     status = stat (file, &statbuf);
1356     if (status != 0)
1357     {
1358       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1360       status = errno;
1361       if (status == ENOENT)
1362         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1363       else
1364         return send_response(sock, RESP_ERR,
1365                              "stat failed with error %i.\n", status);
1366     }
1367     if (!S_ISREG (statbuf.st_mode))
1368       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1370     if (access(file, R_OK|W_OK) != 0)
1371       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1372                            file, rrd_strerror(errno));
1374     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1375     if (ci == NULL)
1376     {
1377       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1379       return send_response(sock, RESP_ERR, "malloc failed.\n");
1380     }
1381     memset (ci, 0, sizeof (cache_item_t));
1383     ci->file = strdup (file);
1384     if (ci->file == NULL)
1385     {
1386       free (ci);
1387       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1389       return send_response(sock, RESP_ERR, "strdup failed.\n");
1390     }
1392     wipe_ci_values(ci, now);
1393     ci->flags = CI_FLAGS_IN_TREE;
1394     pthread_cond_init(&ci->flushed, NULL);
1396     pthread_mutex_lock(&cache_lock);
1398     /* another UPDATE might have added this entry in the meantime */
1399     tmp = g_tree_lookup (cache_tree, file);
1400     if (tmp == NULL)
1401       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1402     else
1403     {
1404       free_cache_item (ci);
1405       ci = tmp;
1406     }
1408     /* state may have changed while we were unlocked */
1409     if (state == SHUTDOWN)
1410       return -1;
1411   } /* }}} */
1412   assert (ci != NULL);
1414   /* don't re-write updates in replay mode */
1415   if (!JOURNAL_REPLAY(sock))
1416     journal_write("update", orig_buf);
1418   while (buffer_size > 0)
1419   {
1420     char *value;
1421     time_t stamp;
1422     char *eostamp;
1424     status = buffer_get_field (&buffer, &buffer_size, &value);
1425     if (status != 0)
1426     {
1427       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1428       break;
1429     }
1431     /* make sure update time is always moving forward */
1432     stamp = strtol(value, &eostamp, 10);
1433     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434     {
1435       pthread_mutex_unlock(&cache_lock);
1436       return send_response(sock, RESP_ERR,
1437                            "Cannot find timestamp in '%s'!\n", value);
1438     }
1439     else if (stamp <= ci->last_update_stamp)
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "illegal attempt to update using time %ld when last"
1444                            " update time is %ld (minimum one second step)\n",
1445                            stamp, ci->last_update_stamp);
1446     }
1447     else
1448       ci->last_update_stamp = stamp;
1450     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451                               &ci->values_alloc, config_alloc_chunk))
1452     {
1453       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454       continue;
1455     }
1457     values_num++;
1458   }
1460   if (((now - ci->last_flush_time) >= config_write_interval)
1461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462       && (ci->values_num > 0))
1463   {
1464     enqueue_cache_item (ci, TAIL);
1465   }
1467   pthread_mutex_unlock (&cache_lock);
1469   if (values_num < 1)
1470     return send_response(sock, RESP_ERR, "No values updated.\n");
1471   else
1472     return send_response(sock, RESP_OK,
1473                          "errors, enqueued %i value(s).\n", values_num);
1475   /* NOTREACHED */
1476   assert(1==0);
1478 } /* }}} int handle_request_update */
1480 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1482   char *file;
1483   char *cf;
1485   char *start_str;
1486   char *end_str;
1487   rrd_time_value_t start_tv;
1488   rrd_time_value_t end_tv;
1489   time_t start_tm;
1490   time_t end_tm;
1492   unsigned long step;
1493   unsigned long ds_cnt;
1494   char **ds_namv;
1495   rrd_value_t *data;
1497   int status;
1498   unsigned long i;
1499   time_t t;
1500   rrd_value_t *data_ptr;
1502   file = NULL;
1503   cf = NULL;
1504   start_str = NULL;
1505   end_str = NULL;
1507   /* Read the arguments */
1508   do /* while (0) */
1509   {
1510     status = buffer_get_field (&buffer, &buffer_size, &file);
1511     if (status != 0)
1512       break;
1514     status = buffer_get_field (&buffer, &buffer_size, &cf);
1515     if (status != 0)
1516       break;
1518     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1519     if (status != 0)
1520     {
1521       start_str = NULL;
1522       status = 0;
1523       break;
1524     }
1526     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1527     if (status != 0)
1528     {
1529       end_str = NULL;
1530       status = 0;
1531       break;
1532     }
1533   } while (0);
1535   if (status != 0)
1536     return (syntax_error(sock,cmd));
1538   status = flush_file (file);
1539   if ((status != 0) && (status != ENOENT))
1540     return (send_response (sock, RESP_ERR,
1541           "flush_file (%s) failed with status %i.\n", file, status));
1543   /* Parse start time */
1544   if (start_str != NULL)
1545   {
1546     const char *errmsg;
1548     errmsg = rrd_parsetime (start_str, &start_tv);
1549     if (errmsg != NULL)
1550       return (send_response(sock, RESP_ERR,
1551             "Cannot parse start time `%s': %s\n", start_str, errmsg));
1552   }
1553   else
1554     rrd_parsetime ("-86400", &start_tv);
1556   /* Parse end time */
1557   if (end_str != NULL)
1558   {
1559     const char *errmsg;
1561     errmsg = rrd_parsetime (end_str, &end_tv);
1562     if (errmsg != NULL)
1563       return (send_response(sock, RESP_ERR,
1564             "Cannot parse end time `%s': %s\n", end_str, errmsg));
1565   }
1566   else
1567     rrd_parsetime ("now", &end_tv);
1569   start_tm = 0;
1570   end_tm = 0;
1571   status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm);
1572   if (status != 0)
1573     return (send_response(sock, RESP_ERR,
1574           "rrd_proc_start_end failed: %s\n", rrd_get_error ()));
1576   step = -1;
1577   ds_cnt = 0;
1578   ds_namv = NULL;
1579   data = NULL;
1581   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1582       &ds_cnt, &ds_namv, &data);
1583   if (status != 0)
1584     return (send_response(sock, RESP_ERR,
1585           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1587   add_response_info (sock, "FlushVersion: %lu\n", 1);
1588   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1589   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1590   add_response_info (sock, "Step: %lu\n", step);
1591   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1593 #define SSTRCAT(buffer,str,buffer_fill) do { \
1594     size_t str_len = strlen (str); \
1595     if ((buffer_fill + str_len) > sizeof (buffer)) \
1596       str_len = sizeof (buffer) - buffer_fill; \
1597     if (str_len > 0) { \
1598       strncpy (buffer + buffer_fill, str, str_len); \
1599       buffer_fill += str_len; \
1600       assert (buffer_fill <= sizeof (buffer)); \
1601       if (buffer_fill == sizeof (buffer)) \
1602         buffer[buffer_fill - 1] = 0; \
1603       else \
1604         buffer[buffer_fill] = 0; \
1605     } \
1606   } while (0)
1608   { /* Add list of DS names */
1609     char linebuf[1024];
1610     size_t linebuf_fill;
1612     memset (linebuf, 0, sizeof (linebuf));
1613     linebuf_fill = 0;
1614     for (i = 0; i < ds_cnt; i++)
1615     {
1616       if (i > 0)
1617         SSTRCAT (linebuf, " ", linebuf_fill);
1618       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1619     }
1620     add_response_info (sock, "DSName: %s\n", linebuf);
1621   }
1623   /* Add the actual data */
1624   assert (step > 0);
1625   data_ptr = data;
1626   for (t = start_tm + step; t <= end_tm; t += step)
1627   {
1628     char linebuf[1024];
1629     size_t linebuf_fill;
1630     char tmp[128];
1632     memset (linebuf, 0, sizeof (linebuf));
1633     linebuf_fill = 0;
1634     for (i = 0; i < ds_cnt; i++)
1635     {
1636       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1637       tmp[sizeof (tmp) - 1] = 0;
1638       SSTRCAT (linebuf, tmp, linebuf_fill);
1640       data_ptr++;
1641     }
1643     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1644   } /* for (t) */
1646   return (send_response (sock, RESP_OK, "Success\n"));
1647 #undef SSTRCAT
1648 } /* }}} int handle_request_fetch */
1650 /* we came across a "WROTE" entry during journal replay.
1651  * throw away any values that we have accumulated for this file
1652  */
1653 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1655   cache_item_t *ci;
1656   const char *file = buffer;
1658   pthread_mutex_lock(&cache_lock);
1660   ci = g_tree_lookup(cache_tree, file);
1661   if (ci == NULL)
1662   {
1663     pthread_mutex_unlock(&cache_lock);
1664     return (0);
1665   }
1667   if (ci->values)
1668     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1670   wipe_ci_values(ci, now);
1671   remove_from_queue(ci);
1673   pthread_mutex_unlock(&cache_lock);
1674   return (0);
1675 } /* }}} int handle_request_wrote */
1677 /* start "BATCH" processing */
1678 static int batch_start (HANDLER_PROTO) /* {{{ */
1680   int status;
1681   if (sock->batch_start)
1682     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1684   status = send_response(sock, RESP_OK,
1685                          "Go ahead.  End with dot '.' on its own line.\n");
1686   sock->batch_start = time(NULL);
1687   sock->batch_cmd = 0;
1689   return status;
1690 } /* }}} static int batch_start */
1692 /* finish "BATCH" processing and return results to the client */
1693 static int batch_done (HANDLER_PROTO) /* {{{ */
1695   assert(sock->batch_start);
1696   sock->batch_start = 0;
1697   sock->batch_cmd  = 0;
1698   return send_response(sock, RESP_OK, "errors\n");
1699 } /* }}} static int batch_done */
1701 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1703   return -1;
1704 } /* }}} static int handle_request_quit */
1706 static command_t list_of_commands[] = { /* {{{ */
1707   {
1708     "UPDATE",
1709     handle_request_update,
1710     CMD_CONTEXT_ANY,
1711     "UPDATE <filename> <values> [<values> ...]\n"
1712     ,
1713     "Adds the given file to the internal cache if it is not yet known and\n"
1714     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1715     "for details.\n"
1716     "\n"
1717     "Each <values> has the following form:\n"
1718     "  <values> = <time>:<value>[:<value>[...]]\n"
1719     "See the rrdupdate(1) manpage for details.\n"
1720   },
1721   {
1722     "WROTE",
1723     handle_request_wrote,
1724     CMD_CONTEXT_JOURNAL,
1725     NULL,
1726     NULL
1727   },
1728   {
1729     "FLUSH",
1730     handle_request_flush,
1731     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1732     "FLUSH <filename>\n"
1733     ,
1734     "Adds the given filename to the head of the update queue and returns\n"
1735     "after it has been dequeued.\n"
1736   },
1737   {
1738     "FLUSHALL",
1739     handle_request_flushall,
1740     CMD_CONTEXT_CLIENT,
1741     "FLUSHALL\n"
1742     ,
1743     "Triggers writing of all pending updates.  Returns immediately.\n"
1744   },
1745   {
1746     "PENDING",
1747     handle_request_pending,
1748     CMD_CONTEXT_CLIENT,
1749     "PENDING <filename>\n"
1750     ,
1751     "Shows any 'pending' updates for a file, in order.\n"
1752     "The updates shown have not yet been written to the underlying RRD file.\n"
1753   },
1754   {
1755     "FORGET",
1756     handle_request_forget,
1757     CMD_CONTEXT_ANY,
1758     "FORGET <filename>\n"
1759     ,
1760     "Removes the file completely from the cache.\n"
1761     "Any pending updates for the file will be lost.\n"
1762   },
1763   {
1764     "QUEUE",
1765     handle_request_queue,
1766     CMD_CONTEXT_CLIENT,
1767     "QUEUE\n"
1768     ,
1769         "Shows all files in the output queue.\n"
1770     "The output is zero or more lines in the following format:\n"
1771     "(where <num_vals> is the number of values to be written)\n"
1772     "\n"
1773     "<num_vals> <filename>\n"
1774   },
1775   {
1776     "STATS",
1777     handle_request_stats,
1778     CMD_CONTEXT_CLIENT,
1779     "STATS\n"
1780     ,
1781     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1782     "a description of the values.\n"
1783   },
1784   {
1785     "HELP",
1786     handle_request_help,
1787     CMD_CONTEXT_CLIENT,
1788     "HELP [<command>]\n",
1789     NULL, /* special! */
1790   },
1791   {
1792     "BATCH",
1793     batch_start,
1794     CMD_CONTEXT_CLIENT,
1795     "BATCH\n"
1796     ,
1797     "The 'BATCH' command permits the client to initiate a bulk load\n"
1798     "   of commands to rrdcached.\n"
1799     "\n"
1800     "Usage:\n"
1801     "\n"
1802     "    client: BATCH\n"
1803     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1804     "    client: command #1\n"
1805     "    client: command #2\n"
1806     "    client: ... and so on\n"
1807     "    client: .\n"
1808     "    server: 2 errors\n"
1809     "    server: 7 message for command #7\n"
1810     "    server: 9 message for command #9\n"
1811     "\n"
1812     "For more information, consult the rrdcached(1) documentation.\n"
1813   },
1814   {
1815     ".",   /* BATCH terminator */
1816     batch_done,
1817     CMD_CONTEXT_BATCH,
1818     NULL,
1819     NULL
1820   },
1821   {
1822     "FETCH",
1823     handle_request_fetch,
1824     CMD_CONTEXT_CLIENT,
1825     "FETCH <file> <CF> [<start> [<end>]]\n"
1826     ,
1827     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1828   },
1829   {
1830     "QUIT",
1831     handle_request_quit,
1832     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1833     "QUIT\n"
1834     ,
1835     "Disconnect from rrdcached.\n"
1836   }
1837 }; /* }}} command_t list_of_commands[] */
1838 static size_t list_of_commands_len = sizeof (list_of_commands)
1839   / sizeof (list_of_commands[0]);
1841 static command_t *find_command(char *cmd)
1843   size_t i;
1845   for (i = 0; i < list_of_commands_len; i++)
1846     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1847       return (&list_of_commands[i]);
1848   return NULL;
1851 /* We currently use the index in the `list_of_commands' array as a bit position
1852  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1853  * outside these functions so that switching to a more elegant storage method
1854  * is easily possible. */
1855 static ssize_t find_command_index (const char *cmd) /* {{{ */
1857   size_t i;
1859   for (i = 0; i < list_of_commands_len; i++)
1860     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1861       return ((ssize_t) i);
1862   return (-1);
1863 } /* }}} ssize_t find_command_index */
1865 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1866     const char *cmd)
1868   ssize_t i;
1870   if (JOURNAL_REPLAY(sock))
1871     return (1);
1873   if (cmd == NULL)
1874     return (-1);
1876   if ((strcasecmp ("QUIT", cmd) == 0)
1877       || (strcasecmp ("HELP", cmd) == 0))
1878     return (1);
1879   else if (strcmp (".", cmd) == 0)
1880     cmd = "BATCH";
1882   i = find_command_index (cmd);
1883   if (i < 0)
1884     return (-1);
1885   assert (i < 32);
1887   if ((sock->permissions & (1 << i)) != 0)
1888     return (1);
1889   return (0);
1890 } /* }}} int socket_permission_check */
1892 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1893     const char *cmd)
1895   ssize_t i;
1897   i = find_command_index (cmd);
1898   if (i < 0)
1899     return (-1);
1900   assert (i < 32);
1902   sock->permissions |= (1 << i);
1903   return (0);
1904 } /* }}} int socket_permission_add */
1906 /* check whether commands are received in the expected context */
1907 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1909   if (JOURNAL_REPLAY(sock))
1910     return (cmd->context & CMD_CONTEXT_JOURNAL);
1911   else if (sock->batch_start)
1912     return (cmd->context & CMD_CONTEXT_BATCH);
1913   else
1914     return (cmd->context & CMD_CONTEXT_CLIENT);
1916   /* NOTREACHED */
1917   assert(1==0);
1920 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1922   int status;
1923   char *cmd_str;
1924   char *resp_txt;
1925   command_t *help = NULL;
1927   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1928   if (status == 0)
1929     help = find_command(cmd_str);
1931   if (help && (help->syntax || help->help))
1932   {
1933     char tmp[CMD_MAX];
1935     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1936     resp_txt = tmp;
1938     if (help->syntax)
1939       add_response_info(sock, "Usage: %s\n", help->syntax);
1941     if (help->help)
1942       add_response_info(sock, "%s\n", help->help);
1943   }
1944   else
1945   {
1946     size_t i;
1948     resp_txt = "Command overview\n";
1950     for (i = 0; i < list_of_commands_len; i++)
1951     {
1952       if (list_of_commands[i].syntax == NULL)
1953         continue;
1954       add_response_info (sock, "%s", list_of_commands[i].syntax);
1955     }
1956   }
1958   return send_response(sock, RESP_OK, resp_txt);
1959 } /* }}} int handle_request_help */
1961 static int handle_request (DISPATCH_PROTO) /* {{{ */
1963   char *buffer_ptr = buffer;
1964   char *cmd_str = NULL;
1965   command_t *cmd = NULL;
1966   int status;
1968   assert (buffer[buffer_size - 1] == '\0');
1970   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1971   if (status != 0)
1972   {
1973     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1974     return (-1);
1975   }
1977   if (sock != NULL && sock->batch_start)
1978     sock->batch_cmd++;
1980   cmd = find_command(cmd_str);
1981   if (!cmd)
1982     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1984   if (!socket_permission_check (sock, cmd->cmd))
1985     return send_response(sock, RESP_ERR, "Permission denied.\n");
1987   if (!command_check_context(sock, cmd))
1988     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1990   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1991 } /* }}} int handle_request */
1993 static void journal_set_free (journal_set *js) /* {{{ */
1995   if (js == NULL)
1996     return;
1998   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2000   free(js);
2001 } /* }}} journal_set_free */
2003 static void journal_set_remove (journal_set *js) /* {{{ */
2005   if (js == NULL)
2006     return;
2008   for (uint i=0; i < js->files_num; i++)
2009   {
2010     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2011     unlink(js->files[i]);
2012   }
2013 } /* }}} journal_set_remove */
2015 /* close current journal file handle.
2016  * MUST hold journal_lock before calling */
2017 static void journal_close(void) /* {{{ */
2019   if (journal_fh != NULL)
2020   {
2021     if (fclose(journal_fh) != 0)
2022       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2023   }
2025   journal_fh = NULL;
2026   journal_size = 0;
2027 } /* }}} journal_close */
2029 /* MUST hold journal_lock before calling */
2030 static void journal_new_file(void) /* {{{ */
2032   struct timeval now;
2033   int  new_fd;
2034   char new_file[PATH_MAX + 1];
2036   assert(journal_dir != NULL);
2037   assert(journal_cur != NULL);
2039   journal_close();
2041   gettimeofday(&now, NULL);
2042   /* this format assures that the files sort in strcmp() order */
2043   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2044            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2046   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2047                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2048   if (new_fd < 0)
2049     goto error;
2051   journal_fh = fdopen(new_fd, "a");
2052   if (journal_fh == NULL)
2053     goto error;
2055   journal_size = ftell(journal_fh);
2056   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2058   /* record the file in the journal set */
2059   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2061   return;
2063 error:
2064   RRDD_LOG(LOG_CRIT,
2065            "JOURNALING DISABLED: Error while trying to create %s : %s",
2066            new_file, rrd_strerror(errno));
2067   RRDD_LOG(LOG_CRIT,
2068            "JOURNALING DISABLED: All values will be flushed at shutdown");
2070   close(new_fd);
2071   config_flush_at_shutdown = 1;
2073 } /* }}} journal_new_file */
2075 /* MUST NOT hold journal_lock before calling this */
2076 static void journal_rotate(void) /* {{{ */
2078   journal_set *old_js = NULL;
2080   if (journal_dir == NULL)
2081     return;
2083   RRDD_LOG(LOG_DEBUG, "rotating journals");
2085   pthread_mutex_lock(&stats_lock);
2086   ++stats_journal_rotate;
2087   pthread_mutex_unlock(&stats_lock);
2089   pthread_mutex_lock(&journal_lock);
2091   journal_close();
2093   /* rotate the journal sets */
2094   old_js = journal_old;
2095   journal_old = journal_cur;
2096   journal_cur = calloc(1, sizeof(journal_set));
2098   if (journal_cur != NULL)
2099     journal_new_file();
2100   else
2101     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2103   pthread_mutex_unlock(&journal_lock);
2105   journal_set_remove(old_js);
2106   journal_set_free  (old_js);
2108 } /* }}} static void journal_rotate */
2110 /* MUST hold journal_lock when calling */
2111 static void journal_done(void) /* {{{ */
2113   if (journal_cur == NULL)
2114     return;
2116   journal_close();
2118   if (config_flush_at_shutdown)
2119   {
2120     RRDD_LOG(LOG_INFO, "removing journals");
2121     journal_set_remove(journal_old);
2122     journal_set_remove(journal_cur);
2123   }
2124   else
2125   {
2126     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2127              "journals will be used at next startup");
2128   }
2130   journal_set_free(journal_cur);
2131   journal_set_free(journal_old);
2132   free(journal_dir);
2134 } /* }}} static void journal_done */
2136 static int journal_write(char *cmd, char *args) /* {{{ */
2138   int chars;
2140   if (journal_fh == NULL)
2141     return 0;
2143   pthread_mutex_lock(&journal_lock);
2144   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2145   journal_size += chars;
2147   if (journal_size > JOURNAL_MAX)
2148     journal_new_file();
2150   pthread_mutex_unlock(&journal_lock);
2152   if (chars > 0)
2153   {
2154     pthread_mutex_lock(&stats_lock);
2155     stats_journal_bytes += chars;
2156     pthread_mutex_unlock(&stats_lock);
2157   }
2159   return chars;
2160 } /* }}} static int journal_write */
2162 static int journal_replay (const char *file) /* {{{ */
2164   FILE *fh;
2165   int entry_cnt = 0;
2166   int fail_cnt = 0;
2167   uint64_t line = 0;
2168   char entry[CMD_MAX];
2169   time_t now;
2171   if (file == NULL) return 0;
2173   {
2174     char *reason = "unknown error";
2175     int status = 0;
2176     struct stat statbuf;
2178     memset(&statbuf, 0, sizeof(statbuf));
2179     if (stat(file, &statbuf) != 0)
2180     {
2181       reason = "stat error";
2182       status = errno;
2183     }
2184     else if (!S_ISREG(statbuf.st_mode))
2185     {
2186       reason = "not a regular file";
2187       status = EPERM;
2188     }
2189     if (statbuf.st_uid != daemon_uid)
2190     {
2191       reason = "not owned by daemon user";
2192       status = EACCES;
2193     }
2194     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2195     {
2196       reason = "must not be user/group writable";
2197       status = EACCES;
2198     }
2200     if (status != 0)
2201     {
2202       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2203                file, rrd_strerror(status), reason);
2204       return 0;
2205     }
2206   }
2208   fh = fopen(file, "r");
2209   if (fh == NULL)
2210   {
2211     if (errno != ENOENT)
2212       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2213                file, rrd_strerror(errno));
2214     return 0;
2215   }
2216   else
2217     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2219   now = time(NULL);
2221   while(!feof(fh))
2222   {
2223     size_t entry_len;
2225     ++line;
2226     if (fgets(entry, sizeof(entry), fh) == NULL)
2227       break;
2228     entry_len = strlen(entry);
2230     /* check \n termination in case journal writing crashed mid-line */
2231     if (entry_len == 0)
2232       continue;
2233     else if (entry[entry_len - 1] != '\n')
2234     {
2235       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2236       ++fail_cnt;
2237       continue;
2238     }
2240     entry[entry_len - 1] = '\0';
2242     if (handle_request(NULL, now, entry, entry_len) == 0)
2243       ++entry_cnt;
2244     else
2245       ++fail_cnt;
2246   }
2248   fclose(fh);
2250   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2251            entry_cnt, fail_cnt);
2253   return entry_cnt > 0 ? 1 : 0;
2254 } /* }}} static int journal_replay */
2256 static int journal_sort(const void *v1, const void *v2)
2258   char **jn1 = (char **) v1;
2259   char **jn2 = (char **) v2;
2261   return strcmp(*jn1,*jn2);
2264 static void journal_init(void) /* {{{ */
2266   int had_journal = 0;
2267   DIR *dir;
2268   struct dirent *dent;
2269   char path[PATH_MAX+1];
2271   if (journal_dir == NULL) return;
2273   pthread_mutex_lock(&journal_lock);
2275   journal_cur = calloc(1, sizeof(journal_set));
2276   if (journal_cur == NULL)
2277   {
2278     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2279     return;
2280   }
2282   RRDD_LOG(LOG_INFO, "checking for journal files");
2284   /* Handle old journal files during transition.  This gives them the
2285    * correct sort order.  TODO: remove after first release
2286    */
2287   {
2288     char old_path[PATH_MAX+1];
2289     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2290     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2291     rename(old_path, path);
2293     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2294     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2295     rename(old_path, path);
2296   }
2298   dir = opendir(journal_dir);
2299   while ((dent = readdir(dir)) != NULL)
2300   {
2301     /* looks like a journal file? */
2302     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2303       continue;
2305     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2307     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2308     {
2309       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2310                dent->d_name);
2311       break;
2312     }
2313   }
2314   closedir(dir);
2316   qsort(journal_cur->files, journal_cur->files_num,
2317         sizeof(journal_cur->files[0]), journal_sort);
2319   for (uint i=0; i < journal_cur->files_num; i++)
2320     had_journal += journal_replay(journal_cur->files[i]);
2322   journal_new_file();
2324   /* it must have been a crash.  start a flush */
2325   if (had_journal && config_flush_at_shutdown)
2326     flush_old_values(-1);
2328   pthread_mutex_unlock(&journal_lock);
2330   RRDD_LOG(LOG_INFO, "journal processing complete");
2332 } /* }}} static void journal_init */
2334 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2336   assert(sock != NULL);
2338   free(sock->rbuf);  sock->rbuf = NULL;
2339   free(sock->wbuf);  sock->wbuf = NULL;
2340   free(sock);
2341 } /* }}} void free_listen_socket */
2343 static void close_connection(listen_socket_t *sock) /* {{{ */
2345   if (sock->fd >= 0)
2346   {
2347     close(sock->fd);
2348     sock->fd = -1;
2349   }
2351   free_listen_socket(sock);
2353 } /* }}} void close_connection */
2355 static void *connection_thread_main (void *args) /* {{{ */
2357   listen_socket_t *sock;
2358   int fd;
2360   sock = (listen_socket_t *) args;
2361   fd = sock->fd;
2363   /* init read buffers */
2364   sock->next_read = sock->next_cmd = 0;
2365   sock->rbuf = malloc(RBUF_SIZE);
2366   if (sock->rbuf == NULL)
2367   {
2368     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2369     close_connection(sock);
2370     return NULL;
2371   }
2373   pthread_mutex_lock (&connection_threads_lock);
2374   connection_threads_num++;
2375   pthread_mutex_unlock (&connection_threads_lock);
2377   while (state == RUNNING)
2378   {
2379     char *cmd;
2380     ssize_t cmd_len;
2381     ssize_t rbytes;
2382     time_t now;
2384     struct pollfd pollfd;
2385     int status;
2387     pollfd.fd = fd;
2388     pollfd.events = POLLIN | POLLPRI;
2389     pollfd.revents = 0;
2391     status = poll (&pollfd, 1, /* timeout = */ 500);
2392     if (state != RUNNING)
2393       break;
2394     else if (status == 0) /* timeout */
2395       continue;
2396     else if (status < 0) /* error */
2397     {
2398       status = errno;
2399       if (status != EINTR)
2400         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2401       continue;
2402     }
2404     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2405       break;
2406     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2407     {
2408       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2409           "poll(2) returned something unexpected: %#04hx",
2410           pollfd.revents);
2411       break;
2412     }
2414     rbytes = read(fd, sock->rbuf + sock->next_read,
2415                   RBUF_SIZE - sock->next_read);
2416     if (rbytes < 0)
2417     {
2418       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2419       break;
2420     }
2421     else if (rbytes == 0)
2422       break; /* eof */
2424     sock->next_read += rbytes;
2426     if (sock->batch_start)
2427       now = sock->batch_start;
2428     else
2429       now = time(NULL);
2431     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2432     {
2433       status = handle_request (sock, now, cmd, cmd_len+1);
2434       if (status != 0)
2435         goto out_close;
2436     }
2437   }
2439 out_close:
2440   close_connection(sock);
2442   /* Remove this thread from the connection threads list */
2443   pthread_mutex_lock (&connection_threads_lock);
2444   connection_threads_num--;
2445   if (connection_threads_num <= 0)
2446     pthread_cond_broadcast(&connection_threads_done);
2447   pthread_mutex_unlock (&connection_threads_lock);
2449   return (NULL);
2450 } /* }}} void *connection_thread_main */
2452 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2454   int fd;
2455   struct sockaddr_un sa;
2456   listen_socket_t *temp;
2457   int status;
2458   const char *path;
2459   char *path_copy, *dir;
2461   path = sock->addr;
2462   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2463     path += strlen("unix:");
2465   /* dirname may modify its argument */
2466   path_copy = strdup(path);
2467   if (path_copy == NULL)
2468   {
2469     fprintf(stderr, "rrdcached: strdup(): %s\n",
2470         rrd_strerror(errno));
2471     return (-1);
2472   }
2474   dir = dirname(path_copy);
2475   if (rrd_mkdir_p(dir, 0777) != 0)
2476   {
2477     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2478         dir, rrd_strerror(errno));
2479     return (-1);
2480   }
2482   free(path_copy);
2484   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2485       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2486   if (temp == NULL)
2487   {
2488     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2489     return (-1);
2490   }
2491   listen_fds = temp;
2492   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2494   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2495   if (fd < 0)
2496   {
2497     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2498              rrd_strerror(errno));
2499     return (-1);
2500   }
2502   memset (&sa, 0, sizeof (sa));
2503   sa.sun_family = AF_UNIX;
2504   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2506   /* if we've gotten this far, we own the pid file.  any daemon started
2507    * with the same args must not be alive.  therefore, ensure that we can
2508    * create the socket...
2509    */
2510   unlink(path);
2512   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2513   if (status != 0)
2514   {
2515     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2516              path, rrd_strerror(errno));
2517     close (fd);
2518     return (-1);
2519   }
2521   /* tweak the sockets group ownership */
2522   if (sock->socket_group != (gid_t)-1)
2523   {
2524     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2525          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2526     {
2527       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2528     }
2529   }
2531   if (sock->socket_permissions != (mode_t)-1)
2532   {
2533     if (chmod(path, sock->socket_permissions) != 0)
2534       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2535           (unsigned int)sock->socket_permissions, strerror(errno));
2536   }
2538   status = listen (fd, /* backlog = */ 10);
2539   if (status != 0)
2540   {
2541     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2542              path, rrd_strerror(errno));
2543     close (fd);
2544     unlink (path);
2545     return (-1);
2546   }
2548   listen_fds[listen_fds_num].fd = fd;
2549   listen_fds[listen_fds_num].family = PF_UNIX;
2550   strncpy(listen_fds[listen_fds_num].addr, path,
2551           sizeof (listen_fds[listen_fds_num].addr) - 1);
2552   listen_fds_num++;
2554   return (0);
2555 } /* }}} int open_listen_socket_unix */
2557 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2559   struct addrinfo ai_hints;
2560   struct addrinfo *ai_res;
2561   struct addrinfo *ai_ptr;
2562   char addr_copy[NI_MAXHOST];
2563   char *addr;
2564   char *port;
2565   int status;
2567   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2568   addr_copy[sizeof (addr_copy) - 1] = 0;
2569   addr = addr_copy;
2571   memset (&ai_hints, 0, sizeof (ai_hints));
2572   ai_hints.ai_flags = 0;
2573 #ifdef AI_ADDRCONFIG
2574   ai_hints.ai_flags |= AI_ADDRCONFIG;
2575 #endif
2576   ai_hints.ai_family = AF_UNSPEC;
2577   ai_hints.ai_socktype = SOCK_STREAM;
2579   port = NULL;
2580   if (*addr == '[') /* IPv6+port format */
2581   {
2582     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2583     addr++;
2585     port = strchr (addr, ']');
2586     if (port == NULL)
2587     {
2588       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2589       return (-1);
2590     }
2591     *port = 0;
2592     port++;
2594     if (*port == ':')
2595       port++;
2596     else if (*port == 0)
2597       port = NULL;
2598     else
2599     {
2600       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2601       return (-1);
2602     }
2603   } /* if (*addr == '[') */
2604   else
2605   {
2606     port = rindex(addr, ':');
2607     if (port != NULL)
2608     {
2609       *port = 0;
2610       port++;
2611     }
2612   }
2613   ai_res = NULL;
2614   status = getaddrinfo (addr,
2615                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2616                         &ai_hints, &ai_res);
2617   if (status != 0)
2618   {
2619     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2620              addr, gai_strerror (status));
2621     return (-1);
2622   }
2624   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2625   {
2626     int fd;
2627     listen_socket_t *temp;
2628     int one = 1;
2630     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2631         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2632     if (temp == NULL)
2633     {
2634       fprintf (stderr,
2635                "rrdcached: open_listen_socket_network: realloc failed.\n");
2636       continue;
2637     }
2638     listen_fds = temp;
2639     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2641     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2642     if (fd < 0)
2643     {
2644       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2645                rrd_strerror(errno));
2646       continue;
2647     }
2649     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2651     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2652     if (status != 0)
2653     {
2654       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2655                sock->addr, rrd_strerror(errno));
2656       close (fd);
2657       continue;
2658     }
2660     status = listen (fd, /* backlog = */ 10);
2661     if (status != 0)
2662     {
2663       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2664                sock->addr, rrd_strerror(errno));
2665       close (fd);
2666       freeaddrinfo(ai_res);
2667       return (-1);
2668     }
2670     listen_fds[listen_fds_num].fd = fd;
2671     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2672     listen_fds_num++;
2673   } /* for (ai_ptr) */
2675   freeaddrinfo(ai_res);
2676   return (0);
2677 } /* }}} static int open_listen_socket_network */
2679 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2681   assert(sock != NULL);
2682   assert(sock->addr != NULL);
2684   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2685       || sock->addr[0] == '/')
2686     return (open_listen_socket_unix(sock));
2687   else
2688     return (open_listen_socket_network(sock));
2689 } /* }}} int open_listen_socket */
2691 static int close_listen_sockets (void) /* {{{ */
2693   size_t i;
2695   for (i = 0; i < listen_fds_num; i++)
2696   {
2697     close (listen_fds[i].fd);
2699     if (listen_fds[i].family == PF_UNIX)
2700       unlink(listen_fds[i].addr);
2701   }
2703   free (listen_fds);
2704   listen_fds = NULL;
2705   listen_fds_num = 0;
2707   return (0);
2708 } /* }}} int close_listen_sockets */
2710 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2712   struct pollfd *pollfds;
2713   int pollfds_num;
2714   int status;
2715   int i;
2717   if (listen_fds_num < 1)
2718   {
2719     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2720     return (NULL);
2721   }
2723   pollfds_num = listen_fds_num;
2724   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2725   if (pollfds == NULL)
2726   {
2727     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2728     return (NULL);
2729   }
2730   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2732   RRDD_LOG(LOG_INFO, "listening for connections");
2734   while (state == RUNNING)
2735   {
2736     for (i = 0; i < pollfds_num; i++)
2737     {
2738       pollfds[i].fd = listen_fds[i].fd;
2739       pollfds[i].events = POLLIN | POLLPRI;
2740       pollfds[i].revents = 0;
2741     }
2743     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2744     if (state != RUNNING)
2745       break;
2746     else if (status == 0) /* timeout */
2747       continue;
2748     else if (status < 0) /* error */
2749     {
2750       status = errno;
2751       if (status != EINTR)
2752       {
2753         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2754       }
2755       continue;
2756     }
2758     for (i = 0; i < pollfds_num; i++)
2759     {
2760       listen_socket_t *client_sock;
2761       struct sockaddr_storage client_sa;
2762       socklen_t client_sa_size;
2763       pthread_t tid;
2764       pthread_attr_t attr;
2766       if (pollfds[i].revents == 0)
2767         continue;
2769       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2770       {
2771         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2772             "poll(2) returned something unexpected for listen FD #%i.",
2773             pollfds[i].fd);
2774         continue;
2775       }
2777       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2778       if (client_sock == NULL)
2779       {
2780         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2781         continue;
2782       }
2783       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2785       client_sa_size = sizeof (client_sa);
2786       client_sock->fd = accept (pollfds[i].fd,
2787           (struct sockaddr *) &client_sa, &client_sa_size);
2788       if (client_sock->fd < 0)
2789       {
2790         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2791         free(client_sock);
2792         continue;
2793       }
2795       pthread_attr_init (&attr);
2796       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2798       status = pthread_create (&tid, &attr, connection_thread_main,
2799                                client_sock);
2800       if (status != 0)
2801       {
2802         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2803         close_connection(client_sock);
2804         continue;
2805       }
2806     } /* for (pollfds_num) */
2807   } /* while (state == RUNNING) */
2809   RRDD_LOG(LOG_INFO, "starting shutdown");
2811   close_listen_sockets ();
2813   pthread_mutex_lock (&connection_threads_lock);
2814   while (connection_threads_num > 0)
2815     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2816   pthread_mutex_unlock (&connection_threads_lock);
2818   free(pollfds);
2820   return (NULL);
2821 } /* }}} void *listen_thread_main */
2823 static int daemonize (void) /* {{{ */
2825   int pid_fd;
2826   char *base_dir;
2828   daemon_uid = geteuid();
2830   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2831   if (pid_fd < 0)
2832     pid_fd = check_pidfile();
2833   if (pid_fd < 0)
2834     return pid_fd;
2836   /* open all the listen sockets */
2837   if (config_listen_address_list_len > 0)
2838   {
2839     for (size_t i = 0; i < config_listen_address_list_len; i++)
2840       open_listen_socket (config_listen_address_list[i]);
2842     rrd_free_ptrs((void ***) &config_listen_address_list,
2843                   &config_listen_address_list_len);
2844   }
2845   else
2846   {
2847     listen_socket_t sock;
2848     memset(&sock, 0, sizeof(sock));
2849     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2850     open_listen_socket (&sock);
2851   }
2853   if (listen_fds_num < 1)
2854   {
2855     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2856     goto error;
2857   }
2859   if (!stay_foreground)
2860   {
2861     pid_t child;
2863     child = fork ();
2864     if (child < 0)
2865     {
2866       fprintf (stderr, "daemonize: fork(2) failed.\n");
2867       goto error;
2868     }
2869     else if (child > 0)
2870       exit(0);
2872     /* Become session leader */
2873     setsid ();
2875     /* Open the first three file descriptors to /dev/null */
2876     close (2);
2877     close (1);
2878     close (0);
2880     open ("/dev/null", O_RDWR);
2881     if (dup(0) == -1 || dup(0) == -1){
2882         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2883     }
2884   } /* if (!stay_foreground) */
2886   /* Change into the /tmp directory. */
2887   base_dir = (config_base_dir != NULL)
2888     ? config_base_dir
2889     : "/tmp";
2891   if (chdir (base_dir) != 0)
2892   {
2893     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2894     goto error;
2895   }
2897   install_signal_handlers();
2899   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2900   RRDD_LOG(LOG_INFO, "starting up");
2902   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2903                                 (GDestroyNotify) free_cache_item);
2904   if (cache_tree == NULL)
2905   {
2906     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2907     goto error;
2908   }
2910   return write_pidfile (pid_fd);
2912 error:
2913   remove_pidfile();
2914   return -1;
2915 } /* }}} int daemonize */
2917 static int cleanup (void) /* {{{ */
2919   pthread_cond_broadcast (&flush_cond);
2920   pthread_join (flush_thread, NULL);
2922   pthread_cond_broadcast (&queue_cond);
2923   for (int i = 0; i < config_queue_threads; i++)
2924     pthread_join (queue_threads[i], NULL);
2926   if (config_flush_at_shutdown)
2927   {
2928     assert(cache_queue_head == NULL);
2929     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2930   }
2932   free(queue_threads);
2933   free(config_base_dir);
2935   pthread_mutex_lock(&cache_lock);
2936   g_tree_destroy(cache_tree);
2938   pthread_mutex_lock(&journal_lock);
2939   journal_done();
2941   RRDD_LOG(LOG_INFO, "goodbye");
2942   closelog ();
2944   remove_pidfile ();
2945   free(config_pid_file);
2947   return (0);
2948 } /* }}} int cleanup */
2950 static int read_options (int argc, char **argv) /* {{{ */
2952   int option;
2953   int status = 0;
2955   char **permissions = NULL;
2956   size_t permissions_len = 0;
2958   gid_t  socket_group = (gid_t)-1;
2959   mode_t socket_permissions = (mode_t)-1;
2961   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2962   {
2963     switch (option)
2964     {
2965       case 'g':
2966         stay_foreground=1;
2967         break;
2969       case 'l':
2970       {
2971         listen_socket_t *new;
2973         new = malloc(sizeof(listen_socket_t));
2974         if (new == NULL)
2975         {
2976           fprintf(stderr, "read_options: malloc failed.\n");
2977           return(2);
2978         }
2979         memset(new, 0, sizeof(listen_socket_t));
2981         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2983         /* Add permissions to the socket {{{ */
2984         if (permissions_len != 0)
2985         {
2986           size_t i;
2987           for (i = 0; i < permissions_len; i++)
2988           {
2989             status = socket_permission_add (new, permissions[i]);
2990             if (status != 0)
2991             {
2992               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2993                   "socket failed. Most likely, this permission doesn't "
2994                   "exist. Check your command line.\n", permissions[i]);
2995               status = 4;
2996             }
2997           }
2998         }
2999         else /* if (permissions_len == 0) */
3000         {
3001           /* Add permission for ALL commands to the socket. */
3002           size_t i;
3003           for (i = 0; i < list_of_commands_len; i++)
3004           {
3005             status = socket_permission_add (new, list_of_commands[i].cmd);
3006             if (status != 0)
3007             {
3008               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3009                   "socket failed. This should never happen, ever! Sorry.\n",
3010                   permissions[i]);
3011               status = 4;
3012             }
3013           }
3014         }
3015         /* }}} Done adding permissions. */
3017         new->socket_group = socket_group;
3018         new->socket_permissions = socket_permissions;
3020         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3021                          &config_listen_address_list_len, new))
3022         {
3023           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3024           return (2);
3025         }
3026       }
3027       break;
3029       /* set socket group permissions */
3030       case 's':
3031       {
3032         gid_t group_gid;
3033         struct group *grp;
3035         group_gid = strtoul(optarg, NULL, 10);
3036         if (errno != EINVAL && group_gid>0)
3037         {
3038           /* we were passed a number */
3039           grp = getgrgid(group_gid);
3040         }
3041         else
3042         {
3043           grp = getgrnam(optarg);
3044         }
3046         if (grp)
3047         {
3048           socket_group = grp->gr_gid;
3049         }
3050         else
3051         {
3052           /* no idea what the user wanted... */
3053           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3054           return (5);
3055         }
3056       }
3057       break;
3059       /* set socket file permissions */
3060       case 'm':
3061       {
3062         long  tmp;
3063         char *endptr = NULL;
3065         tmp = strtol (optarg, &endptr, 8);
3066         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3067             || (tmp > 07777) || (tmp < 0)) {
3068           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3069               optarg);
3070           return (5);
3071         }
3073         socket_permissions = (mode_t)tmp;
3074       }
3075       break;
3077       case 'P':
3078       {
3079         char *optcopy;
3080         char *saveptr;
3081         char *dummy;
3082         char *ptr;
3084         rrd_free_ptrs ((void *) &permissions, &permissions_len);
3086         optcopy = strdup (optarg);
3087         dummy = optcopy;
3088         saveptr = NULL;
3089         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3090         {
3091           dummy = NULL;
3092           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3093         }
3095         free (optcopy);
3096       }
3097       break;
3099       case 'f':
3100       {
3101         int temp;
3103         temp = atoi (optarg);
3104         if (temp > 0)
3105           config_flush_interval = temp;
3106         else
3107         {
3108           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3109           status = 3;
3110         }
3111       }
3112       break;
3114       case 'w':
3115       {
3116         int temp;
3118         temp = atoi (optarg);
3119         if (temp > 0)
3120           config_write_interval = temp;
3121         else
3122         {
3123           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3124           status = 2;
3125         }
3126       }
3127       break;
3129       case 'z':
3130       {
3131         int temp;
3133         temp = atoi(optarg);
3134         if (temp > 0)
3135           config_write_jitter = temp;
3136         else
3137         {
3138           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3139           status = 2;
3140         }
3142         break;
3143       }
3145       case 't':
3146       {
3147         int threads;
3148         threads = atoi(optarg);
3149         if (threads >= 1)
3150           config_queue_threads = threads;
3151         else
3152         {
3153           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3154           return 1;
3155         }
3156       }
3157       break;
3159       case 'B':
3160         config_write_base_only = 1;
3161         break;
3163       case 'b':
3164       {
3165         size_t len;
3166         char base_realpath[PATH_MAX];
3168         if (config_base_dir != NULL)
3169           free (config_base_dir);
3170         config_base_dir = strdup (optarg);
3171         if (config_base_dir == NULL)
3172         {
3173           fprintf (stderr, "read_options: strdup failed.\n");
3174           return (3);
3175         }
3177         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3178         {
3179           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3180               config_base_dir, rrd_strerror (errno));
3181           return (3);
3182         }
3184         /* make sure that the base directory is not resolved via
3185          * symbolic links.  this makes some performance-enhancing
3186          * assumptions possible (we don't have to resolve paths
3187          * that start with a "/")
3188          */
3189         if (realpath(config_base_dir, base_realpath) == NULL)
3190         {
3191           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3192               "%s\n", config_base_dir, rrd_strerror(errno));
3193           return 5;
3194         }
3196         len = strlen (config_base_dir);
3197         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3198         {
3199           config_base_dir[len - 1] = 0;
3200           len--;
3201         }
3203         if (len < 1)
3204         {
3205           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3206           return (4);
3207         }
3209         _config_base_dir_len = len;
3211         len = strlen (base_realpath);
3212         while ((len > 0) && (base_realpath[len - 1] == '/'))
3213         {
3214           base_realpath[len - 1] = '\0';
3215           len--;
3216         }
3218         if (strncmp(config_base_dir,
3219                          base_realpath, sizeof(base_realpath)) != 0)
3220         {
3221           fprintf(stderr,
3222                   "Base directory (-b) resolved via file system links!\n"
3223                   "Please consult rrdcached '-b' documentation!\n"
3224                   "Consider specifying the real directory (%s)\n",
3225                   base_realpath);
3226           return 5;
3227         }
3228       }
3229       break;
3231       case 'p':
3232       {
3233         if (config_pid_file != NULL)
3234           free (config_pid_file);
3235         config_pid_file = strdup (optarg);
3236         if (config_pid_file == NULL)
3237         {
3238           fprintf (stderr, "read_options: strdup failed.\n");
3239           return (3);
3240         }
3241       }
3242       break;
3244       case 'F':
3245         config_flush_at_shutdown = 1;
3246         break;
3248       case 'j':
3249       {
3250         const char *dir = journal_dir = strdup(optarg);
3252         status = rrd_mkdir_p(dir, 0777);
3253         if (status != 0)
3254         {
3255           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3256               dir, rrd_strerror(errno));
3257           return 6;
3258         }
3260         if (access(dir, R_OK|W_OK|X_OK) != 0)
3261         {
3262           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3263                   errno ? rrd_strerror(errno) : "");
3264           return 6;
3265         }
3266       }
3267       break;
3269       case 'a':
3270       {
3271         int temp = atoi(optarg);
3272         if (temp > 0)
3273           config_alloc_chunk = temp;
3274         else
3275         {
3276           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3277           return 10;
3278         }
3279       }
3280       break;
3282       case 'h':
3283       case '?':
3284         printf ("RRDCacheD %s\n"
3285             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3286             "\n"
3287             "Usage: rrdcached [options]\n"
3288             "\n"
3289             "Valid options are:\n"
3290             "  -l <address>  Socket address to listen to.\n"
3291             "  -P <perms>    Sets the permissions to assign to all following "
3292                             "sockets\n"
3293             "  -w <seconds>  Interval in which to write data.\n"
3294             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3295             "  -t <threads>  Number of write threads.\n"
3296             "  -f <seconds>  Interval in which to flush dead data.\n"
3297             "  -p <file>     Location of the PID-file.\n"
3298             "  -b <dir>      Base directory to change to.\n"
3299             "  -B            Restrict file access to paths within -b <dir>\n"
3300             "  -g            Do not fork and run in the foreground.\n"
3301             "  -j <dir>      Directory in which to create the journal files.\n"
3302             "  -F            Always flush all updates at shutdown\n"
3303             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3304             "                (the socket will also have read/write permissions "
3305                             "for that group)\n"
3306             "  -m <mode>     File permissions (octal) of all following UNIX "
3307                             "sockets\n"
3308             "  -a <size>     Memory allocation chunk size. Default is 1."
3309             "\n"
3310             "For more information and a detailed description of all options "
3311             "please refer\n"
3312             "to the rrdcached(1) manual page.\n",
3313             VERSION);
3314         if (option == 'h')
3315           status = -1;
3316         else
3317           status = 1;
3318         break;
3319     } /* switch (option) */
3320   } /* while (getopt) */
3322   /* advise the user when values are not sane */
3323   if (config_flush_interval < 2 * config_write_interval)
3324     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3325             " 2x write interval (-w) !\n");
3326   if (config_write_jitter > config_write_interval)
3327     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3328             " write interval (-w) !\n");
3330   if (config_write_base_only && config_base_dir == NULL)
3331     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3332             "  Consult the rrdcached documentation\n");
3334   if (journal_dir == NULL)
3335     config_flush_at_shutdown = 1;
3337   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3339   return (status);
3340 } /* }}} int read_options */
3342 int main (int argc, char **argv)
3344   int status;
3346   status = read_options (argc, argv);
3347   if (status != 0)
3348   {
3349     if (status < 0)
3350       status = 0;
3351     return (status);
3352   }
3354   status = daemonize ();
3355   if (status != 0)
3356   {
3357     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3358     return (1);
3359   }
3361   journal_init();
3363   /* start the queue threads */
3364   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3365   if (queue_threads == NULL)
3366   {
3367     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3368     cleanup();
3369     return (1);
3370   }
3371   for (int i = 0; i < config_queue_threads; i++)
3372   {
3373     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3374     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3375     if (status != 0)
3376     {
3377       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3378       cleanup();
3379       return (1);
3380     }
3381   }
3383   /* start the flush thread */
3384   memset(&flush_thread, 0, sizeof(flush_thread));
3385   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3386   if (status != 0)
3387   {
3388     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3389     cleanup();
3390     return (1);
3391   }
3393   listen_thread_main (NULL);
3394   cleanup ();
3396   return (0);
3397 } /* int main */
3399 /*
3400  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3401  */