Code

495a313d28e4cfd240bd5408c4c7cbf8293e5100
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120   do { \
121     if (stay_foreground) { \
122       fprintf(stderr, __VA_ARGS__); \
123       fprintf(stderr, "\n"); } \
124     syslog ((severity), __VA_ARGS__); \
125   } while (0)
127 /*
128  * Types
129  */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
134   int fd;
135   char addr[PATH_MAX + 1];
136   int family;
138   /* state for BATCH processing */
139   time_t batch_start;
140   int batch_cmd;
142   /* buffered IO */
143   char *rbuf;
144   off_t next_cmd;
145   off_t next_read;
147   char *wbuf;
148   ssize_t wbuf_len;
150   uint32_t permissions;
152   gid_t  socket_group;
153   mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
161                         time_t UNUSED(now),\
162                         char  UNUSED(*buffer),\
163                         size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
166                         DISPATCH_PROTO
168 struct command_s {
169   char   *cmd;
170   int (*handler)(HANDLER_PROTO);
172   char  context;                /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT      (1<<0)
174 #define CMD_CONTEXT_BATCH       (1<<1)
175 #define CMD_CONTEXT_JOURNAL     (1<<2)
176 #define CMD_CONTEXT_ANY         (0x7f)
178   char *syntax;
179   char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
186   char *file;
187   char **values;
188   size_t values_num;            /* number of valid pointers */
189   size_t values_alloc;          /* number of allocated pointers */
190   time_t last_flush_time;
191   double last_update_stamp;
192 #define CI_FLAGS_IN_TREE  (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194   int flags;
195   pthread_cond_t  flushed;
196   cache_item_t *prev;
197   cache_item_t *next;
198 };
200 struct callback_flush_data_s
202   time_t now;
203   time_t abs_timeout;
204   char **keys;
205   size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
209 enum queue_side_e
211   HEAD,
212   TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
216 /* describe a set of journal files */
217 typedef struct {
218   char **files;
219   size_t files_num;
220 } journal_set;
222 #define RBUF_SIZE (RRD_CMD_MAX*2)
224 /*
225  * Variables
226  */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
233 static listen_socket_t default_socket;
235 enum {
236   RUNNING,              /* normal operation */
237   FLUSHING,             /* flushing remaining values */
238   SHUTDOWN              /* shutting down */
239 } state = RUNNING;
241 static pthread_t *queue_threads;
242 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
243 static int config_queue_threads = 4;
245 static pthread_t flush_thread;
246 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
248 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
249 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
250 static int connection_threads_num = 0;
252 /* Cache stuff */
253 static GTree          *cache_tree = NULL;
254 static cache_item_t   *cache_queue_head = NULL;
255 static cache_item_t   *cache_queue_tail = NULL;
256 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
258 static int config_write_interval = 300;
259 static int config_write_jitter   = 0;
260 static int config_flush_interval = 3600;
261 static int config_flush_at_shutdown = 0;
262 static char *config_pid_file = NULL;
263 static char *config_base_dir = NULL;
264 static size_t _config_base_dir_len = 0;
265 static int config_write_base_only = 0;
266 static size_t config_alloc_chunk = 1;
268 static listen_socket_t **config_listen_address_list = NULL;
269 static size_t config_listen_address_list_len = 0;
271 static uint64_t stats_queue_length = 0;
272 static uint64_t stats_updates_received = 0;
273 static uint64_t stats_flush_received = 0;
274 static uint64_t stats_updates_written = 0;
275 static uint64_t stats_data_sets_written = 0;
276 static uint64_t stats_journal_bytes = 0;
277 static uint64_t stats_journal_rotate = 0;
278 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
280 static int opt_no_overwrite = 0; /* default for the daemon */
282 /* Journaled updates */
283 #define JOURNAL_REPLAY(s) ((s) == NULL)
284 #define JOURNAL_BASE "rrd.journal"
285 static journal_set *journal_cur = NULL;
286 static journal_set *journal_old = NULL;
287 static char *journal_dir = NULL;
288 static FILE *journal_fh = NULL;         /* current journal file handle */
289 static long  journal_size = 0;          /* current journal size */
290 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
291 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
292 static int journal_write(char *cmd, char *args);
293 static void journal_done(void);
294 static void journal_rotate(void);
296 /* prototypes for forward refernces */
297 static int handle_request_help (HANDLER_PROTO);
299 /* 
300  * Functions
301  */
302 static void sig_common (const char *sig) /* {{{ */
304   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
305   state = FLUSHING;
306   pthread_cond_broadcast(&flush_cond);
307   pthread_cond_broadcast(&queue_cond);
308 } /* }}} void sig_common */
310 static void sig_int_handler (int UNUSED(s)) /* {{{ */
312   sig_common("INT");
313 } /* }}} void sig_int_handler */
315 static void sig_term_handler (int UNUSED(s)) /* {{{ */
317   sig_common("TERM");
318 } /* }}} void sig_term_handler */
320 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
322   config_flush_at_shutdown = 1;
323   sig_common("USR1");
324 } /* }}} void sig_usr1_handler */
326 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
328   config_flush_at_shutdown = 0;
329   sig_common("USR2");
330 } /* }}} void sig_usr2_handler */
332 static void install_signal_handlers(void) /* {{{ */
334   /* These structures are static, because `sigaction' behaves weird if the are
335    * overwritten.. */
336   static struct sigaction sa_int;
337   static struct sigaction sa_term;
338   static struct sigaction sa_pipe;
339   static struct sigaction sa_usr1;
340   static struct sigaction sa_usr2;
342   /* Install signal handlers */
343   memset (&sa_int, 0, sizeof (sa_int));
344   sa_int.sa_handler = sig_int_handler;
345   sigaction (SIGINT, &sa_int, NULL);
347   memset (&sa_term, 0, sizeof (sa_term));
348   sa_term.sa_handler = sig_term_handler;
349   sigaction (SIGTERM, &sa_term, NULL);
351   memset (&sa_pipe, 0, sizeof (sa_pipe));
352   sa_pipe.sa_handler = SIG_IGN;
353   sigaction (SIGPIPE, &sa_pipe, NULL);
355   memset (&sa_pipe, 0, sizeof (sa_usr1));
356   sa_usr1.sa_handler = sig_usr1_handler;
357   sigaction (SIGUSR1, &sa_usr1, NULL);
359   memset (&sa_usr2, 0, sizeof (sa_usr2));
360   sa_usr2.sa_handler = sig_usr2_handler;
361   sigaction (SIGUSR2, &sa_usr2, NULL);
363 } /* }}} void install_signal_handlers */
365 static int open_pidfile(char *action, int oflag) /* {{{ */
367   int fd;
368   const char *file;
369   char *file_copy, *dir;
371   file = (config_pid_file != NULL)
372     ? config_pid_file
373     : LOCALSTATEDIR "/run/rrdcached.pid";
375   /* dirname may modify its argument */
376   file_copy = strdup(file);
377   if (file_copy == NULL)
378   {
379     fprintf(stderr, "rrdcached: strdup(): %s\n",
380         rrd_strerror(errno));
381     return -1;
382   }
384   dir = dirname(file_copy);
385   if (rrd_mkdir_p(dir, 0777) != 0)
386   {
387     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
388         dir, rrd_strerror(errno));
389     return -1;
390   }
392   free(file_copy);
394   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
395   if (fd < 0)
396     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
397             action, file, rrd_strerror(errno));
399   return(fd);
400 } /* }}} static int open_pidfile */
402 /* check existing pid file to see whether a daemon is running */
403 static int check_pidfile(void)
405   int pid_fd;
406   pid_t pid;
407   char pid_str[16];
409   pid_fd = open_pidfile("open", O_RDWR);
410   if (pid_fd < 0)
411     return pid_fd;
413   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
414     return -1;
416   pid = atoi(pid_str);
417   if (pid <= 0)
418     return -1;
420   /* another running process that we can signal COULD be
421    * a competing rrdcached */
422   if (pid != getpid() && kill(pid, 0) == 0)
423   {
424     fprintf(stderr,
425             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
426     close(pid_fd);
427     return -1;
428   }
430   lseek(pid_fd, 0, SEEK_SET);
431   if (ftruncate(pid_fd, 0) == -1)
432   {
433     fprintf(stderr,
434             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
435     close(pid_fd);
436     return -1;
437   }
439   fprintf(stderr,
440           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
441           "rrdcached: starting normally.\n", pid);
443   return pid_fd;
444 } /* }}} static int check_pidfile */
446 static int write_pidfile (int fd) /* {{{ */
448   pid_t pid;
449   FILE *fh;
451   pid = getpid ();
453   fh = fdopen (fd, "w");
454   if (fh == NULL)
455   {
456     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
457     close(fd);
458     return (-1);
459   }
461   fprintf (fh, "%i\n", (int) pid);
462   fclose (fh);
464   return (0);
465 } /* }}} int write_pidfile */
467 static int remove_pidfile (void) /* {{{ */
469   char *file;
470   int status;
472   file = (config_pid_file != NULL)
473     ? config_pid_file
474     : LOCALSTATEDIR "/run/rrdcached.pid";
476   status = unlink (file);
477   if (status == 0)
478     return (0);
479   return (errno);
480 } /* }}} int remove_pidfile */
482 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
484   char *eol;
486   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
487                sock->next_read - sock->next_cmd);
489   if (eol == NULL)
490   {
491     /* no commands left, move remainder back to front of rbuf */
492     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
493             sock->next_read - sock->next_cmd);
494     sock->next_read -= sock->next_cmd;
495     sock->next_cmd = 0;
496     *len = 0;
497     return NULL;
498   }
499   else
500   {
501     char *cmd = sock->rbuf + sock->next_cmd;
502     *eol = '\0';
504     sock->next_cmd = eol - sock->rbuf + 1;
506     if (eol > sock->rbuf && *(eol-1) == '\r')
507       *(--eol) = '\0'; /* handle "\r\n" EOL */
509     *len = eol - cmd;
511     return cmd;
512   }
514   /* NOTREACHED */
515   assert(1==0);
516 } /* }}} char *next_cmd */
518 /* add the characters directly to the write buffer */
519 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
521   char *new_buf;
523   assert(sock != NULL);
525   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
526   if (new_buf == NULL)
527   {
528     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
529     return -1;
530   }
532   strncpy(new_buf + sock->wbuf_len, str, len + 1);
534   sock->wbuf = new_buf;
535   sock->wbuf_len += len;
537   return 0;
538 } /* }}} static int add_to_wbuf */
540 /* add the text to the "extra" info that's sent after the status line */
541 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
543   va_list argp;
544   char buffer[RRD_CMD_MAX];
545   int len;
547   if (JOURNAL_REPLAY(sock)) return 0;
548   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
550   va_start(argp, fmt);
551 #ifdef HAVE_VSNPRINTF
552   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
553 #else
554   len = vsprintf(buffer, fmt, argp);
555 #endif
556   va_end(argp);
557   if (len < 0)
558   {
559     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
560     return -1;
561   }
563   return add_to_wbuf(sock, buffer, len);
564 } /* }}} static int add_response_info */
566 static int count_lines(char *str) /* {{{ */
568   int lines = 0;
570   if (str != NULL)
571   {
572     while ((str = strchr(str, '\n')) != NULL)
573     {
574       ++lines;
575       ++str;
576     }
577   }
579   return lines;
580 } /* }}} static int count_lines */
582 /* send the response back to the user.
583  * returns 0 on success, -1 on error
584  * write buffer is always zeroed after this call */
585 static int send_response (listen_socket_t *sock, response_code rc,
586                           char *fmt, ...) /* {{{ */
588   va_list argp;
589   char buffer[RRD_CMD_MAX];
590   int lines;
591   ssize_t wrote;
592   int rclen, len;
594   if (JOURNAL_REPLAY(sock)) return rc;
596   if (sock->batch_start)
597   {
598     if (rc == RESP_OK)
599       return rc; /* no response on success during BATCH */
600     lines = sock->batch_cmd;
601   }
602   else if (rc == RESP_OK)
603     lines = count_lines(sock->wbuf);
604   else
605     lines = -1;
607   rclen = sprintf(buffer, "%d ", lines);
608   va_start(argp, fmt);
609 #ifdef HAVE_VSNPRINTF
610   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
611 #else
612   len = vsprintf(buffer+rclen, fmt, argp);
613 #endif
614   va_end(argp);
615   if (len < 0)
616     return -1;
618   len += rclen;
620   /* append the result to the wbuf, don't write to the user */
621   if (sock->batch_start)
622     return add_to_wbuf(sock, buffer, len);
624   /* first write must be complete */
625   if (len != write(sock->fd, buffer, len))
626   {
627     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
628     return -1;
629   }
631   if (sock->wbuf != NULL && rc == RESP_OK)
632   {
633     wrote = 0;
634     while (wrote < sock->wbuf_len)
635     {
636       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
637       if (wb <= 0)
638       {
639         RRDD_LOG(LOG_INFO, "send_response: could not write results");
640         return -1;
641       }
642       wrote += wb;
643     }
644   }
646   free(sock->wbuf); sock->wbuf = NULL;
647   sock->wbuf_len = 0;
649   return 0;
650 } /* }}} */
652 static void wipe_ci_values(cache_item_t *ci, time_t when)
654   ci->values = NULL;
655   ci->values_num = 0;
656   ci->values_alloc = 0;
658   ci->last_flush_time = when;
659   if (config_write_jitter > 0)
660     ci->last_flush_time += (rrd_random() % config_write_jitter);
663 /* remove_from_queue
664  * remove a "cache_item_t" item from the queue.
665  * must hold 'cache_lock' when calling this
666  */
667 static void remove_from_queue(cache_item_t *ci) /* {{{ */
669   if (ci == NULL) return;
670   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
672   if (ci->prev == NULL)
673     cache_queue_head = ci->next; /* reset head */
674   else
675     ci->prev->next = ci->next;
677   if (ci->next == NULL)
678     cache_queue_tail = ci->prev; /* reset the tail */
679   else
680     ci->next->prev = ci->prev;
682   ci->next = ci->prev = NULL;
683   ci->flags &= ~CI_FLAGS_IN_QUEUE;
685   pthread_mutex_lock (&stats_lock);
686   assert (stats_queue_length > 0);
687   stats_queue_length--;
688   pthread_mutex_unlock (&stats_lock);
690 } /* }}} static void remove_from_queue */
692 /* free the resources associated with the cache_item_t
693  * must hold cache_lock when calling this function
694  */
695 static void *free_cache_item(cache_item_t *ci) /* {{{ */
697   if (ci == NULL) return NULL;
699   remove_from_queue(ci);
701   for (size_t i=0; i < ci->values_num; i++)
702     free(ci->values[i]);
704   free (ci->values);
705   free (ci->file);
707   /* in case anyone is waiting */
708   pthread_cond_broadcast(&ci->flushed);
709   pthread_cond_destroy(&ci->flushed);
711   free (ci);
713   return NULL;
714 } /* }}} static void *free_cache_item */
716 /*
717  * enqueue_cache_item:
718  * `cache_lock' must be acquired before calling this function!
719  */
720 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
721     queue_side_t side)
723   if (ci == NULL)
724     return (-1);
726   if (ci->values_num == 0)
727     return (0);
729   if (side == HEAD)
730   {
731     if (cache_queue_head == ci)
732       return 0;
734     /* remove if further down in queue */
735     remove_from_queue(ci);
737     ci->prev = NULL;
738     ci->next = cache_queue_head;
739     if (ci->next != NULL)
740       ci->next->prev = ci;
741     cache_queue_head = ci;
743     if (cache_queue_tail == NULL)
744       cache_queue_tail = cache_queue_head;
745   }
746   else /* (side == TAIL) */
747   {
748     /* We don't move values back in the list.. */
749     if (ci->flags & CI_FLAGS_IN_QUEUE)
750       return (0);
752     assert (ci->next == NULL);
753     assert (ci->prev == NULL);
755     ci->prev = cache_queue_tail;
757     if (cache_queue_tail == NULL)
758       cache_queue_head = ci;
759     else
760       cache_queue_tail->next = ci;
762     cache_queue_tail = ci;
763   }
765   ci->flags |= CI_FLAGS_IN_QUEUE;
767   pthread_cond_signal(&queue_cond);
768   pthread_mutex_lock (&stats_lock);
769   stats_queue_length++;
770   pthread_mutex_unlock (&stats_lock);
772   return (0);
773 } /* }}} int enqueue_cache_item */
775 /*
776  * tree_callback_flush:
777  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
778  * while this is in progress.
779  */
780 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
781     gpointer data)
783   cache_item_t *ci;
784   callback_flush_data_t *cfd;
786   ci = (cache_item_t *) value;
787   cfd = (callback_flush_data_t *) data;
789   if (ci->flags & CI_FLAGS_IN_QUEUE)
790     return FALSE;
792   if (ci->values_num > 0
793       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
794   {
795     enqueue_cache_item (ci, TAIL);
796   }
797   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
798       && (ci->values_num <= 0))
799   {
800     assert ((char *) key == ci->file);
801     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
802     {
803       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
804       return (FALSE);
805     }
806   }
808   return (FALSE);
809 } /* }}} gboolean tree_callback_flush */
811 static int flush_old_values (int max_age)
813   callback_flush_data_t cfd;
814   size_t k;
816   memset (&cfd, 0, sizeof (cfd));
817   /* Pass the current time as user data so that we don't need to call
818    * `time' for each node. */
819   cfd.now = time (NULL);
820   cfd.keys = NULL;
821   cfd.keys_num = 0;
823   if (max_age > 0)
824     cfd.abs_timeout = cfd.now - max_age;
825   else
826     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
828   /* `tree_callback_flush' will return the keys of all values that haven't
829    * been touched in the last `config_flush_interval' seconds in `cfd'.
830    * The char*'s in this array point to the same memory as ci->file, so we
831    * don't need to free them separately. */
832   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
834   for (k = 0; k < cfd.keys_num; k++)
835   {
836     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
837     /* should never fail, since we have held the cache_lock
838      * the entire time */
839     assert(status == TRUE);
840   }
842   if (cfd.keys != NULL)
843   {
844     free (cfd.keys);
845     cfd.keys = NULL;
846   }
848   return (0);
849 } /* int flush_old_values */
851 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
853   struct timeval now;
854   struct timespec next_flush;
855   int status;
857   gettimeofday (&now, NULL);
858   next_flush.tv_sec = now.tv_sec + config_flush_interval;
859   next_flush.tv_nsec = 1000 * now.tv_usec;
861   pthread_mutex_lock(&cache_lock);
863   while (state == RUNNING)
864   {
865     gettimeofday (&now, NULL);
866     if ((now.tv_sec > next_flush.tv_sec)
867         || ((now.tv_sec == next_flush.tv_sec)
868           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
869     {
870       RRDD_LOG(LOG_DEBUG, "flushing old values");
872       /* Determine the time of the next cache flush. */
873       next_flush.tv_sec = now.tv_sec + config_flush_interval;
875       /* Flush all values that haven't been written in the last
876        * `config_write_interval' seconds. */
877       flush_old_values (config_write_interval);
879       /* unlock the cache while we rotate so we don't block incoming
880        * updates if the fsync() blocks on disk I/O */
881       pthread_mutex_unlock(&cache_lock);
882       journal_rotate();
883       pthread_mutex_lock(&cache_lock);
884     }
886     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
887     if (status != 0 && status != ETIMEDOUT)
888     {
889       RRDD_LOG (LOG_ERR, "flush_thread_main: "
890                 "pthread_cond_timedwait returned %i.", status);
891     }
892   }
894   if (config_flush_at_shutdown)
895     flush_old_values (-1); /* flush everything */
897   state = SHUTDOWN;
899   pthread_mutex_unlock(&cache_lock);
901   return NULL;
902 } /* void *flush_thread_main */
904 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
906   pthread_mutex_lock (&cache_lock);
908   while (state != SHUTDOWN
909          || (cache_queue_head != NULL && config_flush_at_shutdown))
910   {
911     cache_item_t *ci;
912     char *file;
913     char **values;
914     size_t values_num;
915     int status;
917     /* Now, check if there's something to store away. If not, wait until
918      * something comes in. */
919     if (cache_queue_head == NULL)
920     {
921       status = pthread_cond_wait (&queue_cond, &cache_lock);
922       if ((status != 0) && (status != ETIMEDOUT))
923       {
924         RRDD_LOG (LOG_ERR, "queue_thread_main: "
925             "pthread_cond_wait returned %i.", status);
926       }
927     }
929     /* Check if a value has arrived. This may be NULL if we timed out or there
930      * was an interrupt such as a signal. */
931     if (cache_queue_head == NULL)
932       continue;
934     ci = cache_queue_head;
936     /* copy the relevant parts */
937     file = strdup (ci->file);
938     if (file == NULL)
939     {
940       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
941       continue;
942     }
944     assert(ci->values != NULL);
945     assert(ci->values_num > 0);
947     values = ci->values;
948     values_num = ci->values_num;
950     wipe_ci_values(ci, time(NULL));
951     remove_from_queue(ci);
953     pthread_mutex_unlock (&cache_lock);
955     rrd_clear_error ();
956     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
957     if (status != 0)
958     {
959       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
960           "rrd_update_r (%s) failed with status %i. (%s)",
961           file, status, rrd_get_error());
962     }
964     journal_write("wrote", file);
966     /* Search again in the tree.  It's possible someone issued a "FORGET"
967      * while we were writing the update values. */
968     pthread_mutex_lock(&cache_lock);
969     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
970     if (ci)
971       pthread_cond_broadcast(&ci->flushed);
972     pthread_mutex_unlock(&cache_lock);
974     if (status == 0)
975     {
976       pthread_mutex_lock (&stats_lock);
977       stats_updates_written++;
978       stats_data_sets_written += values_num;
979       pthread_mutex_unlock (&stats_lock);
980     }
982     rrd_free_ptrs((void ***) &values, &values_num);
983     free(file);
985     pthread_mutex_lock (&cache_lock);
986   }
987   pthread_mutex_unlock (&cache_lock);
989   return (NULL);
990 } /* }}} void *queue_thread_main */
992 static int buffer_get_field (char **buffer_ret, /* {{{ */
993     size_t *buffer_size_ret, char **field_ret)
995   char *buffer;
996   size_t buffer_pos;
997   size_t buffer_size;
998   char *field;
999   size_t field_size;
1000   int status;
1002   buffer = *buffer_ret;
1003   buffer_pos = 0;
1004   buffer_size = *buffer_size_ret;
1005   field = *buffer_ret;
1006   field_size = 0;
1008   if (buffer_size <= 0)
1009     return (-1);
1011   /* This is ensured by `handle_request'. */
1012   assert (buffer[buffer_size - 1] == '\0');
1014   status = -1;
1015   while (buffer_pos < buffer_size)
1016   {
1017     /* Check for end-of-field or end-of-buffer */
1018     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1019     {
1020       field[field_size] = 0;
1021       field_size++;
1022       buffer_pos++;
1023       status = 0;
1024       break;
1025     }
1026     /* Handle escaped characters. */
1027     else if (buffer[buffer_pos] == '\\')
1028     {
1029       if (buffer_pos >= (buffer_size - 1))
1030         break;
1031       buffer_pos++;
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036     /* Normal operation */ 
1037     else
1038     {
1039       field[field_size] = buffer[buffer_pos];
1040       field_size++;
1041       buffer_pos++;
1042     }
1043   } /* while (buffer_pos < buffer_size) */
1045   if (status != 0)
1046     return (status);
1048   *buffer_ret = buffer + buffer_pos;
1049   *buffer_size_ret = buffer_size - buffer_pos;
1050   *field_ret = field;
1052   return (0);
1053 } /* }}} int buffer_get_field */
1055 /* if we're restricting writes to the base directory,
1056  * check whether the file falls within the dir
1057  * returns 1 if OK, otherwise 0
1058  */
1059 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1061   assert(file != NULL);
1063   if (!config_write_base_only
1064       || JOURNAL_REPLAY(sock)
1065       || config_base_dir == NULL)
1066     return 1;
1068   if (strstr(file, "../") != NULL) goto err;
1070   /* relative paths without "../" are ok */
1071   if (*file != '/') return 1;
1073   /* file must be of the format base + "/" + <1+ char filename> */
1074   if (strlen(file) < _config_base_dir_len + 2) goto err;
1075   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1076   if (*(file + _config_base_dir_len) != '/') goto err;
1078   return 1;
1080 err:
1081   if (sock != NULL && sock->fd >= 0)
1082     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1084   return 0;
1085 } /* }}} static int check_file_access */
1087 /* when using a base dir, convert relative paths to absolute paths.
1088  * if necessary, modifies the "filename" pointer to point
1089  * to the new path created in "tmp".  "tmp" is provided
1090  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1091  *
1092  * this allows us to optimize for the expected case (absolute path)
1093  * with a no-op.
1094  */
1095 static void get_abs_path(char **filename, char *tmp)
1097   assert(tmp != NULL);
1098   assert(filename != NULL && *filename != NULL);
1100   if (config_base_dir == NULL || **filename == '/')
1101     return;
1103   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1104   *filename = tmp;
1105 } /* }}} static int get_abs_path */
1107 static int flush_file (const char *filename) /* {{{ */
1109   cache_item_t *ci;
1111   pthread_mutex_lock (&cache_lock);
1113   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1114   if (ci == NULL)
1115   {
1116     pthread_mutex_unlock (&cache_lock);
1117     return (ENOENT);
1118   }
1120   if (ci->values_num > 0)
1121   {
1122     /* Enqueue at head */
1123     enqueue_cache_item (ci, HEAD);
1124     pthread_cond_wait(&ci->flushed, &cache_lock);
1125   }
1127   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1128    * may have been purged during our cond_wait() */
1130   pthread_mutex_unlock(&cache_lock);
1132   return (0);
1133 } /* }}} int flush_file */
1135 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1137   char *err = "Syntax error.\n";
1139   if (cmd && cmd->syntax)
1140     err = cmd->syntax;
1142   return send_response(sock, RESP_ERR, "Usage: %s", err);
1143 } /* }}} static int syntax_error() */
1145 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1147   uint64_t copy_queue_length;
1148   uint64_t copy_updates_received;
1149   uint64_t copy_flush_received;
1150   uint64_t copy_updates_written;
1151   uint64_t copy_data_sets_written;
1152   uint64_t copy_journal_bytes;
1153   uint64_t copy_journal_rotate;
1155   uint64_t tree_nodes_number;
1156   uint64_t tree_depth;
1158   pthread_mutex_lock (&stats_lock);
1159   copy_queue_length       = stats_queue_length;
1160   copy_updates_received   = stats_updates_received;
1161   copy_flush_received     = stats_flush_received;
1162   copy_updates_written    = stats_updates_written;
1163   copy_data_sets_written  = stats_data_sets_written;
1164   copy_journal_bytes      = stats_journal_bytes;
1165   copy_journal_rotate     = stats_journal_rotate;
1166   pthread_mutex_unlock (&stats_lock);
1168   pthread_mutex_lock (&cache_lock);
1169   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1170   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1171   pthread_mutex_unlock (&cache_lock);
1173   add_response_info(sock,
1174                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1175   add_response_info(sock,
1176                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1177   add_response_info(sock,
1178                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1179   add_response_info(sock,
1180                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1181   add_response_info(sock,
1182                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1183   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1184   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1185   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1186   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1188   send_response(sock, RESP_OK, "Statistics follow\n");
1190   return (0);
1191 } /* }}} int handle_request_stats */
1193 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1195   char *file, file_tmp[PATH_MAX];
1196   int status;
1198   status = buffer_get_field (&buffer, &buffer_size, &file);
1199   if (status != 0)
1200   {
1201     return syntax_error(sock,cmd);
1202   }
1203   else
1204   {
1205     pthread_mutex_lock(&stats_lock);
1206     stats_flush_received++;
1207     pthread_mutex_unlock(&stats_lock);
1209     get_abs_path(&file, file_tmp);
1210     if (!check_file_access(file, sock)) return 0;
1212     status = flush_file (file);
1213     if (status == 0)
1214       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1215     else if (status == ENOENT)
1216     {
1217       /* no file in our tree; see whether it exists at all */
1218       struct stat statbuf;
1220       memset(&statbuf, 0, sizeof(statbuf));
1221       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1222         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1223       else
1224         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1225     }
1226     else if (status < 0)
1227       return send_response(sock, RESP_ERR, "Internal error.\n");
1228     else
1229       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1230   }
1232   /* NOTREACHED */
1233   assert(1==0);
1234 } /* }}} int handle_request_flush */
1236 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1238   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1240   pthread_mutex_lock(&cache_lock);
1241   flush_old_values(-1);
1242   pthread_mutex_unlock(&cache_lock);
1244   return send_response(sock, RESP_OK, "Started flush.\n");
1245 } /* }}} static int handle_request_flushall */
1247 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1249   int status;
1250   char *file, file_tmp[PATH_MAX];
1251   cache_item_t *ci;
1253   status = buffer_get_field(&buffer, &buffer_size, &file);
1254   if (status != 0)
1255     return syntax_error(sock,cmd);
1257   get_abs_path(&file, file_tmp);
1259   pthread_mutex_lock(&cache_lock);
1260   ci = g_tree_lookup(cache_tree, file);
1261   if (ci == NULL)
1262   {
1263     pthread_mutex_unlock(&cache_lock);
1264     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265   }
1267   for (size_t i=0; i < ci->values_num; i++)
1268     add_response_info(sock, "%s\n", ci->values[i]);
1270   pthread_mutex_unlock(&cache_lock);
1271   return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1274 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1276   int status;
1277   gboolean found;
1278   char *file, file_tmp[PATH_MAX];
1280   status = buffer_get_field(&buffer, &buffer_size, &file);
1281   if (status != 0)
1282     return syntax_error(sock,cmd);
1284   get_abs_path(&file, file_tmp);
1285   if (!check_file_access(file, sock)) return 0;
1287   pthread_mutex_lock(&cache_lock);
1288   found = g_tree_remove(cache_tree, file);
1289   pthread_mutex_unlock(&cache_lock);
1291   if (found == TRUE)
1292   {
1293     if (!JOURNAL_REPLAY(sock))
1294       journal_write("forget", file);
1296     return send_response(sock, RESP_OK, "Gone!\n");
1297   }
1298   else
1299     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1301   /* NOTREACHED */
1302   assert(1==0);
1303 } /* }}} static int handle_request_forget */
1305 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1307   cache_item_t *ci;
1309   pthread_mutex_lock(&cache_lock);
1311   ci = cache_queue_head;
1312   while (ci != NULL)
1313   {
1314     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1315     ci = ci->next;
1316   }
1318   pthread_mutex_unlock(&cache_lock);
1320   return send_response(sock, RESP_OK, "in queue.\n");
1321 } /* }}} int handle_request_queue */
1323 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1325   char *file, file_tmp[PATH_MAX];
1326   int values_num = 0;
1327   int status;
1328   char orig_buf[RRD_CMD_MAX];
1330   cache_item_t *ci;
1332   /* save it for the journal later */
1333   if (!JOURNAL_REPLAY(sock))
1334     strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
1336   status = buffer_get_field (&buffer, &buffer_size, &file);
1337   if (status != 0)
1338     return syntax_error(sock,cmd);
1340   pthread_mutex_lock(&stats_lock);
1341   stats_updates_received++;
1342   pthread_mutex_unlock(&stats_lock);
1344   get_abs_path(&file, file_tmp);
1345   if (!check_file_access(file, sock)) return 0;
1347   pthread_mutex_lock (&cache_lock);
1348   ci = g_tree_lookup (cache_tree, file);
1350   if (ci == NULL) /* {{{ */
1351   {
1352     struct stat statbuf;
1353     cache_item_t *tmp;
1355     /* don't hold the lock while we setup; stat(2) might block */
1356     pthread_mutex_unlock(&cache_lock);
1358     memset (&statbuf, 0, sizeof (statbuf));
1359     status = stat (file, &statbuf);
1360     if (status != 0)
1361     {
1362       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1364       status = errno;
1365       if (status == ENOENT)
1366         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1367       else
1368         return send_response(sock, RESP_ERR,
1369                              "stat failed with error %i.\n", status);
1370     }
1371     if (!S_ISREG (statbuf.st_mode))
1372       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1374     if (access(file, R_OK|W_OK) != 0)
1375       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1376                            file, rrd_strerror(errno));
1378     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1379     if (ci == NULL)
1380     {
1381       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1383       return send_response(sock, RESP_ERR, "malloc failed.\n");
1384     }
1385     memset (ci, 0, sizeof (cache_item_t));
1387     ci->file = strdup (file);
1388     if (ci->file == NULL)
1389     {
1390       free (ci);
1391       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1393       return send_response(sock, RESP_ERR, "strdup failed.\n");
1394     }
1396     wipe_ci_values(ci, now);
1397     ci->flags = CI_FLAGS_IN_TREE;
1398     pthread_cond_init(&ci->flushed, NULL);
1400     pthread_mutex_lock(&cache_lock);
1402     /* another UPDATE might have added this entry in the meantime */
1403     tmp = g_tree_lookup (cache_tree, file);
1404     if (tmp == NULL)
1405       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1406     else
1407     {
1408       free_cache_item (ci);
1409       ci = tmp;
1410     }
1412     /* state may have changed while we were unlocked */
1413     if (state == SHUTDOWN)
1414       return -1;
1415   } /* }}} */
1416   assert (ci != NULL);
1418   /* don't re-write updates in replay mode */
1419   if (!JOURNAL_REPLAY(sock))
1420     journal_write("update", orig_buf);
1422   while (buffer_size > 0)
1423   {
1424     char *value;
1425     double stamp;
1426     char *eostamp;
1428     status = buffer_get_field (&buffer, &buffer_size, &value);
1429     if (status != 0)
1430     {
1431       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1432       break;
1433     }
1435     /* make sure update time is always moving forward. We use double here since
1436        update does support subsecond precision for timestamps ... */
1437     stamp = strtod(value, &eostamp);
1438     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1439     {
1440       pthread_mutex_unlock(&cache_lock);
1441       return send_response(sock, RESP_ERR,
1442                            "Cannot find timestamp in '%s'!\n", value);
1443     }
1444     else if (stamp <= ci->last_update_stamp)
1445     {
1446       pthread_mutex_unlock(&cache_lock);
1447       return send_response(sock, RESP_ERR,
1448                            "illegal attempt to update using time %lf when last"
1449                            " update time is %lf (minimum one second step)\n",
1450                            stamp, ci->last_update_stamp);
1451     }
1452     else
1453       ci->last_update_stamp = stamp;
1455     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1456                               &ci->values_alloc, config_alloc_chunk))
1457     {
1458       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1459       continue;
1460     }
1462     values_num++;
1463   }
1465   if (((now - ci->last_flush_time) >= config_write_interval)
1466       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1467       && (ci->values_num > 0))
1468   {
1469     enqueue_cache_item (ci, TAIL);
1470   }
1472   pthread_mutex_unlock (&cache_lock);
1474   if (values_num < 1)
1475     return send_response(sock, RESP_ERR, "No values updated.\n");
1476   else
1477     return send_response(sock, RESP_OK,
1478                          "errors, enqueued %i value(s).\n", values_num);
1480   /* NOTREACHED */
1481   assert(1==0);
1483 } /* }}} int handle_request_update */
1485 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1487   char *file, file_tmp[PATH_MAX];
1488   char *cf;
1490   char *start_str;
1491   char *end_str;
1492   time_t start_tm;
1493   time_t end_tm;
1495   unsigned long step;
1496   unsigned long ds_cnt;
1497   char **ds_namv;
1498   rrd_value_t *data;
1500   int status;
1501   unsigned long i;
1502   time_t t;
1503   rrd_value_t *data_ptr;
1505   file = NULL;
1506   cf = NULL;
1507   start_str = NULL;
1508   end_str = NULL;
1510   /* Read the arguments */
1511   do /* while (0) */
1512   {
1513     status = buffer_get_field (&buffer, &buffer_size, &file);
1514     if (status != 0)
1515       break;
1517     status = buffer_get_field (&buffer, &buffer_size, &cf);
1518     if (status != 0)
1519       break;
1521     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1522     if (status != 0)
1523     {
1524       start_str = NULL;
1525       status = 0;
1526       break;
1527     }
1529     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1530     if (status != 0)
1531     {
1532       end_str = NULL;
1533       status = 0;
1534       break;
1535     }
1536   } while (0);
1538   if (status != 0)
1539     return (syntax_error(sock,cmd));
1541   get_abs_path(&file, file_tmp);
1542   if (!check_file_access(file, sock)) return 0;
1544   status = flush_file (file);
1545   if ((status != 0) && (status != ENOENT))
1546     return (send_response (sock, RESP_ERR,
1547           "flush_file (%s) failed with status %i.\n", file, status));
1549   t = time (NULL); /* "now" */
1551   /* Parse start time */
1552   if (start_str != NULL)
1553   {
1554     char *endptr;
1555     long value;
1557     endptr = NULL;
1558     errno = 0;
1559     value = strtol (start_str, &endptr, /* base = */ 0);
1560     if ((endptr == start_str) || (errno != 0))
1561       return (send_response(sock, RESP_ERR,
1562             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1563             start_str));
1565     if (value > 0)
1566       start_tm = (time_t) value;
1567     else
1568       start_tm = (time_t) (t + value);
1569   }
1570   else
1571   {
1572     start_tm = t - 86400;
1573   }
1575   /* Parse end time */
1576   if (end_str != NULL)
1577   {
1578     char *endptr;
1579     long value;
1581     endptr = NULL;
1582     errno = 0;
1583     value = strtol (end_str, &endptr, /* base = */ 0);
1584     if ((endptr == end_str) || (errno != 0))
1585       return (send_response(sock, RESP_ERR,
1586             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1587             end_str));
1589     if (value > 0)
1590       end_tm = (time_t) value;
1591     else
1592       end_tm = (time_t) (t + value);
1593   }
1594   else
1595   {
1596     end_tm = t;
1597   }
1599   step = -1;
1600   ds_cnt = 0;
1601   ds_namv = NULL;
1602   data = NULL;
1604   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1605       &ds_cnt, &ds_namv, &data);
1606   if (status != 0)
1607     return (send_response(sock, RESP_ERR,
1608           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1610   add_response_info (sock, "FlushVersion: %lu\n", 1);
1611   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1612   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1613   add_response_info (sock, "Step: %lu\n", step);
1614   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1616 #define SSTRCAT(buffer,str,buffer_fill) do { \
1617     size_t str_len = strlen (str); \
1618     if ((buffer_fill + str_len) > sizeof (buffer)) \
1619       str_len = sizeof (buffer) - buffer_fill; \
1620     if (str_len > 0) { \
1621       strncpy (buffer + buffer_fill, str, str_len); \
1622       buffer_fill += str_len; \
1623       assert (buffer_fill <= sizeof (buffer)); \
1624       if (buffer_fill == sizeof (buffer)) \
1625         buffer[buffer_fill - 1] = 0; \
1626       else \
1627         buffer[buffer_fill] = 0; \
1628     } \
1629   } while (0)
1631   { /* Add list of DS names */
1632     char linebuf[1024];
1633     size_t linebuf_fill;
1635     memset (linebuf, 0, sizeof (linebuf));
1636     linebuf_fill = 0;
1637     for (i = 0; i < ds_cnt; i++)
1638     {
1639       if (i > 0)
1640         SSTRCAT (linebuf, " ", linebuf_fill);
1641       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1642       rrd_freemem(ds_namv[i]);
1643     }
1644     rrd_freemem(ds_namv);
1645     add_response_info (sock, "DSName: %s\n", linebuf);
1646   }
1648   /* Add the actual data */
1649   assert (step > 0);
1650   data_ptr = data;
1651   for (t = start_tm + step; t <= end_tm; t += step)
1652   {
1653     char linebuf[1024];
1654     size_t linebuf_fill;
1655     char tmp[128];
1657     memset (linebuf, 0, sizeof (linebuf));
1658     linebuf_fill = 0;
1659     for (i = 0; i < ds_cnt; i++)
1660     {
1661       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1662       tmp[sizeof (tmp) - 1] = 0;
1663       SSTRCAT (linebuf, tmp, linebuf_fill);
1665       data_ptr++;
1666     }
1668     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1669   } /* for (t) */
1670   rrd_freemem(data);
1672   return (send_response (sock, RESP_OK, "Success\n"));
1673 #undef SSTRCAT
1674 } /* }}} int handle_request_fetch */
1676 /* we came across a "WROTE" entry during journal replay.
1677  * throw away any values that we have accumulated for this file
1678  */
1679 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1681   cache_item_t *ci;
1682   const char *file = buffer;
1684   pthread_mutex_lock(&cache_lock);
1686   ci = g_tree_lookup(cache_tree, file);
1687   if (ci == NULL)
1688   {
1689     pthread_mutex_unlock(&cache_lock);
1690     return (0);
1691   }
1693   if (ci->values)
1694     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1696   wipe_ci_values(ci, now);
1697   remove_from_queue(ci);
1699   pthread_mutex_unlock(&cache_lock);
1700   return (0);
1701 } /* }}} int handle_request_wrote */
1703 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1705   char *file, file_tmp[PATH_MAX];
1706   int status;
1707   rrd_info_t *info;
1709   /* obtain filename */
1710   status = buffer_get_field(&buffer, &buffer_size, &file);
1711   if (status != 0)
1712     return syntax_error(sock,cmd);
1713   /* get full pathname */
1714   get_abs_path(&file, file_tmp);
1715   if (!check_file_access(file, sock)) {
1716     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1717   }
1718   /* get data */
1719   rrd_clear_error ();
1720   info = rrd_info_r(file);
1721   if(!info) {
1722     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1723   }
1724   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1725       switch (data->type) {
1726       case RD_I_VAL:
1727           if (isnan(data->value.u_val))
1728               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1729           else
1730               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1731           break;
1732       case RD_I_CNT:
1733           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1734           break;
1735       case RD_I_INT:
1736           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1737           break;
1738       case RD_I_STR:
1739           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1740           break;
1741       case RD_I_BLO:
1742           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1743           break;
1744       }
1745   }
1747   rrd_info_free(info);
1749   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1750 } /* }}} static int handle_request_info  */
1752 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1754   char *i, *file, file_tmp[PATH_MAX];
1755   int status;
1756   int idx;
1757   time_t t;
1759   /* obtain filename */
1760   status = buffer_get_field(&buffer, &buffer_size, &file);
1761   if (status != 0)
1762     return syntax_error(sock,cmd);
1763   /* get full pathname */
1764   get_abs_path(&file, file_tmp);
1765   if (!check_file_access(file, sock)) {
1766     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1767   }
1769   status = buffer_get_field(&buffer, &buffer_size, &i);
1770   if (status != 0)
1771     return syntax_error(sock,cmd);
1772   idx = atoi(i);
1773   if(idx<0) { 
1774     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1775   }
1777   /* get data */
1778   rrd_clear_error ();
1779   t = rrd_first_r(file,idx);
1780   if(t<1) {
1781     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1782   }
1783   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1784 } /* }}} static int handle_request_first  */
1787 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1789   char *file, file_tmp[PATH_MAX];
1790   int status;
1791   time_t t, from_file, step;
1792   rrd_file_t * rrd_file;
1793   cache_item_t * ci;
1794   rrd_t rrd; 
1796   /* obtain filename */
1797   status = buffer_get_field(&buffer, &buffer_size, &file);
1798   if (status != 0)
1799     return syntax_error(sock,cmd);
1800   /* get full pathname */
1801   get_abs_path(&file, file_tmp);
1802   if (!check_file_access(file, sock)) {
1803     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1804   }
1805   rrd_clear_error();
1806   rrd_init(&rrd);
1807   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1808   if(!rrd_file) {
1809     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1810   }
1811   from_file = rrd.live_head->last_up;
1812   step = rrd.stat_head->pdp_step;
1813   rrd_close(rrd_file);
1814   pthread_mutex_lock(&cache_lock);
1815   ci = g_tree_lookup(cache_tree, file);
1816   if (ci)
1817     t = ci->last_update_stamp;
1818   else
1819     t = from_file;
1820   pthread_mutex_unlock(&cache_lock);
1821   t -= t % step;
1822   rrd_free(&rrd);
1823   if(t<1) {
1824     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1825   }
1826   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1827 } /* }}} static int handle_request_last  */
1829 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1831   char *file, file_tmp[PATH_MAX];
1832   char *tok;
1833   int ac = 0;
1834   char *av[128];
1835   int status;
1836   unsigned long step = 300;
1837   time_t last_up = time(NULL)-10;
1838   int no_overwrite = opt_no_overwrite;
1841   /* obtain filename */
1842   status = buffer_get_field(&buffer, &buffer_size, &file);
1843   if (status != 0)
1844     return syntax_error(sock,cmd);
1845   /* get full pathname */
1846   get_abs_path(&file, file_tmp);
1847   if (!check_file_access(file, sock)) {
1848     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1849   }
1850   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1852   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1853     if( ! strncmp(tok,"-b",2) ) {
1854       status = buffer_get_field(&buffer, &buffer_size, &tok );
1855       if (status != 0) return syntax_error(sock,cmd);
1856       last_up = (time_t) atol(tok);
1857       continue;
1858     }
1859     if( ! strncmp(tok,"-s",2) ) {
1860       status = buffer_get_field(&buffer, &buffer_size, &tok );
1861       if (status != 0) return syntax_error(sock,cmd);
1862       step = atol(tok);
1863       continue;
1864     }
1865     if( ! strncmp(tok,"-O",2) ) {
1866       no_overwrite = 1;
1867       continue;
1868     }
1869     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1870     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1871     return syntax_error(sock,cmd);
1872   }
1873   if(step<1) {
1874     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1875   }
1876   if (last_up < 3600 * 24 * 365 * 10) {
1877     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1878   }
1880   rrd_clear_error ();
1881   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1883   if(!status) {
1884     return send_response(sock, RESP_OK, "RRD created OK\n");
1885   }
1886   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1887 } /* }}} static int handle_request_create  */
1889 /* start "BATCH" processing */
1890 static int batch_start (HANDLER_PROTO) /* {{{ */
1892   int status;
1893   if (sock->batch_start)
1894     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1896   status = send_response(sock, RESP_OK,
1897                          "Go ahead.  End with dot '.' on its own line.\n");
1898   sock->batch_start = time(NULL);
1899   sock->batch_cmd = 0;
1901   return status;
1902 } /* }}} static int batch_start */
1904 /* finish "BATCH" processing and return results to the client */
1905 static int batch_done (HANDLER_PROTO) /* {{{ */
1907   assert(sock->batch_start);
1908   sock->batch_start = 0;
1909   sock->batch_cmd  = 0;
1910   return send_response(sock, RESP_OK, "errors\n");
1911 } /* }}} static int batch_done */
1913 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1915   return -1;
1916 } /* }}} static int handle_request_quit */
1918 static command_t list_of_commands[] = { /* {{{ */
1919   {
1920     "UPDATE",
1921     handle_request_update,
1922     CMD_CONTEXT_ANY,
1923     "UPDATE <filename> <values> [<values> ...]\n"
1924     ,
1925     "Adds the given file to the internal cache if it is not yet known and\n"
1926     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1927     "for details.\n"
1928     "\n"
1929     "Each <values> has the following form:\n"
1930     "  <values> = <time>:<value>[:<value>[...]]\n"
1931     "See the rrdupdate(1) manpage for details.\n"
1932   },
1933   {
1934     "WROTE",
1935     handle_request_wrote,
1936     CMD_CONTEXT_JOURNAL,
1937     NULL,
1938     NULL
1939   },
1940   {
1941     "FLUSH",
1942     handle_request_flush,
1943     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1944     "FLUSH <filename>\n"
1945     ,
1946     "Adds the given filename to the head of the update queue and returns\n"
1947     "after it has been dequeued.\n"
1948   },
1949   {
1950     "FLUSHALL",
1951     handle_request_flushall,
1952     CMD_CONTEXT_CLIENT,
1953     "FLUSHALL\n"
1954     ,
1955     "Triggers writing of all pending updates.  Returns immediately.\n"
1956   },
1957   {
1958     "PENDING",
1959     handle_request_pending,
1960     CMD_CONTEXT_CLIENT,
1961     "PENDING <filename>\n"
1962     ,
1963     "Shows any 'pending' updates for a file, in order.\n"
1964     "The updates shown have not yet been written to the underlying RRD file.\n"
1965   },
1966   {
1967     "FORGET",
1968     handle_request_forget,
1969     CMD_CONTEXT_ANY,
1970     "FORGET <filename>\n"
1971     ,
1972     "Removes the file completely from the cache.\n"
1973     "Any pending updates for the file will be lost.\n"
1974   },
1975   {
1976     "QUEUE",
1977     handle_request_queue,
1978     CMD_CONTEXT_CLIENT,
1979     "QUEUE\n"
1980     ,
1981         "Shows all files in the output queue.\n"
1982     "The output is zero or more lines in the following format:\n"
1983     "(where <num_vals> is the number of values to be written)\n"
1984     "\n"
1985     "<num_vals> <filename>\n"
1986   },
1987   {
1988     "STATS",
1989     handle_request_stats,
1990     CMD_CONTEXT_CLIENT,
1991     "STATS\n"
1992     ,
1993     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1994     "a description of the values.\n"
1995   },
1996   {
1997     "HELP",
1998     handle_request_help,
1999     CMD_CONTEXT_CLIENT,
2000     "HELP [<command>]\n",
2001     NULL, /* special! */
2002   },
2003   {
2004     "BATCH",
2005     batch_start,
2006     CMD_CONTEXT_CLIENT,
2007     "BATCH\n"
2008     ,
2009     "The 'BATCH' command permits the client to initiate a bulk load\n"
2010     "   of commands to rrdcached.\n"
2011     "\n"
2012     "Usage:\n"
2013     "\n"
2014     "    client: BATCH\n"
2015     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2016     "    client: command #1\n"
2017     "    client: command #2\n"
2018     "    client: ... and so on\n"
2019     "    client: .\n"
2020     "    server: 2 errors\n"
2021     "    server: 7 message for command #7\n"
2022     "    server: 9 message for command #9\n"
2023     "\n"
2024     "For more information, consult the rrdcached(1) documentation.\n"
2025   },
2026   {
2027     ".",   /* BATCH terminator */
2028     batch_done,
2029     CMD_CONTEXT_BATCH,
2030     NULL,
2031     NULL
2032   },
2033   {
2034     "FETCH",
2035     handle_request_fetch,
2036     CMD_CONTEXT_CLIENT,
2037     "FETCH <file> <CF> [<start> [<end>]]\n"
2038     ,
2039     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2040   },
2041   {
2042     "INFO",
2043     handle_request_info,
2044     CMD_CONTEXT_CLIENT,
2045     "INFO <filename>\n",
2046     "The INFO command retrieves information about a specified RRD file.\n"
2047     "This is returned in standard rrdinfo format, a sequence of lines\n"
2048     "with the format <keyname> = <value>\n"
2049     "Note that this is the data as of the last update of the RRD file itself,\n"
2050     "not the last time data was received via rrdcached, so there may be pending\n"
2051     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2052   },
2053   {
2054     "FIRST",
2055     handle_request_first,
2056     CMD_CONTEXT_CLIENT,
2057     "FIRST <filename> <rra index>\n",
2058     "The FIRST command retrieves the first data time for a specified RRA in\n"
2059     "an RRD file.\n"
2060   },
2061   {
2062     "LAST",
2063     handle_request_last,
2064     CMD_CONTEXT_CLIENT,
2065     "LAST <filename>\n",
2066     "The LAST command retrieves the last update time for a specified RRD file.\n"
2067     "Note that this is the time of the last update of the RRD file itself, not\n"
2068     "the last time data was received via rrdcached, so there may be pending\n"
2069     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2070   },
2071   {
2072     "CREATE",
2073     handle_request_create,
2074     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2075     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2076     "The CREATE command will create an RRD file, overwriting any existing file\n"
2077     "unless the -O option is given or rrdcached was started with the -O option.\n"
2078     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2079     "not acceptable) and the step is in seconds (default is 300).\n"
2080     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2081   },
2082   {
2083     "QUIT",
2084     handle_request_quit,
2085     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2086     "QUIT\n"
2087     ,
2088     "Disconnect from rrdcached.\n"
2089   }
2090 }; /* }}} command_t list_of_commands[] */
2091 static size_t list_of_commands_len = sizeof (list_of_commands)
2092   / sizeof (list_of_commands[0]);
2094 static command_t *find_command(char *cmd)
2096   size_t i;
2098   for (i = 0; i < list_of_commands_len; i++)
2099     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2100       return (&list_of_commands[i]);
2101   return NULL;
2104 /* We currently use the index in the `list_of_commands' array as a bit position
2105  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2106  * outside these functions so that switching to a more elegant storage method
2107  * is easily possible. */
2108 static ssize_t find_command_index (const char *cmd) /* {{{ */
2110   size_t i;
2112   for (i = 0; i < list_of_commands_len; i++)
2113     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2114       return ((ssize_t) i);
2115   return (-1);
2116 } /* }}} ssize_t find_command_index */
2118 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2119     const char *cmd)
2121   ssize_t i;
2123   if (JOURNAL_REPLAY(sock))
2124     return (1);
2126   if (cmd == NULL)
2127     return (-1);
2129   if ((strcasecmp ("QUIT", cmd) == 0)
2130       || (strcasecmp ("HELP", cmd) == 0))
2131     return (1);
2132   else if (strcmp (".", cmd) == 0)
2133     cmd = "BATCH";
2135   i = find_command_index (cmd);
2136   if (i < 0)
2137     return (-1);
2138   assert (i < 32);
2140   if ((sock->permissions & (1 << i)) != 0)
2141     return (1);
2142   return (0);
2143 } /* }}} int socket_permission_check */
2145 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2146     const char *cmd)
2148   ssize_t i;
2150   i = find_command_index (cmd);
2151   if (i < 0)
2152     return (-1);
2153   assert (i < 32);
2155   sock->permissions |= (1 << i);
2156   return (0);
2157 } /* }}} int socket_permission_add */
2159 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2161   sock->permissions = 0;
2162 } /* }}} socket_permission_clear */
2164 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2165     listen_socket_t *src)
2167   dest->permissions = src->permissions;
2168 } /* }}} socket_permission_copy */
2170 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2172   size_t i;
2174   sock->permissions = 0;
2175   for (i = 0; i < list_of_commands_len; i++)
2176     sock->permissions |= (1 << i);
2177 } /* }}} void socket_permission_set_all */
2179 /* check whether commands are received in the expected context */
2180 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2182   if (JOURNAL_REPLAY(sock))
2183     return (cmd->context & CMD_CONTEXT_JOURNAL);
2184   else if (sock->batch_start)
2185     return (cmd->context & CMD_CONTEXT_BATCH);
2186   else
2187     return (cmd->context & CMD_CONTEXT_CLIENT);
2189   /* NOTREACHED */
2190   assert(1==0);
2193 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2195   int status;
2196   char *cmd_str;
2197   char *resp_txt;
2198   command_t *help = NULL;
2200   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2201   if (status == 0)
2202     help = find_command(cmd_str);
2204   if (help && (help->syntax || help->help))
2205   {
2206     char tmp[RRD_CMD_MAX];
2208     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2209     resp_txt = tmp;
2211     if (help->syntax)
2212       add_response_info(sock, "Usage: %s\n", help->syntax);
2214     if (help->help)
2215       add_response_info(sock, "%s\n", help->help);
2216   }
2217   else
2218   {
2219     size_t i;
2221     resp_txt = "Command overview\n";
2223     for (i = 0; i < list_of_commands_len; i++)
2224     {
2225       if (list_of_commands[i].syntax == NULL)
2226         continue;
2227       add_response_info (sock, "%s", list_of_commands[i].syntax);
2228     }
2229   }
2231   return send_response(sock, RESP_OK, resp_txt);
2232 } /* }}} int handle_request_help */
2234 static int handle_request (DISPATCH_PROTO) /* {{{ */
2236   char *buffer_ptr = buffer;
2237   char *cmd_str = NULL;
2238   command_t *cmd = NULL;
2239   int status;
2241   assert (buffer[buffer_size - 1] == '\0');
2243   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2244   if (status != 0)
2245   {
2246     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2247     return (-1);
2248   }
2250   if (sock != NULL && sock->batch_start)
2251     sock->batch_cmd++;
2253   cmd = find_command(cmd_str);
2254   if (!cmd)
2255     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2257   if (!socket_permission_check (sock, cmd->cmd))
2258     return send_response(sock, RESP_ERR, "Permission denied.\n");
2260   if (!command_check_context(sock, cmd))
2261     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2263   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2264 } /* }}} int handle_request */
2266 static void journal_set_free (journal_set *js) /* {{{ */
2268   if (js == NULL)
2269     return;
2271   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2273   free(js);
2274 } /* }}} journal_set_free */
2276 static void journal_set_remove (journal_set *js) /* {{{ */
2278   if (js == NULL)
2279     return;
2281   for (uint i=0; i < js->files_num; i++)
2282   {
2283     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2284     unlink(js->files[i]);
2285   }
2286 } /* }}} journal_set_remove */
2288 /* close current journal file handle.
2289  * MUST hold journal_lock before calling */
2290 static void journal_close(void) /* {{{ */
2292   if (journal_fh != NULL)
2293   {
2294     if (fclose(journal_fh) != 0)
2295       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2296   }
2298   journal_fh = NULL;
2299   journal_size = 0;
2300 } /* }}} journal_close */
2302 /* MUST hold journal_lock before calling */
2303 static void journal_new_file(void) /* {{{ */
2305   struct timeval now;
2306   int  new_fd;
2307   char new_file[PATH_MAX + 1];
2309   assert(journal_dir != NULL);
2310   assert(journal_cur != NULL);
2312   journal_close();
2314   gettimeofday(&now, NULL);
2315   /* this format assures that the files sort in strcmp() order */
2316   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2317            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2319   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2320                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2321   if (new_fd < 0)
2322     goto error;
2324   journal_fh = fdopen(new_fd, "a");
2325   if (journal_fh == NULL)
2326     goto error;
2328   journal_size = ftell(journal_fh);
2329   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2331   /* record the file in the journal set */
2332   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2334   return;
2336 error:
2337   RRDD_LOG(LOG_CRIT,
2338            "JOURNALING DISABLED: Error while trying to create %s : %s",
2339            new_file, rrd_strerror(errno));
2340   RRDD_LOG(LOG_CRIT,
2341            "JOURNALING DISABLED: All values will be flushed at shutdown");
2343   close(new_fd);
2344   config_flush_at_shutdown = 1;
2346 } /* }}} journal_new_file */
2348 /* MUST NOT hold journal_lock before calling this */
2349 static void journal_rotate(void) /* {{{ */
2351   journal_set *old_js = NULL;
2353   if (journal_dir == NULL)
2354     return;
2356   RRDD_LOG(LOG_DEBUG, "rotating journals");
2358   pthread_mutex_lock(&stats_lock);
2359   ++stats_journal_rotate;
2360   pthread_mutex_unlock(&stats_lock);
2362   pthread_mutex_lock(&journal_lock);
2364   journal_close();
2366   /* rotate the journal sets */
2367   old_js = journal_old;
2368   journal_old = journal_cur;
2369   journal_cur = calloc(1, sizeof(journal_set));
2371   if (journal_cur != NULL)
2372     journal_new_file();
2373   else
2374     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2376   pthread_mutex_unlock(&journal_lock);
2378   journal_set_remove(old_js);
2379   journal_set_free  (old_js);
2381 } /* }}} static void journal_rotate */
2383 /* MUST hold journal_lock when calling */
2384 static void journal_done(void) /* {{{ */
2386   if (journal_cur == NULL)
2387     return;
2389   journal_close();
2391   if (config_flush_at_shutdown)
2392   {
2393     RRDD_LOG(LOG_INFO, "removing journals");
2394     journal_set_remove(journal_old);
2395     journal_set_remove(journal_cur);
2396   }
2397   else
2398   {
2399     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2400              "journals will be used at next startup");
2401   }
2403   journal_set_free(journal_cur);
2404   journal_set_free(journal_old);
2405   free(journal_dir);
2407 } /* }}} static void journal_done */
2409 static int journal_write(char *cmd, char *args) /* {{{ */
2411   int chars;
2413   if (journal_fh == NULL)
2414     return 0;
2416   pthread_mutex_lock(&journal_lock);
2417   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2418   journal_size += chars;
2420   if (journal_size > JOURNAL_MAX)
2421     journal_new_file();
2423   pthread_mutex_unlock(&journal_lock);
2425   if (chars > 0)
2426   {
2427     pthread_mutex_lock(&stats_lock);
2428     stats_journal_bytes += chars;
2429     pthread_mutex_unlock(&stats_lock);
2430   }
2432   return chars;
2433 } /* }}} static int journal_write */
2435 static int journal_replay (const char *file) /* {{{ */
2437   FILE *fh;
2438   int entry_cnt = 0;
2439   int fail_cnt = 0;
2440   uint64_t line = 0;
2441   char entry[RRD_CMD_MAX];
2442   time_t now;
2444   if (file == NULL) return 0;
2446   {
2447     char *reason = "unknown error";
2448     int status = 0;
2449     struct stat statbuf;
2451     memset(&statbuf, 0, sizeof(statbuf));
2452     if (stat(file, &statbuf) != 0)
2453     {
2454       reason = "stat error";
2455       status = errno;
2456     }
2457     else if (!S_ISREG(statbuf.st_mode))
2458     {
2459       reason = "not a regular file";
2460       status = EPERM;
2461     }
2462     if (statbuf.st_uid != daemon_uid)
2463     {
2464       reason = "not owned by daemon user";
2465       status = EACCES;
2466     }
2467     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2468     {
2469       reason = "must not be user/group writable";
2470       status = EACCES;
2471     }
2473     if (status != 0)
2474     {
2475       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2476                file, rrd_strerror(status), reason);
2477       return 0;
2478     }
2479   }
2481   fh = fopen(file, "r");
2482   if (fh == NULL)
2483   {
2484     if (errno != ENOENT)
2485       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2486                file, rrd_strerror(errno));
2487     return 0;
2488   }
2489   else
2490     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2492   now = time(NULL);
2494   while(!feof(fh))
2495   {
2496     size_t entry_len;
2498     ++line;
2499     if (fgets(entry, sizeof(entry), fh) == NULL)
2500       break;
2501     entry_len = strlen(entry);
2503     /* check \n termination in case journal writing crashed mid-line */
2504     if (entry_len == 0)
2505       continue;
2506     else if (entry[entry_len - 1] != '\n')
2507     {
2508       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2509       ++fail_cnt;
2510       continue;
2511     }
2513     entry[entry_len - 1] = '\0';
2515     if (handle_request(NULL, now, entry, entry_len) == 0)
2516       ++entry_cnt;
2517     else
2518       ++fail_cnt;
2519   }
2521   fclose(fh);
2523   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2524            entry_cnt, fail_cnt);
2526   return entry_cnt > 0 ? 1 : 0;
2527 } /* }}} static int journal_replay */
2529 static int journal_sort(const void *v1, const void *v2)
2531   char **jn1 = (char **) v1;
2532   char **jn2 = (char **) v2;
2534   return strcmp(*jn1,*jn2);
2537 static void journal_init(void) /* {{{ */
2539   int had_journal = 0;
2540   DIR *dir;
2541   struct dirent *dent;
2542   char path[PATH_MAX+1];
2544   if (journal_dir == NULL) return;
2546   pthread_mutex_lock(&journal_lock);
2548   journal_cur = calloc(1, sizeof(journal_set));
2549   if (journal_cur == NULL)
2550   {
2551     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2552     return;
2553   }
2555   RRDD_LOG(LOG_INFO, "checking for journal files");
2557   /* Handle old journal files during transition.  This gives them the
2558    * correct sort order.  TODO: remove after first release
2559    */
2560   {
2561     char old_path[PATH_MAX+1];
2562     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2563     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2564     rename(old_path, path);
2566     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2567     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2568     rename(old_path, path);
2569   }
2571   dir = opendir(journal_dir);
2572   if (!dir) {
2573     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2574     return;
2575   }
2576   while ((dent = readdir(dir)) != NULL)
2577   {
2578     /* looks like a journal file? */
2579     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2580       continue;
2582     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2584     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2585     {
2586       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2587                dent->d_name);
2588       break;
2589     }
2590   }
2591   closedir(dir);
2593   qsort(journal_cur->files, journal_cur->files_num,
2594         sizeof(journal_cur->files[0]), journal_sort);
2596   for (uint i=0; i < journal_cur->files_num; i++)
2597     had_journal += journal_replay(journal_cur->files[i]);
2599   journal_new_file();
2601   /* it must have been a crash.  start a flush */
2602   if (had_journal && config_flush_at_shutdown)
2603     flush_old_values(-1);
2605   pthread_mutex_unlock(&journal_lock);
2607   RRDD_LOG(LOG_INFO, "journal processing complete");
2609 } /* }}} static void journal_init */
2611 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2613   assert(sock != NULL);
2615   free(sock->rbuf);  sock->rbuf = NULL;
2616   free(sock->wbuf);  sock->wbuf = NULL;
2617   free(sock);
2618 } /* }}} void free_listen_socket */
2620 static void close_connection(listen_socket_t *sock) /* {{{ */
2622   if (sock->fd >= 0)
2623   {
2624     close(sock->fd);
2625     sock->fd = -1;
2626   }
2628   free_listen_socket(sock);
2630 } /* }}} void close_connection */
2632 static void *connection_thread_main (void *args) /* {{{ */
2634   listen_socket_t *sock;
2635   int fd;
2637   sock = (listen_socket_t *) args;
2638   fd = sock->fd;
2640   /* init read buffers */
2641   sock->next_read = sock->next_cmd = 0;
2642   sock->rbuf = malloc(RBUF_SIZE);
2643   if (sock->rbuf == NULL)
2644   {
2645     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2646     close_connection(sock);
2647     return NULL;
2648   }
2650   pthread_mutex_lock (&connection_threads_lock);
2651 #ifdef HAVE_LIBWRAP
2652   /* LIBWRAP does not support multiple threads! By putting this code
2653      inside pthread_mutex_lock we do not have to worry about request_info
2654      getting overwritten by another thread.
2655   */
2656   struct request_info req;
2657   request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2658   fromhost(&req);
2659   if(!hosts_access(&req)) {
2660     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2661     pthread_mutex_unlock (&connection_threads_lock);
2662     close_connection(sock);
2663     return NULL;
2664   }
2665 #endif /* HAVE_LIBWRAP */
2666   connection_threads_num++;
2667   pthread_mutex_unlock (&connection_threads_lock);
2669   while (state == RUNNING)
2670   {
2671     char *cmd;
2672     ssize_t cmd_len;
2673     ssize_t rbytes;
2674     time_t now;
2676     struct pollfd pollfd;
2677     int status;
2679     pollfd.fd = fd;
2680     pollfd.events = POLLIN | POLLPRI;
2681     pollfd.revents = 0;
2683     status = poll (&pollfd, 1, /* timeout = */ 500);
2684     if (state != RUNNING)
2685       break;
2686     else if (status == 0) /* timeout */
2687       continue;
2688     else if (status < 0) /* error */
2689     {
2690       status = errno;
2691       if (status != EINTR)
2692         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2693       continue;
2694     }
2696     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2697       break;
2698     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2699     {
2700       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2701           "poll(2) returned something unexpected: %#04hx",
2702           pollfd.revents);
2703       break;
2704     }
2706     rbytes = read(fd, sock->rbuf + sock->next_read,
2707                   RBUF_SIZE - sock->next_read);
2708     if (rbytes < 0)
2709     {
2710       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2711       break;
2712     }
2713     else if (rbytes == 0)
2714       break; /* eof */
2716     sock->next_read += rbytes;
2718     if (sock->batch_start)
2719       now = sock->batch_start;
2720     else
2721       now = time(NULL);
2723     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2724     {
2725       status = handle_request (sock, now, cmd, cmd_len+1);
2726       if (status != 0)
2727         goto out_close;
2728     }
2729   }
2731 out_close:
2732   close_connection(sock);
2734   /* Remove this thread from the connection threads list */
2735   pthread_mutex_lock (&connection_threads_lock);
2736   connection_threads_num--;
2737   if (connection_threads_num <= 0)
2738     pthread_cond_broadcast(&connection_threads_done);
2739   pthread_mutex_unlock (&connection_threads_lock);
2741   return (NULL);
2742 } /* }}} void *connection_thread_main */
2744 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2746   int fd;
2747   struct sockaddr_un sa;
2748   listen_socket_t *temp;
2749   int status;
2750   const char *path;
2751   char *path_copy, *dir;
2753   path = sock->addr;
2754   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2755     path += strlen("unix:");
2757   /* dirname may modify its argument */
2758   path_copy = strdup(path);
2759   if (path_copy == NULL)
2760   {
2761     fprintf(stderr, "rrdcached: strdup(): %s\n",
2762         rrd_strerror(errno));
2763     return (-1);
2764   }
2766   dir = dirname(path_copy);
2767   if (rrd_mkdir_p(dir, 0777) != 0)
2768   {
2769     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2770         dir, rrd_strerror(errno));
2771     return (-1);
2772   }
2774   free(path_copy);
2776   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2777       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2778   if (temp == NULL)
2779   {
2780     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2781     return (-1);
2782   }
2783   listen_fds = temp;
2784   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2786   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2787   if (fd < 0)
2788   {
2789     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2790              rrd_strerror(errno));
2791     return (-1);
2792   }
2794   memset (&sa, 0, sizeof (sa));
2795   sa.sun_family = AF_UNIX;
2796   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2798   /* if we've gotten this far, we own the pid file.  any daemon started
2799    * with the same args must not be alive.  therefore, ensure that we can
2800    * create the socket...
2801    */
2802   unlink(path);
2804   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2805   if (status != 0)
2806   {
2807     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2808              path, rrd_strerror(errno));
2809     close (fd);
2810     return (-1);
2811   }
2813   /* tweak the sockets group ownership */
2814   if (sock->socket_group != (gid_t)-1)
2815   {
2816     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2817          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2818     {
2819       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2820     }
2821   }
2823   if (sock->socket_permissions != (mode_t)-1)
2824   {
2825     if (chmod(path, sock->socket_permissions) != 0)
2826       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2827           (unsigned int)sock->socket_permissions, strerror(errno));
2828   }
2830   status = listen (fd, /* backlog = */ 10);
2831   if (status != 0)
2832   {
2833     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2834              path, rrd_strerror(errno));
2835     close (fd);
2836     unlink (path);
2837     return (-1);
2838   }
2840   listen_fds[listen_fds_num].fd = fd;
2841   listen_fds[listen_fds_num].family = PF_UNIX;
2842   strncpy(listen_fds[listen_fds_num].addr, path,
2843           sizeof (listen_fds[listen_fds_num].addr) - 1);
2844   listen_fds_num++;
2846   return (0);
2847 } /* }}} int open_listen_socket_unix */
2849 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2851   struct addrinfo ai_hints;
2852   struct addrinfo *ai_res;
2853   struct addrinfo *ai_ptr;
2854   char addr_copy[NI_MAXHOST];
2855   char *addr;
2856   char *port;
2857   int status;
2859   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2860   addr_copy[sizeof (addr_copy) - 1] = 0;
2861   addr = addr_copy;
2863   memset (&ai_hints, 0, sizeof (ai_hints));
2864   ai_hints.ai_flags = 0;
2865 #ifdef AI_ADDRCONFIG
2866   ai_hints.ai_flags |= AI_ADDRCONFIG;
2867 #endif
2868   ai_hints.ai_family = AF_UNSPEC;
2869   ai_hints.ai_socktype = SOCK_STREAM;
2871   port = NULL;
2872   if (*addr == '[') /* IPv6+port format */
2873   {
2874     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2875     addr++;
2877     port = strchr (addr, ']');
2878     if (port == NULL)
2879     {
2880       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2881       return (-1);
2882     }
2883     *port = 0;
2884     port++;
2886     if (*port == ':')
2887       port++;
2888     else if (*port == 0)
2889       port = NULL;
2890     else
2891     {
2892       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2893       return (-1);
2894     }
2895   } /* if (*addr == '[') */
2896   else
2897   {
2898     port = rindex(addr, ':');
2899     if (port != NULL)
2900     {
2901       *port = 0;
2902       port++;
2903     }
2904   }
2905   ai_res = NULL;
2906   status = getaddrinfo (addr,
2907                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2908                         &ai_hints, &ai_res);
2909   if (status != 0)
2910   {
2911     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2912              addr, gai_strerror (status));
2913     return (-1);
2914   }
2916   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2917   {
2918     int fd;
2919     listen_socket_t *temp;
2920     int one = 1;
2922     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2923         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2924     if (temp == NULL)
2925     {
2926       fprintf (stderr,
2927                "rrdcached: open_listen_socket_network: realloc failed.\n");
2928       continue;
2929     }
2930     listen_fds = temp;
2931     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2933     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2934     if (fd < 0)
2935     {
2936       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2937                rrd_strerror(errno));
2938       continue;
2939     }
2941     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2943     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2944     if (status != 0)
2945     {
2946       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2947                sock->addr, rrd_strerror(errno));
2948       close (fd);
2949       continue;
2950     }
2952     status = listen (fd, /* backlog = */ 10);
2953     if (status != 0)
2954     {
2955       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2956                sock->addr, rrd_strerror(errno));
2957       close (fd);
2958       freeaddrinfo(ai_res);
2959       return (-1);
2960     }
2962     listen_fds[listen_fds_num].fd = fd;
2963     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2964     listen_fds_num++;
2965   } /* for (ai_ptr) */
2967   freeaddrinfo(ai_res);
2968   return (0);
2969 } /* }}} static int open_listen_socket_network */
2971 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2973   assert(sock != NULL);
2974   assert(sock->addr != NULL);
2976   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2977       || sock->addr[0] == '/')
2978     return (open_listen_socket_unix(sock));
2979   else
2980     return (open_listen_socket_network(sock));
2981 } /* }}} int open_listen_socket */
2983 static int close_listen_sockets (void) /* {{{ */
2985   size_t i;
2987   for (i = 0; i < listen_fds_num; i++)
2988   {
2989     close (listen_fds[i].fd);
2991     if (listen_fds[i].family == PF_UNIX)
2992       unlink(listen_fds[i].addr);
2993   }
2995   free (listen_fds);
2996   listen_fds = NULL;
2997   listen_fds_num = 0;
2999   return (0);
3000 } /* }}} int close_listen_sockets */
3002 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3004   struct pollfd *pollfds;
3005   int pollfds_num;
3006   int status;
3007   int i;
3009   if (listen_fds_num < 1)
3010   {
3011     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3012     return (NULL);
3013   }
3015   pollfds_num = listen_fds_num;
3016   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3017   if (pollfds == NULL)
3018   {
3019     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3020     return (NULL);
3021   }
3022   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3024   RRDD_LOG(LOG_INFO, "listening for connections");
3026   while (state == RUNNING)
3027   {
3028     for (i = 0; i < pollfds_num; i++)
3029     {
3030       pollfds[i].fd = listen_fds[i].fd;
3031       pollfds[i].events = POLLIN | POLLPRI;
3032       pollfds[i].revents = 0;
3033     }
3035     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3036     if (state != RUNNING)
3037       break;
3038     else if (status == 0) /* timeout */
3039       continue;
3040     else if (status < 0) /* error */
3041     {
3042       status = errno;
3043       if (status != EINTR)
3044       {
3045         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3046       }
3047       continue;
3048     }
3050     for (i = 0; i < pollfds_num; i++)
3051     {
3052       listen_socket_t *client_sock;
3053       struct sockaddr_storage client_sa;
3054       socklen_t client_sa_size;
3055       pthread_t tid;
3056       pthread_attr_t attr;
3058       if (pollfds[i].revents == 0)
3059         continue;
3061       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3062       {
3063         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3064             "poll(2) returned something unexpected for listen FD #%i.",
3065             pollfds[i].fd);
3066         continue;
3067       }
3069       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3070       if (client_sock == NULL)
3071       {
3072         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3073         continue;
3074       }
3075       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3077       client_sa_size = sizeof (client_sa);
3078       client_sock->fd = accept (pollfds[i].fd,
3079           (struct sockaddr *) &client_sa, &client_sa_size);
3080       if (client_sock->fd < 0)
3081       {
3082         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3083         free(client_sock);
3084         continue;
3085       }
3087       pthread_attr_init (&attr);
3088       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3090       status = pthread_create (&tid, &attr, connection_thread_main,
3091                                client_sock);
3092       if (status != 0)
3093       {
3094         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3095         close_connection(client_sock);
3096         continue;
3097       }
3098     } /* for (pollfds_num) */
3099   } /* while (state == RUNNING) */
3101   RRDD_LOG(LOG_INFO, "starting shutdown");
3103   close_listen_sockets ();
3105   pthread_mutex_lock (&connection_threads_lock);
3106   while (connection_threads_num > 0)
3107     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3108   pthread_mutex_unlock (&connection_threads_lock);
3110   free(pollfds);
3112   return (NULL);
3113 } /* }}} void *listen_thread_main */
3115 static int daemonize (void) /* {{{ */
3117   int pid_fd;
3118   char *base_dir;
3120   daemon_uid = geteuid();
3122   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3123   if (pid_fd < 0)
3124     pid_fd = check_pidfile();
3125   if (pid_fd < 0)
3126     return pid_fd;
3128   /* open all the listen sockets */
3129   if (config_listen_address_list_len > 0)
3130   {
3131     for (size_t i = 0; i < config_listen_address_list_len; i++)
3132       open_listen_socket (config_listen_address_list[i]);
3134     rrd_free_ptrs((void ***) &config_listen_address_list,
3135                   &config_listen_address_list_len);
3136   }
3137   else
3138   {
3139     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3140         sizeof(default_socket.addr) - 1);
3141     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3143     if (default_socket.permissions == 0)
3144       socket_permission_set_all (&default_socket);
3146     open_listen_socket (&default_socket);
3147   }
3149   if (listen_fds_num < 1)
3150   {
3151     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3152     goto error;
3153   }
3155   if (!stay_foreground)
3156   {
3157     pid_t child;
3159     child = fork ();
3160     if (child < 0)
3161     {
3162       fprintf (stderr, "daemonize: fork(2) failed.\n");
3163       goto error;
3164     }
3165     else if (child > 0)
3166       exit(0);
3168     /* Become session leader */
3169     setsid ();
3171     /* Open the first three file descriptors to /dev/null */
3172     close (2);
3173     close (1);
3174     close (0);
3176     open ("/dev/null", O_RDWR);
3177     if (dup(0) == -1 || dup(0) == -1){
3178         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3179     }
3180   } /* if (!stay_foreground) */
3182   /* Change into the /tmp directory. */
3183   base_dir = (config_base_dir != NULL)
3184     ? config_base_dir
3185     : "/tmp";
3187   if (chdir (base_dir) != 0)
3188   {
3189     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3190     goto error;
3191   }
3193   install_signal_handlers();
3195   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3196   RRDD_LOG(LOG_INFO, "starting up");
3198   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3199                                 (GDestroyNotify) free_cache_item);
3200   if (cache_tree == NULL)
3201   {
3202     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3203     goto error;
3204   }
3206   return write_pidfile (pid_fd);
3208 error:
3209   remove_pidfile();
3210   return -1;
3211 } /* }}} int daemonize */
3213 static int cleanup (void) /* {{{ */
3215   pthread_cond_broadcast (&flush_cond);
3216   pthread_join (flush_thread, NULL);
3218   pthread_cond_broadcast (&queue_cond);
3219   for (int i = 0; i < config_queue_threads; i++)
3220     pthread_join (queue_threads[i], NULL);
3222   if (config_flush_at_shutdown)
3223   {
3224     assert(cache_queue_head == NULL);
3225     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3226   }
3228   free(queue_threads);
3229   free(config_base_dir);
3231   pthread_mutex_lock(&cache_lock);
3232   g_tree_destroy(cache_tree);
3234   pthread_mutex_lock(&journal_lock);
3235   journal_done();
3237   RRDD_LOG(LOG_INFO, "goodbye");
3238   closelog ();
3240   remove_pidfile ();
3241   free(config_pid_file);
3243   return (0);
3244 } /* }}} int cleanup */
3246 static int read_options (int argc, char **argv) /* {{{ */
3248   int option;
3249   int status = 0;
3251   socket_permission_clear (&default_socket);
3253   default_socket.socket_group = (gid_t)-1;
3254   default_socket.socket_permissions = (mode_t)-1;
3256   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3257   {
3258     switch (option)
3259     {
3260       case 'O':
3261         opt_no_overwrite = 1;
3262         break;
3264       case 'g':
3265         stay_foreground=1;
3266         break;
3268       case 'l':
3269       {
3270         listen_socket_t *new;
3272         new = malloc(sizeof(listen_socket_t));
3273         if (new == NULL)
3274         {
3275           fprintf(stderr, "read_options: malloc failed.\n");
3276           return(2);
3277         }
3278         memset(new, 0, sizeof(listen_socket_t));
3280         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3282         /* Add permissions to the socket {{{ */
3283         if (default_socket.permissions != 0)
3284         {
3285           socket_permission_copy (new, &default_socket);
3286         }
3287         else /* if (default_socket.permissions == 0) */
3288         {
3289           /* Add permission for ALL commands to the socket. */
3290           socket_permission_set_all (new);
3291         }
3292         /* }}} Done adding permissions. */
3294         new->socket_group = default_socket.socket_group;
3295         new->socket_permissions = default_socket.socket_permissions;
3297         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3298                          &config_listen_address_list_len, new))
3299         {
3300           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3301           return (2);
3302         }
3303       }
3304       break;
3306       /* set socket group permissions */
3307       case 's':
3308       {
3309         gid_t group_gid;
3310         struct group *grp;
3312         group_gid = strtoul(optarg, NULL, 10);
3313         if (errno != EINVAL && group_gid>0)
3314         {
3315           /* we were passed a number */
3316           grp = getgrgid(group_gid);
3317         }
3318         else
3319         {
3320           grp = getgrnam(optarg);
3321         }
3323         if (grp)
3324         {
3325           default_socket.socket_group = grp->gr_gid;
3326         }
3327         else
3328         {
3329           /* no idea what the user wanted... */
3330           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3331           return (5);
3332         }
3333       }
3334       break;
3336       /* set socket file permissions */
3337       case 'm':
3338       {
3339         long  tmp;
3340         char *endptr = NULL;
3342         tmp = strtol (optarg, &endptr, 8);
3343         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3344             || (tmp > 07777) || (tmp < 0)) {
3345           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3346               optarg);
3347           return (5);
3348         }
3350         default_socket.socket_permissions = (mode_t)tmp;
3351       }
3352       break;
3354       case 'P':
3355       {
3356         char *optcopy;
3357         char *saveptr;
3358         char *dummy;
3359         char *ptr;
3361         socket_permission_clear (&default_socket);
3363         optcopy = strdup (optarg);
3364         dummy = optcopy;
3365         saveptr = NULL;
3366         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3367         {
3368           dummy = NULL;
3369           status = socket_permission_add (&default_socket, ptr);
3370           if (status != 0)
3371           {
3372             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3373                 "socket failed. Most likely, this permission doesn't "
3374                 "exist. Check your command line.\n", ptr);
3375             status = 4;
3376           }
3377         }
3379         free (optcopy);
3380       }
3381       break;
3383       case 'f':
3384       {
3385         int temp;
3387         temp = atoi (optarg);
3388         if (temp > 0)
3389           config_flush_interval = temp;
3390         else
3391         {
3392           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3393           status = 3;
3394         }
3395       }
3396       break;
3398       case 'w':
3399       {
3400         int temp;
3402         temp = atoi (optarg);
3403         if (temp > 0)
3404           config_write_interval = temp;
3405         else
3406         {
3407           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3408           status = 2;
3409         }
3410       }
3411       break;
3413       case 'z':
3414       {
3415         int temp;
3417         temp = atoi(optarg);
3418         if (temp > 0)
3419           config_write_jitter = temp;
3420         else
3421         {
3422           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3423           status = 2;
3424         }
3426         break;
3427       }
3429       case 't':
3430       {
3431         int threads;
3432         threads = atoi(optarg);
3433         if (threads >= 1)
3434           config_queue_threads = threads;
3435         else
3436         {
3437           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3438           return 1;
3439         }
3440       }
3441       break;
3443       case 'B':
3444         config_write_base_only = 1;
3445         break;
3447       case 'b':
3448       {
3449         size_t len;
3450         char base_realpath[PATH_MAX];
3452         if (config_base_dir != NULL)
3453           free (config_base_dir);
3454         config_base_dir = strdup (optarg);
3455         if (config_base_dir == NULL)
3456         {
3457           fprintf (stderr, "read_options: strdup failed.\n");
3458           return (3);
3459         }
3461         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3462         {
3463           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3464               config_base_dir, rrd_strerror (errno));
3465           return (3);
3466         }
3468         /* make sure that the base directory is not resolved via
3469          * symbolic links.  this makes some performance-enhancing
3470          * assumptions possible (we don't have to resolve paths
3471          * that start with a "/")
3472          */
3473         if (realpath(config_base_dir, base_realpath) == NULL)
3474         {
3475           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3476               "%s\n", config_base_dir, rrd_strerror(errno));
3477           return 5;
3478         }
3480         len = strlen (config_base_dir);
3481         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3482         {
3483           config_base_dir[len - 1] = 0;
3484           len--;
3485         }
3487         if (len < 1)
3488         {
3489           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3490           return (4);
3491         }
3493         _config_base_dir_len = len;
3495         len = strlen (base_realpath);
3496         while ((len > 0) && (base_realpath[len - 1] == '/'))
3497         {
3498           base_realpath[len - 1] = '\0';
3499           len--;
3500         }
3502         if (strncmp(config_base_dir,
3503                          base_realpath, sizeof(base_realpath)) != 0)
3504         {
3505           fprintf(stderr,
3506                   "Base directory (-b) resolved via file system links!\n"
3507                   "Please consult rrdcached '-b' documentation!\n"
3508                   "Consider specifying the real directory (%s)\n",
3509                   base_realpath);
3510           return 5;
3511         }
3512       }
3513       break;
3515       case 'p':
3516       {
3517         if (config_pid_file != NULL)
3518           free (config_pid_file);
3519         config_pid_file = strdup (optarg);
3520         if (config_pid_file == NULL)
3521         {
3522           fprintf (stderr, "read_options: strdup failed.\n");
3523           return (3);
3524         }
3525       }
3526       break;
3528       case 'F':
3529         config_flush_at_shutdown = 1;
3530         break;
3532       case 'j':
3533       {
3534         char journal_dir_actual[PATH_MAX];
3535         journal_dir = realpath((const char *)optarg, journal_dir_actual);
3536         if (journal_dir)
3537         {
3538           // if we were able to properly resolve the path, lets have a copy
3539           // for use outside this block.
3540           journal_dir = strdup(journal_dir);           
3541           status = rrd_mkdir_p(journal_dir, 0777);
3542           if (status != 0)
3543           {
3544             fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3545                     journal_dir, rrd_strerror(errno));
3546             return 6;
3547           }
3548           if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3549           {
3550             fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3551                     errno ? rrd_strerror(errno) : "");
3552             return 6;
3553           }
3554         } else {
3555           fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3556                   errno ? rrd_strerror(errno) : "");
3557           return 6;
3558         }
3559       }
3560       break;
3562       case 'a':
3563       {
3564         int temp = atoi(optarg);
3565         if (temp > 0)
3566           config_alloc_chunk = temp;
3567         else
3568         {
3569           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3570           return 10;
3571         }
3572       }
3573       break;
3575       case 'h':
3576       case '?':
3577         printf ("RRDCacheD %s\n"
3578             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3579             "\n"
3580             "Usage: rrdcached [options]\n"
3581             "\n"
3582             "Valid options are:\n"
3583             "  -l <address>  Socket address to listen to.\n"
3584             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3585             "  -P <perms>    Sets the permissions to assign to all following "
3586                             "sockets\n"
3587             "  -w <seconds>  Interval in which to write data.\n"
3588             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3589             "  -t <threads>  Number of write threads.\n"
3590             "  -f <seconds>  Interval in which to flush dead data.\n"
3591             "  -p <file>     Location of the PID-file.\n"
3592             "  -b <dir>      Base directory to change to.\n"
3593             "  -B            Restrict file access to paths within -b <dir>\n"
3594             "  -g            Do not fork and run in the foreground.\n"
3595             "  -j <dir>      Directory in which to create the journal files.\n"
3596             "  -F            Always flush all updates at shutdown\n"
3597             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3598             "                (the socket will also have read/write permissions "
3599                             "for that group)\n"
3600             "  -m <mode>     File permissions (octal) of all following UNIX "
3601                             "sockets\n"
3602             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3603             "  -O            Do not allow CREATE commands to overwrite existing\n"
3604             "                files, even if asked to.\n"
3605             "\n"
3606             "For more information and a detailed description of all options "
3607             "please refer\n"
3608             "to the rrdcached(1) manual page.\n",
3609             VERSION);
3610         if (option == 'h')
3611           status = -1;
3612         else
3613           status = 1;
3614         break;
3615     } /* switch (option) */
3616   } /* while (getopt) */
3618   /* advise the user when values are not sane */
3619   if (config_flush_interval < 2 * config_write_interval)
3620     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3621             " 2x write interval (-w) !\n");
3622   if (config_write_jitter > config_write_interval)
3623     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3624             " write interval (-w) !\n");
3626   if (config_write_base_only && config_base_dir == NULL)
3627     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3628             "  Consult the rrdcached documentation\n");
3630   if (journal_dir == NULL)
3631     config_flush_at_shutdown = 1;
3633   return (status);
3634 } /* }}} int read_options */
3636 int main (int argc, char **argv)
3638   int status;
3640   status = read_options (argc, argv);
3641   if (status != 0)
3642   {
3643     if (status < 0)
3644       status = 0;
3645     return (status);
3646   }
3648   status = daemonize ();
3649   if (status != 0)
3650   {
3651     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3652     return (1);
3653   }
3655   journal_init();
3657   /* start the queue threads */
3658   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3659   if (queue_threads == NULL)
3660   {
3661     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3662     cleanup();
3663     return (1);
3664   }
3665   for (int i = 0; i < config_queue_threads; i++)
3666   {
3667     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3668     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3669     if (status != 0)
3670     {
3671       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3672       cleanup();
3673       return (1);
3674     }
3675   }
3677   /* start the flush thread */
3678   memset(&flush_thread, 0, sizeof(flush_thread));
3679   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3680   if (status != 0)
3681   {
3682     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3683     cleanup();
3684     return (1);
3685   }
3687   listen_thread_main (NULL);
3688   cleanup ();
3690   return (0);
3691 } /* int main */
3693 /*
3694  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3695  */