Code

a2001359e158bb1e92852d53369163f2fc571f41
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
78 #include <stdlib.h>
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
89 #else
91 #endif
92 #include <stdio.h>
93 #include <string.h>
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
117 /* }}} */
119 #define RRDD_LOG(severity, ...) \
120   do { \
121     if (stay_foreground) { \
122       fprintf(stderr, __VA_ARGS__); \
123       fprintf(stderr, "\n"); } \
124     syslog ((severity), __VA_ARGS__); \
125   } while (0)
127 /*
128  * Types
129  */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
134   int fd;
135   char addr[PATH_MAX + 1];
136   int family;
138   /* state for BATCH processing */
139   time_t batch_start;
140   int batch_cmd;
142   /* buffered IO */
143   char *rbuf;
144   off_t next_cmd;
145   off_t next_read;
147   char *wbuf;
148   ssize_t wbuf_len;
150   uint32_t permissions;
152   gid_t  socket_group;
153   mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
161                         time_t UNUSED(now),\
162                         char  UNUSED(*buffer),\
163                         size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
166                         DISPATCH_PROTO
168 struct command_s {
169   char   *cmd;
170   int (*handler)(HANDLER_PROTO);
172   char  context;                /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT      (1<<0)
174 #define CMD_CONTEXT_BATCH       (1<<1)
175 #define CMD_CONTEXT_JOURNAL     (1<<2)
176 #define CMD_CONTEXT_ANY         (0x7f)
178   char *syntax;
179   char *help;
180 };
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
186   char *file;
187   char **values;
188   size_t values_num;            /* number of valid pointers */
189   size_t values_alloc;          /* number of allocated pointers */
190   time_t last_flush_time;
191   time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE  (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194   int flags;
195   pthread_cond_t  flushed;
196   cache_item_t *prev;
197   cache_item_t *next;
198 };
200 struct callback_flush_data_s
202   time_t now;
203   time_t abs_timeout;
204   char **keys;
205   size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
209 enum queue_side_e
211   HEAD,
212   TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
216 /* describe a set of journal files */
217 typedef struct {
218   char **files;
219   size_t files_num;
220 } journal_set;
222 /* max length of socket command or response */
223 #define CMD_MAX 4096
224 #define RBUF_SIZE (CMD_MAX*2)
226 /*
227  * Variables
228  */
229 static int stay_foreground = 0;
230 static uid_t daemon_uid;
232 static listen_socket_t *listen_fds = NULL;
233 static size_t listen_fds_num = 0;
235 static listen_socket_t default_socket;
237 enum {
238   RUNNING,              /* normal operation */
239   FLUSHING,             /* flushing remaining values */
240   SHUTDOWN              /* shutting down */
241 } state = RUNNING;
243 static pthread_t *queue_threads;
244 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
245 static int config_queue_threads = 4;
247 static pthread_t flush_thread;
248 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
250 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
251 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
252 static int connection_threads_num = 0;
254 /* Cache stuff */
255 static GTree          *cache_tree = NULL;
256 static cache_item_t   *cache_queue_head = NULL;
257 static cache_item_t   *cache_queue_tail = NULL;
258 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
260 static int config_write_interval = 300;
261 static int config_write_jitter   = 0;
262 static int config_flush_interval = 3600;
263 static int config_flush_at_shutdown = 0;
264 static char *config_pid_file = NULL;
265 static char *config_base_dir = NULL;
266 static size_t _config_base_dir_len = 0;
267 static int config_write_base_only = 0;
268 static size_t config_alloc_chunk = 1;
270 static listen_socket_t **config_listen_address_list = NULL;
271 static size_t config_listen_address_list_len = 0;
273 static uint64_t stats_queue_length = 0;
274 static uint64_t stats_updates_received = 0;
275 static uint64_t stats_flush_received = 0;
276 static uint64_t stats_updates_written = 0;
277 static uint64_t stats_data_sets_written = 0;
278 static uint64_t stats_journal_bytes = 0;
279 static uint64_t stats_journal_rotate = 0;
280 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
282 static int opt_no_overwrite = 0; /* default for the daemon */
284 /* Journaled updates */
285 #define JOURNAL_REPLAY(s) ((s) == NULL)
286 #define JOURNAL_BASE "rrd.journal"
287 static journal_set *journal_cur = NULL;
288 static journal_set *journal_old = NULL;
289 static char *journal_dir = NULL;
290 static FILE *journal_fh = NULL;         /* current journal file handle */
291 static long  journal_size = 0;          /* current journal size */
292 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
293 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
294 static int journal_write(char *cmd, char *args);
295 static void journal_done(void);
296 static void journal_rotate(void);
298 /* prototypes for forward refernces */
299 static int handle_request_help (HANDLER_PROTO);
301 /* 
302  * Functions
303  */
304 static void sig_common (const char *sig) /* {{{ */
306   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
307   state = FLUSHING;
308   pthread_cond_broadcast(&flush_cond);
309   pthread_cond_broadcast(&queue_cond);
310 } /* }}} void sig_common */
312 static void sig_int_handler (int UNUSED(s)) /* {{{ */
314   sig_common("INT");
315 } /* }}} void sig_int_handler */
317 static void sig_term_handler (int UNUSED(s)) /* {{{ */
319   sig_common("TERM");
320 } /* }}} void sig_term_handler */
322 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
324   config_flush_at_shutdown = 1;
325   sig_common("USR1");
326 } /* }}} void sig_usr1_handler */
328 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
330   config_flush_at_shutdown = 0;
331   sig_common("USR2");
332 } /* }}} void sig_usr2_handler */
334 static void install_signal_handlers(void) /* {{{ */
336   /* These structures are static, because `sigaction' behaves weird if the are
337    * overwritten.. */
338   static struct sigaction sa_int;
339   static struct sigaction sa_term;
340   static struct sigaction sa_pipe;
341   static struct sigaction sa_usr1;
342   static struct sigaction sa_usr2;
344   /* Install signal handlers */
345   memset (&sa_int, 0, sizeof (sa_int));
346   sa_int.sa_handler = sig_int_handler;
347   sigaction (SIGINT, &sa_int, NULL);
349   memset (&sa_term, 0, sizeof (sa_term));
350   sa_term.sa_handler = sig_term_handler;
351   sigaction (SIGTERM, &sa_term, NULL);
353   memset (&sa_pipe, 0, sizeof (sa_pipe));
354   sa_pipe.sa_handler = SIG_IGN;
355   sigaction (SIGPIPE, &sa_pipe, NULL);
357   memset (&sa_pipe, 0, sizeof (sa_usr1));
358   sa_usr1.sa_handler = sig_usr1_handler;
359   sigaction (SIGUSR1, &sa_usr1, NULL);
361   memset (&sa_usr2, 0, sizeof (sa_usr2));
362   sa_usr2.sa_handler = sig_usr2_handler;
363   sigaction (SIGUSR2, &sa_usr2, NULL);
365 } /* }}} void install_signal_handlers */
367 static int open_pidfile(char *action, int oflag) /* {{{ */
369   int fd;
370   const char *file;
371   char *file_copy, *dir;
373   file = (config_pid_file != NULL)
374     ? config_pid_file
375     : LOCALSTATEDIR "/run/rrdcached.pid";
377   /* dirname may modify its argument */
378   file_copy = strdup(file);
379   if (file_copy == NULL)
380   {
381     fprintf(stderr, "rrdcached: strdup(): %s\n",
382         rrd_strerror(errno));
383     return -1;
384   }
386   dir = dirname(file_copy);
387   if (rrd_mkdir_p(dir, 0777) != 0)
388   {
389     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
390         dir, rrd_strerror(errno));
391     return -1;
392   }
394   free(file_copy);
396   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
397   if (fd < 0)
398     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
399             action, file, rrd_strerror(errno));
401   return(fd);
402 } /* }}} static int open_pidfile */
404 /* check existing pid file to see whether a daemon is running */
405 static int check_pidfile(void)
407   int pid_fd;
408   pid_t pid;
409   char pid_str[16];
411   pid_fd = open_pidfile("open", O_RDWR);
412   if (pid_fd < 0)
413     return pid_fd;
415   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
416     return -1;
418   pid = atoi(pid_str);
419   if (pid <= 0)
420     return -1;
422   /* another running process that we can signal COULD be
423    * a competing rrdcached */
424   if (pid != getpid() && kill(pid, 0) == 0)
425   {
426     fprintf(stderr,
427             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
428     close(pid_fd);
429     return -1;
430   }
432   lseek(pid_fd, 0, SEEK_SET);
433   if (ftruncate(pid_fd, 0) == -1)
434   {
435     fprintf(stderr,
436             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
437     close(pid_fd);
438     return -1;
439   }
441   fprintf(stderr,
442           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
443           "rrdcached: starting normally.\n", pid);
445   return pid_fd;
446 } /* }}} static int check_pidfile */
448 static int write_pidfile (int fd) /* {{{ */
450   pid_t pid;
451   FILE *fh;
453   pid = getpid ();
455   fh = fdopen (fd, "w");
456   if (fh == NULL)
457   {
458     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
459     close(fd);
460     return (-1);
461   }
463   fprintf (fh, "%i\n", (int) pid);
464   fclose (fh);
466   return (0);
467 } /* }}} int write_pidfile */
469 static int remove_pidfile (void) /* {{{ */
471   char *file;
472   int status;
474   file = (config_pid_file != NULL)
475     ? config_pid_file
476     : LOCALSTATEDIR "/run/rrdcached.pid";
478   status = unlink (file);
479   if (status == 0)
480     return (0);
481   return (errno);
482 } /* }}} int remove_pidfile */
484 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
486   char *eol;
488   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
489                sock->next_read - sock->next_cmd);
491   if (eol == NULL)
492   {
493     /* no commands left, move remainder back to front of rbuf */
494     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
495             sock->next_read - sock->next_cmd);
496     sock->next_read -= sock->next_cmd;
497     sock->next_cmd = 0;
498     *len = 0;
499     return NULL;
500   }
501   else
502   {
503     char *cmd = sock->rbuf + sock->next_cmd;
504     *eol = '\0';
506     sock->next_cmd = eol - sock->rbuf + 1;
508     if (eol > sock->rbuf && *(eol-1) == '\r')
509       *(--eol) = '\0'; /* handle "\r\n" EOL */
511     *len = eol - cmd;
513     return cmd;
514   }
516   /* NOTREACHED */
517   assert(1==0);
518 } /* }}} char *next_cmd */
520 /* add the characters directly to the write buffer */
521 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
523   char *new_buf;
525   assert(sock != NULL);
527   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
528   if (new_buf == NULL)
529   {
530     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
531     return -1;
532   }
534   strncpy(new_buf + sock->wbuf_len, str, len + 1);
536   sock->wbuf = new_buf;
537   sock->wbuf_len += len;
539   return 0;
540 } /* }}} static int add_to_wbuf */
542 /* add the text to the "extra" info that's sent after the status line */
543 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
545   va_list argp;
546   char buffer[CMD_MAX];
547   int len;
549   if (JOURNAL_REPLAY(sock)) return 0;
550   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
552   va_start(argp, fmt);
553 #ifdef HAVE_VSNPRINTF
554   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
555 #else
556   len = vsprintf(buffer, fmt, argp);
557 #endif
558   va_end(argp);
559   if (len < 0)
560   {
561     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
562     return -1;
563   }
565   return add_to_wbuf(sock, buffer, len);
566 } /* }}} static int add_response_info */
568 static int count_lines(char *str) /* {{{ */
570   int lines = 0;
572   if (str != NULL)
573   {
574     while ((str = strchr(str, '\n')) != NULL)
575     {
576       ++lines;
577       ++str;
578     }
579   }
581   return lines;
582 } /* }}} static int count_lines */
584 /* send the response back to the user.
585  * returns 0 on success, -1 on error
586  * write buffer is always zeroed after this call */
587 static int send_response (listen_socket_t *sock, response_code rc,
588                           char *fmt, ...) /* {{{ */
590   va_list argp;
591   char buffer[CMD_MAX];
592   int lines;
593   ssize_t wrote;
594   int rclen, len;
596   if (JOURNAL_REPLAY(sock)) return rc;
598   if (sock->batch_start)
599   {
600     if (rc == RESP_OK)
601       return rc; /* no response on success during BATCH */
602     lines = sock->batch_cmd;
603   }
604   else if (rc == RESP_OK)
605     lines = count_lines(sock->wbuf);
606   else
607     lines = -1;
609   rclen = sprintf(buffer, "%d ", lines);
610   va_start(argp, fmt);
611 #ifdef HAVE_VSNPRINTF
612   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
613 #else
614   len = vsprintf(buffer+rclen, fmt, argp);
615 #endif
616   va_end(argp);
617   if (len < 0)
618     return -1;
620   len += rclen;
622   /* append the result to the wbuf, don't write to the user */
623   if (sock->batch_start)
624     return add_to_wbuf(sock, buffer, len);
626   /* first write must be complete */
627   if (len != write(sock->fd, buffer, len))
628   {
629     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
630     return -1;
631   }
633   if (sock->wbuf != NULL && rc == RESP_OK)
634   {
635     wrote = 0;
636     while (wrote < sock->wbuf_len)
637     {
638       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
639       if (wb <= 0)
640       {
641         RRDD_LOG(LOG_INFO, "send_response: could not write results");
642         return -1;
643       }
644       wrote += wb;
645     }
646   }
648   free(sock->wbuf); sock->wbuf = NULL;
649   sock->wbuf_len = 0;
651   return 0;
652 } /* }}} */
654 static void wipe_ci_values(cache_item_t *ci, time_t when)
656   ci->values = NULL;
657   ci->values_num = 0;
658   ci->values_alloc = 0;
660   ci->last_flush_time = when;
661   if (config_write_jitter > 0)
662     ci->last_flush_time += (rrd_random() % config_write_jitter);
665 /* remove_from_queue
666  * remove a "cache_item_t" item from the queue.
667  * must hold 'cache_lock' when calling this
668  */
669 static void remove_from_queue(cache_item_t *ci) /* {{{ */
671   if (ci == NULL) return;
672   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
674   if (ci->prev == NULL)
675     cache_queue_head = ci->next; /* reset head */
676   else
677     ci->prev->next = ci->next;
679   if (ci->next == NULL)
680     cache_queue_tail = ci->prev; /* reset the tail */
681   else
682     ci->next->prev = ci->prev;
684   ci->next = ci->prev = NULL;
685   ci->flags &= ~CI_FLAGS_IN_QUEUE;
687   pthread_mutex_lock (&stats_lock);
688   assert (stats_queue_length > 0);
689   stats_queue_length--;
690   pthread_mutex_unlock (&stats_lock);
692 } /* }}} static void remove_from_queue */
694 /* free the resources associated with the cache_item_t
695  * must hold cache_lock when calling this function
696  */
697 static void *free_cache_item(cache_item_t *ci) /* {{{ */
699   if (ci == NULL) return NULL;
701   remove_from_queue(ci);
703   for (size_t i=0; i < ci->values_num; i++)
704     free(ci->values[i]);
706   free (ci->values);
707   free (ci->file);
709   /* in case anyone is waiting */
710   pthread_cond_broadcast(&ci->flushed);
711   pthread_cond_destroy(&ci->flushed);
713   free (ci);
715   return NULL;
716 } /* }}} static void *free_cache_item */
718 /*
719  * enqueue_cache_item:
720  * `cache_lock' must be acquired before calling this function!
721  */
722 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
723     queue_side_t side)
725   if (ci == NULL)
726     return (-1);
728   if (ci->values_num == 0)
729     return (0);
731   if (side == HEAD)
732   {
733     if (cache_queue_head == ci)
734       return 0;
736     /* remove if further down in queue */
737     remove_from_queue(ci);
739     ci->prev = NULL;
740     ci->next = cache_queue_head;
741     if (ci->next != NULL)
742       ci->next->prev = ci;
743     cache_queue_head = ci;
745     if (cache_queue_tail == NULL)
746       cache_queue_tail = cache_queue_head;
747   }
748   else /* (side == TAIL) */
749   {
750     /* We don't move values back in the list.. */
751     if (ci->flags & CI_FLAGS_IN_QUEUE)
752       return (0);
754     assert (ci->next == NULL);
755     assert (ci->prev == NULL);
757     ci->prev = cache_queue_tail;
759     if (cache_queue_tail == NULL)
760       cache_queue_head = ci;
761     else
762       cache_queue_tail->next = ci;
764     cache_queue_tail = ci;
765   }
767   ci->flags |= CI_FLAGS_IN_QUEUE;
769   pthread_cond_signal(&queue_cond);
770   pthread_mutex_lock (&stats_lock);
771   stats_queue_length++;
772   pthread_mutex_unlock (&stats_lock);
774   return (0);
775 } /* }}} int enqueue_cache_item */
777 /*
778  * tree_callback_flush:
779  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
780  * while this is in progress.
781  */
782 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
783     gpointer data)
785   cache_item_t *ci;
786   callback_flush_data_t *cfd;
788   ci = (cache_item_t *) value;
789   cfd = (callback_flush_data_t *) data;
791   if (ci->flags & CI_FLAGS_IN_QUEUE)
792     return FALSE;
794   if (ci->values_num > 0
795       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
796   {
797     enqueue_cache_item (ci, TAIL);
798   }
799   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
800       && (ci->values_num <= 0))
801   {
802     assert ((char *) key == ci->file);
803     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
804     {
805       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
806       return (FALSE);
807     }
808   }
810   return (FALSE);
811 } /* }}} gboolean tree_callback_flush */
813 static int flush_old_values (int max_age)
815   callback_flush_data_t cfd;
816   size_t k;
818   memset (&cfd, 0, sizeof (cfd));
819   /* Pass the current time as user data so that we don't need to call
820    * `time' for each node. */
821   cfd.now = time (NULL);
822   cfd.keys = NULL;
823   cfd.keys_num = 0;
825   if (max_age > 0)
826     cfd.abs_timeout = cfd.now - max_age;
827   else
828     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
830   /* `tree_callback_flush' will return the keys of all values that haven't
831    * been touched in the last `config_flush_interval' seconds in `cfd'.
832    * The char*'s in this array point to the same memory as ci->file, so we
833    * don't need to free them separately. */
834   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
836   for (k = 0; k < cfd.keys_num; k++)
837   {
838     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
839     /* should never fail, since we have held the cache_lock
840      * the entire time */
841     assert(status == TRUE);
842   }
844   if (cfd.keys != NULL)
845   {
846     free (cfd.keys);
847     cfd.keys = NULL;
848   }
850   return (0);
851 } /* int flush_old_values */
853 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
855   struct timeval now;
856   struct timespec next_flush;
857   int status;
859   gettimeofday (&now, NULL);
860   next_flush.tv_sec = now.tv_sec + config_flush_interval;
861   next_flush.tv_nsec = 1000 * now.tv_usec;
863   pthread_mutex_lock(&cache_lock);
865   while (state == RUNNING)
866   {
867     gettimeofday (&now, NULL);
868     if ((now.tv_sec > next_flush.tv_sec)
869         || ((now.tv_sec == next_flush.tv_sec)
870           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
871     {
872       RRDD_LOG(LOG_DEBUG, "flushing old values");
874       /* Determine the time of the next cache flush. */
875       next_flush.tv_sec = now.tv_sec + config_flush_interval;
877       /* Flush all values that haven't been written in the last
878        * `config_write_interval' seconds. */
879       flush_old_values (config_write_interval);
881       /* unlock the cache while we rotate so we don't block incoming
882        * updates if the fsync() blocks on disk I/O */
883       pthread_mutex_unlock(&cache_lock);
884       journal_rotate();
885       pthread_mutex_lock(&cache_lock);
886     }
888     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
889     if (status != 0 && status != ETIMEDOUT)
890     {
891       RRDD_LOG (LOG_ERR, "flush_thread_main: "
892                 "pthread_cond_timedwait returned %i.", status);
893     }
894   }
896   if (config_flush_at_shutdown)
897     flush_old_values (-1); /* flush everything */
899   state = SHUTDOWN;
901   pthread_mutex_unlock(&cache_lock);
903   return NULL;
904 } /* void *flush_thread_main */
906 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
908   pthread_mutex_lock (&cache_lock);
910   while (state != SHUTDOWN
911          || (cache_queue_head != NULL && config_flush_at_shutdown))
912   {
913     cache_item_t *ci;
914     char *file;
915     char **values;
916     size_t values_num;
917     int status;
919     /* Now, check if there's something to store away. If not, wait until
920      * something comes in. */
921     if (cache_queue_head == NULL)
922     {
923       status = pthread_cond_wait (&queue_cond, &cache_lock);
924       if ((status != 0) && (status != ETIMEDOUT))
925       {
926         RRDD_LOG (LOG_ERR, "queue_thread_main: "
927             "pthread_cond_wait returned %i.", status);
928       }
929     }
931     /* Check if a value has arrived. This may be NULL if we timed out or there
932      * was an interrupt such as a signal. */
933     if (cache_queue_head == NULL)
934       continue;
936     ci = cache_queue_head;
938     /* copy the relevant parts */
939     file = strdup (ci->file);
940     if (file == NULL)
941     {
942       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
943       continue;
944     }
946     assert(ci->values != NULL);
947     assert(ci->values_num > 0);
949     values = ci->values;
950     values_num = ci->values_num;
952     wipe_ci_values(ci, time(NULL));
953     remove_from_queue(ci);
955     pthread_mutex_unlock (&cache_lock);
957     rrd_clear_error ();
958     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
959     if (status != 0)
960     {
961       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
962           "rrd_update_r (%s) failed with status %i. (%s)",
963           file, status, rrd_get_error());
964     }
966     journal_write("wrote", file);
968     /* Search again in the tree.  It's possible someone issued a "FORGET"
969      * while we were writing the update values. */
970     pthread_mutex_lock(&cache_lock);
971     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
972     if (ci)
973       pthread_cond_broadcast(&ci->flushed);
974     pthread_mutex_unlock(&cache_lock);
976     if (status == 0)
977     {
978       pthread_mutex_lock (&stats_lock);
979       stats_updates_written++;
980       stats_data_sets_written += values_num;
981       pthread_mutex_unlock (&stats_lock);
982     }
984     rrd_free_ptrs((void ***) &values, &values_num);
985     free(file);
987     pthread_mutex_lock (&cache_lock);
988   }
989   pthread_mutex_unlock (&cache_lock);
991   return (NULL);
992 } /* }}} void *queue_thread_main */
994 static int buffer_get_field (char **buffer_ret, /* {{{ */
995     size_t *buffer_size_ret, char **field_ret)
997   char *buffer;
998   size_t buffer_pos;
999   size_t buffer_size;
1000   char *field;
1001   size_t field_size;
1002   int status;
1004   buffer = *buffer_ret;
1005   buffer_pos = 0;
1006   buffer_size = *buffer_size_ret;
1007   field = *buffer_ret;
1008   field_size = 0;
1010   if (buffer_size <= 0)
1011     return (-1);
1013   /* This is ensured by `handle_request'. */
1014   assert (buffer[buffer_size - 1] == '\0');
1016   status = -1;
1017   while (buffer_pos < buffer_size)
1018   {
1019     /* Check for end-of-field or end-of-buffer */
1020     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1021     {
1022       field[field_size] = 0;
1023       field_size++;
1024       buffer_pos++;
1025       status = 0;
1026       break;
1027     }
1028     /* Handle escaped characters. */
1029     else if (buffer[buffer_pos] == '\\')
1030     {
1031       if (buffer_pos >= (buffer_size - 1))
1032         break;
1033       buffer_pos++;
1034       field[field_size] = buffer[buffer_pos];
1035       field_size++;
1036       buffer_pos++;
1037     }
1038     /* Normal operation */ 
1039     else
1040     {
1041       field[field_size] = buffer[buffer_pos];
1042       field_size++;
1043       buffer_pos++;
1044     }
1045   } /* while (buffer_pos < buffer_size) */
1047   if (status != 0)
1048     return (status);
1050   *buffer_ret = buffer + buffer_pos;
1051   *buffer_size_ret = buffer_size - buffer_pos;
1052   *field_ret = field;
1054   return (0);
1055 } /* }}} int buffer_get_field */
1057 /* if we're restricting writes to the base directory,
1058  * check whether the file falls within the dir
1059  * returns 1 if OK, otherwise 0
1060  */
1061 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1063   assert(file != NULL);
1065   if (!config_write_base_only
1066       || JOURNAL_REPLAY(sock)
1067       || config_base_dir == NULL)
1068     return 1;
1070   if (strstr(file, "../") != NULL) goto err;
1072   /* relative paths without "../" are ok */
1073   if (*file != '/') return 1;
1075   /* file must be of the format base + "/" + <1+ char filename> */
1076   if (strlen(file) < _config_base_dir_len + 2) goto err;
1077   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1078   if (*(file + _config_base_dir_len) != '/') goto err;
1080   return 1;
1082 err:
1083   if (sock != NULL && sock->fd >= 0)
1084     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1086   return 0;
1087 } /* }}} static int check_file_access */
1089 /* when using a base dir, convert relative paths to absolute paths.
1090  * if necessary, modifies the "filename" pointer to point
1091  * to the new path created in "tmp".  "tmp" is provided
1092  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1093  *
1094  * this allows us to optimize for the expected case (absolute path)
1095  * with a no-op.
1096  */
1097 static void get_abs_path(char **filename, char *tmp)
1099   assert(tmp != NULL);
1100   assert(filename != NULL && *filename != NULL);
1102   if (config_base_dir == NULL || **filename == '/')
1103     return;
1105   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1106   *filename = tmp;
1107 } /* }}} static int get_abs_path */
1109 static int flush_file (const char *filename) /* {{{ */
1111   cache_item_t *ci;
1113   pthread_mutex_lock (&cache_lock);
1115   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1116   if (ci == NULL)
1117   {
1118     pthread_mutex_unlock (&cache_lock);
1119     return (ENOENT);
1120   }
1122   if (ci->values_num > 0)
1123   {
1124     /* Enqueue at head */
1125     enqueue_cache_item (ci, HEAD);
1126     pthread_cond_wait(&ci->flushed, &cache_lock);
1127   }
1129   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1130    * may have been purged during our cond_wait() */
1132   pthread_mutex_unlock(&cache_lock);
1134   return (0);
1135 } /* }}} int flush_file */
1137 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1139   char *err = "Syntax error.\n";
1141   if (cmd && cmd->syntax)
1142     err = cmd->syntax;
1144   return send_response(sock, RESP_ERR, "Usage: %s", err);
1145 } /* }}} static int syntax_error() */
1147 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1149   uint64_t copy_queue_length;
1150   uint64_t copy_updates_received;
1151   uint64_t copy_flush_received;
1152   uint64_t copy_updates_written;
1153   uint64_t copy_data_sets_written;
1154   uint64_t copy_journal_bytes;
1155   uint64_t copy_journal_rotate;
1157   uint64_t tree_nodes_number;
1158   uint64_t tree_depth;
1160   pthread_mutex_lock (&stats_lock);
1161   copy_queue_length       = stats_queue_length;
1162   copy_updates_received   = stats_updates_received;
1163   copy_flush_received     = stats_flush_received;
1164   copy_updates_written    = stats_updates_written;
1165   copy_data_sets_written  = stats_data_sets_written;
1166   copy_journal_bytes      = stats_journal_bytes;
1167   copy_journal_rotate     = stats_journal_rotate;
1168   pthread_mutex_unlock (&stats_lock);
1170   pthread_mutex_lock (&cache_lock);
1171   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1172   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1173   pthread_mutex_unlock (&cache_lock);
1175   add_response_info(sock,
1176                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1177   add_response_info(sock,
1178                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1179   add_response_info(sock,
1180                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1181   add_response_info(sock,
1182                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1183   add_response_info(sock,
1184                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1185   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1186   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1187   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1188   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1190   send_response(sock, RESP_OK, "Statistics follow\n");
1192   return (0);
1193 } /* }}} int handle_request_stats */
1195 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1197   char *file, file_tmp[PATH_MAX];
1198   int status;
1200   status = buffer_get_field (&buffer, &buffer_size, &file);
1201   if (status != 0)
1202   {
1203     return syntax_error(sock,cmd);
1204   }
1205   else
1206   {
1207     pthread_mutex_lock(&stats_lock);
1208     stats_flush_received++;
1209     pthread_mutex_unlock(&stats_lock);
1211     get_abs_path(&file, file_tmp);
1212     if (!check_file_access(file, sock)) return 0;
1214     status = flush_file (file);
1215     if (status == 0)
1216       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1217     else if (status == ENOENT)
1218     {
1219       /* no file in our tree; see whether it exists at all */
1220       struct stat statbuf;
1222       memset(&statbuf, 0, sizeof(statbuf));
1223       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1224         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1225       else
1226         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1227     }
1228     else if (status < 0)
1229       return send_response(sock, RESP_ERR, "Internal error.\n");
1230     else
1231       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1232   }
1234   /* NOTREACHED */
1235   assert(1==0);
1236 } /* }}} int handle_request_flush */
1238 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1240   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1242   pthread_mutex_lock(&cache_lock);
1243   flush_old_values(-1);
1244   pthread_mutex_unlock(&cache_lock);
1246   return send_response(sock, RESP_OK, "Started flush.\n");
1247 } /* }}} static int handle_request_flushall */
1249 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1251   int status;
1252   char *file, file_tmp[PATH_MAX];
1253   cache_item_t *ci;
1255   status = buffer_get_field(&buffer, &buffer_size, &file);
1256   if (status != 0)
1257     return syntax_error(sock,cmd);
1259   get_abs_path(&file, file_tmp);
1261   pthread_mutex_lock(&cache_lock);
1262   ci = g_tree_lookup(cache_tree, file);
1263   if (ci == NULL)
1264   {
1265     pthread_mutex_unlock(&cache_lock);
1266     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267   }
1269   for (size_t i=0; i < ci->values_num; i++)
1270     add_response_info(sock, "%s\n", ci->values[i]);
1272   pthread_mutex_unlock(&cache_lock);
1273   return send_response(sock, RESP_OK, "updates pending\n");
1274 } /* }}} static int handle_request_pending */
1276 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1278   int status;
1279   gboolean found;
1280   char *file, file_tmp[PATH_MAX];
1282   status = buffer_get_field(&buffer, &buffer_size, &file);
1283   if (status != 0)
1284     return syntax_error(sock,cmd);
1286   get_abs_path(&file, file_tmp);
1287   if (!check_file_access(file, sock)) return 0;
1289   pthread_mutex_lock(&cache_lock);
1290   found = g_tree_remove(cache_tree, file);
1291   pthread_mutex_unlock(&cache_lock);
1293   if (found == TRUE)
1294   {
1295     if (!JOURNAL_REPLAY(sock))
1296       journal_write("forget", file);
1298     return send_response(sock, RESP_OK, "Gone!\n");
1299   }
1300   else
1301     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1303   /* NOTREACHED */
1304   assert(1==0);
1305 } /* }}} static int handle_request_forget */
1307 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1309   cache_item_t *ci;
1311   pthread_mutex_lock(&cache_lock);
1313   ci = cache_queue_head;
1314   while (ci != NULL)
1315   {
1316     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1317     ci = ci->next;
1318   }
1320   pthread_mutex_unlock(&cache_lock);
1322   return send_response(sock, RESP_OK, "in queue.\n");
1323 } /* }}} int handle_request_queue */
1325 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1327   char *file, file_tmp[PATH_MAX];
1328   int values_num = 0;
1329   int status;
1330   char orig_buf[CMD_MAX];
1332   cache_item_t *ci;
1334   /* save it for the journal later */
1335   if (!JOURNAL_REPLAY(sock))
1336     strncpy(orig_buf, buffer, buffer_size);
1338   status = buffer_get_field (&buffer, &buffer_size, &file);
1339   if (status != 0)
1340     return syntax_error(sock,cmd);
1342   pthread_mutex_lock(&stats_lock);
1343   stats_updates_received++;
1344   pthread_mutex_unlock(&stats_lock);
1346   get_abs_path(&file, file_tmp);
1347   if (!check_file_access(file, sock)) return 0;
1349   pthread_mutex_lock (&cache_lock);
1350   ci = g_tree_lookup (cache_tree, file);
1352   if (ci == NULL) /* {{{ */
1353   {
1354     struct stat statbuf;
1355     cache_item_t *tmp;
1357     /* don't hold the lock while we setup; stat(2) might block */
1358     pthread_mutex_unlock(&cache_lock);
1360     memset (&statbuf, 0, sizeof (statbuf));
1361     status = stat (file, &statbuf);
1362     if (status != 0)
1363     {
1364       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1366       status = errno;
1367       if (status == ENOENT)
1368         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1369       else
1370         return send_response(sock, RESP_ERR,
1371                              "stat failed with error %i.\n", status);
1372     }
1373     if (!S_ISREG (statbuf.st_mode))
1374       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1376     if (access(file, R_OK|W_OK) != 0)
1377       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1378                            file, rrd_strerror(errno));
1380     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1381     if (ci == NULL)
1382     {
1383       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1385       return send_response(sock, RESP_ERR, "malloc failed.\n");
1386     }
1387     memset (ci, 0, sizeof (cache_item_t));
1389     ci->file = strdup (file);
1390     if (ci->file == NULL)
1391     {
1392       free (ci);
1393       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1395       return send_response(sock, RESP_ERR, "strdup failed.\n");
1396     }
1398     wipe_ci_values(ci, now);
1399     ci->flags = CI_FLAGS_IN_TREE;
1400     pthread_cond_init(&ci->flushed, NULL);
1402     pthread_mutex_lock(&cache_lock);
1404     /* another UPDATE might have added this entry in the meantime */
1405     tmp = g_tree_lookup (cache_tree, file);
1406     if (tmp == NULL)
1407       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1408     else
1409     {
1410       free_cache_item (ci);
1411       ci = tmp;
1412     }
1414     /* state may have changed while we were unlocked */
1415     if (state == SHUTDOWN)
1416       return -1;
1417   } /* }}} */
1418   assert (ci != NULL);
1420   /* don't re-write updates in replay mode */
1421   if (!JOURNAL_REPLAY(sock))
1422     journal_write("update", orig_buf);
1424   while (buffer_size > 0)
1425   {
1426     char *value;
1427     time_t stamp;
1428     char *eostamp;
1430     status = buffer_get_field (&buffer, &buffer_size, &value);
1431     if (status != 0)
1432     {
1433       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1434       break;
1435     }
1437     /* make sure update time is always moving forward */
1438     stamp = strtol(value, &eostamp, 10);
1439     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "Cannot find timestamp in '%s'!\n", value);
1444     }
1445     else if (stamp <= ci->last_update_stamp)
1446     {
1447       pthread_mutex_unlock(&cache_lock);
1448       return send_response(sock, RESP_ERR,
1449                            "illegal attempt to update using time %ld when last"
1450                            " update time is %ld (minimum one second step)\n",
1451                            stamp, ci->last_update_stamp);
1452     }
1453     else
1454       ci->last_update_stamp = stamp;
1456     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1457                               &ci->values_alloc, config_alloc_chunk))
1458     {
1459       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1460       continue;
1461     }
1463     values_num++;
1464   }
1466   if (((now - ci->last_flush_time) >= config_write_interval)
1467       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1468       && (ci->values_num > 0))
1469   {
1470     enqueue_cache_item (ci, TAIL);
1471   }
1473   pthread_mutex_unlock (&cache_lock);
1475   if (values_num < 1)
1476     return send_response(sock, RESP_ERR, "No values updated.\n");
1477   else
1478     return send_response(sock, RESP_OK,
1479                          "errors, enqueued %i value(s).\n", values_num);
1481   /* NOTREACHED */
1482   assert(1==0);
1484 } /* }}} int handle_request_update */
1486 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1488   char *file, file_tmp[PATH_MAX];
1489   char *cf;
1491   char *start_str;
1492   char *end_str;
1493   time_t start_tm;
1494   time_t end_tm;
1496   unsigned long step;
1497   unsigned long ds_cnt;
1498   char **ds_namv;
1499   rrd_value_t *data;
1501   int status;
1502   unsigned long i;
1503   time_t t;
1504   rrd_value_t *data_ptr;
1506   file = NULL;
1507   cf = NULL;
1508   start_str = NULL;
1509   end_str = NULL;
1511   /* Read the arguments */
1512   do /* while (0) */
1513   {
1514     status = buffer_get_field (&buffer, &buffer_size, &file);
1515     if (status != 0)
1516       break;
1518     status = buffer_get_field (&buffer, &buffer_size, &cf);
1519     if (status != 0)
1520       break;
1522     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1523     if (status != 0)
1524     {
1525       start_str = NULL;
1526       status = 0;
1527       break;
1528     }
1530     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1531     if (status != 0)
1532     {
1533       end_str = NULL;
1534       status = 0;
1535       break;
1536     }
1537   } while (0);
1539   if (status != 0)
1540     return (syntax_error(sock,cmd));
1542   get_abs_path(&file, file_tmp);
1543   if (!check_file_access(file, sock)) return 0;
1545   status = flush_file (file);
1546   if ((status != 0) && (status != ENOENT))
1547     return (send_response (sock, RESP_ERR,
1548           "flush_file (%s) failed with status %i.\n", file, status));
1550   t = time (NULL); /* "now" */
1552   /* Parse start time */
1553   if (start_str != NULL)
1554   {
1555     char *endptr;
1556     long value;
1558     endptr = NULL;
1559     errno = 0;
1560     value = strtol (start_str, &endptr, /* base = */ 0);
1561     if ((endptr == start_str) || (errno != 0))
1562       return (send_response(sock, RESP_ERR,
1563             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1564             start_str));
1566     if (value > 0)
1567       start_tm = (time_t) value;
1568     else
1569       start_tm = (time_t) (t + value);
1570   }
1571   else
1572   {
1573     start_tm = t - 86400;
1574   }
1576   /* Parse end time */
1577   if (end_str != NULL)
1578   {
1579     char *endptr;
1580     long value;
1582     endptr = NULL;
1583     errno = 0;
1584     value = strtol (end_str, &endptr, /* base = */ 0);
1585     if ((endptr == end_str) || (errno != 0))
1586       return (send_response(sock, RESP_ERR,
1587             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1588             end_str));
1590     if (value > 0)
1591       end_tm = (time_t) value;
1592     else
1593       end_tm = (time_t) (t + value);
1594   }
1595   else
1596   {
1597     end_tm = t;
1598   }
1600   step = -1;
1601   ds_cnt = 0;
1602   ds_namv = NULL;
1603   data = NULL;
1605   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1606       &ds_cnt, &ds_namv, &data);
1607   if (status != 0)
1608     return (send_response(sock, RESP_ERR,
1609           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1611   add_response_info (sock, "FlushVersion: %lu\n", 1);
1612   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1613   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1614   add_response_info (sock, "Step: %lu\n", step);
1615   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1617 #define SSTRCAT(buffer,str,buffer_fill) do { \
1618     size_t str_len = strlen (str); \
1619     if ((buffer_fill + str_len) > sizeof (buffer)) \
1620       str_len = sizeof (buffer) - buffer_fill; \
1621     if (str_len > 0) { \
1622       strncpy (buffer + buffer_fill, str, str_len); \
1623       buffer_fill += str_len; \
1624       assert (buffer_fill <= sizeof (buffer)); \
1625       if (buffer_fill == sizeof (buffer)) \
1626         buffer[buffer_fill - 1] = 0; \
1627       else \
1628         buffer[buffer_fill] = 0; \
1629     } \
1630   } while (0)
1632   { /* Add list of DS names */
1633     char linebuf[1024];
1634     size_t linebuf_fill;
1636     memset (linebuf, 0, sizeof (linebuf));
1637     linebuf_fill = 0;
1638     for (i = 0; i < ds_cnt; i++)
1639     {
1640       if (i > 0)
1641         SSTRCAT (linebuf, " ", linebuf_fill);
1642       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1643       rrd_freemem(ds_namv[i]);
1644     }
1645     rrd_freemem(ds_namv);
1646     add_response_info (sock, "DSName: %s\n", linebuf);
1647   }
1649   /* Add the actual data */
1650   assert (step > 0);
1651   data_ptr = data;
1652   for (t = start_tm + step; t <= end_tm; t += step)
1653   {
1654     char linebuf[1024];
1655     size_t linebuf_fill;
1656     char tmp[128];
1658     memset (linebuf, 0, sizeof (linebuf));
1659     linebuf_fill = 0;
1660     for (i = 0; i < ds_cnt; i++)
1661     {
1662       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1663       tmp[sizeof (tmp) - 1] = 0;
1664       SSTRCAT (linebuf, tmp, linebuf_fill);
1666       data_ptr++;
1667     }
1669     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1670   } /* for (t) */
1671   rrd_freemem(data);
1673   return (send_response (sock, RESP_OK, "Success\n"));
1674 #undef SSTRCAT
1675 } /* }}} int handle_request_fetch */
1677 /* we came across a "WROTE" entry during journal replay.
1678  * throw away any values that we have accumulated for this file
1679  */
1680 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1682   cache_item_t *ci;
1683   const char *file = buffer;
1685   pthread_mutex_lock(&cache_lock);
1687   ci = g_tree_lookup(cache_tree, file);
1688   if (ci == NULL)
1689   {
1690     pthread_mutex_unlock(&cache_lock);
1691     return (0);
1692   }
1694   if (ci->values)
1695     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1697   wipe_ci_values(ci, now);
1698   remove_from_queue(ci);
1700   pthread_mutex_unlock(&cache_lock);
1701   return (0);
1702 } /* }}} int handle_request_wrote */
1704 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1706   char *file, file_tmp[PATH_MAX];
1707   int status;
1708   rrd_info_t *info;
1710   /* obtain filename */
1711   status = buffer_get_field(&buffer, &buffer_size, &file);
1712   if (status != 0)
1713     return syntax_error(sock,cmd);
1714   /* get full pathname */
1715   get_abs_path(&file, file_tmp);
1716   if (!check_file_access(file, sock)) {
1717     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1718   }
1719   /* get data */
1720   rrd_clear_error ();
1721   info = rrd_info_r(file);
1722   if(!info) {
1723     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1724   }
1725   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1726       switch (data->type) {
1727       case RD_I_VAL:
1728           if (isnan(data->value.u_val))
1729               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1730           else
1731               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1732           break;
1733       case RD_I_CNT:
1734           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1735           break;
1736       case RD_I_INT:
1737           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1738           break;
1739       case RD_I_STR:
1740           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1741           break;
1742       case RD_I_BLO:
1743           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1744           break;
1745       }
1746   }
1748   rrd_info_free(info);
1750   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1751 } /* }}} static int handle_request_info  */
1753 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1755   char *i, *file, file_tmp[PATH_MAX];
1756   int status;
1757   int idx;
1758   time_t t;
1760   /* obtain filename */
1761   status = buffer_get_field(&buffer, &buffer_size, &file);
1762   if (status != 0)
1763     return syntax_error(sock,cmd);
1764   /* get full pathname */
1765   get_abs_path(&file, file_tmp);
1766   if (!check_file_access(file, sock)) {
1767     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1768   }
1770   status = buffer_get_field(&buffer, &buffer_size, &i);
1771   if (status != 0)
1772     return syntax_error(sock,cmd);
1773   idx = atoi(i);
1774   if(idx<0) { 
1775     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1776   }
1778   /* get data */
1779   rrd_clear_error ();
1780   t = rrd_first_r(file,idx);
1781   if(t<1) {
1782     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1783   }
1784   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1785 } /* }}} static int handle_request_first  */
1788 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1790   char *file, file_tmp[PATH_MAX];
1791   int status;
1792   time_t t, from_file, step;
1793   rrd_file_t * rrd_file;
1794   cache_item_t * ci;
1795   rrd_t rrd; 
1797   /* obtain filename */
1798   status = buffer_get_field(&buffer, &buffer_size, &file);
1799   if (status != 0)
1800     return syntax_error(sock,cmd);
1801   /* get full pathname */
1802   get_abs_path(&file, file_tmp);
1803   if (!check_file_access(file, sock)) {
1804     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1805   }
1806   rrd_clear_error();
1807   rrd_init(&rrd);
1808   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1809   if(!rrd_file) {
1810     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1811   }
1812   from_file = rrd.live_head->last_up;
1813   step = rrd.stat_head->pdp_step;
1814   rrd_close(rrd_file);
1815   pthread_mutex_lock(&cache_lock);
1816   ci = g_tree_lookup(cache_tree, file);
1817   if (ci)
1818     t = ci->last_update_stamp;
1819   else
1820     t = from_file;
1821   pthread_mutex_unlock(&cache_lock);
1822   t -= t % step;
1823   rrd_free(&rrd);
1824   if(t<1) {
1825     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1826   }
1827   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1828 } /* }}} static int handle_request_last  */
1830 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1832   char *file, file_tmp[PATH_MAX];
1833   char *tok;
1834   int ac = 0;
1835   char *av[128];
1836   int status;
1837   unsigned long step = 300;
1838   time_t last_up = time(NULL)-10;
1839   int no_overwrite = opt_no_overwrite;
1842   /* obtain filename */
1843   status = buffer_get_field(&buffer, &buffer_size, &file);
1844   if (status != 0)
1845     return syntax_error(sock,cmd);
1846   /* get full pathname */
1847   get_abs_path(&file, file_tmp);
1848   if (!check_file_access(file, sock)) {
1849     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1850   }
1851   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1853   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1854     if( ! strncmp(tok,"-b",2) ) {
1855       status = buffer_get_field(&buffer, &buffer_size, &tok );
1856       if (status != 0) return syntax_error(sock,cmd);
1857       last_up = (time_t) atol(tok);
1858       continue;
1859     }
1860     if( ! strncmp(tok,"-s",2) ) {
1861       status = buffer_get_field(&buffer, &buffer_size, &tok );
1862       if (status != 0) return syntax_error(sock,cmd);
1863       step = atol(tok);
1864       continue;
1865     }
1866     if( ! strncmp(tok,"-O",2) ) {
1867       no_overwrite = 1;
1868       continue;
1869     }
1870     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1871     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1872     return syntax_error(sock,cmd);
1873   }
1874   if(step<1) {
1875     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1876   }
1877   if (last_up < 3600 * 24 * 365 * 10) {
1878     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1879   }
1881   rrd_clear_error ();
1882   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1884   if(!status) {
1885     return send_response(sock, RESP_OK, "RRD created OK\n");
1886   }
1887   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1888 } /* }}} static int handle_request_create  */
1890 /* start "BATCH" processing */
1891 static int batch_start (HANDLER_PROTO) /* {{{ */
1893   int status;
1894   if (sock->batch_start)
1895     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1897   status = send_response(sock, RESP_OK,
1898                          "Go ahead.  End with dot '.' on its own line.\n");
1899   sock->batch_start = time(NULL);
1900   sock->batch_cmd = 0;
1902   return status;
1903 } /* }}} static int batch_start */
1905 /* finish "BATCH" processing and return results to the client */
1906 static int batch_done (HANDLER_PROTO) /* {{{ */
1908   assert(sock->batch_start);
1909   sock->batch_start = 0;
1910   sock->batch_cmd  = 0;
1911   return send_response(sock, RESP_OK, "errors\n");
1912 } /* }}} static int batch_done */
1914 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1916   return -1;
1917 } /* }}} static int handle_request_quit */
1919 static command_t list_of_commands[] = { /* {{{ */
1920   {
1921     "UPDATE",
1922     handle_request_update,
1923     CMD_CONTEXT_ANY,
1924     "UPDATE <filename> <values> [<values> ...]\n"
1925     ,
1926     "Adds the given file to the internal cache if it is not yet known and\n"
1927     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1928     "for details.\n"
1929     "\n"
1930     "Each <values> has the following form:\n"
1931     "  <values> = <time>:<value>[:<value>[...]]\n"
1932     "See the rrdupdate(1) manpage for details.\n"
1933   },
1934   {
1935     "WROTE",
1936     handle_request_wrote,
1937     CMD_CONTEXT_JOURNAL,
1938     NULL,
1939     NULL
1940   },
1941   {
1942     "FLUSH",
1943     handle_request_flush,
1944     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1945     "FLUSH <filename>\n"
1946     ,
1947     "Adds the given filename to the head of the update queue and returns\n"
1948     "after it has been dequeued.\n"
1949   },
1950   {
1951     "FLUSHALL",
1952     handle_request_flushall,
1953     CMD_CONTEXT_CLIENT,
1954     "FLUSHALL\n"
1955     ,
1956     "Triggers writing of all pending updates.  Returns immediately.\n"
1957   },
1958   {
1959     "PENDING",
1960     handle_request_pending,
1961     CMD_CONTEXT_CLIENT,
1962     "PENDING <filename>\n"
1963     ,
1964     "Shows any 'pending' updates for a file, in order.\n"
1965     "The updates shown have not yet been written to the underlying RRD file.\n"
1966   },
1967   {
1968     "FORGET",
1969     handle_request_forget,
1970     CMD_CONTEXT_ANY,
1971     "FORGET <filename>\n"
1972     ,
1973     "Removes the file completely from the cache.\n"
1974     "Any pending updates for the file will be lost.\n"
1975   },
1976   {
1977     "QUEUE",
1978     handle_request_queue,
1979     CMD_CONTEXT_CLIENT,
1980     "QUEUE\n"
1981     ,
1982         "Shows all files in the output queue.\n"
1983     "The output is zero or more lines in the following format:\n"
1984     "(where <num_vals> is the number of values to be written)\n"
1985     "\n"
1986     "<num_vals> <filename>\n"
1987   },
1988   {
1989     "STATS",
1990     handle_request_stats,
1991     CMD_CONTEXT_CLIENT,
1992     "STATS\n"
1993     ,
1994     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1995     "a description of the values.\n"
1996   },
1997   {
1998     "HELP",
1999     handle_request_help,
2000     CMD_CONTEXT_CLIENT,
2001     "HELP [<command>]\n",
2002     NULL, /* special! */
2003   },
2004   {
2005     "BATCH",
2006     batch_start,
2007     CMD_CONTEXT_CLIENT,
2008     "BATCH\n"
2009     ,
2010     "The 'BATCH' command permits the client to initiate a bulk load\n"
2011     "   of commands to rrdcached.\n"
2012     "\n"
2013     "Usage:\n"
2014     "\n"
2015     "    client: BATCH\n"
2016     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2017     "    client: command #1\n"
2018     "    client: command #2\n"
2019     "    client: ... and so on\n"
2020     "    client: .\n"
2021     "    server: 2 errors\n"
2022     "    server: 7 message for command #7\n"
2023     "    server: 9 message for command #9\n"
2024     "\n"
2025     "For more information, consult the rrdcached(1) documentation.\n"
2026   },
2027   {
2028     ".",   /* BATCH terminator */
2029     batch_done,
2030     CMD_CONTEXT_BATCH,
2031     NULL,
2032     NULL
2033   },
2034   {
2035     "FETCH",
2036     handle_request_fetch,
2037     CMD_CONTEXT_CLIENT,
2038     "FETCH <file> <CF> [<start> [<end>]]\n"
2039     ,
2040     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2041   },
2042   {
2043     "INFO",
2044     handle_request_info,
2045     CMD_CONTEXT_CLIENT,
2046     "INFO <filename>\n",
2047     "The INFO command retrieves information about a specified RRD file.\n"
2048     "This is returned in standard rrdinfo format, a sequence of lines\n"
2049     "with the format <keyname> = <value>\n"
2050     "Note that this is the data as of the last update of the RRD file itself,\n"
2051     "not the last time data was received via rrdcached, so there may be pending\n"
2052     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2053   },
2054   {
2055     "FIRST",
2056     handle_request_first,
2057     CMD_CONTEXT_CLIENT,
2058     "FIRST <filename> <rra index>\n",
2059     "The FIRST command retrieves the first data time for a specified RRA in\n"
2060     "an RRD file.\n"
2061   },
2062   {
2063     "LAST",
2064     handle_request_last,
2065     CMD_CONTEXT_CLIENT,
2066     "LAST <filename>\n",
2067     "The LAST command retrieves the last update time for a specified RRD file.\n"
2068     "Note that this is the time of the last update of the RRD file itself, not\n"
2069     "the last time data was received via rrdcached, so there may be pending\n"
2070     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2071   },
2072   {
2073     "CREATE",
2074     handle_request_create,
2075     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2076     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2077     "The CREATE command will create an RRD file, overwriting any existing file\n"
2078     "unless the -O option is given or rrdcached was started with the -O option.\n"
2079     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2080     "not acceptable) and the step is in seconds (default is 300).\n"
2081     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2082   },
2083   {
2084     "QUIT",
2085     handle_request_quit,
2086     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2087     "QUIT\n"
2088     ,
2089     "Disconnect from rrdcached.\n"
2090   }
2091 }; /* }}} command_t list_of_commands[] */
2092 static size_t list_of_commands_len = sizeof (list_of_commands)
2093   / sizeof (list_of_commands[0]);
2095 static command_t *find_command(char *cmd)
2097   size_t i;
2099   for (i = 0; i < list_of_commands_len; i++)
2100     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2101       return (&list_of_commands[i]);
2102   return NULL;
2105 /* We currently use the index in the `list_of_commands' array as a bit position
2106  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2107  * outside these functions so that switching to a more elegant storage method
2108  * is easily possible. */
2109 static ssize_t find_command_index (const char *cmd) /* {{{ */
2111   size_t i;
2113   for (i = 0; i < list_of_commands_len; i++)
2114     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2115       return ((ssize_t) i);
2116   return (-1);
2117 } /* }}} ssize_t find_command_index */
2119 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2120     const char *cmd)
2122   ssize_t i;
2124   if (JOURNAL_REPLAY(sock))
2125     return (1);
2127   if (cmd == NULL)
2128     return (-1);
2130   if ((strcasecmp ("QUIT", cmd) == 0)
2131       || (strcasecmp ("HELP", cmd) == 0))
2132     return (1);
2133   else if (strcmp (".", cmd) == 0)
2134     cmd = "BATCH";
2136   i = find_command_index (cmd);
2137   if (i < 0)
2138     return (-1);
2139   assert (i < 32);
2141   if ((sock->permissions & (1 << i)) != 0)
2142     return (1);
2143   return (0);
2144 } /* }}} int socket_permission_check */
2146 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2147     const char *cmd)
2149   ssize_t i;
2151   i = find_command_index (cmd);
2152   if (i < 0)
2153     return (-1);
2154   assert (i < 32);
2156   sock->permissions |= (1 << i);
2157   return (0);
2158 } /* }}} int socket_permission_add */
2160 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2162   sock->permissions = 0;
2163 } /* }}} socket_permission_clear */
2165 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2166     listen_socket_t *src)
2168   dest->permissions = src->permissions;
2169 } /* }}} socket_permission_copy */
2171 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2173   size_t i;
2175   sock->permissions = 0;
2176   for (i = 0; i < list_of_commands_len; i++)
2177     sock->permissions |= (1 << i);
2178 } /* }}} void socket_permission_set_all */
2180 /* check whether commands are received in the expected context */
2181 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2183   if (JOURNAL_REPLAY(sock))
2184     return (cmd->context & CMD_CONTEXT_JOURNAL);
2185   else if (sock->batch_start)
2186     return (cmd->context & CMD_CONTEXT_BATCH);
2187   else
2188     return (cmd->context & CMD_CONTEXT_CLIENT);
2190   /* NOTREACHED */
2191   assert(1==0);
2194 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2196   int status;
2197   char *cmd_str;
2198   char *resp_txt;
2199   command_t *help = NULL;
2201   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2202   if (status == 0)
2203     help = find_command(cmd_str);
2205   if (help && (help->syntax || help->help))
2206   {
2207     char tmp[CMD_MAX];
2209     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2210     resp_txt = tmp;
2212     if (help->syntax)
2213       add_response_info(sock, "Usage: %s\n", help->syntax);
2215     if (help->help)
2216       add_response_info(sock, "%s\n", help->help);
2217   }
2218   else
2219   {
2220     size_t i;
2222     resp_txt = "Command overview\n";
2224     for (i = 0; i < list_of_commands_len; i++)
2225     {
2226       if (list_of_commands[i].syntax == NULL)
2227         continue;
2228       add_response_info (sock, "%s", list_of_commands[i].syntax);
2229     }
2230   }
2232   return send_response(sock, RESP_OK, resp_txt);
2233 } /* }}} int handle_request_help */
2235 static int handle_request (DISPATCH_PROTO) /* {{{ */
2237   char *buffer_ptr = buffer;
2238   char *cmd_str = NULL;
2239   command_t *cmd = NULL;
2240   int status;
2242   assert (buffer[buffer_size - 1] == '\0');
2244   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2245   if (status != 0)
2246   {
2247     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2248     return (-1);
2249   }
2251   if (sock != NULL && sock->batch_start)
2252     sock->batch_cmd++;
2254   cmd = find_command(cmd_str);
2255   if (!cmd)
2256     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2258   if (!socket_permission_check (sock, cmd->cmd))
2259     return send_response(sock, RESP_ERR, "Permission denied.\n");
2261   if (!command_check_context(sock, cmd))
2262     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2264   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2265 } /* }}} int handle_request */
2267 static void journal_set_free (journal_set *js) /* {{{ */
2269   if (js == NULL)
2270     return;
2272   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2274   free(js);
2275 } /* }}} journal_set_free */
2277 static void journal_set_remove (journal_set *js) /* {{{ */
2279   if (js == NULL)
2280     return;
2282   for (uint i=0; i < js->files_num; i++)
2283   {
2284     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2285     unlink(js->files[i]);
2286   }
2287 } /* }}} journal_set_remove */
2289 /* close current journal file handle.
2290  * MUST hold journal_lock before calling */
2291 static void journal_close(void) /* {{{ */
2293   if (journal_fh != NULL)
2294   {
2295     if (fclose(journal_fh) != 0)
2296       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2297   }
2299   journal_fh = NULL;
2300   journal_size = 0;
2301 } /* }}} journal_close */
2303 /* MUST hold journal_lock before calling */
2304 static void journal_new_file(void) /* {{{ */
2306   struct timeval now;
2307   int  new_fd;
2308   char new_file[PATH_MAX + 1];
2310   assert(journal_dir != NULL);
2311   assert(journal_cur != NULL);
2313   journal_close();
2315   gettimeofday(&now, NULL);
2316   /* this format assures that the files sort in strcmp() order */
2317   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2318            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2320   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2321                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2322   if (new_fd < 0)
2323     goto error;
2325   journal_fh = fdopen(new_fd, "a");
2326   if (journal_fh == NULL)
2327     goto error;
2329   journal_size = ftell(journal_fh);
2330   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2332   /* record the file in the journal set */
2333   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2335   return;
2337 error:
2338   RRDD_LOG(LOG_CRIT,
2339            "JOURNALING DISABLED: Error while trying to create %s : %s",
2340            new_file, rrd_strerror(errno));
2341   RRDD_LOG(LOG_CRIT,
2342            "JOURNALING DISABLED: All values will be flushed at shutdown");
2344   close(new_fd);
2345   config_flush_at_shutdown = 1;
2347 } /* }}} journal_new_file */
2349 /* MUST NOT hold journal_lock before calling this */
2350 static void journal_rotate(void) /* {{{ */
2352   journal_set *old_js = NULL;
2354   if (journal_dir == NULL)
2355     return;
2357   RRDD_LOG(LOG_DEBUG, "rotating journals");
2359   pthread_mutex_lock(&stats_lock);
2360   ++stats_journal_rotate;
2361   pthread_mutex_unlock(&stats_lock);
2363   pthread_mutex_lock(&journal_lock);
2365   journal_close();
2367   /* rotate the journal sets */
2368   old_js = journal_old;
2369   journal_old = journal_cur;
2370   journal_cur = calloc(1, sizeof(journal_set));
2372   if (journal_cur != NULL)
2373     journal_new_file();
2374   else
2375     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2377   pthread_mutex_unlock(&journal_lock);
2379   journal_set_remove(old_js);
2380   journal_set_free  (old_js);
2382 } /* }}} static void journal_rotate */
2384 /* MUST hold journal_lock when calling */
2385 static void journal_done(void) /* {{{ */
2387   if (journal_cur == NULL)
2388     return;
2390   journal_close();
2392   if (config_flush_at_shutdown)
2393   {
2394     RRDD_LOG(LOG_INFO, "removing journals");
2395     journal_set_remove(journal_old);
2396     journal_set_remove(journal_cur);
2397   }
2398   else
2399   {
2400     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2401              "journals will be used at next startup");
2402   }
2404   journal_set_free(journal_cur);
2405   journal_set_free(journal_old);
2406   free(journal_dir);
2408 } /* }}} static void journal_done */
2410 static int journal_write(char *cmd, char *args) /* {{{ */
2412   int chars;
2414   if (journal_fh == NULL)
2415     return 0;
2417   pthread_mutex_lock(&journal_lock);
2418   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2419   journal_size += chars;
2421   if (journal_size > JOURNAL_MAX)
2422     journal_new_file();
2424   pthread_mutex_unlock(&journal_lock);
2426   if (chars > 0)
2427   {
2428     pthread_mutex_lock(&stats_lock);
2429     stats_journal_bytes += chars;
2430     pthread_mutex_unlock(&stats_lock);
2431   }
2433   return chars;
2434 } /* }}} static int journal_write */
2436 static int journal_replay (const char *file) /* {{{ */
2438   FILE *fh;
2439   int entry_cnt = 0;
2440   int fail_cnt = 0;
2441   uint64_t line = 0;
2442   char entry[CMD_MAX];
2443   time_t now;
2445   if (file == NULL) return 0;
2447   {
2448     char *reason = "unknown error";
2449     int status = 0;
2450     struct stat statbuf;
2452     memset(&statbuf, 0, sizeof(statbuf));
2453     if (stat(file, &statbuf) != 0)
2454     {
2455       reason = "stat error";
2456       status = errno;
2457     }
2458     else if (!S_ISREG(statbuf.st_mode))
2459     {
2460       reason = "not a regular file";
2461       status = EPERM;
2462     }
2463     if (statbuf.st_uid != daemon_uid)
2464     {
2465       reason = "not owned by daemon user";
2466       status = EACCES;
2467     }
2468     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2469     {
2470       reason = "must not be user/group writable";
2471       status = EACCES;
2472     }
2474     if (status != 0)
2475     {
2476       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2477                file, rrd_strerror(status), reason);
2478       return 0;
2479     }
2480   }
2482   fh = fopen(file, "r");
2483   if (fh == NULL)
2484   {
2485     if (errno != ENOENT)
2486       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2487                file, rrd_strerror(errno));
2488     return 0;
2489   }
2490   else
2491     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2493   now = time(NULL);
2495   while(!feof(fh))
2496   {
2497     size_t entry_len;
2499     ++line;
2500     if (fgets(entry, sizeof(entry), fh) == NULL)
2501       break;
2502     entry_len = strlen(entry);
2504     /* check \n termination in case journal writing crashed mid-line */
2505     if (entry_len == 0)
2506       continue;
2507     else if (entry[entry_len - 1] != '\n')
2508     {
2509       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2510       ++fail_cnt;
2511       continue;
2512     }
2514     entry[entry_len - 1] = '\0';
2516     if (handle_request(NULL, now, entry, entry_len) == 0)
2517       ++entry_cnt;
2518     else
2519       ++fail_cnt;
2520   }
2522   fclose(fh);
2524   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2525            entry_cnt, fail_cnt);
2527   return entry_cnt > 0 ? 1 : 0;
2528 } /* }}} static int journal_replay */
2530 static int journal_sort(const void *v1, const void *v2)
2532   char **jn1 = (char **) v1;
2533   char **jn2 = (char **) v2;
2535   return strcmp(*jn1,*jn2);
2538 static void journal_init(void) /* {{{ */
2540   int had_journal = 0;
2541   DIR *dir;
2542   struct dirent *dent;
2543   char path[PATH_MAX+1];
2545   if (journal_dir == NULL) return;
2547   pthread_mutex_lock(&journal_lock);
2549   journal_cur = calloc(1, sizeof(journal_set));
2550   if (journal_cur == NULL)
2551   {
2552     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2553     return;
2554   }
2556   RRDD_LOG(LOG_INFO, "checking for journal files");
2558   /* Handle old journal files during transition.  This gives them the
2559    * correct sort order.  TODO: remove after first release
2560    */
2561   {
2562     char old_path[PATH_MAX+1];
2563     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2564     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2565     rename(old_path, path);
2567     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2568     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2569     rename(old_path, path);
2570   }
2572   dir = opendir(journal_dir);
2573   if (!dir) {
2574     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2575     return;
2576   }
2577   while ((dent = readdir(dir)) != NULL)
2578   {
2579     /* looks like a journal file? */
2580     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2581       continue;
2583     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2585     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2586     {
2587       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2588                dent->d_name);
2589       break;
2590     }
2591   }
2592   closedir(dir);
2594   qsort(journal_cur->files, journal_cur->files_num,
2595         sizeof(journal_cur->files[0]), journal_sort);
2597   for (uint i=0; i < journal_cur->files_num; i++)
2598     had_journal += journal_replay(journal_cur->files[i]);
2600   journal_new_file();
2602   /* it must have been a crash.  start a flush */
2603   if (had_journal && config_flush_at_shutdown)
2604     flush_old_values(-1);
2606   pthread_mutex_unlock(&journal_lock);
2608   RRDD_LOG(LOG_INFO, "journal processing complete");
2610 } /* }}} static void journal_init */
2612 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2614   assert(sock != NULL);
2616   free(sock->rbuf);  sock->rbuf = NULL;
2617   free(sock->wbuf);  sock->wbuf = NULL;
2618   free(sock);
2619 } /* }}} void free_listen_socket */
2621 static void close_connection(listen_socket_t *sock) /* {{{ */
2623   if (sock->fd >= 0)
2624   {
2625     close(sock->fd);
2626     sock->fd = -1;
2627   }
2629   free_listen_socket(sock);
2631 } /* }}} void close_connection */
2633 static void *connection_thread_main (void *args) /* {{{ */
2635   listen_socket_t *sock;
2636   int fd;
2638   sock = (listen_socket_t *) args;
2639   fd = sock->fd;
2641   /* init read buffers */
2642   sock->next_read = sock->next_cmd = 0;
2643   sock->rbuf = malloc(RBUF_SIZE);
2644   if (sock->rbuf == NULL)
2645   {
2646     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2647     close_connection(sock);
2648     return NULL;
2649   }
2651   pthread_mutex_lock (&connection_threads_lock);
2652 #ifdef HAVE_LIBWRAP
2653   /* LIBWRAP does not support multiple threads! By putting this code
2654      inside pthread_mutex_lock we do not have to worry about request_info
2655      getting overwritten by another thread.
2656   */
2657   struct request_info req;
2658   request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2659   fromhost(&req);
2660   if(!hosts_access(&req)) {
2661     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2662     pthread_mutex_unlock (&connection_threads_lock);
2663     close_connection(sock);
2664     return NULL;
2665   }
2666 #endif /* HAVE_LIBWRAP */
2667   connection_threads_num++;
2668   pthread_mutex_unlock (&connection_threads_lock);
2670   while (state == RUNNING)
2671   {
2672     char *cmd;
2673     ssize_t cmd_len;
2674     ssize_t rbytes;
2675     time_t now;
2677     struct pollfd pollfd;
2678     int status;
2680     pollfd.fd = fd;
2681     pollfd.events = POLLIN | POLLPRI;
2682     pollfd.revents = 0;
2684     status = poll (&pollfd, 1, /* timeout = */ 500);
2685     if (state != RUNNING)
2686       break;
2687     else if (status == 0) /* timeout */
2688       continue;
2689     else if (status < 0) /* error */
2690     {
2691       status = errno;
2692       if (status != EINTR)
2693         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2694       continue;
2695     }
2697     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2698       break;
2699     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2700     {
2701       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2702           "poll(2) returned something unexpected: %#04hx",
2703           pollfd.revents);
2704       break;
2705     }
2707     rbytes = read(fd, sock->rbuf + sock->next_read,
2708                   RBUF_SIZE - sock->next_read);
2709     if (rbytes < 0)
2710     {
2711       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2712       break;
2713     }
2714     else if (rbytes == 0)
2715       break; /* eof */
2717     sock->next_read += rbytes;
2719     if (sock->batch_start)
2720       now = sock->batch_start;
2721     else
2722       now = time(NULL);
2724     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2725     {
2726       status = handle_request (sock, now, cmd, cmd_len+1);
2727       if (status != 0)
2728         goto out_close;
2729     }
2730   }
2732 out_close:
2733   close_connection(sock);
2735   /* Remove this thread from the connection threads list */
2736   pthread_mutex_lock (&connection_threads_lock);
2737   connection_threads_num--;
2738   if (connection_threads_num <= 0)
2739     pthread_cond_broadcast(&connection_threads_done);
2740   pthread_mutex_unlock (&connection_threads_lock);
2742   return (NULL);
2743 } /* }}} void *connection_thread_main */
2745 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2747   int fd;
2748   struct sockaddr_un sa;
2749   listen_socket_t *temp;
2750   int status;
2751   const char *path;
2752   char *path_copy, *dir;
2754   path = sock->addr;
2755   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2756     path += strlen("unix:");
2758   /* dirname may modify its argument */
2759   path_copy = strdup(path);
2760   if (path_copy == NULL)
2761   {
2762     fprintf(stderr, "rrdcached: strdup(): %s\n",
2763         rrd_strerror(errno));
2764     return (-1);
2765   }
2767   dir = dirname(path_copy);
2768   if (rrd_mkdir_p(dir, 0777) != 0)
2769   {
2770     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2771         dir, rrd_strerror(errno));
2772     return (-1);
2773   }
2775   free(path_copy);
2777   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2778       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2779   if (temp == NULL)
2780   {
2781     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2782     return (-1);
2783   }
2784   listen_fds = temp;
2785   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2787   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2788   if (fd < 0)
2789   {
2790     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2791              rrd_strerror(errno));
2792     return (-1);
2793   }
2795   memset (&sa, 0, sizeof (sa));
2796   sa.sun_family = AF_UNIX;
2797   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2799   /* if we've gotten this far, we own the pid file.  any daemon started
2800    * with the same args must not be alive.  therefore, ensure that we can
2801    * create the socket...
2802    */
2803   unlink(path);
2805   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2806   if (status != 0)
2807   {
2808     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2809              path, rrd_strerror(errno));
2810     close (fd);
2811     return (-1);
2812   }
2814   /* tweak the sockets group ownership */
2815   if (sock->socket_group != (gid_t)-1)
2816   {
2817     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2818          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2819     {
2820       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2821     }
2822   }
2824   if (sock->socket_permissions != (mode_t)-1)
2825   {
2826     if (chmod(path, sock->socket_permissions) != 0)
2827       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2828           (unsigned int)sock->socket_permissions, strerror(errno));
2829   }
2831   status = listen (fd, /* backlog = */ 10);
2832   if (status != 0)
2833   {
2834     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2835              path, rrd_strerror(errno));
2836     close (fd);
2837     unlink (path);
2838     return (-1);
2839   }
2841   listen_fds[listen_fds_num].fd = fd;
2842   listen_fds[listen_fds_num].family = PF_UNIX;
2843   strncpy(listen_fds[listen_fds_num].addr, path,
2844           sizeof (listen_fds[listen_fds_num].addr) - 1);
2845   listen_fds_num++;
2847   return (0);
2848 } /* }}} int open_listen_socket_unix */
2850 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2852   struct addrinfo ai_hints;
2853   struct addrinfo *ai_res;
2854   struct addrinfo *ai_ptr;
2855   char addr_copy[NI_MAXHOST];
2856   char *addr;
2857   char *port;
2858   int status;
2860   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2861   addr_copy[sizeof (addr_copy) - 1] = 0;
2862   addr = addr_copy;
2864   memset (&ai_hints, 0, sizeof (ai_hints));
2865   ai_hints.ai_flags = 0;
2866 #ifdef AI_ADDRCONFIG
2867   ai_hints.ai_flags |= AI_ADDRCONFIG;
2868 #endif
2869   ai_hints.ai_family = AF_UNSPEC;
2870   ai_hints.ai_socktype = SOCK_STREAM;
2872   port = NULL;
2873   if (*addr == '[') /* IPv6+port format */
2874   {
2875     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2876     addr++;
2878     port = strchr (addr, ']');
2879     if (port == NULL)
2880     {
2881       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2882       return (-1);
2883     }
2884     *port = 0;
2885     port++;
2887     if (*port == ':')
2888       port++;
2889     else if (*port == 0)
2890       port = NULL;
2891     else
2892     {
2893       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2894       return (-1);
2895     }
2896   } /* if (*addr == '[') */
2897   else
2898   {
2899     port = rindex(addr, ':');
2900     if (port != NULL)
2901     {
2902       *port = 0;
2903       port++;
2904     }
2905   }
2906   ai_res = NULL;
2907   status = getaddrinfo (addr,
2908                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2909                         &ai_hints, &ai_res);
2910   if (status != 0)
2911   {
2912     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2913              addr, gai_strerror (status));
2914     return (-1);
2915   }
2917   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2918   {
2919     int fd;
2920     listen_socket_t *temp;
2921     int one = 1;
2923     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2924         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2925     if (temp == NULL)
2926     {
2927       fprintf (stderr,
2928                "rrdcached: open_listen_socket_network: realloc failed.\n");
2929       continue;
2930     }
2931     listen_fds = temp;
2932     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2934     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2935     if (fd < 0)
2936     {
2937       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2938                rrd_strerror(errno));
2939       continue;
2940     }
2942     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2944     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2945     if (status != 0)
2946     {
2947       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2948                sock->addr, rrd_strerror(errno));
2949       close (fd);
2950       continue;
2951     }
2953     status = listen (fd, /* backlog = */ 10);
2954     if (status != 0)
2955     {
2956       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2957                sock->addr, rrd_strerror(errno));
2958       close (fd);
2959       freeaddrinfo(ai_res);
2960       return (-1);
2961     }
2963     listen_fds[listen_fds_num].fd = fd;
2964     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2965     listen_fds_num++;
2966   } /* for (ai_ptr) */
2968   freeaddrinfo(ai_res);
2969   return (0);
2970 } /* }}} static int open_listen_socket_network */
2972 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2974   assert(sock != NULL);
2975   assert(sock->addr != NULL);
2977   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2978       || sock->addr[0] == '/')
2979     return (open_listen_socket_unix(sock));
2980   else
2981     return (open_listen_socket_network(sock));
2982 } /* }}} int open_listen_socket */
2984 static int close_listen_sockets (void) /* {{{ */
2986   size_t i;
2988   for (i = 0; i < listen_fds_num; i++)
2989   {
2990     close (listen_fds[i].fd);
2992     if (listen_fds[i].family == PF_UNIX)
2993       unlink(listen_fds[i].addr);
2994   }
2996   free (listen_fds);
2997   listen_fds = NULL;
2998   listen_fds_num = 0;
3000   return (0);
3001 } /* }}} int close_listen_sockets */
3003 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3005   struct pollfd *pollfds;
3006   int pollfds_num;
3007   int status;
3008   int i;
3010   if (listen_fds_num < 1)
3011   {
3012     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3013     return (NULL);
3014   }
3016   pollfds_num = listen_fds_num;
3017   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3018   if (pollfds == NULL)
3019   {
3020     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3021     return (NULL);
3022   }
3023   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3025   RRDD_LOG(LOG_INFO, "listening for connections");
3027   while (state == RUNNING)
3028   {
3029     for (i = 0; i < pollfds_num; i++)
3030     {
3031       pollfds[i].fd = listen_fds[i].fd;
3032       pollfds[i].events = POLLIN | POLLPRI;
3033       pollfds[i].revents = 0;
3034     }
3036     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3037     if (state != RUNNING)
3038       break;
3039     else if (status == 0) /* timeout */
3040       continue;
3041     else if (status < 0) /* error */
3042     {
3043       status = errno;
3044       if (status != EINTR)
3045       {
3046         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3047       }
3048       continue;
3049     }
3051     for (i = 0; i < pollfds_num; i++)
3052     {
3053       listen_socket_t *client_sock;
3054       struct sockaddr_storage client_sa;
3055       socklen_t client_sa_size;
3056       pthread_t tid;
3057       pthread_attr_t attr;
3059       if (pollfds[i].revents == 0)
3060         continue;
3062       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3063       {
3064         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3065             "poll(2) returned something unexpected for listen FD #%i.",
3066             pollfds[i].fd);
3067         continue;
3068       }
3070       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3071       if (client_sock == NULL)
3072       {
3073         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3074         continue;
3075       }
3076       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3078       client_sa_size = sizeof (client_sa);
3079       client_sock->fd = accept (pollfds[i].fd,
3080           (struct sockaddr *) &client_sa, &client_sa_size);
3081       if (client_sock->fd < 0)
3082       {
3083         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3084         free(client_sock);
3085         continue;
3086       }
3088       pthread_attr_init (&attr);
3089       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3091       status = pthread_create (&tid, &attr, connection_thread_main,
3092                                client_sock);
3093       if (status != 0)
3094       {
3095         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3096         close_connection(client_sock);
3097         continue;
3098       }
3099     } /* for (pollfds_num) */
3100   } /* while (state == RUNNING) */
3102   RRDD_LOG(LOG_INFO, "starting shutdown");
3104   close_listen_sockets ();
3106   pthread_mutex_lock (&connection_threads_lock);
3107   while (connection_threads_num > 0)
3108     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3109   pthread_mutex_unlock (&connection_threads_lock);
3111   free(pollfds);
3113   return (NULL);
3114 } /* }}} void *listen_thread_main */
3116 static int daemonize (void) /* {{{ */
3118   int pid_fd;
3119   char *base_dir;
3121   daemon_uid = geteuid();
3123   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3124   if (pid_fd < 0)
3125     pid_fd = check_pidfile();
3126   if (pid_fd < 0)
3127     return pid_fd;
3129   /* open all the listen sockets */
3130   if (config_listen_address_list_len > 0)
3131   {
3132     for (size_t i = 0; i < config_listen_address_list_len; i++)
3133       open_listen_socket (config_listen_address_list[i]);
3135     rrd_free_ptrs((void ***) &config_listen_address_list,
3136                   &config_listen_address_list_len);
3137   }
3138   else
3139   {
3140     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3141         sizeof(default_socket.addr) - 1);
3142     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3144     if (default_socket.permissions == 0)
3145       socket_permission_set_all (&default_socket);
3147     open_listen_socket (&default_socket);
3148   }
3150   if (listen_fds_num < 1)
3151   {
3152     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3153     goto error;
3154   }
3156   if (!stay_foreground)
3157   {
3158     pid_t child;
3160     child = fork ();
3161     if (child < 0)
3162     {
3163       fprintf (stderr, "daemonize: fork(2) failed.\n");
3164       goto error;
3165     }
3166     else if (child > 0)
3167       exit(0);
3169     /* Become session leader */
3170     setsid ();
3172     /* Open the first three file descriptors to /dev/null */
3173     close (2);
3174     close (1);
3175     close (0);
3177     open ("/dev/null", O_RDWR);
3178     if (dup(0) == -1 || dup(0) == -1){
3179         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3180     }
3181   } /* if (!stay_foreground) */
3183   /* Change into the /tmp directory. */
3184   base_dir = (config_base_dir != NULL)
3185     ? config_base_dir
3186     : "/tmp";
3188   if (chdir (base_dir) != 0)
3189   {
3190     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3191     goto error;
3192   }
3194   install_signal_handlers();
3196   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3197   RRDD_LOG(LOG_INFO, "starting up");
3199   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3200                                 (GDestroyNotify) free_cache_item);
3201   if (cache_tree == NULL)
3202   {
3203     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3204     goto error;
3205   }
3207   return write_pidfile (pid_fd);
3209 error:
3210   remove_pidfile();
3211   return -1;
3212 } /* }}} int daemonize */
3214 static int cleanup (void) /* {{{ */
3216   pthread_cond_broadcast (&flush_cond);
3217   pthread_join (flush_thread, NULL);
3219   pthread_cond_broadcast (&queue_cond);
3220   for (int i = 0; i < config_queue_threads; i++)
3221     pthread_join (queue_threads[i], NULL);
3223   if (config_flush_at_shutdown)
3224   {
3225     assert(cache_queue_head == NULL);
3226     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3227   }
3229   free(queue_threads);
3230   free(config_base_dir);
3232   pthread_mutex_lock(&cache_lock);
3233   g_tree_destroy(cache_tree);
3235   pthread_mutex_lock(&journal_lock);
3236   journal_done();
3238   RRDD_LOG(LOG_INFO, "goodbye");
3239   closelog ();
3241   remove_pidfile ();
3242   free(config_pid_file);
3244   return (0);
3245 } /* }}} int cleanup */
3247 static int read_options (int argc, char **argv) /* {{{ */
3249   int option;
3250   int status = 0;
3252   socket_permission_clear (&default_socket);
3254   default_socket.socket_group = (gid_t)-1;
3255   default_socket.socket_permissions = (mode_t)-1;
3257   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3258   {
3259     switch (option)
3260     {
3261       case 'O':
3262         opt_no_overwrite = 1;
3263         break;
3265       case 'g':
3266         stay_foreground=1;
3267         break;
3269       case 'l':
3270       {
3271         listen_socket_t *new;
3273         new = malloc(sizeof(listen_socket_t));
3274         if (new == NULL)
3275         {
3276           fprintf(stderr, "read_options: malloc failed.\n");
3277           return(2);
3278         }
3279         memset(new, 0, sizeof(listen_socket_t));
3281         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3283         /* Add permissions to the socket {{{ */
3284         if (default_socket.permissions != 0)
3285         {
3286           socket_permission_copy (new, &default_socket);
3287         }
3288         else /* if (default_socket.permissions == 0) */
3289         {
3290           /* Add permission for ALL commands to the socket. */
3291           socket_permission_set_all (new);
3292         }
3293         /* }}} Done adding permissions. */
3295         new->socket_group = default_socket.socket_group;
3296         new->socket_permissions = default_socket.socket_permissions;
3298         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3299                          &config_listen_address_list_len, new))
3300         {
3301           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3302           return (2);
3303         }
3304       }
3305       break;
3307       /* set socket group permissions */
3308       case 's':
3309       {
3310         gid_t group_gid;
3311         struct group *grp;
3313         group_gid = strtoul(optarg, NULL, 10);
3314         if (errno != EINVAL && group_gid>0)
3315         {
3316           /* we were passed a number */
3317           grp = getgrgid(group_gid);
3318         }
3319         else
3320         {
3321           grp = getgrnam(optarg);
3322         }
3324         if (grp)
3325         {
3326           default_socket.socket_group = grp->gr_gid;
3327         }
3328         else
3329         {
3330           /* no idea what the user wanted... */
3331           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3332           return (5);
3333         }
3334       }
3335       break;
3337       /* set socket file permissions */
3338       case 'm':
3339       {
3340         long  tmp;
3341         char *endptr = NULL;
3343         tmp = strtol (optarg, &endptr, 8);
3344         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3345             || (tmp > 07777) || (tmp < 0)) {
3346           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3347               optarg);
3348           return (5);
3349         }
3351         default_socket.socket_permissions = (mode_t)tmp;
3352       }
3353       break;
3355       case 'P':
3356       {
3357         char *optcopy;
3358         char *saveptr;
3359         char *dummy;
3360         char *ptr;
3362         socket_permission_clear (&default_socket);
3364         optcopy = strdup (optarg);
3365         dummy = optcopy;
3366         saveptr = NULL;
3367         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3368         {
3369           dummy = NULL;
3370           status = socket_permission_add (&default_socket, ptr);
3371           if (status != 0)
3372           {
3373             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3374                 "socket failed. Most likely, this permission doesn't "
3375                 "exist. Check your command line.\n", ptr);
3376             status = 4;
3377           }
3378         }
3380         free (optcopy);
3381       }
3382       break;
3384       case 'f':
3385       {
3386         int temp;
3388         temp = atoi (optarg);
3389         if (temp > 0)
3390           config_flush_interval = temp;
3391         else
3392         {
3393           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3394           status = 3;
3395         }
3396       }
3397       break;
3399       case 'w':
3400       {
3401         int temp;
3403         temp = atoi (optarg);
3404         if (temp > 0)
3405           config_write_interval = temp;
3406         else
3407         {
3408           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3409           status = 2;
3410         }
3411       }
3412       break;
3414       case 'z':
3415       {
3416         int temp;
3418         temp = atoi(optarg);
3419         if (temp > 0)
3420           config_write_jitter = temp;
3421         else
3422         {
3423           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3424           status = 2;
3425         }
3427         break;
3428       }
3430       case 't':
3431       {
3432         int threads;
3433         threads = atoi(optarg);
3434         if (threads >= 1)
3435           config_queue_threads = threads;
3436         else
3437         {
3438           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3439           return 1;
3440         }
3441       }
3442       break;
3444       case 'B':
3445         config_write_base_only = 1;
3446         break;
3448       case 'b':
3449       {
3450         size_t len;
3451         char base_realpath[PATH_MAX];
3453         if (config_base_dir != NULL)
3454           free (config_base_dir);
3455         config_base_dir = strdup (optarg);
3456         if (config_base_dir == NULL)
3457         {
3458           fprintf (stderr, "read_options: strdup failed.\n");
3459           return (3);
3460         }
3462         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3463         {
3464           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3465               config_base_dir, rrd_strerror (errno));
3466           return (3);
3467         }
3469         /* make sure that the base directory is not resolved via
3470          * symbolic links.  this makes some performance-enhancing
3471          * assumptions possible (we don't have to resolve paths
3472          * that start with a "/")
3473          */
3474         if (realpath(config_base_dir, base_realpath) == NULL)
3475         {
3476           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3477               "%s\n", config_base_dir, rrd_strerror(errno));
3478           return 5;
3479         }
3481         len = strlen (config_base_dir);
3482         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3483         {
3484           config_base_dir[len - 1] = 0;
3485           len--;
3486         }
3488         if (len < 1)
3489         {
3490           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3491           return (4);
3492         }
3494         _config_base_dir_len = len;
3496         len = strlen (base_realpath);
3497         while ((len > 0) && (base_realpath[len - 1] == '/'))
3498         {
3499           base_realpath[len - 1] = '\0';
3500           len--;
3501         }
3503         if (strncmp(config_base_dir,
3504                          base_realpath, sizeof(base_realpath)) != 0)
3505         {
3506           fprintf(stderr,
3507                   "Base directory (-b) resolved via file system links!\n"
3508                   "Please consult rrdcached '-b' documentation!\n"
3509                   "Consider specifying the real directory (%s)\n",
3510                   base_realpath);
3511           return 5;
3512         }
3513       }
3514       break;
3516       case 'p':
3517       {
3518         if (config_pid_file != NULL)
3519           free (config_pid_file);
3520         config_pid_file = strdup (optarg);
3521         if (config_pid_file == NULL)
3522         {
3523           fprintf (stderr, "read_options: strdup failed.\n");
3524           return (3);
3525         }
3526       }
3527       break;
3529       case 'F':
3530         config_flush_at_shutdown = 1;
3531         break;
3533       case 'j':
3534       {
3535         char journal_dir_actual[PATH_MAX];
3536         journal_dir = realpath((const char *)optarg, journal_dir_actual);
3537         if (journal_dir)
3538         {
3539           // if we were able to properly resolve the path, lets have a copy
3540           // for use outside this block.
3541           journal_dir = strdup(journal_dir);           
3542           status = rrd_mkdir_p(journal_dir, 0777);
3543           if (status != 0)
3544           {
3545             fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3546                     journal_dir, rrd_strerror(errno));
3547             return 6;
3548           }
3549           if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3550           {
3551             fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3552                     errno ? rrd_strerror(errno) : "");
3553             return 6;
3554           }
3555         } else {
3556           fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3557                   errno ? rrd_strerror(errno) : "");
3558           return 6;
3559         }
3560       }
3561       break;
3563       case 'a':
3564       {
3565         int temp = atoi(optarg);
3566         if (temp > 0)
3567           config_alloc_chunk = temp;
3568         else
3569         {
3570           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3571           return 10;
3572         }
3573       }
3574       break;
3576       case 'h':
3577       case '?':
3578         printf ("RRDCacheD %s\n"
3579             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3580             "\n"
3581             "Usage: rrdcached [options]\n"
3582             "\n"
3583             "Valid options are:\n"
3584             "  -l <address>  Socket address to listen to.\n"
3585             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3586             "  -P <perms>    Sets the permissions to assign to all following "
3587                             "sockets\n"
3588             "  -w <seconds>  Interval in which to write data.\n"
3589             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3590             "  -t <threads>  Number of write threads.\n"
3591             "  -f <seconds>  Interval in which to flush dead data.\n"
3592             "  -p <file>     Location of the PID-file.\n"
3593             "  -b <dir>      Base directory to change to.\n"
3594             "  -B            Restrict file access to paths within -b <dir>\n"
3595             "  -g            Do not fork and run in the foreground.\n"
3596             "  -j <dir>      Directory in which to create the journal files.\n"
3597             "  -F            Always flush all updates at shutdown\n"
3598             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3599             "                (the socket will also have read/write permissions "
3600                             "for that group)\n"
3601             "  -m <mode>     File permissions (octal) of all following UNIX "
3602                             "sockets\n"
3603             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3604             "  -O            Do not allow CREATE commands to overwrite existing\n"
3605             "                files, even if asked to.\n"
3606             "\n"
3607             "For more information and a detailed description of all options "
3608             "please refer\n"
3609             "to the rrdcached(1) manual page.\n",
3610             VERSION);
3611         if (option == 'h')
3612           status = -1;
3613         else
3614           status = 1;
3615         break;
3616     } /* switch (option) */
3617   } /* while (getopt) */
3619   /* advise the user when values are not sane */
3620   if (config_flush_interval < 2 * config_write_interval)
3621     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3622             " 2x write interval (-w) !\n");
3623   if (config_write_jitter > config_write_interval)
3624     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3625             " write interval (-w) !\n");
3627   if (config_write_base_only && config_base_dir == NULL)
3628     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3629             "  Consult the rrdcached documentation\n");
3631   if (journal_dir == NULL)
3632     config_flush_at_shutdown = 1;
3634   return (status);
3635 } /* }}} int read_options */
3637 int main (int argc, char **argv)
3639   int status;
3641   status = read_options (argc, argv);
3642   if (status != 0)
3643   {
3644     if (status < 0)
3645       status = 0;
3646     return (status);
3647   }
3649   status = daemonize ();
3650   if (status != 0)
3651   {
3652     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3653     return (1);
3654   }
3656   journal_init();
3658   /* start the queue threads */
3659   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3660   if (queue_threads == NULL)
3661   {
3662     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3663     cleanup();
3664     return (1);
3665   }
3666   for (int i = 0; i < config_queue_threads; i++)
3667   {
3668     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3669     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3670     if (status != 0)
3671     {
3672       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3673       cleanup();
3674       return (1);
3675     }
3676   }
3678   /* start the flush thread */
3679   memset(&flush_thread, 0, sizeof(flush_thread));
3680   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3681   if (status != 0)
3682   {
3683     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3684     cleanup();
3685     return (1);
3686   }
3688   listen_thread_main (NULL);
3689   cleanup ();
3691   return (0);
3692 } /* int main */
3694 /*
3695  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3696  */