Code

The buffer length for command buffers should be controlled by a single
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120   do { \
121     if (stay_foreground) { \
122       fprintf(stderr, __VA_ARGS__); \
123       fprintf(stderr, "\n"); } \
124     syslog ((severity), __VA_ARGS__); \
125   } while (0)
127 /*
128  * Types
129  */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
134   int fd;
135   char addr[PATH_MAX + 1];
136   int family;
138   /* state for BATCH processing */
139   time_t batch_start;
140   int batch_cmd;
142   /* buffered IO */
143   char *rbuf;
144   off_t next_cmd;
145   off_t next_read;
147   char *wbuf;
148   ssize_t wbuf_len;
150   uint32_t permissions;
152   gid_t  socket_group;
153   mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
161                         time_t UNUSED(now),\
162                         char  UNUSED(*buffer),\
163                         size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
166                         DISPATCH_PROTO
168 struct command_s {
169   char   *cmd;
170   int (*handler)(HANDLER_PROTO);
172   char  context;                /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT      (1<<0)
174 #define CMD_CONTEXT_BATCH       (1<<1)
175 #define CMD_CONTEXT_JOURNAL     (1<<2)
176 #define CMD_CONTEXT_ANY         (0x7f)
178   char *syntax;
179   char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
186   char *file;
187   char **values;
188   size_t values_num;            /* number of valid pointers */
189   size_t values_alloc;          /* number of allocated pointers */
190   time_t last_flush_time;
191   time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE  (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194   int flags;
195   pthread_cond_t  flushed;
196   cache_item_t *prev;
197   cache_item_t *next;
198 };
200 struct callback_flush_data_s
202   time_t now;
203   time_t abs_timeout;
204   char **keys;
205   size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
209 enum queue_side_e
211   HEAD,
212   TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
216 /* describe a set of journal files */
217 typedef struct {
218   char **files;
219   size_t files_num;
220 } journal_set;
222 #define RBUF_SIZE (RRD_CMD_MAX*2)
224 /*
225  * Variables
226  */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
233 static listen_socket_t default_socket;
235 enum {
236   RUNNING,              /* normal operation */
237   FLUSHING,             /* flushing remaining values */
238   SHUTDOWN              /* shutting down */
239 } state = RUNNING;
241 static pthread_t *queue_threads;
242 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
243 static int config_queue_threads = 4;
245 static pthread_t flush_thread;
246 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
248 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
249 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
250 static int connection_threads_num = 0;
252 /* Cache stuff */
253 static GTree          *cache_tree = NULL;
254 static cache_item_t   *cache_queue_head = NULL;
255 static cache_item_t   *cache_queue_tail = NULL;
256 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
258 static int config_write_interval = 300;
259 static int config_write_jitter   = 0;
260 static int config_flush_interval = 3600;
261 static int config_flush_at_shutdown = 0;
262 static char *config_pid_file = NULL;
263 static char *config_base_dir = NULL;
264 static size_t _config_base_dir_len = 0;
265 static int config_write_base_only = 0;
266 static size_t config_alloc_chunk = 1;
268 static listen_socket_t **config_listen_address_list = NULL;
269 static size_t config_listen_address_list_len = 0;
271 static uint64_t stats_queue_length = 0;
272 static uint64_t stats_updates_received = 0;
273 static uint64_t stats_flush_received = 0;
274 static uint64_t stats_updates_written = 0;
275 static uint64_t stats_data_sets_written = 0;
276 static uint64_t stats_journal_bytes = 0;
277 static uint64_t stats_journal_rotate = 0;
278 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
280 static int opt_no_overwrite = 0; /* default for the daemon */
282 /* Journaled updates */
283 #define JOURNAL_REPLAY(s) ((s) == NULL)
284 #define JOURNAL_BASE "rrd.journal"
285 static journal_set *journal_cur = NULL;
286 static journal_set *journal_old = NULL;
287 static char *journal_dir = NULL;
288 static FILE *journal_fh = NULL;         /* current journal file handle */
289 static long  journal_size = 0;          /* current journal size */
290 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
291 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
292 static int journal_write(char *cmd, char *args);
293 static void journal_done(void);
294 static void journal_rotate(void);
296 /* prototypes for forward refernces */
297 static int handle_request_help (HANDLER_PROTO);
299 /* 
300  * Functions
301  */
302 static void sig_common (const char *sig) /* {{{ */
304   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
305   state = FLUSHING;
306   pthread_cond_broadcast(&flush_cond);
307   pthread_cond_broadcast(&queue_cond);
308 } /* }}} void sig_common */
310 static void sig_int_handler (int UNUSED(s)) /* {{{ */
312   sig_common("INT");
313 } /* }}} void sig_int_handler */
315 static void sig_term_handler (int UNUSED(s)) /* {{{ */
317   sig_common("TERM");
318 } /* }}} void sig_term_handler */
320 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
322   config_flush_at_shutdown = 1;
323   sig_common("USR1");
324 } /* }}} void sig_usr1_handler */
326 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
328   config_flush_at_shutdown = 0;
329   sig_common("USR2");
330 } /* }}} void sig_usr2_handler */
332 static void install_signal_handlers(void) /* {{{ */
334   /* These structures are static, because `sigaction' behaves weird if the are
335    * overwritten.. */
336   static struct sigaction sa_int;
337   static struct sigaction sa_term;
338   static struct sigaction sa_pipe;
339   static struct sigaction sa_usr1;
340   static struct sigaction sa_usr2;
342   /* Install signal handlers */
343   memset (&sa_int, 0, sizeof (sa_int));
344   sa_int.sa_handler = sig_int_handler;
345   sigaction (SIGINT, &sa_int, NULL);
347   memset (&sa_term, 0, sizeof (sa_term));
348   sa_term.sa_handler = sig_term_handler;
349   sigaction (SIGTERM, &sa_term, NULL);
351   memset (&sa_pipe, 0, sizeof (sa_pipe));
352   sa_pipe.sa_handler = SIG_IGN;
353   sigaction (SIGPIPE, &sa_pipe, NULL);
355   memset (&sa_pipe, 0, sizeof (sa_usr1));
356   sa_usr1.sa_handler = sig_usr1_handler;
357   sigaction (SIGUSR1, &sa_usr1, NULL);
359   memset (&sa_usr2, 0, sizeof (sa_usr2));
360   sa_usr2.sa_handler = sig_usr2_handler;
361   sigaction (SIGUSR2, &sa_usr2, NULL);
363 } /* }}} void install_signal_handlers */
365 static int open_pidfile(char *action, int oflag) /* {{{ */
367   int fd;
368   const char *file;
369   char *file_copy, *dir;
371   file = (config_pid_file != NULL)
372     ? config_pid_file
373     : LOCALSTATEDIR "/run/rrdcached.pid";
375   /* dirname may modify its argument */
376   file_copy = strdup(file);
377   if (file_copy == NULL)
378   {
379     fprintf(stderr, "rrdcached: strdup(): %s\n",
380         rrd_strerror(errno));
381     return -1;
382   }
384   dir = dirname(file_copy);
385   if (rrd_mkdir_p(dir, 0777) != 0)
386   {
387     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
388         dir, rrd_strerror(errno));
389     return -1;
390   }
392   free(file_copy);
394   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
395   if (fd < 0)
396     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
397             action, file, rrd_strerror(errno));
399   return(fd);
400 } /* }}} static int open_pidfile */
402 /* check existing pid file to see whether a daemon is running */
403 static int check_pidfile(void)
405   int pid_fd;
406   pid_t pid;
407   char pid_str[16];
409   pid_fd = open_pidfile("open", O_RDWR);
410   if (pid_fd < 0)
411     return pid_fd;
413   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
414     return -1;
416   pid = atoi(pid_str);
417   if (pid <= 0)
418     return -1;
420   /* another running process that we can signal COULD be
421    * a competing rrdcached */
422   if (pid != getpid() && kill(pid, 0) == 0)
423   {
424     fprintf(stderr,
425             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
426     close(pid_fd);
427     return -1;
428   }
430   lseek(pid_fd, 0, SEEK_SET);
431   if (ftruncate(pid_fd, 0) == -1)
432   {
433     fprintf(stderr,
434             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
435     close(pid_fd);
436     return -1;
437   }
439   fprintf(stderr,
440           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
441           "rrdcached: starting normally.\n", pid);
443   return pid_fd;
444 } /* }}} static int check_pidfile */
446 static int write_pidfile (int fd) /* {{{ */
448   pid_t pid;
449   FILE *fh;
451   pid = getpid ();
453   fh = fdopen (fd, "w");
454   if (fh == NULL)
455   {
456     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
457     close(fd);
458     return (-1);
459   }
461   fprintf (fh, "%i\n", (int) pid);
462   fclose (fh);
464   return (0);
465 } /* }}} int write_pidfile */
467 static int remove_pidfile (void) /* {{{ */
469   char *file;
470   int status;
472   file = (config_pid_file != NULL)
473     ? config_pid_file
474     : LOCALSTATEDIR "/run/rrdcached.pid";
476   status = unlink (file);
477   if (status == 0)
478     return (0);
479   return (errno);
480 } /* }}} int remove_pidfile */
482 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
484   char *eol;
486   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
487                sock->next_read - sock->next_cmd);
489   if (eol == NULL)
490   {
491     /* no commands left, move remainder back to front of rbuf */
492     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
493             sock->next_read - sock->next_cmd);
494     sock->next_read -= sock->next_cmd;
495     sock->next_cmd = 0;
496     *len = 0;
497     return NULL;
498   }
499   else
500   {
501     char *cmd = sock->rbuf + sock->next_cmd;
502     *eol = '\0';
504     sock->next_cmd = eol - sock->rbuf + 1;
506     if (eol > sock->rbuf && *(eol-1) == '\r')
507       *(--eol) = '\0'; /* handle "\r\n" EOL */
509     *len = eol - cmd;
511     return cmd;
512   }
514   /* NOTREACHED */
515   assert(1==0);
516 } /* }}} char *next_cmd */
518 /* add the characters directly to the write buffer */
519 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
521   char *new_buf;
523   assert(sock != NULL);
525   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
526   if (new_buf == NULL)
527   {
528     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
529     return -1;
530   }
532   strncpy(new_buf + sock->wbuf_len, str, len + 1);
534   sock->wbuf = new_buf;
535   sock->wbuf_len += len;
537   return 0;
538 } /* }}} static int add_to_wbuf */
540 /* add the text to the "extra" info that's sent after the status line */
541 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
543   va_list argp;
544   char buffer[RRD_CMD_MAX];
545   int len;
547   if (JOURNAL_REPLAY(sock)) return 0;
548   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
550   va_start(argp, fmt);
551 #ifdef HAVE_VSNPRINTF
552   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
553 #else
554   len = vsprintf(buffer, fmt, argp);
555 #endif
556   va_end(argp);
557   if (len < 0)
558   {
559     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
560     return -1;
561   }
563   return add_to_wbuf(sock, buffer, len);
564 } /* }}} static int add_response_info */
566 static int count_lines(char *str) /* {{{ */
568   int lines = 0;
570   if (str != NULL)
571   {
572     while ((str = strchr(str, '\n')) != NULL)
573     {
574       ++lines;
575       ++str;
576     }
577   }
579   return lines;
580 } /* }}} static int count_lines */
582 /* send the response back to the user.
583  * returns 0 on success, -1 on error
584  * write buffer is always zeroed after this call */
585 static int send_response (listen_socket_t *sock, response_code rc,
586                           char *fmt, ...) /* {{{ */
588   va_list argp;
589   char buffer[RRD_CMD_MAX];
590   int lines;
591   ssize_t wrote;
592   int rclen, len;
594   if (JOURNAL_REPLAY(sock)) return rc;
596   if (sock->batch_start)
597   {
598     if (rc == RESP_OK)
599       return rc; /* no response on success during BATCH */
600     lines = sock->batch_cmd;
601   }
602   else if (rc == RESP_OK)
603     lines = count_lines(sock->wbuf);
604   else
605     lines = -1;
607   rclen = sprintf(buffer, "%d ", lines);
608   va_start(argp, fmt);
609 #ifdef HAVE_VSNPRINTF
610   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
611 #else
612   len = vsprintf(buffer+rclen, fmt, argp);
613 #endif
614   va_end(argp);
615   if (len < 0)
616     return -1;
618   len += rclen;
620   /* append the result to the wbuf, don't write to the user */
621   if (sock->batch_start)
622     return add_to_wbuf(sock, buffer, len);
624   /* first write must be complete */
625   if (len != write(sock->fd, buffer, len))
626   {
627     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
628     return -1;
629   }
631   if (sock->wbuf != NULL && rc == RESP_OK)
632   {
633     wrote = 0;
634     while (wrote < sock->wbuf_len)
635     {
636       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
637       if (wb <= 0)
638       {
639         RRDD_LOG(LOG_INFO, "send_response: could not write results");
640         return -1;
641       }
642       wrote += wb;
643     }
644   }
646   free(sock->wbuf); sock->wbuf = NULL;
647   sock->wbuf_len = 0;
649   return 0;
650 } /* }}} */
652 static void wipe_ci_values(cache_item_t *ci, time_t when)
654   ci->values = NULL;
655   ci->values_num = 0;
656   ci->values_alloc = 0;
658   ci->last_flush_time = when;
659   if (config_write_jitter > 0)
660     ci->last_flush_time += (rrd_random() % config_write_jitter);
663 /* remove_from_queue
664  * remove a "cache_item_t" item from the queue.
665  * must hold 'cache_lock' when calling this
666  */
667 static void remove_from_queue(cache_item_t *ci) /* {{{ */
669   if (ci == NULL) return;
670   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
672   if (ci->prev == NULL)
673     cache_queue_head = ci->next; /* reset head */
674   else
675     ci->prev->next = ci->next;
677   if (ci->next == NULL)
678     cache_queue_tail = ci->prev; /* reset the tail */
679   else
680     ci->next->prev = ci->prev;
682   ci->next = ci->prev = NULL;
683   ci->flags &= ~CI_FLAGS_IN_QUEUE;
685   pthread_mutex_lock (&stats_lock);
686   assert (stats_queue_length > 0);
687   stats_queue_length--;
688   pthread_mutex_unlock (&stats_lock);
690 } /* }}} static void remove_from_queue */
692 /* free the resources associated with the cache_item_t
693  * must hold cache_lock when calling this function
694  */
695 static void *free_cache_item(cache_item_t *ci) /* {{{ */
697   if (ci == NULL) return NULL;
699   remove_from_queue(ci);
701   for (size_t i=0; i < ci->values_num; i++)
702     free(ci->values[i]);
704   free (ci->values);
705   free (ci->file);
707   /* in case anyone is waiting */
708   pthread_cond_broadcast(&ci->flushed);
709   pthread_cond_destroy(&ci->flushed);
711   free (ci);
713   return NULL;
714 } /* }}} static void *free_cache_item */
716 /*
717  * enqueue_cache_item:
718  * `cache_lock' must be acquired before calling this function!
719  */
720 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
721     queue_side_t side)
723   if (ci == NULL)
724     return (-1);
726   if (ci->values_num == 0)
727     return (0);
729   if (side == HEAD)
730   {
731     if (cache_queue_head == ci)
732       return 0;
734     /* remove if further down in queue */
735     remove_from_queue(ci);
737     ci->prev = NULL;
738     ci->next = cache_queue_head;
739     if (ci->next != NULL)
740       ci->next->prev = ci;
741     cache_queue_head = ci;
743     if (cache_queue_tail == NULL)
744       cache_queue_tail = cache_queue_head;
745   }
746   else /* (side == TAIL) */
747   {
748     /* We don't move values back in the list.. */
749     if (ci->flags & CI_FLAGS_IN_QUEUE)
750       return (0);
752     assert (ci->next == NULL);
753     assert (ci->prev == NULL);
755     ci->prev = cache_queue_tail;
757     if (cache_queue_tail == NULL)
758       cache_queue_head = ci;
759     else
760       cache_queue_tail->next = ci;
762     cache_queue_tail = ci;
763   }
765   ci->flags |= CI_FLAGS_IN_QUEUE;
767   pthread_cond_signal(&queue_cond);
768   pthread_mutex_lock (&stats_lock);
769   stats_queue_length++;
770   pthread_mutex_unlock (&stats_lock);
772   return (0);
773 } /* }}} int enqueue_cache_item */
775 /*
776  * tree_callback_flush:
777  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
778  * while this is in progress.
779  */
780 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
781     gpointer data)
783   cache_item_t *ci;
784   callback_flush_data_t *cfd;
786   ci = (cache_item_t *) value;
787   cfd = (callback_flush_data_t *) data;
789   if (ci->flags & CI_FLAGS_IN_QUEUE)
790     return FALSE;
792   if (ci->values_num > 0
793       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
794   {
795     enqueue_cache_item (ci, TAIL);
796   }
797   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
798       && (ci->values_num <= 0))
799   {
800     assert ((char *) key == ci->file);
801     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
802     {
803       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
804       return (FALSE);
805     }
806   }
808   return (FALSE);
809 } /* }}} gboolean tree_callback_flush */
811 static int flush_old_values (int max_age)
813   callback_flush_data_t cfd;
814   size_t k;
816   memset (&cfd, 0, sizeof (cfd));
817   /* Pass the current time as user data so that we don't need to call
818    * `time' for each node. */
819   cfd.now = time (NULL);
820   cfd.keys = NULL;
821   cfd.keys_num = 0;
823   if (max_age > 0)
824     cfd.abs_timeout = cfd.now - max_age;
825   else
826     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
828   /* `tree_callback_flush' will return the keys of all values that haven't
829    * been touched in the last `config_flush_interval' seconds in `cfd'.
830    * The char*'s in this array point to the same memory as ci->file, so we
831    * don't need to free them separately. */
832   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
834   for (k = 0; k < cfd.keys_num; k++)
835   {
836     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
837     /* should never fail, since we have held the cache_lock
838      * the entire time */
839     assert(status == TRUE);
840   }
842   if (cfd.keys != NULL)
843   {
844     free (cfd.keys);
845     cfd.keys = NULL;
846   }
848   return (0);
849 } /* int flush_old_values */
851 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
853   struct timeval now;
854   struct timespec next_flush;
855   int status;
857   gettimeofday (&now, NULL);
858   next_flush.tv_sec = now.tv_sec + config_flush_interval;
859   next_flush.tv_nsec = 1000 * now.tv_usec;
861   pthread_mutex_lock(&cache_lock);
863   while (state == RUNNING)
864   {
865     gettimeofday (&now, NULL);
866     if ((now.tv_sec > next_flush.tv_sec)
867         || ((now.tv_sec == next_flush.tv_sec)
868           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
869     {
870       RRDD_LOG(LOG_DEBUG, "flushing old values");
872       /* Determine the time of the next cache flush. */
873       next_flush.tv_sec = now.tv_sec + config_flush_interval;
875       /* Flush all values that haven't been written in the last
876        * `config_write_interval' seconds. */
877       flush_old_values (config_write_interval);
879       /* unlock the cache while we rotate so we don't block incoming
880        * updates if the fsync() blocks on disk I/O */
881       pthread_mutex_unlock(&cache_lock);
882       journal_rotate();
883       pthread_mutex_lock(&cache_lock);
884     }
886     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
887     if (status != 0 && status != ETIMEDOUT)
888     {
889       RRDD_LOG (LOG_ERR, "flush_thread_main: "
890                 "pthread_cond_timedwait returned %i.", status);
891     }
892   }
894   if (config_flush_at_shutdown)
895     flush_old_values (-1); /* flush everything */
897   state = SHUTDOWN;
899   pthread_mutex_unlock(&cache_lock);
901   return NULL;
902 } /* void *flush_thread_main */
904 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
906   pthread_mutex_lock (&cache_lock);
908   while (state != SHUTDOWN
909          || (cache_queue_head != NULL && config_flush_at_shutdown))
910   {
911     cache_item_t *ci;
912     char *file;
913     char **values;
914     size_t values_num;
915     int status;
917     /* Now, check if there's something to store away. If not, wait until
918      * something comes in. */
919     if (cache_queue_head == NULL)
920     {
921       status = pthread_cond_wait (&queue_cond, &cache_lock);
922       if ((status != 0) && (status != ETIMEDOUT))
923       {
924         RRDD_LOG (LOG_ERR, "queue_thread_main: "
925             "pthread_cond_wait returned %i.", status);
926       }
927     }
929     /* Check if a value has arrived. This may be NULL if we timed out or there
930      * was an interrupt such as a signal. */
931     if (cache_queue_head == NULL)
932       continue;
934     ci = cache_queue_head;
936     /* copy the relevant parts */
937     file = strdup (ci->file);
938     if (file == NULL)
939     {
940       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
941       continue;
942     }
944     assert(ci->values != NULL);
945     assert(ci->values_num > 0);
947     values = ci->values;
948     values_num = ci->values_num;
950     wipe_ci_values(ci, time(NULL));
951     remove_from_queue(ci);
953     pthread_mutex_unlock (&cache_lock);
955     rrd_clear_error ();
956     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
957     if (status != 0)
958     {
959       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
960           "rrd_update_r (%s) failed with status %i. (%s)",
961           file, status, rrd_get_error());
962     }
964     journal_write("wrote", file);
966     /* Search again in the tree.  It's possible someone issued a "FORGET"
967      * while we were writing the update values. */
968     pthread_mutex_lock(&cache_lock);
969     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
970     if (ci)
971       pthread_cond_broadcast(&ci->flushed);
972     pthread_mutex_unlock(&cache_lock);
974     if (status == 0)
975     {
976       pthread_mutex_lock (&stats_lock);
977       stats_updates_written++;
978       stats_data_sets_written += values_num;
979       pthread_mutex_unlock (&stats_lock);
980     }
982     rrd_free_ptrs((void ***) &values, &values_num);
983     free(file);
985     pthread_mutex_lock (&cache_lock);
986   }
987   pthread_mutex_unlock (&cache_lock);
989   return (NULL);
990 } /* }}} void *queue_thread_main */
992 static int buffer_get_field (char **buffer_ret, /* {{{ */
993     size_t *buffer_size_ret, char **field_ret)
995   char *buffer;
996   size_t buffer_pos;
997   size_t buffer_size;
998   char *field;
999   size_t field_size;
1000   int status;
1002   buffer = *buffer_ret;
1003   buffer_pos = 0;
1004   buffer_size = *buffer_size_ret;
1005   field = *buffer_ret;
1006   field_size = 0;
1008   if (buffer_size <= 0)
1009     return (-1);
1011   /* This is ensured by `handle_request'. */
1012   assert (buffer[buffer_size - 1] == '\0');
1014   status = -1;
1015   while (buffer_pos < buffer_size)
1016   {
1017     /* Check for end-of-field or end-of-buffer */
1018     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1019     {
1020       field[field_size] = 0;
1021       field_size++;
1022       buffer_pos++;
1023       status = 0;
1024       break;
1025     }
1026     /* Handle escaped characters. */
1027     else if (buffer[buffer_pos] == '\\')
1028     {
1029       if (buffer_pos >= (buffer_size - 1))
1030         break;
1031       buffer_pos++;
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036     /* Normal operation */ 
1037     else
1038     {
1039       field[field_size] = buffer[buffer_pos];
1040       field_size++;
1041       buffer_pos++;
1042     }
1043   } /* while (buffer_pos < buffer_size) */
1045   if (status != 0)
1046     return (status);
1048   *buffer_ret = buffer + buffer_pos;
1049   *buffer_size_ret = buffer_size - buffer_pos;
1050   *field_ret = field;
1052   return (0);
1053 } /* }}} int buffer_get_field */
1055 /* if we're restricting writes to the base directory,
1056  * check whether the file falls within the dir
1057  * returns 1 if OK, otherwise 0
1058  */
1059 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1061   assert(file != NULL);
1063   if (!config_write_base_only
1064       || JOURNAL_REPLAY(sock)
1065       || config_base_dir == NULL)
1066     return 1;
1068   if (strstr(file, "../") != NULL) goto err;
1070   /* relative paths without "../" are ok */
1071   if (*file != '/') return 1;
1073   /* file must be of the format base + "/" + <1+ char filename> */
1074   if (strlen(file) < _config_base_dir_len + 2) goto err;
1075   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1076   if (*(file + _config_base_dir_len) != '/') goto err;
1078   return 1;
1080 err:
1081   if (sock != NULL && sock->fd >= 0)
1082     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1084   return 0;
1085 } /* }}} static int check_file_access */
1087 /* when using a base dir, convert relative paths to absolute paths.
1088  * if necessary, modifies the "filename" pointer to point
1089  * to the new path created in "tmp".  "tmp" is provided
1090  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1091  *
1092  * this allows us to optimize for the expected case (absolute path)
1093  * with a no-op.
1094  */
1095 static void get_abs_path(char **filename, char *tmp)
1097   assert(tmp != NULL);
1098   assert(filename != NULL && *filename != NULL);
1100   if (config_base_dir == NULL || **filename == '/')
1101     return;
1103   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1104   *filename = tmp;
1105 } /* }}} static int get_abs_path */
1107 static int flush_file (const char *filename) /* {{{ */
1109   cache_item_t *ci;
1111   pthread_mutex_lock (&cache_lock);
1113   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1114   if (ci == NULL)
1115   {
1116     pthread_mutex_unlock (&cache_lock);
1117     return (ENOENT);
1118   }
1120   if (ci->values_num > 0)
1121   {
1122     /* Enqueue at head */
1123     enqueue_cache_item (ci, HEAD);
1124     pthread_cond_wait(&ci->flushed, &cache_lock);
1125   }
1127   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1128    * may have been purged during our cond_wait() */
1130   pthread_mutex_unlock(&cache_lock);
1132   return (0);
1133 } /* }}} int flush_file */
1135 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1137   char *err = "Syntax error.\n";
1139   if (cmd && cmd->syntax)
1140     err = cmd->syntax;
1142   return send_response(sock, RESP_ERR, "Usage: %s", err);
1143 } /* }}} static int syntax_error() */
1145 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1147   uint64_t copy_queue_length;
1148   uint64_t copy_updates_received;
1149   uint64_t copy_flush_received;
1150   uint64_t copy_updates_written;
1151   uint64_t copy_data_sets_written;
1152   uint64_t copy_journal_bytes;
1153   uint64_t copy_journal_rotate;
1155   uint64_t tree_nodes_number;
1156   uint64_t tree_depth;
1158   pthread_mutex_lock (&stats_lock);
1159   copy_queue_length       = stats_queue_length;
1160   copy_updates_received   = stats_updates_received;
1161   copy_flush_received     = stats_flush_received;
1162   copy_updates_written    = stats_updates_written;
1163   copy_data_sets_written  = stats_data_sets_written;
1164   copy_journal_bytes      = stats_journal_bytes;
1165   copy_journal_rotate     = stats_journal_rotate;
1166   pthread_mutex_unlock (&stats_lock);
1168   pthread_mutex_lock (&cache_lock);
1169   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1170   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1171   pthread_mutex_unlock (&cache_lock);
1173   add_response_info(sock,
1174                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1175   add_response_info(sock,
1176                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1177   add_response_info(sock,
1178                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1179   add_response_info(sock,
1180                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1181   add_response_info(sock,
1182                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1183   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1184   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1185   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1186   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1188   send_response(sock, RESP_OK, "Statistics follow\n");
1190   return (0);
1191 } /* }}} int handle_request_stats */
1193 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1195   char *file, file_tmp[PATH_MAX];
1196   int status;
1198   status = buffer_get_field (&buffer, &buffer_size, &file);
1199   if (status != 0)
1200   {
1201     return syntax_error(sock,cmd);
1202   }
1203   else
1204   {
1205     pthread_mutex_lock(&stats_lock);
1206     stats_flush_received++;
1207     pthread_mutex_unlock(&stats_lock);
1209     get_abs_path(&file, file_tmp);
1210     if (!check_file_access(file, sock)) return 0;
1212     status = flush_file (file);
1213     if (status == 0)
1214       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1215     else if (status == ENOENT)
1216     {
1217       /* no file in our tree; see whether it exists at all */
1218       struct stat statbuf;
1220       memset(&statbuf, 0, sizeof(statbuf));
1221       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1222         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1223       else
1224         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1225     }
1226     else if (status < 0)
1227       return send_response(sock, RESP_ERR, "Internal error.\n");
1228     else
1229       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1230   }
1232   /* NOTREACHED */
1233   assert(1==0);
1234 } /* }}} int handle_request_flush */
1236 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1238   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1240   pthread_mutex_lock(&cache_lock);
1241   flush_old_values(-1);
1242   pthread_mutex_unlock(&cache_lock);
1244   return send_response(sock, RESP_OK, "Started flush.\n");
1245 } /* }}} static int handle_request_flushall */
1247 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1249   int status;
1250   char *file, file_tmp[PATH_MAX];
1251   cache_item_t *ci;
1253   status = buffer_get_field(&buffer, &buffer_size, &file);
1254   if (status != 0)
1255     return syntax_error(sock,cmd);
1257   get_abs_path(&file, file_tmp);
1259   pthread_mutex_lock(&cache_lock);
1260   ci = g_tree_lookup(cache_tree, file);
1261   if (ci == NULL)
1262   {
1263     pthread_mutex_unlock(&cache_lock);
1264     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265   }
1267   for (size_t i=0; i < ci->values_num; i++)
1268     add_response_info(sock, "%s\n", ci->values[i]);
1270   pthread_mutex_unlock(&cache_lock);
1271   return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1274 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1276   int status;
1277   gboolean found;
1278   char *file, file_tmp[PATH_MAX];
1280   status = buffer_get_field(&buffer, &buffer_size, &file);
1281   if (status != 0)
1282     return syntax_error(sock,cmd);
1284   get_abs_path(&file, file_tmp);
1285   if (!check_file_access(file, sock)) return 0;
1287   pthread_mutex_lock(&cache_lock);
1288   found = g_tree_remove(cache_tree, file);
1289   pthread_mutex_unlock(&cache_lock);
1291   if (found == TRUE)
1292   {
1293     if (!JOURNAL_REPLAY(sock))
1294       journal_write("forget", file);
1296     return send_response(sock, RESP_OK, "Gone!\n");
1297   }
1298   else
1299     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1301   /* NOTREACHED */
1302   assert(1==0);
1303 } /* }}} static int handle_request_forget */
1305 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1307   cache_item_t *ci;
1309   pthread_mutex_lock(&cache_lock);
1311   ci = cache_queue_head;
1312   while (ci != NULL)
1313   {
1314     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1315     ci = ci->next;
1316   }
1318   pthread_mutex_unlock(&cache_lock);
1320   return send_response(sock, RESP_OK, "in queue.\n");
1321 } /* }}} int handle_request_queue */
1323 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1325   char *file, file_tmp[PATH_MAX];
1326   int values_num = 0;
1327   int status;
1328   char orig_buf[RRD_CMD_MAX];
1330   cache_item_t *ci;
1332   /* save it for the journal later */
1333   if (!JOURNAL_REPLAY(sock))
1334     strncpy(orig_buf, buffer, buffer_size);
1336   status = buffer_get_field (&buffer, &buffer_size, &file);
1337   if (status != 0)
1338     return syntax_error(sock,cmd);
1340   pthread_mutex_lock(&stats_lock);
1341   stats_updates_received++;
1342   pthread_mutex_unlock(&stats_lock);
1344   get_abs_path(&file, file_tmp);
1345   if (!check_file_access(file, sock)) return 0;
1347   pthread_mutex_lock (&cache_lock);
1348   ci = g_tree_lookup (cache_tree, file);
1350   if (ci == NULL) /* {{{ */
1351   {
1352     struct stat statbuf;
1353     cache_item_t *tmp;
1355     /* don't hold the lock while we setup; stat(2) might block */
1356     pthread_mutex_unlock(&cache_lock);
1358     memset (&statbuf, 0, sizeof (statbuf));
1359     status = stat (file, &statbuf);
1360     if (status != 0)
1361     {
1362       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1364       status = errno;
1365       if (status == ENOENT)
1366         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1367       else
1368         return send_response(sock, RESP_ERR,
1369                              "stat failed with error %i.\n", status);
1370     }
1371     if (!S_ISREG (statbuf.st_mode))
1372       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1374     if (access(file, R_OK|W_OK) != 0)
1375       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1376                            file, rrd_strerror(errno));
1378     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1379     if (ci == NULL)
1380     {
1381       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1383       return send_response(sock, RESP_ERR, "malloc failed.\n");
1384     }
1385     memset (ci, 0, sizeof (cache_item_t));
1387     ci->file = strdup (file);
1388     if (ci->file == NULL)
1389     {
1390       free (ci);
1391       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1393       return send_response(sock, RESP_ERR, "strdup failed.\n");
1394     }
1396     wipe_ci_values(ci, now);
1397     ci->flags = CI_FLAGS_IN_TREE;
1398     pthread_cond_init(&ci->flushed, NULL);
1400     pthread_mutex_lock(&cache_lock);
1402     /* another UPDATE might have added this entry in the meantime */
1403     tmp = g_tree_lookup (cache_tree, file);
1404     if (tmp == NULL)
1405       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1406     else
1407     {
1408       free_cache_item (ci);
1409       ci = tmp;
1410     }
1412     /* state may have changed while we were unlocked */
1413     if (state == SHUTDOWN)
1414       return -1;
1415   } /* }}} */
1416   assert (ci != NULL);
1418   /* don't re-write updates in replay mode */
1419   if (!JOURNAL_REPLAY(sock))
1420     journal_write("update", orig_buf);
1422   while (buffer_size > 0)
1423   {
1424     char *value;
1425     time_t stamp;
1426     char *eostamp;
1428     status = buffer_get_field (&buffer, &buffer_size, &value);
1429     if (status != 0)
1430     {
1431       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1432       break;
1433     }
1435     /* make sure update time is always moving forward */
1436     stamp = strtol(value, &eostamp, 10);
1437     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1438     {
1439       pthread_mutex_unlock(&cache_lock);
1440       return send_response(sock, RESP_ERR,
1441                            "Cannot find timestamp in '%s'!\n", value);
1442     }
1443     else if (stamp <= ci->last_update_stamp)
1444     {
1445       pthread_mutex_unlock(&cache_lock);
1446       return send_response(sock, RESP_ERR,
1447                            "illegal attempt to update using time %ld when last"
1448                            " update time is %ld (minimum one second step)\n",
1449                            stamp, ci->last_update_stamp);
1450     }
1451     else
1452       ci->last_update_stamp = stamp;
1454     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1455                               &ci->values_alloc, config_alloc_chunk))
1456     {
1457       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1458       continue;
1459     }
1461     values_num++;
1462   }
1464   if (((now - ci->last_flush_time) >= config_write_interval)
1465       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1466       && (ci->values_num > 0))
1467   {
1468     enqueue_cache_item (ci, TAIL);
1469   }
1471   pthread_mutex_unlock (&cache_lock);
1473   if (values_num < 1)
1474     return send_response(sock, RESP_ERR, "No values updated.\n");
1475   else
1476     return send_response(sock, RESP_OK,
1477                          "errors, enqueued %i value(s).\n", values_num);
1479   /* NOTREACHED */
1480   assert(1==0);
1482 } /* }}} int handle_request_update */
1484 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1486   char *file, file_tmp[PATH_MAX];
1487   char *cf;
1489   char *start_str;
1490   char *end_str;
1491   time_t start_tm;
1492   time_t end_tm;
1494   unsigned long step;
1495   unsigned long ds_cnt;
1496   char **ds_namv;
1497   rrd_value_t *data;
1499   int status;
1500   unsigned long i;
1501   time_t t;
1502   rrd_value_t *data_ptr;
1504   file = NULL;
1505   cf = NULL;
1506   start_str = NULL;
1507   end_str = NULL;
1509   /* Read the arguments */
1510   do /* while (0) */
1511   {
1512     status = buffer_get_field (&buffer, &buffer_size, &file);
1513     if (status != 0)
1514       break;
1516     status = buffer_get_field (&buffer, &buffer_size, &cf);
1517     if (status != 0)
1518       break;
1520     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1521     if (status != 0)
1522     {
1523       start_str = NULL;
1524       status = 0;
1525       break;
1526     }
1528     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1529     if (status != 0)
1530     {
1531       end_str = NULL;
1532       status = 0;
1533       break;
1534     }
1535   } while (0);
1537   if (status != 0)
1538     return (syntax_error(sock,cmd));
1540   get_abs_path(&file, file_tmp);
1541   if (!check_file_access(file, sock)) return 0;
1543   status = flush_file (file);
1544   if ((status != 0) && (status != ENOENT))
1545     return (send_response (sock, RESP_ERR,
1546           "flush_file (%s) failed with status %i.\n", file, status));
1548   t = time (NULL); /* "now" */
1550   /* Parse start time */
1551   if (start_str != NULL)
1552   {
1553     char *endptr;
1554     long value;
1556     endptr = NULL;
1557     errno = 0;
1558     value = strtol (start_str, &endptr, /* base = */ 0);
1559     if ((endptr == start_str) || (errno != 0))
1560       return (send_response(sock, RESP_ERR,
1561             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1562             start_str));
1564     if (value > 0)
1565       start_tm = (time_t) value;
1566     else
1567       start_tm = (time_t) (t + value);
1568   }
1569   else
1570   {
1571     start_tm = t - 86400;
1572   }
1574   /* Parse end time */
1575   if (end_str != NULL)
1576   {
1577     char *endptr;
1578     long value;
1580     endptr = NULL;
1581     errno = 0;
1582     value = strtol (end_str, &endptr, /* base = */ 0);
1583     if ((endptr == end_str) || (errno != 0))
1584       return (send_response(sock, RESP_ERR,
1585             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1586             end_str));
1588     if (value > 0)
1589       end_tm = (time_t) value;
1590     else
1591       end_tm = (time_t) (t + value);
1592   }
1593   else
1594   {
1595     end_tm = t;
1596   }
1598   step = -1;
1599   ds_cnt = 0;
1600   ds_namv = NULL;
1601   data = NULL;
1603   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1604       &ds_cnt, &ds_namv, &data);
1605   if (status != 0)
1606     return (send_response(sock, RESP_ERR,
1607           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1609   add_response_info (sock, "FlushVersion: %lu\n", 1);
1610   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1611   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1612   add_response_info (sock, "Step: %lu\n", step);
1613   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1615 #define SSTRCAT(buffer,str,buffer_fill) do { \
1616     size_t str_len = strlen (str); \
1617     if ((buffer_fill + str_len) > sizeof (buffer)) \
1618       str_len = sizeof (buffer) - buffer_fill; \
1619     if (str_len > 0) { \
1620       strncpy (buffer + buffer_fill, str, str_len); \
1621       buffer_fill += str_len; \
1622       assert (buffer_fill <= sizeof (buffer)); \
1623       if (buffer_fill == sizeof (buffer)) \
1624         buffer[buffer_fill - 1] = 0; \
1625       else \
1626         buffer[buffer_fill] = 0; \
1627     } \
1628   } while (0)
1630   { /* Add list of DS names */
1631     char linebuf[1024];
1632     size_t linebuf_fill;
1634     memset (linebuf, 0, sizeof (linebuf));
1635     linebuf_fill = 0;
1636     for (i = 0; i < ds_cnt; i++)
1637     {
1638       if (i > 0)
1639         SSTRCAT (linebuf, " ", linebuf_fill);
1640       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1641       rrd_freemem(ds_namv[i]);
1642     }
1643     rrd_freemem(ds_namv);
1644     add_response_info (sock, "DSName: %s\n", linebuf);
1645   }
1647   /* Add the actual data */
1648   assert (step > 0);
1649   data_ptr = data;
1650   for (t = start_tm + step; t <= end_tm; t += step)
1651   {
1652     char linebuf[1024];
1653     size_t linebuf_fill;
1654     char tmp[128];
1656     memset (linebuf, 0, sizeof (linebuf));
1657     linebuf_fill = 0;
1658     for (i = 0; i < ds_cnt; i++)
1659     {
1660       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1661       tmp[sizeof (tmp) - 1] = 0;
1662       SSTRCAT (linebuf, tmp, linebuf_fill);
1664       data_ptr++;
1665     }
1667     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1668   } /* for (t) */
1669   rrd_freemem(data);
1671   return (send_response (sock, RESP_OK, "Success\n"));
1672 #undef SSTRCAT
1673 } /* }}} int handle_request_fetch */
1675 /* we came across a "WROTE" entry during journal replay.
1676  * throw away any values that we have accumulated for this file
1677  */
1678 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1680   cache_item_t *ci;
1681   const char *file = buffer;
1683   pthread_mutex_lock(&cache_lock);
1685   ci = g_tree_lookup(cache_tree, file);
1686   if (ci == NULL)
1687   {
1688     pthread_mutex_unlock(&cache_lock);
1689     return (0);
1690   }
1692   if (ci->values)
1693     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1695   wipe_ci_values(ci, now);
1696   remove_from_queue(ci);
1698   pthread_mutex_unlock(&cache_lock);
1699   return (0);
1700 } /* }}} int handle_request_wrote */
1702 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1704   char *file, file_tmp[PATH_MAX];
1705   int status;
1706   rrd_info_t *info;
1708   /* obtain filename */
1709   status = buffer_get_field(&buffer, &buffer_size, &file);
1710   if (status != 0)
1711     return syntax_error(sock,cmd);
1712   /* get full pathname */
1713   get_abs_path(&file, file_tmp);
1714   if (!check_file_access(file, sock)) {
1715     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1716   }
1717   /* get data */
1718   rrd_clear_error ();
1719   info = rrd_info_r(file);
1720   if(!info) {
1721     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1722   }
1723   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1724       switch (data->type) {
1725       case RD_I_VAL:
1726           if (isnan(data->value.u_val))
1727               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1728           else
1729               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1730           break;
1731       case RD_I_CNT:
1732           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1733           break;
1734       case RD_I_INT:
1735           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1736           break;
1737       case RD_I_STR:
1738           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1739           break;
1740       case RD_I_BLO:
1741           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1742           break;
1743       }
1744   }
1746   rrd_info_free(info);
1748   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1749 } /* }}} static int handle_request_info  */
1751 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1753   char *i, *file, file_tmp[PATH_MAX];
1754   int status;
1755   int idx;
1756   time_t t;
1758   /* obtain filename */
1759   status = buffer_get_field(&buffer, &buffer_size, &file);
1760   if (status != 0)
1761     return syntax_error(sock,cmd);
1762   /* get full pathname */
1763   get_abs_path(&file, file_tmp);
1764   if (!check_file_access(file, sock)) {
1765     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1766   }
1768   status = buffer_get_field(&buffer, &buffer_size, &i);
1769   if (status != 0)
1770     return syntax_error(sock,cmd);
1771   idx = atoi(i);
1772   if(idx<0) { 
1773     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1774   }
1776   /* get data */
1777   rrd_clear_error ();
1778   t = rrd_first_r(file,idx);
1779   if(t<1) {
1780     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1781   }
1782   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1783 } /* }}} static int handle_request_first  */
1786 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1788   char *file, file_tmp[PATH_MAX];
1789   int status;
1790   time_t t, from_file, step;
1791   rrd_file_t * rrd_file;
1792   cache_item_t * ci;
1793   rrd_t rrd; 
1795   /* obtain filename */
1796   status = buffer_get_field(&buffer, &buffer_size, &file);
1797   if (status != 0)
1798     return syntax_error(sock,cmd);
1799   /* get full pathname */
1800   get_abs_path(&file, file_tmp);
1801   if (!check_file_access(file, sock)) {
1802     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1803   }
1804   rrd_clear_error();
1805   rrd_init(&rrd);
1806   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1807   if(!rrd_file) {
1808     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1809   }
1810   from_file = rrd.live_head->last_up;
1811   step = rrd.stat_head->pdp_step;
1812   rrd_close(rrd_file);
1813   pthread_mutex_lock(&cache_lock);
1814   ci = g_tree_lookup(cache_tree, file);
1815   if (ci)
1816     t = ci->last_update_stamp;
1817   else
1818     t = from_file;
1819   pthread_mutex_unlock(&cache_lock);
1820   t -= t % step;
1821   rrd_free(&rrd);
1822   if(t<1) {
1823     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1824   }
1825   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1826 } /* }}} static int handle_request_last  */
1828 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1830   char *file, file_tmp[PATH_MAX];
1831   char *tok;
1832   int ac = 0;
1833   char *av[128];
1834   int status;
1835   unsigned long step = 300;
1836   time_t last_up = time(NULL)-10;
1837   int no_overwrite = opt_no_overwrite;
1840   /* obtain filename */
1841   status = buffer_get_field(&buffer, &buffer_size, &file);
1842   if (status != 0)
1843     return syntax_error(sock,cmd);
1844   /* get full pathname */
1845   get_abs_path(&file, file_tmp);
1846   if (!check_file_access(file, sock)) {
1847     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1848   }
1849   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1851   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1852     if( ! strncmp(tok,"-b",2) ) {
1853       status = buffer_get_field(&buffer, &buffer_size, &tok );
1854       if (status != 0) return syntax_error(sock,cmd);
1855       last_up = (time_t) atol(tok);
1856       continue;
1857     }
1858     if( ! strncmp(tok,"-s",2) ) {
1859       status = buffer_get_field(&buffer, &buffer_size, &tok );
1860       if (status != 0) return syntax_error(sock,cmd);
1861       step = atol(tok);
1862       continue;
1863     }
1864     if( ! strncmp(tok,"-O",2) ) {
1865       no_overwrite = 1;
1866       continue;
1867     }
1868     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1869     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1870     return syntax_error(sock,cmd);
1871   }
1872   if(step<1) {
1873     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1874   }
1875   if (last_up < 3600 * 24 * 365 * 10) {
1876     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1877   }
1879   rrd_clear_error ();
1880   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1882   if(!status) {
1883     return send_response(sock, RESP_OK, "RRD created OK\n");
1884   }
1885   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1886 } /* }}} static int handle_request_create  */
1888 /* start "BATCH" processing */
1889 static int batch_start (HANDLER_PROTO) /* {{{ */
1891   int status;
1892   if (sock->batch_start)
1893     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1895   status = send_response(sock, RESP_OK,
1896                          "Go ahead.  End with dot '.' on its own line.\n");
1897   sock->batch_start = time(NULL);
1898   sock->batch_cmd = 0;
1900   return status;
1901 } /* }}} static int batch_start */
1903 /* finish "BATCH" processing and return results to the client */
1904 static int batch_done (HANDLER_PROTO) /* {{{ */
1906   assert(sock->batch_start);
1907   sock->batch_start = 0;
1908   sock->batch_cmd  = 0;
1909   return send_response(sock, RESP_OK, "errors\n");
1910 } /* }}} static int batch_done */
1912 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1914   return -1;
1915 } /* }}} static int handle_request_quit */
1917 static command_t list_of_commands[] = { /* {{{ */
1918   {
1919     "UPDATE",
1920     handle_request_update,
1921     CMD_CONTEXT_ANY,
1922     "UPDATE <filename> <values> [<values> ...]\n"
1923     ,
1924     "Adds the given file to the internal cache if it is not yet known and\n"
1925     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1926     "for details.\n"
1927     "\n"
1928     "Each <values> has the following form:\n"
1929     "  <values> = <time>:<value>[:<value>[...]]\n"
1930     "See the rrdupdate(1) manpage for details.\n"
1931   },
1932   {
1933     "WROTE",
1934     handle_request_wrote,
1935     CMD_CONTEXT_JOURNAL,
1936     NULL,
1937     NULL
1938   },
1939   {
1940     "FLUSH",
1941     handle_request_flush,
1942     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1943     "FLUSH <filename>\n"
1944     ,
1945     "Adds the given filename to the head of the update queue and returns\n"
1946     "after it has been dequeued.\n"
1947   },
1948   {
1949     "FLUSHALL",
1950     handle_request_flushall,
1951     CMD_CONTEXT_CLIENT,
1952     "FLUSHALL\n"
1953     ,
1954     "Triggers writing of all pending updates.  Returns immediately.\n"
1955   },
1956   {
1957     "PENDING",
1958     handle_request_pending,
1959     CMD_CONTEXT_CLIENT,
1960     "PENDING <filename>\n"
1961     ,
1962     "Shows any 'pending' updates for a file, in order.\n"
1963     "The updates shown have not yet been written to the underlying RRD file.\n"
1964   },
1965   {
1966     "FORGET",
1967     handle_request_forget,
1968     CMD_CONTEXT_ANY,
1969     "FORGET <filename>\n"
1970     ,
1971     "Removes the file completely from the cache.\n"
1972     "Any pending updates for the file will be lost.\n"
1973   },
1974   {
1975     "QUEUE",
1976     handle_request_queue,
1977     CMD_CONTEXT_CLIENT,
1978     "QUEUE\n"
1979     ,
1980         "Shows all files in the output queue.\n"
1981     "The output is zero or more lines in the following format:\n"
1982     "(where <num_vals> is the number of values to be written)\n"
1983     "\n"
1984     "<num_vals> <filename>\n"
1985   },
1986   {
1987     "STATS",
1988     handle_request_stats,
1989     CMD_CONTEXT_CLIENT,
1990     "STATS\n"
1991     ,
1992     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1993     "a description of the values.\n"
1994   },
1995   {
1996     "HELP",
1997     handle_request_help,
1998     CMD_CONTEXT_CLIENT,
1999     "HELP [<command>]\n",
2000     NULL, /* special! */
2001   },
2002   {
2003     "BATCH",
2004     batch_start,
2005     CMD_CONTEXT_CLIENT,
2006     "BATCH\n"
2007     ,
2008     "The 'BATCH' command permits the client to initiate a bulk load\n"
2009     "   of commands to rrdcached.\n"
2010     "\n"
2011     "Usage:\n"
2012     "\n"
2013     "    client: BATCH\n"
2014     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2015     "    client: command #1\n"
2016     "    client: command #2\n"
2017     "    client: ... and so on\n"
2018     "    client: .\n"
2019     "    server: 2 errors\n"
2020     "    server: 7 message for command #7\n"
2021     "    server: 9 message for command #9\n"
2022     "\n"
2023     "For more information, consult the rrdcached(1) documentation.\n"
2024   },
2025   {
2026     ".",   /* BATCH terminator */
2027     batch_done,
2028     CMD_CONTEXT_BATCH,
2029     NULL,
2030     NULL
2031   },
2032   {
2033     "FETCH",
2034     handle_request_fetch,
2035     CMD_CONTEXT_CLIENT,
2036     "FETCH <file> <CF> [<start> [<end>]]\n"
2037     ,
2038     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2039   },
2040   {
2041     "INFO",
2042     handle_request_info,
2043     CMD_CONTEXT_CLIENT,
2044     "INFO <filename>\n",
2045     "The INFO command retrieves information about a specified RRD file.\n"
2046     "This is returned in standard rrdinfo format, a sequence of lines\n"
2047     "with the format <keyname> = <value>\n"
2048     "Note that this is the data as of the last update of the RRD file itself,\n"
2049     "not the last time data was received via rrdcached, so there may be pending\n"
2050     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2051   },
2052   {
2053     "FIRST",
2054     handle_request_first,
2055     CMD_CONTEXT_CLIENT,
2056     "FIRST <filename> <rra index>\n",
2057     "The FIRST command retrieves the first data time for a specified RRA in\n"
2058     "an RRD file.\n"
2059   },
2060   {
2061     "LAST",
2062     handle_request_last,
2063     CMD_CONTEXT_CLIENT,
2064     "LAST <filename>\n",
2065     "The LAST command retrieves the last update time for a specified RRD file.\n"
2066     "Note that this is the time of the last update of the RRD file itself, not\n"
2067     "the last time data was received via rrdcached, so there may be pending\n"
2068     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2069   },
2070   {
2071     "CREATE",
2072     handle_request_create,
2073     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2074     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2075     "The CREATE command will create an RRD file, overwriting any existing file\n"
2076     "unless the -O option is given or rrdcached was started with the -O option.\n"
2077     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2078     "not acceptable) and the step is in seconds (default is 300).\n"
2079     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2080   },
2081   {
2082     "QUIT",
2083     handle_request_quit,
2084     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2085     "QUIT\n"
2086     ,
2087     "Disconnect from rrdcached.\n"
2088   }
2089 }; /* }}} command_t list_of_commands[] */
2090 static size_t list_of_commands_len = sizeof (list_of_commands)
2091   / sizeof (list_of_commands[0]);
2093 static command_t *find_command(char *cmd)
2095   size_t i;
2097   for (i = 0; i < list_of_commands_len; i++)
2098     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2099       return (&list_of_commands[i]);
2100   return NULL;
2103 /* We currently use the index in the `list_of_commands' array as a bit position
2104  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2105  * outside these functions so that switching to a more elegant storage method
2106  * is easily possible. */
2107 static ssize_t find_command_index (const char *cmd) /* {{{ */
2109   size_t i;
2111   for (i = 0; i < list_of_commands_len; i++)
2112     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2113       return ((ssize_t) i);
2114   return (-1);
2115 } /* }}} ssize_t find_command_index */
2117 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2118     const char *cmd)
2120   ssize_t i;
2122   if (JOURNAL_REPLAY(sock))
2123     return (1);
2125   if (cmd == NULL)
2126     return (-1);
2128   if ((strcasecmp ("QUIT", cmd) == 0)
2129       || (strcasecmp ("HELP", cmd) == 0))
2130     return (1);
2131   else if (strcmp (".", cmd) == 0)
2132     cmd = "BATCH";
2134   i = find_command_index (cmd);
2135   if (i < 0)
2136     return (-1);
2137   assert (i < 32);
2139   if ((sock->permissions & (1 << i)) != 0)
2140     return (1);
2141   return (0);
2142 } /* }}} int socket_permission_check */
2144 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2145     const char *cmd)
2147   ssize_t i;
2149   i = find_command_index (cmd);
2150   if (i < 0)
2151     return (-1);
2152   assert (i < 32);
2154   sock->permissions |= (1 << i);
2155   return (0);
2156 } /* }}} int socket_permission_add */
2158 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2160   sock->permissions = 0;
2161 } /* }}} socket_permission_clear */
2163 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2164     listen_socket_t *src)
2166   dest->permissions = src->permissions;
2167 } /* }}} socket_permission_copy */
2169 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2171   size_t i;
2173   sock->permissions = 0;
2174   for (i = 0; i < list_of_commands_len; i++)
2175     sock->permissions |= (1 << i);
2176 } /* }}} void socket_permission_set_all */
2178 /* check whether commands are received in the expected context */
2179 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2181   if (JOURNAL_REPLAY(sock))
2182     return (cmd->context & CMD_CONTEXT_JOURNAL);
2183   else if (sock->batch_start)
2184     return (cmd->context & CMD_CONTEXT_BATCH);
2185   else
2186     return (cmd->context & CMD_CONTEXT_CLIENT);
2188   /* NOTREACHED */
2189   assert(1==0);
2192 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2194   int status;
2195   char *cmd_str;
2196   char *resp_txt;
2197   command_t *help = NULL;
2199   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2200   if (status == 0)
2201     help = find_command(cmd_str);
2203   if (help && (help->syntax || help->help))
2204   {
2205     char tmp[RRD_CMD_MAX];
2207     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2208     resp_txt = tmp;
2210     if (help->syntax)
2211       add_response_info(sock, "Usage: %s\n", help->syntax);
2213     if (help->help)
2214       add_response_info(sock, "%s\n", help->help);
2215   }
2216   else
2217   {
2218     size_t i;
2220     resp_txt = "Command overview\n";
2222     for (i = 0; i < list_of_commands_len; i++)
2223     {
2224       if (list_of_commands[i].syntax == NULL)
2225         continue;
2226       add_response_info (sock, "%s", list_of_commands[i].syntax);
2227     }
2228   }
2230   return send_response(sock, RESP_OK, resp_txt);
2231 } /* }}} int handle_request_help */
2233 static int handle_request (DISPATCH_PROTO) /* {{{ */
2235   char *buffer_ptr = buffer;
2236   char *cmd_str = NULL;
2237   command_t *cmd = NULL;
2238   int status;
2240   assert (buffer[buffer_size - 1] == '\0');
2242   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2243   if (status != 0)
2244   {
2245     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2246     return (-1);
2247   }
2249   if (sock != NULL && sock->batch_start)
2250     sock->batch_cmd++;
2252   cmd = find_command(cmd_str);
2253   if (!cmd)
2254     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2256   if (!socket_permission_check (sock, cmd->cmd))
2257     return send_response(sock, RESP_ERR, "Permission denied.\n");
2259   if (!command_check_context(sock, cmd))
2260     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2262   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2263 } /* }}} int handle_request */
2265 static void journal_set_free (journal_set *js) /* {{{ */
2267   if (js == NULL)
2268     return;
2270   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2272   free(js);
2273 } /* }}} journal_set_free */
2275 static void journal_set_remove (journal_set *js) /* {{{ */
2277   if (js == NULL)
2278     return;
2280   for (uint i=0; i < js->files_num; i++)
2281   {
2282     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2283     unlink(js->files[i]);
2284   }
2285 } /* }}} journal_set_remove */
2287 /* close current journal file handle.
2288  * MUST hold journal_lock before calling */
2289 static void journal_close(void) /* {{{ */
2291   if (journal_fh != NULL)
2292   {
2293     if (fclose(journal_fh) != 0)
2294       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2295   }
2297   journal_fh = NULL;
2298   journal_size = 0;
2299 } /* }}} journal_close */
2301 /* MUST hold journal_lock before calling */
2302 static void journal_new_file(void) /* {{{ */
2304   struct timeval now;
2305   int  new_fd;
2306   char new_file[PATH_MAX + 1];
2308   assert(journal_dir != NULL);
2309   assert(journal_cur != NULL);
2311   journal_close();
2313   gettimeofday(&now, NULL);
2314   /* this format assures that the files sort in strcmp() order */
2315   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2316            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2318   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2319                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2320   if (new_fd < 0)
2321     goto error;
2323   journal_fh = fdopen(new_fd, "a");
2324   if (journal_fh == NULL)
2325     goto error;
2327   journal_size = ftell(journal_fh);
2328   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2330   /* record the file in the journal set */
2331   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2333   return;
2335 error:
2336   RRDD_LOG(LOG_CRIT,
2337            "JOURNALING DISABLED: Error while trying to create %s : %s",
2338            new_file, rrd_strerror(errno));
2339   RRDD_LOG(LOG_CRIT,
2340            "JOURNALING DISABLED: All values will be flushed at shutdown");
2342   close(new_fd);
2343   config_flush_at_shutdown = 1;
2345 } /* }}} journal_new_file */
2347 /* MUST NOT hold journal_lock before calling this */
2348 static void journal_rotate(void) /* {{{ */
2350   journal_set *old_js = NULL;
2352   if (journal_dir == NULL)
2353     return;
2355   RRDD_LOG(LOG_DEBUG, "rotating journals");
2357   pthread_mutex_lock(&stats_lock);
2358   ++stats_journal_rotate;
2359   pthread_mutex_unlock(&stats_lock);
2361   pthread_mutex_lock(&journal_lock);
2363   journal_close();
2365   /* rotate the journal sets */
2366   old_js = journal_old;
2367   journal_old = journal_cur;
2368   journal_cur = calloc(1, sizeof(journal_set));
2370   if (journal_cur != NULL)
2371     journal_new_file();
2372   else
2373     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2375   pthread_mutex_unlock(&journal_lock);
2377   journal_set_remove(old_js);
2378   journal_set_free  (old_js);
2380 } /* }}} static void journal_rotate */
2382 /* MUST hold journal_lock when calling */
2383 static void journal_done(void) /* {{{ */
2385   if (journal_cur == NULL)
2386     return;
2388   journal_close();
2390   if (config_flush_at_shutdown)
2391   {
2392     RRDD_LOG(LOG_INFO, "removing journals");
2393     journal_set_remove(journal_old);
2394     journal_set_remove(journal_cur);
2395   }
2396   else
2397   {
2398     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2399              "journals will be used at next startup");
2400   }
2402   journal_set_free(journal_cur);
2403   journal_set_free(journal_old);
2404   free(journal_dir);
2406 } /* }}} static void journal_done */
2408 static int journal_write(char *cmd, char *args) /* {{{ */
2410   int chars;
2412   if (journal_fh == NULL)
2413     return 0;
2415   pthread_mutex_lock(&journal_lock);
2416   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2417   journal_size += chars;
2419   if (journal_size > JOURNAL_MAX)
2420     journal_new_file();
2422   pthread_mutex_unlock(&journal_lock);
2424   if (chars > 0)
2425   {
2426     pthread_mutex_lock(&stats_lock);
2427     stats_journal_bytes += chars;
2428     pthread_mutex_unlock(&stats_lock);
2429   }
2431   return chars;
2432 } /* }}} static int journal_write */
2434 static int journal_replay (const char *file) /* {{{ */
2436   FILE *fh;
2437   int entry_cnt = 0;
2438   int fail_cnt = 0;
2439   uint64_t line = 0;
2440   char entry[RRD_CMD_MAX];
2441   time_t now;
2443   if (file == NULL) return 0;
2445   {
2446     char *reason = "unknown error";
2447     int status = 0;
2448     struct stat statbuf;
2450     memset(&statbuf, 0, sizeof(statbuf));
2451     if (stat(file, &statbuf) != 0)
2452     {
2453       reason = "stat error";
2454       status = errno;
2455     }
2456     else if (!S_ISREG(statbuf.st_mode))
2457     {
2458       reason = "not a regular file";
2459       status = EPERM;
2460     }
2461     if (statbuf.st_uid != daemon_uid)
2462     {
2463       reason = "not owned by daemon user";
2464       status = EACCES;
2465     }
2466     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2467     {
2468       reason = "must not be user/group writable";
2469       status = EACCES;
2470     }
2472     if (status != 0)
2473     {
2474       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2475                file, rrd_strerror(status), reason);
2476       return 0;
2477     }
2478   }
2480   fh = fopen(file, "r");
2481   if (fh == NULL)
2482   {
2483     if (errno != ENOENT)
2484       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2485                file, rrd_strerror(errno));
2486     return 0;
2487   }
2488   else
2489     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2491   now = time(NULL);
2493   while(!feof(fh))
2494   {
2495     size_t entry_len;
2497     ++line;
2498     if (fgets(entry, sizeof(entry), fh) == NULL)
2499       break;
2500     entry_len = strlen(entry);
2502     /* check \n termination in case journal writing crashed mid-line */
2503     if (entry_len == 0)
2504       continue;
2505     else if (entry[entry_len - 1] != '\n')
2506     {
2507       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2508       ++fail_cnt;
2509       continue;
2510     }
2512     entry[entry_len - 1] = '\0';
2514     if (handle_request(NULL, now, entry, entry_len) == 0)
2515       ++entry_cnt;
2516     else
2517       ++fail_cnt;
2518   }
2520   fclose(fh);
2522   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2523            entry_cnt, fail_cnt);
2525   return entry_cnt > 0 ? 1 : 0;
2526 } /* }}} static int journal_replay */
2528 static int journal_sort(const void *v1, const void *v2)
2530   char **jn1 = (char **) v1;
2531   char **jn2 = (char **) v2;
2533   return strcmp(*jn1,*jn2);
2536 static void journal_init(void) /* {{{ */
2538   int had_journal = 0;
2539   DIR *dir;
2540   struct dirent *dent;
2541   char path[PATH_MAX+1];
2543   if (journal_dir == NULL) return;
2545   pthread_mutex_lock(&journal_lock);
2547   journal_cur = calloc(1, sizeof(journal_set));
2548   if (journal_cur == NULL)
2549   {
2550     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2551     return;
2552   }
2554   RRDD_LOG(LOG_INFO, "checking for journal files");
2556   /* Handle old journal files during transition.  This gives them the
2557    * correct sort order.  TODO: remove after first release
2558    */
2559   {
2560     char old_path[PATH_MAX+1];
2561     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2562     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2563     rename(old_path, path);
2565     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2566     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2567     rename(old_path, path);
2568   }
2570   dir = opendir(journal_dir);
2571   if (!dir) {
2572     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2573     return;
2574   }
2575   while ((dent = readdir(dir)) != NULL)
2576   {
2577     /* looks like a journal file? */
2578     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2579       continue;
2581     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2583     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2584     {
2585       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2586                dent->d_name);
2587       break;
2588     }
2589   }
2590   closedir(dir);
2592   qsort(journal_cur->files, journal_cur->files_num,
2593         sizeof(journal_cur->files[0]), journal_sort);
2595   for (uint i=0; i < journal_cur->files_num; i++)
2596     had_journal += journal_replay(journal_cur->files[i]);
2598   journal_new_file();
2600   /* it must have been a crash.  start a flush */
2601   if (had_journal && config_flush_at_shutdown)
2602     flush_old_values(-1);
2604   pthread_mutex_unlock(&journal_lock);
2606   RRDD_LOG(LOG_INFO, "journal processing complete");
2608 } /* }}} static void journal_init */
2610 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2612   assert(sock != NULL);
2614   free(sock->rbuf);  sock->rbuf = NULL;
2615   free(sock->wbuf);  sock->wbuf = NULL;
2616   free(sock);
2617 } /* }}} void free_listen_socket */
2619 static void close_connection(listen_socket_t *sock) /* {{{ */
2621   if (sock->fd >= 0)
2622   {
2623     close(sock->fd);
2624     sock->fd = -1;
2625   }
2627   free_listen_socket(sock);
2629 } /* }}} void close_connection */
2631 static void *connection_thread_main (void *args) /* {{{ */
2633   listen_socket_t *sock;
2634   int fd;
2636   sock = (listen_socket_t *) args;
2637   fd = sock->fd;
2639   /* init read buffers */
2640   sock->next_read = sock->next_cmd = 0;
2641   sock->rbuf = malloc(RBUF_SIZE);
2642   if (sock->rbuf == NULL)
2643   {
2644     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2645     close_connection(sock);
2646     return NULL;
2647   }
2649   pthread_mutex_lock (&connection_threads_lock);
2650 #ifdef HAVE_LIBWRAP
2651   /* LIBWRAP does not support multiple threads! By putting this code
2652      inside pthread_mutex_lock we do not have to worry about request_info
2653      getting overwritten by another thread.
2654   */
2655   struct request_info req;
2656   request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2657   fromhost(&req);
2658   if(!hosts_access(&req)) {
2659     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2660     pthread_mutex_unlock (&connection_threads_lock);
2661     close_connection(sock);
2662     return NULL;
2663   }
2664 #endif /* HAVE_LIBWRAP */
2665   connection_threads_num++;
2666   pthread_mutex_unlock (&connection_threads_lock);
2668   while (state == RUNNING)
2669   {
2670     char *cmd;
2671     ssize_t cmd_len;
2672     ssize_t rbytes;
2673     time_t now;
2675     struct pollfd pollfd;
2676     int status;
2678     pollfd.fd = fd;
2679     pollfd.events = POLLIN | POLLPRI;
2680     pollfd.revents = 0;
2682     status = poll (&pollfd, 1, /* timeout = */ 500);
2683     if (state != RUNNING)
2684       break;
2685     else if (status == 0) /* timeout */
2686       continue;
2687     else if (status < 0) /* error */
2688     {
2689       status = errno;
2690       if (status != EINTR)
2691         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2692       continue;
2693     }
2695     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2696       break;
2697     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2698     {
2699       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2700           "poll(2) returned something unexpected: %#04hx",
2701           pollfd.revents);
2702       break;
2703     }
2705     rbytes = read(fd, sock->rbuf + sock->next_read,
2706                   RBUF_SIZE - sock->next_read);
2707     if (rbytes < 0)
2708     {
2709       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2710       break;
2711     }
2712     else if (rbytes == 0)
2713       break; /* eof */
2715     sock->next_read += rbytes;
2717     if (sock->batch_start)
2718       now = sock->batch_start;
2719     else
2720       now = time(NULL);
2722     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2723     {
2724       status = handle_request (sock, now, cmd, cmd_len+1);
2725       if (status != 0)
2726         goto out_close;
2727     }
2728   }
2730 out_close:
2731   close_connection(sock);
2733   /* Remove this thread from the connection threads list */
2734   pthread_mutex_lock (&connection_threads_lock);
2735   connection_threads_num--;
2736   if (connection_threads_num <= 0)
2737     pthread_cond_broadcast(&connection_threads_done);
2738   pthread_mutex_unlock (&connection_threads_lock);
2740   return (NULL);
2741 } /* }}} void *connection_thread_main */
2743 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2745   int fd;
2746   struct sockaddr_un sa;
2747   listen_socket_t *temp;
2748   int status;
2749   const char *path;
2750   char *path_copy, *dir;
2752   path = sock->addr;
2753   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2754     path += strlen("unix:");
2756   /* dirname may modify its argument */
2757   path_copy = strdup(path);
2758   if (path_copy == NULL)
2759   {
2760     fprintf(stderr, "rrdcached: strdup(): %s\n",
2761         rrd_strerror(errno));
2762     return (-1);
2763   }
2765   dir = dirname(path_copy);
2766   if (rrd_mkdir_p(dir, 0777) != 0)
2767   {
2768     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2769         dir, rrd_strerror(errno));
2770     return (-1);
2771   }
2773   free(path_copy);
2775   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2776       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2777   if (temp == NULL)
2778   {
2779     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2780     return (-1);
2781   }
2782   listen_fds = temp;
2783   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2785   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2786   if (fd < 0)
2787   {
2788     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2789              rrd_strerror(errno));
2790     return (-1);
2791   }
2793   memset (&sa, 0, sizeof (sa));
2794   sa.sun_family = AF_UNIX;
2795   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2797   /* if we've gotten this far, we own the pid file.  any daemon started
2798    * with the same args must not be alive.  therefore, ensure that we can
2799    * create the socket...
2800    */
2801   unlink(path);
2803   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2804   if (status != 0)
2805   {
2806     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2807              path, rrd_strerror(errno));
2808     close (fd);
2809     return (-1);
2810   }
2812   /* tweak the sockets group ownership */
2813   if (sock->socket_group != (gid_t)-1)
2814   {
2815     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2816          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2817     {
2818       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2819     }
2820   }
2822   if (sock->socket_permissions != (mode_t)-1)
2823   {
2824     if (chmod(path, sock->socket_permissions) != 0)
2825       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2826           (unsigned int)sock->socket_permissions, strerror(errno));
2827   }
2829   status = listen (fd, /* backlog = */ 10);
2830   if (status != 0)
2831   {
2832     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2833              path, rrd_strerror(errno));
2834     close (fd);
2835     unlink (path);
2836     return (-1);
2837   }
2839   listen_fds[listen_fds_num].fd = fd;
2840   listen_fds[listen_fds_num].family = PF_UNIX;
2841   strncpy(listen_fds[listen_fds_num].addr, path,
2842           sizeof (listen_fds[listen_fds_num].addr) - 1);
2843   listen_fds_num++;
2845   return (0);
2846 } /* }}} int open_listen_socket_unix */
2848 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2850   struct addrinfo ai_hints;
2851   struct addrinfo *ai_res;
2852   struct addrinfo *ai_ptr;
2853   char addr_copy[NI_MAXHOST];
2854   char *addr;
2855   char *port;
2856   int status;
2858   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2859   addr_copy[sizeof (addr_copy) - 1] = 0;
2860   addr = addr_copy;
2862   memset (&ai_hints, 0, sizeof (ai_hints));
2863   ai_hints.ai_flags = 0;
2864 #ifdef AI_ADDRCONFIG
2865   ai_hints.ai_flags |= AI_ADDRCONFIG;
2866 #endif
2867   ai_hints.ai_family = AF_UNSPEC;
2868   ai_hints.ai_socktype = SOCK_STREAM;
2870   port = NULL;
2871   if (*addr == '[') /* IPv6+port format */
2872   {
2873     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2874     addr++;
2876     port = strchr (addr, ']');
2877     if (port == NULL)
2878     {
2879       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2880       return (-1);
2881     }
2882     *port = 0;
2883     port++;
2885     if (*port == ':')
2886       port++;
2887     else if (*port == 0)
2888       port = NULL;
2889     else
2890     {
2891       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2892       return (-1);
2893     }
2894   } /* if (*addr == '[') */
2895   else
2896   {
2897     port = rindex(addr, ':');
2898     if (port != NULL)
2899     {
2900       *port = 0;
2901       port++;
2902     }
2903   }
2904   ai_res = NULL;
2905   status = getaddrinfo (addr,
2906                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2907                         &ai_hints, &ai_res);
2908   if (status != 0)
2909   {
2910     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2911              addr, gai_strerror (status));
2912     return (-1);
2913   }
2915   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2916   {
2917     int fd;
2918     listen_socket_t *temp;
2919     int one = 1;
2921     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2922         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2923     if (temp == NULL)
2924     {
2925       fprintf (stderr,
2926                "rrdcached: open_listen_socket_network: realloc failed.\n");
2927       continue;
2928     }
2929     listen_fds = temp;
2930     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2932     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2933     if (fd < 0)
2934     {
2935       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2936                rrd_strerror(errno));
2937       continue;
2938     }
2940     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2942     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2943     if (status != 0)
2944     {
2945       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2946                sock->addr, rrd_strerror(errno));
2947       close (fd);
2948       continue;
2949     }
2951     status = listen (fd, /* backlog = */ 10);
2952     if (status != 0)
2953     {
2954       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2955                sock->addr, rrd_strerror(errno));
2956       close (fd);
2957       freeaddrinfo(ai_res);
2958       return (-1);
2959     }
2961     listen_fds[listen_fds_num].fd = fd;
2962     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2963     listen_fds_num++;
2964   } /* for (ai_ptr) */
2966   freeaddrinfo(ai_res);
2967   return (0);
2968 } /* }}} static int open_listen_socket_network */
2970 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2972   assert(sock != NULL);
2973   assert(sock->addr != NULL);
2975   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2976       || sock->addr[0] == '/')
2977     return (open_listen_socket_unix(sock));
2978   else
2979     return (open_listen_socket_network(sock));
2980 } /* }}} int open_listen_socket */
2982 static int close_listen_sockets (void) /* {{{ */
2984   size_t i;
2986   for (i = 0; i < listen_fds_num; i++)
2987   {
2988     close (listen_fds[i].fd);
2990     if (listen_fds[i].family == PF_UNIX)
2991       unlink(listen_fds[i].addr);
2992   }
2994   free (listen_fds);
2995   listen_fds = NULL;
2996   listen_fds_num = 0;
2998   return (0);
2999 } /* }}} int close_listen_sockets */
3001 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3003   struct pollfd *pollfds;
3004   int pollfds_num;
3005   int status;
3006   int i;
3008   if (listen_fds_num < 1)
3009   {
3010     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3011     return (NULL);
3012   }
3014   pollfds_num = listen_fds_num;
3015   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3016   if (pollfds == NULL)
3017   {
3018     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3019     return (NULL);
3020   }
3021   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3023   RRDD_LOG(LOG_INFO, "listening for connections");
3025   while (state == RUNNING)
3026   {
3027     for (i = 0; i < pollfds_num; i++)
3028     {
3029       pollfds[i].fd = listen_fds[i].fd;
3030       pollfds[i].events = POLLIN | POLLPRI;
3031       pollfds[i].revents = 0;
3032     }
3034     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3035     if (state != RUNNING)
3036       break;
3037     else if (status == 0) /* timeout */
3038       continue;
3039     else if (status < 0) /* error */
3040     {
3041       status = errno;
3042       if (status != EINTR)
3043       {
3044         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3045       }
3046       continue;
3047     }
3049     for (i = 0; i < pollfds_num; i++)
3050     {
3051       listen_socket_t *client_sock;
3052       struct sockaddr_storage client_sa;
3053       socklen_t client_sa_size;
3054       pthread_t tid;
3055       pthread_attr_t attr;
3057       if (pollfds[i].revents == 0)
3058         continue;
3060       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3061       {
3062         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3063             "poll(2) returned something unexpected for listen FD #%i.",
3064             pollfds[i].fd);
3065         continue;
3066       }
3068       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3069       if (client_sock == NULL)
3070       {
3071         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3072         continue;
3073       }
3074       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3076       client_sa_size = sizeof (client_sa);
3077       client_sock->fd = accept (pollfds[i].fd,
3078           (struct sockaddr *) &client_sa, &client_sa_size);
3079       if (client_sock->fd < 0)
3080       {
3081         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3082         free(client_sock);
3083         continue;
3084       }
3086       pthread_attr_init (&attr);
3087       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3089       status = pthread_create (&tid, &attr, connection_thread_main,
3090                                client_sock);
3091       if (status != 0)
3092       {
3093         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3094         close_connection(client_sock);
3095         continue;
3096       }
3097     } /* for (pollfds_num) */
3098   } /* while (state == RUNNING) */
3100   RRDD_LOG(LOG_INFO, "starting shutdown");
3102   close_listen_sockets ();
3104   pthread_mutex_lock (&connection_threads_lock);
3105   while (connection_threads_num > 0)
3106     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3107   pthread_mutex_unlock (&connection_threads_lock);
3109   free(pollfds);
3111   return (NULL);
3112 } /* }}} void *listen_thread_main */
3114 static int daemonize (void) /* {{{ */
3116   int pid_fd;
3117   char *base_dir;
3119   daemon_uid = geteuid();
3121   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3122   if (pid_fd < 0)
3123     pid_fd = check_pidfile();
3124   if (pid_fd < 0)
3125     return pid_fd;
3127   /* open all the listen sockets */
3128   if (config_listen_address_list_len > 0)
3129   {
3130     for (size_t i = 0; i < config_listen_address_list_len; i++)
3131       open_listen_socket (config_listen_address_list[i]);
3133     rrd_free_ptrs((void ***) &config_listen_address_list,
3134                   &config_listen_address_list_len);
3135   }
3136   else
3137   {
3138     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3139         sizeof(default_socket.addr) - 1);
3140     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3142     if (default_socket.permissions == 0)
3143       socket_permission_set_all (&default_socket);
3145     open_listen_socket (&default_socket);
3146   }
3148   if (listen_fds_num < 1)
3149   {
3150     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3151     goto error;
3152   }
3154   if (!stay_foreground)
3155   {
3156     pid_t child;
3158     child = fork ();
3159     if (child < 0)
3160     {
3161       fprintf (stderr, "daemonize: fork(2) failed.\n");
3162       goto error;
3163     }
3164     else if (child > 0)
3165       exit(0);
3167     /* Become session leader */
3168     setsid ();
3170     /* Open the first three file descriptors to /dev/null */
3171     close (2);
3172     close (1);
3173     close (0);
3175     open ("/dev/null", O_RDWR);
3176     if (dup(0) == -1 || dup(0) == -1){
3177         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3178     }
3179   } /* if (!stay_foreground) */
3181   /* Change into the /tmp directory. */
3182   base_dir = (config_base_dir != NULL)
3183     ? config_base_dir
3184     : "/tmp";
3186   if (chdir (base_dir) != 0)
3187   {
3188     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3189     goto error;
3190   }
3192   install_signal_handlers();
3194   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3195   RRDD_LOG(LOG_INFO, "starting up");
3197   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3198                                 (GDestroyNotify) free_cache_item);
3199   if (cache_tree == NULL)
3200   {
3201     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3202     goto error;
3203   }
3205   return write_pidfile (pid_fd);
3207 error:
3208   remove_pidfile();
3209   return -1;
3210 } /* }}} int daemonize */
3212 static int cleanup (void) /* {{{ */
3214   pthread_cond_broadcast (&flush_cond);
3215   pthread_join (flush_thread, NULL);
3217   pthread_cond_broadcast (&queue_cond);
3218   for (int i = 0; i < config_queue_threads; i++)
3219     pthread_join (queue_threads[i], NULL);
3221   if (config_flush_at_shutdown)
3222   {
3223     assert(cache_queue_head == NULL);
3224     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3225   }
3227   free(queue_threads);
3228   free(config_base_dir);
3230   pthread_mutex_lock(&cache_lock);
3231   g_tree_destroy(cache_tree);
3233   pthread_mutex_lock(&journal_lock);
3234   journal_done();
3236   RRDD_LOG(LOG_INFO, "goodbye");
3237   closelog ();
3239   remove_pidfile ();
3240   free(config_pid_file);
3242   return (0);
3243 } /* }}} int cleanup */
3245 static int read_options (int argc, char **argv) /* {{{ */
3247   int option;
3248   int status = 0;
3250   socket_permission_clear (&default_socket);
3252   default_socket.socket_group = (gid_t)-1;
3253   default_socket.socket_permissions = (mode_t)-1;
3255   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3256   {
3257     switch (option)
3258     {
3259       case 'O':
3260         opt_no_overwrite = 1;
3261         break;
3263       case 'g':
3264         stay_foreground=1;
3265         break;
3267       case 'l':
3268       {
3269         listen_socket_t *new;
3271         new = malloc(sizeof(listen_socket_t));
3272         if (new == NULL)
3273         {
3274           fprintf(stderr, "read_options: malloc failed.\n");
3275           return(2);
3276         }
3277         memset(new, 0, sizeof(listen_socket_t));
3279         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3281         /* Add permissions to the socket {{{ */
3282         if (default_socket.permissions != 0)
3283         {
3284           socket_permission_copy (new, &default_socket);
3285         }
3286         else /* if (default_socket.permissions == 0) */
3287         {
3288           /* Add permission for ALL commands to the socket. */
3289           socket_permission_set_all (new);
3290         }
3291         /* }}} Done adding permissions. */
3293         new->socket_group = default_socket.socket_group;
3294         new->socket_permissions = default_socket.socket_permissions;
3296         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3297                          &config_listen_address_list_len, new))
3298         {
3299           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3300           return (2);
3301         }
3302       }
3303       break;
3305       /* set socket group permissions */
3306       case 's':
3307       {
3308         gid_t group_gid;
3309         struct group *grp;
3311         group_gid = strtoul(optarg, NULL, 10);
3312         if (errno != EINVAL && group_gid>0)
3313         {
3314           /* we were passed a number */
3315           grp = getgrgid(group_gid);
3316         }
3317         else
3318         {
3319           grp = getgrnam(optarg);
3320         }
3322         if (grp)
3323         {
3324           default_socket.socket_group = grp->gr_gid;
3325         }
3326         else
3327         {
3328           /* no idea what the user wanted... */
3329           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3330           return (5);
3331         }
3332       }
3333       break;
3335       /* set socket file permissions */
3336       case 'm':
3337       {
3338         long  tmp;
3339         char *endptr = NULL;
3341         tmp = strtol (optarg, &endptr, 8);
3342         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3343             || (tmp > 07777) || (tmp < 0)) {
3344           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3345               optarg);
3346           return (5);
3347         }
3349         default_socket.socket_permissions = (mode_t)tmp;
3350       }
3351       break;
3353       case 'P':
3354       {
3355         char *optcopy;
3356         char *saveptr;
3357         char *dummy;
3358         char *ptr;
3360         socket_permission_clear (&default_socket);
3362         optcopy = strdup (optarg);
3363         dummy = optcopy;
3364         saveptr = NULL;
3365         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3366         {
3367           dummy = NULL;
3368           status = socket_permission_add (&default_socket, ptr);
3369           if (status != 0)
3370           {
3371             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3372                 "socket failed. Most likely, this permission doesn't "
3373                 "exist. Check your command line.\n", ptr);
3374             status = 4;
3375           }
3376         }
3378         free (optcopy);
3379       }
3380       break;
3382       case 'f':
3383       {
3384         int temp;
3386         temp = atoi (optarg);
3387         if (temp > 0)
3388           config_flush_interval = temp;
3389         else
3390         {
3391           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3392           status = 3;
3393         }
3394       }
3395       break;
3397       case 'w':
3398       {
3399         int temp;
3401         temp = atoi (optarg);
3402         if (temp > 0)
3403           config_write_interval = temp;
3404         else
3405         {
3406           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3407           status = 2;
3408         }
3409       }
3410       break;
3412       case 'z':
3413       {
3414         int temp;
3416         temp = atoi(optarg);
3417         if (temp > 0)
3418           config_write_jitter = temp;
3419         else
3420         {
3421           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3422           status = 2;
3423         }
3425         break;
3426       }
3428       case 't':
3429       {
3430         int threads;
3431         threads = atoi(optarg);
3432         if (threads >= 1)
3433           config_queue_threads = threads;
3434         else
3435         {
3436           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3437           return 1;
3438         }
3439       }
3440       break;
3442       case 'B':
3443         config_write_base_only = 1;
3444         break;
3446       case 'b':
3447       {
3448         size_t len;
3449         char base_realpath[PATH_MAX];
3451         if (config_base_dir != NULL)
3452           free (config_base_dir);
3453         config_base_dir = strdup (optarg);
3454         if (config_base_dir == NULL)
3455         {
3456           fprintf (stderr, "read_options: strdup failed.\n");
3457           return (3);
3458         }
3460         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3461         {
3462           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3463               config_base_dir, rrd_strerror (errno));
3464           return (3);
3465         }
3467         /* make sure that the base directory is not resolved via
3468          * symbolic links.  this makes some performance-enhancing
3469          * assumptions possible (we don't have to resolve paths
3470          * that start with a "/")
3471          */
3472         if (realpath(config_base_dir, base_realpath) == NULL)
3473         {
3474           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3475               "%s\n", config_base_dir, rrd_strerror(errno));
3476           return 5;
3477         }
3479         len = strlen (config_base_dir);
3480         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3481         {
3482           config_base_dir[len - 1] = 0;
3483           len--;
3484         }
3486         if (len < 1)
3487         {
3488           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3489           return (4);
3490         }
3492         _config_base_dir_len = len;
3494         len = strlen (base_realpath);
3495         while ((len > 0) && (base_realpath[len - 1] == '/'))
3496         {
3497           base_realpath[len - 1] = '\0';
3498           len--;
3499         }
3501         if (strncmp(config_base_dir,
3502                          base_realpath, sizeof(base_realpath)) != 0)
3503         {
3504           fprintf(stderr,
3505                   "Base directory (-b) resolved via file system links!\n"
3506                   "Please consult rrdcached '-b' documentation!\n"
3507                   "Consider specifying the real directory (%s)\n",
3508                   base_realpath);
3509           return 5;
3510         }
3511       }
3512       break;
3514       case 'p':
3515       {
3516         if (config_pid_file != NULL)
3517           free (config_pid_file);
3518         config_pid_file = strdup (optarg);
3519         if (config_pid_file == NULL)
3520         {
3521           fprintf (stderr, "read_options: strdup failed.\n");
3522           return (3);
3523         }
3524       }
3525       break;
3527       case 'F':
3528         config_flush_at_shutdown = 1;
3529         break;
3531       case 'j':
3532       {
3533         char journal_dir_actual[PATH_MAX];
3534         journal_dir = realpath((const char *)optarg, journal_dir_actual);
3535         if (journal_dir)
3536         {
3537           // if we were able to properly resolve the path, lets have a copy
3538           // for use outside this block.
3539           journal_dir = strdup(journal_dir);           
3540           status = rrd_mkdir_p(journal_dir, 0777);
3541           if (status != 0)
3542           {
3543             fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3544                     journal_dir, rrd_strerror(errno));
3545             return 6;
3546           }
3547           if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3548           {
3549             fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3550                     errno ? rrd_strerror(errno) : "");
3551             return 6;
3552           }
3553         } else {
3554           fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3555                   errno ? rrd_strerror(errno) : "");
3556           return 6;
3557         }
3558       }
3559       break;
3561       case 'a':
3562       {
3563         int temp = atoi(optarg);
3564         if (temp > 0)
3565           config_alloc_chunk = temp;
3566         else
3567         {
3568           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3569           return 10;
3570         }
3571       }
3572       break;
3574       case 'h':
3575       case '?':
3576         printf ("RRDCacheD %s\n"
3577             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3578             "\n"
3579             "Usage: rrdcached [options]\n"
3580             "\n"
3581             "Valid options are:\n"
3582             "  -l <address>  Socket address to listen to.\n"
3583             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3584             "  -P <perms>    Sets the permissions to assign to all following "
3585                             "sockets\n"
3586             "  -w <seconds>  Interval in which to write data.\n"
3587             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3588             "  -t <threads>  Number of write threads.\n"
3589             "  -f <seconds>  Interval in which to flush dead data.\n"
3590             "  -p <file>     Location of the PID-file.\n"
3591             "  -b <dir>      Base directory to change to.\n"
3592             "  -B            Restrict file access to paths within -b <dir>\n"
3593             "  -g            Do not fork and run in the foreground.\n"
3594             "  -j <dir>      Directory in which to create the journal files.\n"
3595             "  -F            Always flush all updates at shutdown\n"
3596             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3597             "                (the socket will also have read/write permissions "
3598                             "for that group)\n"
3599             "  -m <mode>     File permissions (octal) of all following UNIX "
3600                             "sockets\n"
3601             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3602             "  -O            Do not allow CREATE commands to overwrite existing\n"
3603             "                files, even if asked to.\n"
3604             "\n"
3605             "For more information and a detailed description of all options "
3606             "please refer\n"
3607             "to the rrdcached(1) manual page.\n",
3608             VERSION);
3609         if (option == 'h')
3610           status = -1;
3611         else
3612           status = 1;
3613         break;
3614     } /* switch (option) */
3615   } /* while (getopt) */
3617   /* advise the user when values are not sane */
3618   if (config_flush_interval < 2 * config_write_interval)
3619     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3620             " 2x write interval (-w) !\n");
3621   if (config_write_jitter > config_write_interval)
3622     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3623             " write interval (-w) !\n");
3625   if (config_write_base_only && config_base_dir == NULL)
3626     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3627             "  Consult the rrdcached documentation\n");
3629   if (journal_dir == NULL)
3630     config_flush_at_shutdown = 1;
3632   return (status);
3633 } /* }}} int read_options */
3635 int main (int argc, char **argv)
3637   int status;
3639   status = read_options (argc, argv);
3640   if (status != 0)
3641   {
3642     if (status < 0)
3643       status = 0;
3644     return (status);
3645   }
3647   status = daemonize ();
3648   if (status != 0)
3649   {
3650     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3651     return (1);
3652   }
3654   journal_init();
3656   /* start the queue threads */
3657   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3658   if (queue_threads == NULL)
3659   {
3660     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3661     cleanup();
3662     return (1);
3663   }
3664   for (int i = 0; i < config_queue_threads; i++)
3665   {
3666     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3667     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3668     if (status != 0)
3669     {
3670       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3671       cleanup();
3672       return (1);
3673     }
3674   }
3676   /* start the flush thread */
3677   memset(&flush_thread, 0, sizeof(flush_thread));
3678   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3679   if (status != 0)
3680   {
3681     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3682     cleanup();
3683     return (1);
3684   }
3686   listen_thread_main (NULL);
3687   cleanup ();
3689   return (0);
3690 } /* int main */
3692 /*
3693  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3694  */