Code

16b5df59ebf769d8d4884d5a885feb339347558d
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
109 #include <glib-2.0/glib.h>
110 /* }}} */
112 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
114 #ifndef __GNUC__
115 # define __attribute__(x) /**/
116 #endif
118 /*
119  * Types
120  */
121 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
123 struct listen_socket_s
125   int fd;
126   char addr[PATH_MAX + 1];
127   int family;
129   /* state for BATCH processing */
130   time_t batch_start;
131   int batch_cmd;
133   /* buffered IO */
134   char *rbuf;
135   off_t next_cmd;
136   off_t next_read;
138   char *wbuf;
139   ssize_t wbuf_len;
141   uint32_t permissions;
142 };
143 typedef struct listen_socket_s listen_socket_t;
145 struct command_s;
146 typedef struct command_s command_t;
147 /* note: guard against "unused" warnings in the handlers */
148 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
149                         time_t now              __attribute__((unused)),\
150                         char  *buffer           __attribute__((unused)),\
151                         size_t buffer_size      __attribute__((unused))
153 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
154                         DISPATCH_PROTO
156 struct command_s {
157   char   *cmd;
158   int (*handler)(HANDLER_PROTO);
160   char  context;                /* where we expect to see it */
161 #define CMD_CONTEXT_CLIENT      (1<<0)
162 #define CMD_CONTEXT_BATCH       (1<<1)
163 #define CMD_CONTEXT_JOURNAL     (1<<2)
164 #define CMD_CONTEXT_ANY         (0x7f)
166   char *syntax;
167   char *help;
168 };
170 struct cache_item_s;
171 typedef struct cache_item_s cache_item_t;
172 struct cache_item_s
174   char *file;
175   char **values;
176   size_t values_num;
177   time_t last_flush_time;
178   time_t last_update_stamp;
179 #define CI_FLAGS_IN_TREE  (1<<0)
180 #define CI_FLAGS_IN_QUEUE (1<<1)
181   int flags;
182   pthread_cond_t  flushed;
183   cache_item_t *prev;
184   cache_item_t *next;
185 };
187 struct callback_flush_data_s
189   time_t now;
190   time_t abs_timeout;
191   char **keys;
192   size_t keys_num;
193 };
194 typedef struct callback_flush_data_s callback_flush_data_t;
196 enum queue_side_e
198   HEAD,
199   TAIL
200 };
201 typedef enum queue_side_e queue_side_t;
203 /* describe a set of journal files */
204 typedef struct {
205   char **files;
206   size_t files_num;
207 } journal_set;
209 /* max length of socket command or response */
210 #define CMD_MAX 4096
211 #define RBUF_SIZE (CMD_MAX*2)
213 /*
214  * Variables
215  */
216 static int stay_foreground = 0;
217 static uid_t daemon_uid;
219 static listen_socket_t *listen_fds = NULL;
220 static size_t listen_fds_num = 0;
222 enum {
223   RUNNING,              /* normal operation */
224   FLUSHING,             /* flushing remaining values */
225   SHUTDOWN              /* shutting down */
226 } state = RUNNING;
228 static pthread_t *queue_threads;
229 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
230 static int config_queue_threads = 4;
232 static pthread_t flush_thread;
233 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
235 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
236 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
237 static int connection_threads_num = 0;
239 /* Cache stuff */
240 static GTree          *cache_tree = NULL;
241 static cache_item_t   *cache_queue_head = NULL;
242 static cache_item_t   *cache_queue_tail = NULL;
243 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
245 static int config_write_interval = 300;
246 static int config_write_jitter   = 0;
247 static int config_flush_interval = 3600;
248 static int config_flush_at_shutdown = 0;
249 static char *config_pid_file = NULL;
250 static char *config_base_dir = NULL;
251 static size_t _config_base_dir_len = 0;
252 static int config_write_base_only = 0;
254 static listen_socket_t **config_listen_address_list = NULL;
255 static size_t config_listen_address_list_len = 0;
257 static uint64_t stats_queue_length = 0;
258 static uint64_t stats_updates_received = 0;
259 static uint64_t stats_flush_received = 0;
260 static uint64_t stats_updates_written = 0;
261 static uint64_t stats_data_sets_written = 0;
262 static uint64_t stats_journal_bytes = 0;
263 static uint64_t stats_journal_rotate = 0;
264 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
266 /* Journaled updates */
267 #define JOURNAL_BASE "rrd.journal"
268 static journal_set *journal_cur = NULL;
269 static journal_set *journal_old = NULL;
270 static char *journal_dir = NULL;
271 static FILE *journal_fh = NULL;         /* current journal file handle */
272 static long  journal_size = 0;          /* current journal size */
273 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
274 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
275 static int journal_write(char *cmd, char *args);
276 static void journal_done(void);
277 static void journal_rotate(void);
279 /* prototypes for forward refernces */
280 static int handle_request_help (HANDLER_PROTO);
282 /* 
283  * Functions
284  */
285 static void sig_common (const char *sig) /* {{{ */
287   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
288   state = FLUSHING;
289   pthread_cond_broadcast(&flush_cond);
290   pthread_cond_broadcast(&queue_cond);
291 } /* }}} void sig_common */
293 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
295   sig_common("INT");
296 } /* }}} void sig_int_handler */
298 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
300   sig_common("TERM");
301 } /* }}} void sig_term_handler */
303 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
305   config_flush_at_shutdown = 1;
306   sig_common("USR1");
307 } /* }}} void sig_usr1_handler */
309 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
311   config_flush_at_shutdown = 0;
312   sig_common("USR2");
313 } /* }}} void sig_usr2_handler */
315 static void install_signal_handlers(void) /* {{{ */
317   /* These structures are static, because `sigaction' behaves weird if the are
318    * overwritten.. */
319   static struct sigaction sa_int;
320   static struct sigaction sa_term;
321   static struct sigaction sa_pipe;
322   static struct sigaction sa_usr1;
323   static struct sigaction sa_usr2;
325   /* Install signal handlers */
326   memset (&sa_int, 0, sizeof (sa_int));
327   sa_int.sa_handler = sig_int_handler;
328   sigaction (SIGINT, &sa_int, NULL);
330   memset (&sa_term, 0, sizeof (sa_term));
331   sa_term.sa_handler = sig_term_handler;
332   sigaction (SIGTERM, &sa_term, NULL);
334   memset (&sa_pipe, 0, sizeof (sa_pipe));
335   sa_pipe.sa_handler = SIG_IGN;
336   sigaction (SIGPIPE, &sa_pipe, NULL);
338   memset (&sa_pipe, 0, sizeof (sa_usr1));
339   sa_usr1.sa_handler = sig_usr1_handler;
340   sigaction (SIGUSR1, &sa_usr1, NULL);
342   memset (&sa_usr2, 0, sizeof (sa_usr2));
343   sa_usr2.sa_handler = sig_usr2_handler;
344   sigaction (SIGUSR2, &sa_usr2, NULL);
346 } /* }}} void install_signal_handlers */
348 static int open_pidfile(char *action, int oflag) /* {{{ */
350   int fd;
351   char *file;
353   file = (config_pid_file != NULL)
354     ? config_pid_file
355     : LOCALSTATEDIR "/run/rrdcached.pid";
357   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
358   if (fd < 0)
359     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
360             action, file, rrd_strerror(errno));
362   return(fd);
363 } /* }}} static int open_pidfile */
365 /* check existing pid file to see whether a daemon is running */
366 static int check_pidfile(void)
368   int pid_fd;
369   pid_t pid;
370   char pid_str[16];
372   pid_fd = open_pidfile("open", O_RDWR);
373   if (pid_fd < 0)
374     return pid_fd;
376   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
377     return -1;
379   pid = atoi(pid_str);
380   if (pid <= 0)
381     return -1;
383   /* another running process that we can signal COULD be
384    * a competing rrdcached */
385   if (pid != getpid() && kill(pid, 0) == 0)
386   {
387     fprintf(stderr,
388             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
389     close(pid_fd);
390     return -1;
391   }
393   lseek(pid_fd, 0, SEEK_SET);
394   if (ftruncate(pid_fd, 0) == -1)
395   {
396     fprintf(stderr,
397             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
398     close(pid_fd);
399     return -1;
400   }
402   fprintf(stderr,
403           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
404           "rrdcached: starting normally.\n", pid);
406   return pid_fd;
407 } /* }}} static int check_pidfile */
409 static int write_pidfile (int fd) /* {{{ */
411   pid_t pid;
412   FILE *fh;
414   pid = getpid ();
416   fh = fdopen (fd, "w");
417   if (fh == NULL)
418   {
419     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
420     close(fd);
421     return (-1);
422   }
424   fprintf (fh, "%i\n", (int) pid);
425   fclose (fh);
427   return (0);
428 } /* }}} int write_pidfile */
430 static int remove_pidfile (void) /* {{{ */
432   char *file;
433   int status;
435   file = (config_pid_file != NULL)
436     ? config_pid_file
437     : LOCALSTATEDIR "/run/rrdcached.pid";
439   status = unlink (file);
440   if (status == 0)
441     return (0);
442   return (errno);
443 } /* }}} int remove_pidfile */
445 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
447   char *eol;
449   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
450                sock->next_read - sock->next_cmd);
452   if (eol == NULL)
453   {
454     /* no commands left, move remainder back to front of rbuf */
455     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
456             sock->next_read - sock->next_cmd);
457     sock->next_read -= sock->next_cmd;
458     sock->next_cmd = 0;
459     *len = 0;
460     return NULL;
461   }
462   else
463   {
464     char *cmd = sock->rbuf + sock->next_cmd;
465     *eol = '\0';
467     sock->next_cmd = eol - sock->rbuf + 1;
469     if (eol > sock->rbuf && *(eol-1) == '\r')
470       *(--eol) = '\0'; /* handle "\r\n" EOL */
472     *len = eol - cmd;
474     return cmd;
475   }
477   /* NOTREACHED */
478   assert(1==0);
479 } /* }}} char *next_cmd */
481 /* add the characters directly to the write buffer */
482 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
484   char *new_buf;
486   assert(sock != NULL);
488   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
489   if (new_buf == NULL)
490   {
491     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
492     return -1;
493   }
495   strncpy(new_buf + sock->wbuf_len, str, len + 1);
497   sock->wbuf = new_buf;
498   sock->wbuf_len += len;
500   return 0;
501 } /* }}} static int add_to_wbuf */
503 /* add the text to the "extra" info that's sent after the status line */
504 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
506   va_list argp;
507   char buffer[CMD_MAX];
508   int len;
510   if (sock == NULL) return 0; /* journal replay mode */
511   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
513   va_start(argp, fmt);
514 #ifdef HAVE_VSNPRINTF
515   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
516 #else
517   len = vsprintf(buffer, fmt, argp);
518 #endif
519   va_end(argp);
520   if (len < 0)
521   {
522     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
523     return -1;
524   }
526   return add_to_wbuf(sock, buffer, len);
527 } /* }}} static int add_response_info */
529 static int count_lines(char *str) /* {{{ */
531   int lines = 0;
533   if (str != NULL)
534   {
535     while ((str = strchr(str, '\n')) != NULL)
536     {
537       ++lines;
538       ++str;
539     }
540   }
542   return lines;
543 } /* }}} static int count_lines */
545 /* send the response back to the user.
546  * returns 0 on success, -1 on error
547  * write buffer is always zeroed after this call */
548 static int send_response (listen_socket_t *sock, response_code rc,
549                           char *fmt, ...) /* {{{ */
551   va_list argp;
552   char buffer[CMD_MAX];
553   int lines;
554   ssize_t wrote;
555   int rclen, len;
557   if (sock == NULL) return rc;  /* journal replay mode */
559   if (sock->batch_start)
560   {
561     if (rc == RESP_OK)
562       return rc; /* no response on success during BATCH */
563     lines = sock->batch_cmd;
564   }
565   else if (rc == RESP_OK)
566     lines = count_lines(sock->wbuf);
567   else
568     lines = -1;
570   rclen = sprintf(buffer, "%d ", lines);
571   va_start(argp, fmt);
572 #ifdef HAVE_VSNPRINTF
573   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
574 #else
575   len = vsprintf(buffer+rclen, fmt, argp);
576 #endif
577   va_end(argp);
578   if (len < 0)
579     return -1;
581   len += rclen;
583   /* append the result to the wbuf, don't write to the user */
584   if (sock->batch_start)
585     return add_to_wbuf(sock, buffer, len);
587   /* first write must be complete */
588   if (len != write(sock->fd, buffer, len))
589   {
590     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
591     return -1;
592   }
594   if (sock->wbuf != NULL && rc == RESP_OK)
595   {
596     wrote = 0;
597     while (wrote < sock->wbuf_len)
598     {
599       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
600       if (wb <= 0)
601       {
602         RRDD_LOG(LOG_INFO, "send_response: could not write results");
603         return -1;
604       }
605       wrote += wb;
606     }
607   }
609   free(sock->wbuf); sock->wbuf = NULL;
610   sock->wbuf_len = 0;
612   return 0;
613 } /* }}} */
615 static void wipe_ci_values(cache_item_t *ci, time_t when)
617   ci->values = NULL;
618   ci->values_num = 0;
620   ci->last_flush_time = when;
621   if (config_write_jitter > 0)
622     ci->last_flush_time += (rrd_random() % config_write_jitter);
625 /* remove_from_queue
626  * remove a "cache_item_t" item from the queue.
627  * must hold 'cache_lock' when calling this
628  */
629 static void remove_from_queue(cache_item_t *ci) /* {{{ */
631   if (ci == NULL) return;
632   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
634   if (ci->prev == NULL)
635     cache_queue_head = ci->next; /* reset head */
636   else
637     ci->prev->next = ci->next;
639   if (ci->next == NULL)
640     cache_queue_tail = ci->prev; /* reset the tail */
641   else
642     ci->next->prev = ci->prev;
644   ci->next = ci->prev = NULL;
645   ci->flags &= ~CI_FLAGS_IN_QUEUE;
647   pthread_mutex_lock (&stats_lock);
648   assert (stats_queue_length > 0);
649   stats_queue_length--;
650   pthread_mutex_unlock (&stats_lock);
652 } /* }}} static void remove_from_queue */
654 /* free the resources associated with the cache_item_t
655  * must hold cache_lock when calling this function
656  */
657 static void *free_cache_item(cache_item_t *ci) /* {{{ */
659   if (ci == NULL) return NULL;
661   remove_from_queue(ci);
663   for (size_t i=0; i < ci->values_num; i++)
664     free(ci->values[i]);
666   free (ci->values);
667   free (ci->file);
669   /* in case anyone is waiting */
670   pthread_cond_broadcast(&ci->flushed);
671   pthread_cond_destroy(&ci->flushed);
673   free (ci);
675   return NULL;
676 } /* }}} static void *free_cache_item */
678 /*
679  * enqueue_cache_item:
680  * `cache_lock' must be acquired before calling this function!
681  */
682 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
683     queue_side_t side)
685   if (ci == NULL)
686     return (-1);
688   if (ci->values_num == 0)
689     return (0);
691   if (side == HEAD)
692   {
693     if (cache_queue_head == ci)
694       return 0;
696     /* remove if further down in queue */
697     remove_from_queue(ci);
699     ci->prev = NULL;
700     ci->next = cache_queue_head;
701     if (ci->next != NULL)
702       ci->next->prev = ci;
703     cache_queue_head = ci;
705     if (cache_queue_tail == NULL)
706       cache_queue_tail = cache_queue_head;
707   }
708   else /* (side == TAIL) */
709   {
710     /* We don't move values back in the list.. */
711     if (ci->flags & CI_FLAGS_IN_QUEUE)
712       return (0);
714     assert (ci->next == NULL);
715     assert (ci->prev == NULL);
717     ci->prev = cache_queue_tail;
719     if (cache_queue_tail == NULL)
720       cache_queue_head = ci;
721     else
722       cache_queue_tail->next = ci;
724     cache_queue_tail = ci;
725   }
727   ci->flags |= CI_FLAGS_IN_QUEUE;
729   pthread_cond_signal(&queue_cond);
730   pthread_mutex_lock (&stats_lock);
731   stats_queue_length++;
732   pthread_mutex_unlock (&stats_lock);
734   return (0);
735 } /* }}} int enqueue_cache_item */
737 /*
738  * tree_callback_flush:
739  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
740  * while this is in progress.
741  */
742 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
743     gpointer data)
745   cache_item_t *ci;
746   callback_flush_data_t *cfd;
748   ci = (cache_item_t *) value;
749   cfd = (callback_flush_data_t *) data;
751   if (ci->flags & CI_FLAGS_IN_QUEUE)
752     return FALSE;
754   if (ci->values_num > 0
755       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
756   {
757     enqueue_cache_item (ci, TAIL);
758   }
759   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
760       && (ci->values_num <= 0))
761   {
762     assert ((char *) key == ci->file);
763     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
764     {
765       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
766       return (FALSE);
767     }
768   }
770   return (FALSE);
771 } /* }}} gboolean tree_callback_flush */
773 static int flush_old_values (int max_age)
775   callback_flush_data_t cfd;
776   size_t k;
778   memset (&cfd, 0, sizeof (cfd));
779   /* Pass the current time as user data so that we don't need to call
780    * `time' for each node. */
781   cfd.now = time (NULL);
782   cfd.keys = NULL;
783   cfd.keys_num = 0;
785   if (max_age > 0)
786     cfd.abs_timeout = cfd.now - max_age;
787   else
788     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
790   /* `tree_callback_flush' will return the keys of all values that haven't
791    * been touched in the last `config_flush_interval' seconds in `cfd'.
792    * The char*'s in this array point to the same memory as ci->file, so we
793    * don't need to free them separately. */
794   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
796   for (k = 0; k < cfd.keys_num; k++)
797   {
798     /* should never fail, since we have held the cache_lock
799      * the entire time */
800     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
801   }
803   if (cfd.keys != NULL)
804   {
805     free (cfd.keys);
806     cfd.keys = NULL;
807   }
809   return (0);
810 } /* int flush_old_values */
812 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
814   struct timeval now;
815   struct timespec next_flush;
816   int status;
818   gettimeofday (&now, NULL);
819   next_flush.tv_sec = now.tv_sec + config_flush_interval;
820   next_flush.tv_nsec = 1000 * now.tv_usec;
822   pthread_mutex_lock(&cache_lock);
824   while (state == RUNNING)
825   {
826     gettimeofday (&now, NULL);
827     if ((now.tv_sec > next_flush.tv_sec)
828         || ((now.tv_sec == next_flush.tv_sec)
829           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
830     {
831       RRDD_LOG(LOG_DEBUG, "flushing old values");
833       /* Determine the time of the next cache flush. */
834       next_flush.tv_sec = now.tv_sec + config_flush_interval;
836       /* Flush all values that haven't been written in the last
837        * `config_write_interval' seconds. */
838       flush_old_values (config_write_interval);
840       /* unlock the cache while we rotate so we don't block incoming
841        * updates if the fsync() blocks on disk I/O */
842       pthread_mutex_unlock(&cache_lock);
843       journal_rotate();
844       pthread_mutex_lock(&cache_lock);
845     }
847     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
848     if (status != 0 && status != ETIMEDOUT)
849     {
850       RRDD_LOG (LOG_ERR, "flush_thread_main: "
851                 "pthread_cond_timedwait returned %i.", status);
852     }
853   }
855   if (config_flush_at_shutdown)
856     flush_old_values (-1); /* flush everything */
858   state = SHUTDOWN;
860   pthread_mutex_unlock(&cache_lock);
862   return NULL;
863 } /* void *flush_thread_main */
865 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
867   pthread_mutex_lock (&cache_lock);
869   while (state != SHUTDOWN
870          || (cache_queue_head != NULL && config_flush_at_shutdown))
871   {
872     cache_item_t *ci;
873     char *file;
874     char **values;
875     size_t values_num;
876     int status;
878     /* Now, check if there's something to store away. If not, wait until
879      * something comes in. */
880     if (cache_queue_head == NULL)
881     {
882       status = pthread_cond_wait (&queue_cond, &cache_lock);
883       if ((status != 0) && (status != ETIMEDOUT))
884       {
885         RRDD_LOG (LOG_ERR, "queue_thread_main: "
886             "pthread_cond_wait returned %i.", status);
887       }
888     }
890     /* Check if a value has arrived. This may be NULL if we timed out or there
891      * was an interrupt such as a signal. */
892     if (cache_queue_head == NULL)
893       continue;
895     ci = cache_queue_head;
897     /* copy the relevant parts */
898     file = strdup (ci->file);
899     if (file == NULL)
900     {
901       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
902       continue;
903     }
905     assert(ci->values != NULL);
906     assert(ci->values_num > 0);
908     values = ci->values;
909     values_num = ci->values_num;
911     wipe_ci_values(ci, time(NULL));
912     remove_from_queue(ci);
914     pthread_mutex_unlock (&cache_lock);
916     rrd_clear_error ();
917     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
918     if (status != 0)
919     {
920       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
921           "rrd_update_r (%s) failed with status %i. (%s)",
922           file, status, rrd_get_error());
923     }
925     journal_write("wrote", file);
927     /* Search again in the tree.  It's possible someone issued a "FORGET"
928      * while we were writing the update values. */
929     pthread_mutex_lock(&cache_lock);
930     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
931     if (ci)
932       pthread_cond_broadcast(&ci->flushed);
933     pthread_mutex_unlock(&cache_lock);
935     if (status == 0)
936     {
937       pthread_mutex_lock (&stats_lock);
938       stats_updates_written++;
939       stats_data_sets_written += values_num;
940       pthread_mutex_unlock (&stats_lock);
941     }
943     rrd_free_ptrs((void ***) &values, &values_num);
944     free(file);
946     pthread_mutex_lock (&cache_lock);
947   }
948   pthread_mutex_unlock (&cache_lock);
950   return (NULL);
951 } /* }}} void *queue_thread_main */
953 static int buffer_get_field (char **buffer_ret, /* {{{ */
954     size_t *buffer_size_ret, char **field_ret)
956   char *buffer;
957   size_t buffer_pos;
958   size_t buffer_size;
959   char *field;
960   size_t field_size;
961   int status;
963   buffer = *buffer_ret;
964   buffer_pos = 0;
965   buffer_size = *buffer_size_ret;
966   field = *buffer_ret;
967   field_size = 0;
969   if (buffer_size <= 0)
970     return (-1);
972   /* This is ensured by `handle_request'. */
973   assert (buffer[buffer_size - 1] == '\0');
975   status = -1;
976   while (buffer_pos < buffer_size)
977   {
978     /* Check for end-of-field or end-of-buffer */
979     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
980     {
981       field[field_size] = 0;
982       field_size++;
983       buffer_pos++;
984       status = 0;
985       break;
986     }
987     /* Handle escaped characters. */
988     else if (buffer[buffer_pos] == '\\')
989     {
990       if (buffer_pos >= (buffer_size - 1))
991         break;
992       buffer_pos++;
993       field[field_size] = buffer[buffer_pos];
994       field_size++;
995       buffer_pos++;
996     }
997     /* Normal operation */ 
998     else
999     {
1000       field[field_size] = buffer[buffer_pos];
1001       field_size++;
1002       buffer_pos++;
1003     }
1004   } /* while (buffer_pos < buffer_size) */
1006   if (status != 0)
1007     return (status);
1009   *buffer_ret = buffer + buffer_pos;
1010   *buffer_size_ret = buffer_size - buffer_pos;
1011   *field_ret = field;
1013   return (0);
1014 } /* }}} int buffer_get_field */
1016 /* if we're restricting writes to the base directory,
1017  * check whether the file falls within the dir
1018  * returns 1 if OK, otherwise 0
1019  */
1020 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1022   assert(file != NULL);
1024   if (!config_write_base_only
1025       || sock == NULL /* journal replay */
1026       || config_base_dir == NULL)
1027     return 1;
1029   if (strstr(file, "../") != NULL) goto err;
1031   /* relative paths without "../" are ok */
1032   if (*file != '/') return 1;
1034   /* file must be of the format base + "/" + <1+ char filename> */
1035   if (strlen(file) < _config_base_dir_len + 2) goto err;
1036   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1037   if (*(file + _config_base_dir_len) != '/') goto err;
1039   return 1;
1041 err:
1042   if (sock != NULL && sock->fd >= 0)
1043     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1045   return 0;
1046 } /* }}} static int check_file_access */
1048 /* when using a base dir, convert relative paths to absolute paths.
1049  * if necessary, modifies the "filename" pointer to point
1050  * to the new path created in "tmp".  "tmp" is provided
1051  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1052  *
1053  * this allows us to optimize for the expected case (absolute path)
1054  * with a no-op.
1055  */
1056 static void get_abs_path(char **filename, char *tmp)
1058   assert(tmp != NULL);
1059   assert(filename != NULL && *filename != NULL);
1061   if (config_base_dir == NULL || **filename == '/')
1062     return;
1064   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1065   *filename = tmp;
1066 } /* }}} static int get_abs_path */
1068 static int flush_file (const char *filename) /* {{{ */
1070   cache_item_t *ci;
1072   pthread_mutex_lock (&cache_lock);
1074   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1075   if (ci == NULL)
1076   {
1077     pthread_mutex_unlock (&cache_lock);
1078     return (ENOENT);
1079   }
1081   if (ci->values_num > 0)
1082   {
1083     /* Enqueue at head */
1084     enqueue_cache_item (ci, HEAD);
1085     pthread_cond_wait(&ci->flushed, &cache_lock);
1086   }
1088   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1089    * may have been purged during our cond_wait() */
1091   pthread_mutex_unlock(&cache_lock);
1093   return (0);
1094 } /* }}} int flush_file */
1096 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1098   char *err = "Syntax error.\n";
1100   if (cmd && cmd->syntax)
1101     err = cmd->syntax;
1103   return send_response(sock, RESP_ERR, "Usage: %s", err);
1104 } /* }}} static int syntax_error() */
1106 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1108   uint64_t copy_queue_length;
1109   uint64_t copy_updates_received;
1110   uint64_t copy_flush_received;
1111   uint64_t copy_updates_written;
1112   uint64_t copy_data_sets_written;
1113   uint64_t copy_journal_bytes;
1114   uint64_t copy_journal_rotate;
1116   uint64_t tree_nodes_number;
1117   uint64_t tree_depth;
1119   pthread_mutex_lock (&stats_lock);
1120   copy_queue_length       = stats_queue_length;
1121   copy_updates_received   = stats_updates_received;
1122   copy_flush_received     = stats_flush_received;
1123   copy_updates_written    = stats_updates_written;
1124   copy_data_sets_written  = stats_data_sets_written;
1125   copy_journal_bytes      = stats_journal_bytes;
1126   copy_journal_rotate     = stats_journal_rotate;
1127   pthread_mutex_unlock (&stats_lock);
1129   pthread_mutex_lock (&cache_lock);
1130   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1131   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1132   pthread_mutex_unlock (&cache_lock);
1134   add_response_info(sock,
1135                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1136   add_response_info(sock,
1137                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1138   add_response_info(sock,
1139                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1140   add_response_info(sock,
1141                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1142   add_response_info(sock,
1143                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1144   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1145   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1146   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1147   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1149   send_response(sock, RESP_OK, "Statistics follow\n");
1151   return (0);
1152 } /* }}} int handle_request_stats */
1154 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1156   char *file, file_tmp[PATH_MAX];
1157   int status;
1159   status = buffer_get_field (&buffer, &buffer_size, &file);
1160   if (status != 0)
1161   {
1162     return syntax_error(sock,cmd);
1163   }
1164   else
1165   {
1166     pthread_mutex_lock(&stats_lock);
1167     stats_flush_received++;
1168     pthread_mutex_unlock(&stats_lock);
1170     get_abs_path(&file, file_tmp);
1171     if (!check_file_access(file, sock)) return 0;
1173     status = flush_file (file);
1174     if (status == 0)
1175       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1176     else if (status == ENOENT)
1177     {
1178       /* no file in our tree; see whether it exists at all */
1179       struct stat statbuf;
1181       memset(&statbuf, 0, sizeof(statbuf));
1182       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1183         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1184       else
1185         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1186     }
1187     else if (status < 0)
1188       return send_response(sock, RESP_ERR, "Internal error.\n");
1189     else
1190       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1191   }
1193   /* NOTREACHED */
1194   assert(1==0);
1195 } /* }}} int handle_request_flush */
1197 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1199   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1201   pthread_mutex_lock(&cache_lock);
1202   flush_old_values(-1);
1203   pthread_mutex_unlock(&cache_lock);
1205   return send_response(sock, RESP_OK, "Started flush.\n");
1206 } /* }}} static int handle_request_flushall */
1208 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1210   int status;
1211   char *file, file_tmp[PATH_MAX];
1212   cache_item_t *ci;
1214   status = buffer_get_field(&buffer, &buffer_size, &file);
1215   if (status != 0)
1216     return syntax_error(sock,cmd);
1218   get_abs_path(&file, file_tmp);
1220   pthread_mutex_lock(&cache_lock);
1221   ci = g_tree_lookup(cache_tree, file);
1222   if (ci == NULL)
1223   {
1224     pthread_mutex_unlock(&cache_lock);
1225     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1226   }
1228   for (size_t i=0; i < ci->values_num; i++)
1229     add_response_info(sock, "%s\n", ci->values[i]);
1231   pthread_mutex_unlock(&cache_lock);
1232   return send_response(sock, RESP_OK, "updates pending\n");
1233 } /* }}} static int handle_request_pending */
1235 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1237   int status;
1238   gboolean found;
1239   char *file, file_tmp[PATH_MAX];
1241   status = buffer_get_field(&buffer, &buffer_size, &file);
1242   if (status != 0)
1243     return syntax_error(sock,cmd);
1245   get_abs_path(&file, file_tmp);
1246   if (!check_file_access(file, sock)) return 0;
1248   pthread_mutex_lock(&cache_lock);
1249   found = g_tree_remove(cache_tree, file);
1250   pthread_mutex_unlock(&cache_lock);
1252   if (found == TRUE)
1253   {
1254     if (sock != NULL)
1255       journal_write("forget", file);
1257     return send_response(sock, RESP_OK, "Gone!\n");
1258   }
1259   else
1260     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1262   /* NOTREACHED */
1263   assert(1==0);
1264 } /* }}} static int handle_request_forget */
1266 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1268   cache_item_t *ci;
1270   pthread_mutex_lock(&cache_lock);
1272   ci = cache_queue_head;
1273   while (ci != NULL)
1274   {
1275     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1276     ci = ci->next;
1277   }
1279   pthread_mutex_unlock(&cache_lock);
1281   return send_response(sock, RESP_OK, "in queue.\n");
1282 } /* }}} int handle_request_queue */
1284 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1286   char *file, file_tmp[PATH_MAX];
1287   int values_num = 0;
1288   int status;
1289   char orig_buf[CMD_MAX];
1291   cache_item_t *ci;
1293   /* save it for the journal later */
1294   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1296   status = buffer_get_field (&buffer, &buffer_size, &file);
1297   if (status != 0)
1298     return syntax_error(sock,cmd);
1300   pthread_mutex_lock(&stats_lock);
1301   stats_updates_received++;
1302   pthread_mutex_unlock(&stats_lock);
1304   get_abs_path(&file, file_tmp);
1305   if (!check_file_access(file, sock)) return 0;
1307   pthread_mutex_lock (&cache_lock);
1308   ci = g_tree_lookup (cache_tree, file);
1310   if (ci == NULL) /* {{{ */
1311   {
1312     struct stat statbuf;
1313     cache_item_t *tmp;
1315     /* don't hold the lock while we setup; stat(2) might block */
1316     pthread_mutex_unlock(&cache_lock);
1318     memset (&statbuf, 0, sizeof (statbuf));
1319     status = stat (file, &statbuf);
1320     if (status != 0)
1321     {
1322       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1324       status = errno;
1325       if (status == ENOENT)
1326         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1327       else
1328         return send_response(sock, RESP_ERR,
1329                              "stat failed with error %i.\n", status);
1330     }
1331     if (!S_ISREG (statbuf.st_mode))
1332       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1334     if (access(file, R_OK|W_OK) != 0)
1335       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1336                            file, rrd_strerror(errno));
1338     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1339     if (ci == NULL)
1340     {
1341       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1343       return send_response(sock, RESP_ERR, "malloc failed.\n");
1344     }
1345     memset (ci, 0, sizeof (cache_item_t));
1347     ci->file = strdup (file);
1348     if (ci->file == NULL)
1349     {
1350       free (ci);
1351       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1353       return send_response(sock, RESP_ERR, "strdup failed.\n");
1354     }
1356     wipe_ci_values(ci, now);
1357     ci->flags = CI_FLAGS_IN_TREE;
1358     pthread_cond_init(&ci->flushed, NULL);
1360     pthread_mutex_lock(&cache_lock);
1362     /* another UPDATE might have added this entry in the meantime */
1363     tmp = g_tree_lookup (cache_tree, file);
1364     if (tmp == NULL)
1365       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1366     else
1367     {
1368       free_cache_item (ci);
1369       ci = tmp;
1370     }
1372     /* state may have changed while we were unlocked */
1373     if (state == SHUTDOWN)
1374       return -1;
1375   } /* }}} */
1376   assert (ci != NULL);
1378   /* don't re-write updates in replay mode */
1379   if (sock != NULL)
1380     journal_write("update", orig_buf);
1382   while (buffer_size > 0)
1383   {
1384     char *value;
1385     time_t stamp;
1386     char *eostamp;
1388     status = buffer_get_field (&buffer, &buffer_size, &value);
1389     if (status != 0)
1390     {
1391       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1392       break;
1393     }
1395     /* make sure update time is always moving forward */
1396     stamp = strtol(value, &eostamp, 10);
1397     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1398     {
1399       pthread_mutex_unlock(&cache_lock);
1400       return send_response(sock, RESP_ERR,
1401                            "Cannot find timestamp in '%s'!\n", value);
1402     }
1403     else if (stamp <= ci->last_update_stamp)
1404     {
1405       pthread_mutex_unlock(&cache_lock);
1406       return send_response(sock, RESP_ERR,
1407                            "illegal attempt to update using time %ld when last"
1408                            " update time is %ld (minimum one second step)\n",
1409                            stamp, ci->last_update_stamp);
1410     }
1411     else
1412       ci->last_update_stamp = stamp;
1414     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1415     {
1416       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1417       continue;
1418     }
1420     values_num++;
1421   }
1423   if (((now - ci->last_flush_time) >= config_write_interval)
1424       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1425       && (ci->values_num > 0))
1426   {
1427     enqueue_cache_item (ci, TAIL);
1428   }
1430   pthread_mutex_unlock (&cache_lock);
1432   if (values_num < 1)
1433     return send_response(sock, RESP_ERR, "No values updated.\n");
1434   else
1435     return send_response(sock, RESP_OK,
1436                          "errors, enqueued %i value(s).\n", values_num);
1438   /* NOTREACHED */
1439   assert(1==0);
1441 } /* }}} int handle_request_update */
1443 /* we came across a "WROTE" entry during journal replay.
1444  * throw away any values that we have accumulated for this file
1445  */
1446 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1448   cache_item_t *ci;
1449   const char *file = buffer;
1451   pthread_mutex_lock(&cache_lock);
1453   ci = g_tree_lookup(cache_tree, file);
1454   if (ci == NULL)
1455   {
1456     pthread_mutex_unlock(&cache_lock);
1457     return (0);
1458   }
1460   if (ci->values)
1461     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1463   wipe_ci_values(ci, now);
1464   remove_from_queue(ci);
1466   pthread_mutex_unlock(&cache_lock);
1467   return (0);
1468 } /* }}} int handle_request_wrote */
1470 /* start "BATCH" processing */
1471 static int batch_start (HANDLER_PROTO) /* {{{ */
1473   int status;
1474   if (sock->batch_start)
1475     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1477   status = send_response(sock, RESP_OK,
1478                          "Go ahead.  End with dot '.' on its own line.\n");
1479   sock->batch_start = time(NULL);
1480   sock->batch_cmd = 0;
1482   return status;
1483 } /* }}} static int batch_start */
1485 /* finish "BATCH" processing and return results to the client */
1486 static int batch_done (HANDLER_PROTO) /* {{{ */
1488   assert(sock->batch_start);
1489   sock->batch_start = 0;
1490   sock->batch_cmd  = 0;
1491   return send_response(sock, RESP_OK, "errors\n");
1492 } /* }}} static int batch_done */
1494 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1496   return -1;
1497 } /* }}} static int handle_request_quit */
1499 static command_t list_of_commands[] = { /* {{{ */
1500   {
1501     "UPDATE",
1502     handle_request_update,
1503     CMD_CONTEXT_ANY,
1504     "UPDATE <filename> <values> [<values> ...]\n"
1505     ,
1506     "Adds the given file to the internal cache if it is not yet known and\n"
1507     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1508     "for details.\n"
1509     "\n"
1510     "Each <values> has the following form:\n"
1511     "  <values> = <time>:<value>[:<value>[...]]\n"
1512     "See the rrdupdate(1) manpage for details.\n"
1513   },
1514   {
1515     "WROTE",
1516     handle_request_wrote,
1517     CMD_CONTEXT_JOURNAL,
1518     NULL,
1519     NULL
1520   },
1521   {
1522     "FLUSH",
1523     handle_request_flush,
1524     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1525     "FLUSH <filename>\n"
1526     ,
1527     "Adds the given filename to the head of the update queue and returns\n"
1528     "after it has been dequeued.\n"
1529   },
1530   {
1531     "FLUSHALL",
1532     handle_request_flushall,
1533     CMD_CONTEXT_CLIENT,
1534     "FLUSHALL\n"
1535     ,
1536     "Triggers writing of all pending updates.  Returns immediately.\n"
1537   },
1538   {
1539     "PENDING",
1540     handle_request_pending,
1541     CMD_CONTEXT_CLIENT,
1542     "PENDING <filename>\n"
1543     ,
1544     "Shows any 'pending' updates for a file, in order.\n"
1545     "The updates shown have not yet been written to the underlying RRD file.\n"
1546   },
1547   {
1548     "FORGET",
1549     handle_request_forget,
1550     CMD_CONTEXT_ANY,
1551     "FORGET <filename>\n"
1552     ,
1553     "Removes the file completely from the cache.\n"
1554     "Any pending updates for the file will be lost.\n"
1555   },
1556   {
1557     "QUEUE",
1558     handle_request_queue,
1559     CMD_CONTEXT_CLIENT,
1560     "QUEUE\n"
1561     ,
1562         "Shows all files in the output queue.\n"
1563     "The output is zero or more lines in the following format:\n"
1564     "(where <num_vals> is the number of values to be written)\n"
1565     "\n"
1566     "<num_vals> <filename>\n"
1567   },
1568   {
1569     "STATS",
1570     handle_request_stats,
1571     CMD_CONTEXT_CLIENT,
1572     "STATS\n"
1573     ,
1574     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1575     "a description of the values.\n"
1576   },
1577   {
1578     "HELP",
1579     handle_request_help,
1580     CMD_CONTEXT_CLIENT,
1581     "HELP [<command>]\n",
1582     NULL, /* special! */
1583   },
1584   {
1585     "BATCH",
1586     batch_start,
1587     CMD_CONTEXT_CLIENT,
1588     "BATCH\n"
1589     ,
1590     "The 'BATCH' command permits the client to initiate a bulk load\n"
1591     "   of commands to rrdcached.\n"
1592     "\n"
1593     "Usage:\n"
1594     "\n"
1595     "    client: BATCH\n"
1596     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1597     "    client: command #1\n"
1598     "    client: command #2\n"
1599     "    client: ... and so on\n"
1600     "    client: .\n"
1601     "    server: 2 errors\n"
1602     "    server: 7 message for command #7\n"
1603     "    server: 9 message for command #9\n"
1604     "\n"
1605     "For more information, consult the rrdcached(1) documentation.\n"
1606   },
1607   {
1608     ".",   /* BATCH terminator */
1609     batch_done,
1610     CMD_CONTEXT_BATCH,
1611     NULL,
1612     NULL
1613   },
1614   {
1615     "QUIT",
1616     handle_request_quit,
1617     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1618     "QUIT\n"
1619     ,
1620     "Disconnect from rrdcached.\n"
1621   }
1622 }; /* }}} command_t list_of_commands[] */
1623 static size_t list_of_commands_len = sizeof (list_of_commands)
1624   / sizeof (list_of_commands[0]);
1626 static command_t *find_command(char *cmd)
1628   size_t i;
1630   for (i = 0; i < list_of_commands_len; i++)
1631     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1632       return (&list_of_commands[i]);
1633   return NULL;
1636 /* We currently use the index in the `list_of_commands' array as a bit position
1637  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1638  * outside these functions so that switching to a more elegant storage method
1639  * is easily possible. */
1640 static ssize_t find_command_index (const char *cmd) /* {{{ */
1642   size_t i;
1644   for (i = 0; i < list_of_commands_len; i++)
1645     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1646       return ((ssize_t) i);
1647   return (-1);
1648 } /* }}} ssize_t find_command_index */
1650 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1651     const char *cmd)
1653   ssize_t i;
1655   if (cmd == NULL)
1656     return (-1);
1658   if ((strcasecmp ("QUIT", cmd) == 0)
1659       || (strcasecmp ("HELP", cmd) == 0))
1660     return (1);
1661   else if (strcmp (".", cmd) == 0)
1662     cmd = "BATCH";
1664   i = find_command_index (cmd);
1665   if (i < 0)
1666     return (-1);
1667   assert (i < 32);
1669   if ((sock->permissions & (1 << i)) != 0)
1670     return (1);
1671   return (0);
1672 } /* }}} int socket_permission_check */
1674 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1675     const char *cmd)
1677   ssize_t i;
1679   i = find_command_index (cmd);
1680   if (i < 0)
1681     return (-1);
1682   assert (i < 32);
1684   sock->permissions |= (1 << i);
1685   return (0);
1686 } /* }}} int socket_permission_add */
1688 /* check whether commands are received in the expected context */
1689 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1691   if (sock == NULL)
1692     return (cmd->context & CMD_CONTEXT_JOURNAL);
1693   else if (sock->batch_start)
1694     return (cmd->context & CMD_CONTEXT_BATCH);
1695   else
1696     return (cmd->context & CMD_CONTEXT_CLIENT);
1698   /* NOTREACHED */
1699   assert(1==0);
1702 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1704   int status;
1705   char *cmd_str;
1706   char *resp_txt;
1707   command_t *help = NULL;
1709   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1710   if (status == 0)
1711     help = find_command(cmd_str);
1713   if (help && (help->syntax || help->help))
1714   {
1715     char tmp[CMD_MAX];
1717     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1718     resp_txt = tmp;
1720     if (help->syntax)
1721       add_response_info(sock, "Usage: %s\n", help->syntax);
1723     if (help->help)
1724       add_response_info(sock, "%s\n", help->help);
1725   }
1726   else
1727   {
1728     size_t i;
1730     resp_txt = "Command overview\n";
1732     for (i = 0; i < list_of_commands_len; i++)
1733     {
1734       if (list_of_commands[i].syntax == NULL)
1735         continue;
1736       add_response_info (sock, "%s", list_of_commands[i].syntax);
1737     }
1738   }
1740   return send_response(sock, RESP_OK, resp_txt);
1741 } /* }}} int handle_request_help */
1743 /* if sock==NULL, we are in journal replay mode */
1744 static int handle_request (DISPATCH_PROTO) /* {{{ */
1746   char *buffer_ptr = buffer;
1747   char *cmd_str = NULL;
1748   command_t *cmd = NULL;
1749   int status;
1751   assert (buffer[buffer_size - 1] == '\0');
1753   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1754   if (status != 0)
1755   {
1756     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1757     return (-1);
1758   }
1760   if (sock != NULL && sock->batch_start)
1761     sock->batch_cmd++;
1763   cmd = find_command(cmd_str);
1764   if (!cmd)
1765     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1767   if (!socket_permission_check (sock, cmd->cmd))
1768     return send_response(sock, RESP_ERR, "Permission denied.\n");
1770   if (!command_check_context(sock, cmd))
1771     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1773   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1774 } /* }}} int handle_request */
1776 static void journal_set_free (journal_set *js) /* {{{ */
1778   if (js == NULL)
1779     return;
1781   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1783   free(js);
1784 } /* }}} journal_set_free */
1786 static void journal_set_remove (journal_set *js) /* {{{ */
1788   if (js == NULL)
1789     return;
1791   for (uint i=0; i < js->files_num; i++)
1792   {
1793     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1794     unlink(js->files[i]);
1795   }
1796 } /* }}} journal_set_remove */
1798 /* close current journal file handle.
1799  * MUST hold journal_lock before calling */
1800 static void journal_close(void) /* {{{ */
1802   if (journal_fh != NULL)
1803   {
1804     if (fclose(journal_fh) != 0)
1805       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1806   }
1808   journal_fh = NULL;
1809   journal_size = 0;
1810 } /* }}} journal_close */
1812 /* MUST hold journal_lock before calling */
1813 static void journal_new_file(void) /* {{{ */
1815   struct timeval now;
1816   int  new_fd;
1817   char new_file[PATH_MAX + 1];
1819   assert(journal_dir != NULL);
1820   assert(journal_cur != NULL);
1822   journal_close();
1824   gettimeofday(&now, NULL);
1825   /* this format assures that the files sort in strcmp() order */
1826   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1827            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1829   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1830                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1831   if (new_fd < 0)
1832     goto error;
1834   journal_fh = fdopen(new_fd, "a");
1835   if (journal_fh == NULL)
1836     goto error;
1838   journal_size = ftell(journal_fh);
1839   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1841   /* record the file in the journal set */
1842   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1844   return;
1846 error:
1847   RRDD_LOG(LOG_CRIT,
1848            "JOURNALING DISABLED: Error while trying to create %s : %s",
1849            new_file, rrd_strerror(errno));
1850   RRDD_LOG(LOG_CRIT,
1851            "JOURNALING DISABLED: All values will be flushed at shutdown");
1853   close(new_fd);
1854   config_flush_at_shutdown = 1;
1856 } /* }}} journal_new_file */
1858 /* MUST NOT hold journal_lock before calling this */
1859 static void journal_rotate(void) /* {{{ */
1861   journal_set *old_js = NULL;
1863   if (journal_dir == NULL)
1864     return;
1866   RRDD_LOG(LOG_DEBUG, "rotating journals");
1868   pthread_mutex_lock(&stats_lock);
1869   ++stats_journal_rotate;
1870   pthread_mutex_unlock(&stats_lock);
1872   pthread_mutex_lock(&journal_lock);
1874   journal_close();
1876   /* rotate the journal sets */
1877   old_js = journal_old;
1878   journal_old = journal_cur;
1879   journal_cur = calloc(1, sizeof(journal_set));
1881   if (journal_cur != NULL)
1882     journal_new_file();
1883   else
1884     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1886   pthread_mutex_unlock(&journal_lock);
1888   journal_set_remove(old_js);
1889   journal_set_free  (old_js);
1891 } /* }}} static void journal_rotate */
1893 /* MUST hold journal_lock when calling */
1894 static void journal_done(void) /* {{{ */
1896   if (journal_cur == NULL)
1897     return;
1899   journal_close();
1901   if (config_flush_at_shutdown)
1902   {
1903     RRDD_LOG(LOG_INFO, "removing journals");
1904     journal_set_remove(journal_old);
1905     journal_set_remove(journal_cur);
1906   }
1907   else
1908   {
1909     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1910              "journals will be used at next startup");
1911   }
1913   journal_set_free(journal_cur);
1914   journal_set_free(journal_old);
1915   free(journal_dir);
1917 } /* }}} static void journal_done */
1919 static int journal_write(char *cmd, char *args) /* {{{ */
1921   int chars;
1923   if (journal_fh == NULL)
1924     return 0;
1926   pthread_mutex_lock(&journal_lock);
1927   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1928   journal_size += chars;
1930   if (journal_size > JOURNAL_MAX)
1931     journal_new_file();
1933   pthread_mutex_unlock(&journal_lock);
1935   if (chars > 0)
1936   {
1937     pthread_mutex_lock(&stats_lock);
1938     stats_journal_bytes += chars;
1939     pthread_mutex_unlock(&stats_lock);
1940   }
1942   return chars;
1943 } /* }}} static int journal_write */
1945 static int journal_replay (const char *file) /* {{{ */
1947   FILE *fh;
1948   int entry_cnt = 0;
1949   int fail_cnt = 0;
1950   uint64_t line = 0;
1951   char entry[CMD_MAX];
1952   time_t now;
1954   if (file == NULL) return 0;
1956   {
1957     char *reason = "unknown error";
1958     int status = 0;
1959     struct stat statbuf;
1961     memset(&statbuf, 0, sizeof(statbuf));
1962     if (stat(file, &statbuf) != 0)
1963     {
1964       reason = "stat error";
1965       status = errno;
1966     }
1967     else if (!S_ISREG(statbuf.st_mode))
1968     {
1969       reason = "not a regular file";
1970       status = EPERM;
1971     }
1972     if (statbuf.st_uid != daemon_uid)
1973     {
1974       reason = "not owned by daemon user";
1975       status = EACCES;
1976     }
1977     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1978     {
1979       reason = "must not be user/group writable";
1980       status = EACCES;
1981     }
1983     if (status != 0)
1984     {
1985       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1986                file, rrd_strerror(status), reason);
1987       return 0;
1988     }
1989   }
1991   fh = fopen(file, "r");
1992   if (fh == NULL)
1993   {
1994     if (errno != ENOENT)
1995       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1996                file, rrd_strerror(errno));
1997     return 0;
1998   }
1999   else
2000     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2002   now = time(NULL);
2004   while(!feof(fh))
2005   {
2006     size_t entry_len;
2008     ++line;
2009     if (fgets(entry, sizeof(entry), fh) == NULL)
2010       break;
2011     entry_len = strlen(entry);
2013     /* check \n termination in case journal writing crashed mid-line */
2014     if (entry_len == 0)
2015       continue;
2016     else if (entry[entry_len - 1] != '\n')
2017     {
2018       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2019       ++fail_cnt;
2020       continue;
2021     }
2023     entry[entry_len - 1] = '\0';
2025     if (handle_request(NULL, now, entry, entry_len) == 0)
2026       ++entry_cnt;
2027     else
2028       ++fail_cnt;
2029   }
2031   fclose(fh);
2033   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2034            entry_cnt, fail_cnt);
2036   return entry_cnt > 0 ? 1 : 0;
2037 } /* }}} static int journal_replay */
2039 static int journal_sort(const void *v1, const void *v2)
2041   char **jn1 = (char **) v1;
2042   char **jn2 = (char **) v2;
2044   return strcmp(*jn1,*jn2);
2047 static void journal_init(void) /* {{{ */
2049   int had_journal = 0;
2050   DIR *dir;
2051   struct dirent *dent;
2052   char path[PATH_MAX+1];
2054   if (journal_dir == NULL) return;
2056   pthread_mutex_lock(&journal_lock);
2058   journal_cur = calloc(1, sizeof(journal_set));
2059   if (journal_cur == NULL)
2060   {
2061     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2062     return;
2063   }
2065   RRDD_LOG(LOG_INFO, "checking for journal files");
2067   /* Handle old journal files during transition.  This gives them the
2068    * correct sort order.  TODO: remove after first release
2069    */
2070   {
2071     char old_path[PATH_MAX+1];
2072     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2073     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2074     rename(old_path, path);
2076     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2077     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2078     rename(old_path, path);
2079   }
2081   dir = opendir(journal_dir);
2082   while ((dent = readdir(dir)) != NULL)
2083   {
2084     /* looks like a journal file? */
2085     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2086       continue;
2088     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2090     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2091     {
2092       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2093                dent->d_name);
2094       break;
2095     }
2096   }
2097   closedir(dir);
2099   qsort(journal_cur->files, journal_cur->files_num,
2100         sizeof(journal_cur->files[0]), journal_sort);
2102   for (uint i=0; i < journal_cur->files_num; i++)
2103     had_journal += journal_replay(journal_cur->files[i]);
2105   journal_new_file();
2107   /* it must have been a crash.  start a flush */
2108   if (had_journal && config_flush_at_shutdown)
2109     flush_old_values(-1);
2111   pthread_mutex_unlock(&journal_lock);
2113   RRDD_LOG(LOG_INFO, "journal processing complete");
2115 } /* }}} static void journal_init */
2117 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2119   assert(sock != NULL);
2121   free(sock->rbuf);  sock->rbuf = NULL;
2122   free(sock->wbuf);  sock->wbuf = NULL;
2123   free(sock);
2124 } /* }}} void free_listen_socket */
2126 static void close_connection(listen_socket_t *sock) /* {{{ */
2128   if (sock->fd >= 0)
2129   {
2130     close(sock->fd);
2131     sock->fd = -1;
2132   }
2134   free_listen_socket(sock);
2136 } /* }}} void close_connection */
2138 static void *connection_thread_main (void *args) /* {{{ */
2140   listen_socket_t *sock;
2141   int fd;
2143   sock = (listen_socket_t *) args;
2144   fd = sock->fd;
2146   /* init read buffers */
2147   sock->next_read = sock->next_cmd = 0;
2148   sock->rbuf = malloc(RBUF_SIZE);
2149   if (sock->rbuf == NULL)
2150   {
2151     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2152     close_connection(sock);
2153     return NULL;
2154   }
2156   pthread_mutex_lock (&connection_threads_lock);
2157   connection_threads_num++;
2158   pthread_mutex_unlock (&connection_threads_lock);
2160   while (state == RUNNING)
2161   {
2162     char *cmd;
2163     ssize_t cmd_len;
2164     ssize_t rbytes;
2165     time_t now;
2167     struct pollfd pollfd;
2168     int status;
2170     pollfd.fd = fd;
2171     pollfd.events = POLLIN | POLLPRI;
2172     pollfd.revents = 0;
2174     status = poll (&pollfd, 1, /* timeout = */ 500);
2175     if (state != RUNNING)
2176       break;
2177     else if (status == 0) /* timeout */
2178       continue;
2179     else if (status < 0) /* error */
2180     {
2181       status = errno;
2182       if (status != EINTR)
2183         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2184       continue;
2185     }
2187     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2188       break;
2189     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2190     {
2191       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2192           "poll(2) returned something unexpected: %#04hx",
2193           pollfd.revents);
2194       break;
2195     }
2197     rbytes = read(fd, sock->rbuf + sock->next_read,
2198                   RBUF_SIZE - sock->next_read);
2199     if (rbytes < 0)
2200     {
2201       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2202       break;
2203     }
2204     else if (rbytes == 0)
2205       break; /* eof */
2207     sock->next_read += rbytes;
2209     if (sock->batch_start)
2210       now = sock->batch_start;
2211     else
2212       now = time(NULL);
2214     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2215     {
2216       status = handle_request (sock, now, cmd, cmd_len+1);
2217       if (status != 0)
2218         goto out_close;
2219     }
2220   }
2222 out_close:
2223   close_connection(sock);
2225   /* Remove this thread from the connection threads list */
2226   pthread_mutex_lock (&connection_threads_lock);
2227   connection_threads_num--;
2228   if (connection_threads_num <= 0)
2229     pthread_cond_broadcast(&connection_threads_done);
2230   pthread_mutex_unlock (&connection_threads_lock);
2232   return (NULL);
2233 } /* }}} void *connection_thread_main */
2235 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2237   int fd;
2238   struct sockaddr_un sa;
2239   listen_socket_t *temp;
2240   int status;
2241   const char *path;
2243   path = sock->addr;
2244   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2245     path += strlen("unix:");
2247   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2248       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2249   if (temp == NULL)
2250   {
2251     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2252     return (-1);
2253   }
2254   listen_fds = temp;
2255   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2257   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2258   if (fd < 0)
2259   {
2260     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2261              rrd_strerror(errno));
2262     return (-1);
2263   }
2265   memset (&sa, 0, sizeof (sa));
2266   sa.sun_family = AF_UNIX;
2267   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2269   /* if we've gotten this far, we own the pid file.  any daemon started
2270    * with the same args must not be alive.  therefore, ensure that we can
2271    * create the socket...
2272    */
2273   unlink(path);
2275   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2276   if (status != 0)
2277   {
2278     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2279              path, rrd_strerror(errno));
2280     close (fd);
2281     return (-1);
2282   }
2284   status = listen (fd, /* backlog = */ 10);
2285   if (status != 0)
2286   {
2287     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2288              path, rrd_strerror(errno));
2289     close (fd);
2290     unlink (path);
2291     return (-1);
2292   }
2294   listen_fds[listen_fds_num].fd = fd;
2295   listen_fds[listen_fds_num].family = PF_UNIX;
2296   strncpy(listen_fds[listen_fds_num].addr, path,
2297           sizeof (listen_fds[listen_fds_num].addr) - 1);
2298   listen_fds_num++;
2300   return (0);
2301 } /* }}} int open_listen_socket_unix */
2303 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2305   struct addrinfo ai_hints;
2306   struct addrinfo *ai_res;
2307   struct addrinfo *ai_ptr;
2308   char addr_copy[NI_MAXHOST];
2309   char *addr;
2310   char *port;
2311   int status;
2313   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2314   addr_copy[sizeof (addr_copy) - 1] = 0;
2315   addr = addr_copy;
2317   memset (&ai_hints, 0, sizeof (ai_hints));
2318   ai_hints.ai_flags = 0;
2319 #ifdef AI_ADDRCONFIG
2320   ai_hints.ai_flags |= AI_ADDRCONFIG;
2321 #endif
2322   ai_hints.ai_family = AF_UNSPEC;
2323   ai_hints.ai_socktype = SOCK_STREAM;
2325   port = NULL;
2326   if (*addr == '[') /* IPv6+port format */
2327   {
2328     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2329     addr++;
2331     port = strchr (addr, ']');
2332     if (port == NULL)
2333     {
2334       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2335       return (-1);
2336     }
2337     *port = 0;
2338     port++;
2340     if (*port == ':')
2341       port++;
2342     else if (*port == 0)
2343       port = NULL;
2344     else
2345     {
2346       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2347       return (-1);
2348     }
2349   } /* if (*addr = ']') */
2350   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2351   {
2352     port = rindex(addr, ':');
2353     if (port != NULL)
2354     {
2355       *port = 0;
2356       port++;
2357     }
2358   }
2359   ai_res = NULL;
2360   status = getaddrinfo (addr,
2361                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2362                         &ai_hints, &ai_res);
2363   if (status != 0)
2364   {
2365     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2366              addr, gai_strerror (status));
2367     return (-1);
2368   }
2370   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2371   {
2372     int fd;
2373     listen_socket_t *temp;
2374     int one = 1;
2376     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2377         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2378     if (temp == NULL)
2379     {
2380       fprintf (stderr,
2381                "rrdcached: open_listen_socket_network: realloc failed.\n");
2382       continue;
2383     }
2384     listen_fds = temp;
2385     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2387     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2388     if (fd < 0)
2389     {
2390       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2391                rrd_strerror(errno));
2392       continue;
2393     }
2395     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2397     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2398     if (status != 0)
2399     {
2400       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2401                sock->addr, rrd_strerror(errno));
2402       close (fd);
2403       continue;
2404     }
2406     status = listen (fd, /* backlog = */ 10);
2407     if (status != 0)
2408     {
2409       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2410                sock->addr, rrd_strerror(errno));
2411       close (fd);
2412       freeaddrinfo(ai_res);
2413       return (-1);
2414     }
2416     listen_fds[listen_fds_num].fd = fd;
2417     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2418     listen_fds_num++;
2419   } /* for (ai_ptr) */
2421   freeaddrinfo(ai_res);
2422   return (0);
2423 } /* }}} static int open_listen_socket_network */
2425 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2427   assert(sock != NULL);
2428   assert(sock->addr != NULL);
2430   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2431       || sock->addr[0] == '/')
2432     return (open_listen_socket_unix(sock));
2433   else
2434     return (open_listen_socket_network(sock));
2435 } /* }}} int open_listen_socket */
2437 static int close_listen_sockets (void) /* {{{ */
2439   size_t i;
2441   for (i = 0; i < listen_fds_num; i++)
2442   {
2443     close (listen_fds[i].fd);
2445     if (listen_fds[i].family == PF_UNIX)
2446       unlink(listen_fds[i].addr);
2447   }
2449   free (listen_fds);
2450   listen_fds = NULL;
2451   listen_fds_num = 0;
2453   return (0);
2454 } /* }}} int close_listen_sockets */
2456 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2458   struct pollfd *pollfds;
2459   int pollfds_num;
2460   int status;
2461   int i;
2463   if (listen_fds_num < 1)
2464   {
2465     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2466     return (NULL);
2467   }
2469   pollfds_num = listen_fds_num;
2470   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2471   if (pollfds == NULL)
2472   {
2473     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2474     return (NULL);
2475   }
2476   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2478   RRDD_LOG(LOG_INFO, "listening for connections");
2480   while (state == RUNNING)
2481   {
2482     for (i = 0; i < pollfds_num; i++)
2483     {
2484       pollfds[i].fd = listen_fds[i].fd;
2485       pollfds[i].events = POLLIN | POLLPRI;
2486       pollfds[i].revents = 0;
2487     }
2489     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2490     if (state != RUNNING)
2491       break;
2492     else if (status == 0) /* timeout */
2493       continue;
2494     else if (status < 0) /* error */
2495     {
2496       status = errno;
2497       if (status != EINTR)
2498       {
2499         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2500       }
2501       continue;
2502     }
2504     for (i = 0; i < pollfds_num; i++)
2505     {
2506       listen_socket_t *client_sock;
2507       struct sockaddr_storage client_sa;
2508       socklen_t client_sa_size;
2509       pthread_t tid;
2510       pthread_attr_t attr;
2512       if (pollfds[i].revents == 0)
2513         continue;
2515       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2516       {
2517         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2518             "poll(2) returned something unexpected for listen FD #%i.",
2519             pollfds[i].fd);
2520         continue;
2521       }
2523       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2524       if (client_sock == NULL)
2525       {
2526         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2527         continue;
2528       }
2529       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2531       client_sa_size = sizeof (client_sa);
2532       client_sock->fd = accept (pollfds[i].fd,
2533           (struct sockaddr *) &client_sa, &client_sa_size);
2534       if (client_sock->fd < 0)
2535       {
2536         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2537         free(client_sock);
2538         continue;
2539       }
2541       pthread_attr_init (&attr);
2542       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2544       status = pthread_create (&tid, &attr, connection_thread_main,
2545                                client_sock);
2546       if (status != 0)
2547       {
2548         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2549         close_connection(client_sock);
2550         continue;
2551       }
2552     } /* for (pollfds_num) */
2553   } /* while (state == RUNNING) */
2555   RRDD_LOG(LOG_INFO, "starting shutdown");
2557   close_listen_sockets ();
2559   pthread_mutex_lock (&connection_threads_lock);
2560   while (connection_threads_num > 0)
2561     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2562   pthread_mutex_unlock (&connection_threads_lock);
2564   free(pollfds);
2566   return (NULL);
2567 } /* }}} void *listen_thread_main */
2569 static int daemonize (void) /* {{{ */
2571   int pid_fd;
2572   char *base_dir;
2574   daemon_uid = geteuid();
2576   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2577   if (pid_fd < 0)
2578     pid_fd = check_pidfile();
2579   if (pid_fd < 0)
2580     return pid_fd;
2582   /* open all the listen sockets */
2583   if (config_listen_address_list_len > 0)
2584   {
2585     for (size_t i = 0; i < config_listen_address_list_len; i++)
2586       open_listen_socket (config_listen_address_list[i]);
2588     rrd_free_ptrs((void ***) &config_listen_address_list,
2589                   &config_listen_address_list_len);
2590   }
2591   else
2592   {
2593     listen_socket_t sock;
2594     memset(&sock, 0, sizeof(sock));
2595     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2596     open_listen_socket (&sock);
2597   }
2599   if (listen_fds_num < 1)
2600   {
2601     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2602     goto error;
2603   }
2605   if (!stay_foreground)
2606   {
2607     pid_t child;
2609     child = fork ();
2610     if (child < 0)
2611     {
2612       fprintf (stderr, "daemonize: fork(2) failed.\n");
2613       goto error;
2614     }
2615     else if (child > 0)
2616       exit(0);
2618     /* Become session leader */
2619     setsid ();
2621     /* Open the first three file descriptors to /dev/null */
2622     close (2);
2623     close (1);
2624     close (0);
2626     open ("/dev/null", O_RDWR);
2627     if (dup(0) == -1 || dup(0) == -1){
2628         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2629     }
2630   } /* if (!stay_foreground) */
2632   /* Change into the /tmp directory. */
2633   base_dir = (config_base_dir != NULL)
2634     ? config_base_dir
2635     : "/tmp";
2637   if (chdir (base_dir) != 0)
2638   {
2639     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2640     goto error;
2641   }
2643   install_signal_handlers();
2645   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2646   RRDD_LOG(LOG_INFO, "starting up");
2648   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2649                                 (GDestroyNotify) free_cache_item);
2650   if (cache_tree == NULL)
2651   {
2652     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2653     goto error;
2654   }
2656   return write_pidfile (pid_fd);
2658 error:
2659   remove_pidfile();
2660   return -1;
2661 } /* }}} int daemonize */
2663 static int cleanup (void) /* {{{ */
2665   pthread_cond_broadcast (&flush_cond);
2666   pthread_join (flush_thread, NULL);
2668   pthread_cond_broadcast (&queue_cond);
2669   for (int i = 0; i < config_queue_threads; i++)
2670     pthread_join (queue_threads[i], NULL);
2672   if (config_flush_at_shutdown)
2673   {
2674     assert(cache_queue_head == NULL);
2675     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2676   }
2678   free(queue_threads);
2679   free(config_base_dir);
2680   free(config_pid_file);
2682   pthread_mutex_lock(&cache_lock);
2683   g_tree_destroy(cache_tree);
2685   pthread_mutex_lock(&journal_lock);
2686   journal_done();
2688   RRDD_LOG(LOG_INFO, "goodbye");
2689   closelog ();
2691   remove_pidfile ();
2693   return (0);
2694 } /* }}} int cleanup */
2696 static int read_options (int argc, char **argv) /* {{{ */
2698   int option;
2699   int status = 0;
2701   char **permissions = NULL;
2702   size_t permissions_len = 0;
2704   while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2705   {
2706     switch (option)
2707     {
2708       case 'g':
2709         stay_foreground=1;
2710         break;
2712       case 'l':
2713       {
2714         listen_socket_t *new;
2716         new = malloc(sizeof(listen_socket_t));
2717         if (new == NULL)
2718         {
2719           fprintf(stderr, "read_options: malloc failed.\n");
2720           return(2);
2721         }
2722         memset(new, 0, sizeof(listen_socket_t));
2724         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2726         /* Add permissions to the socket {{{ */
2727         if (permissions_len != 0)
2728         {
2729           size_t i;
2730           for (i = 0; i < permissions_len; i++)
2731           {
2732             status = socket_permission_add (new, permissions[i]);
2733             if (status != 0)
2734             {
2735               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2736                   "socket failed. Most likely, this permission doesn't "
2737                   "exist. Check your command line.\n", permissions[i]);
2738               status = 4;
2739             }
2740           }
2741         }
2742         else /* if (permissions_len == 0) */
2743         {
2744           /* Add permission for ALL commands to the socket. */
2745           size_t i;
2746           for (i = 0; i < list_of_commands_len; i++)
2747           {
2748             status = socket_permission_add (new, list_of_commands[i].cmd);
2749             if (status != 0)
2750             {
2751               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2752                   "socket failed. This should never happen, ever! Sorry.\n",
2753                   permissions[i]);
2754               status = 4;
2755             }
2756           }
2757         }
2758         /* }}} Done adding permissions. */
2760         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2761                          &config_listen_address_list_len, new))
2762         {
2763           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2764           return (2);
2765         }
2766       }
2767       break;
2769       case 'P':
2770       {
2771         char *optcopy;
2772         char *saveptr;
2773         char *dummy;
2774         char *ptr;
2776         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2778         optcopy = strdup (optarg);
2779         dummy = optcopy;
2780         saveptr = NULL;
2781         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2782         {
2783           dummy = NULL;
2784           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2785         }
2787         free (optcopy);
2788       }
2789       break;
2791       case 'f':
2792       {
2793         int temp;
2795         temp = atoi (optarg);
2796         if (temp > 0)
2797           config_flush_interval = temp;
2798         else
2799         {
2800           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2801           status = 3;
2802         }
2803       }
2804       break;
2806       case 'w':
2807       {
2808         int temp;
2810         temp = atoi (optarg);
2811         if (temp > 0)
2812           config_write_interval = temp;
2813         else
2814         {
2815           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2816           status = 2;
2817         }
2818       }
2819       break;
2821       case 'z':
2822       {
2823         int temp;
2825         temp = atoi(optarg);
2826         if (temp > 0)
2827           config_write_jitter = temp;
2828         else
2829         {
2830           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2831           status = 2;
2832         }
2834         break;
2835       }
2837       case 't':
2838       {
2839         int threads;
2840         threads = atoi(optarg);
2841         if (threads >= 1)
2842           config_queue_threads = threads;
2843         else
2844         {
2845           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2846           return 1;
2847         }
2848       }
2849       break;
2851       case 'B':
2852         config_write_base_only = 1;
2853         break;
2855       case 'b':
2856       {
2857         size_t len;
2858         char base_realpath[PATH_MAX];
2860         if (config_base_dir != NULL)
2861           free (config_base_dir);
2862         config_base_dir = strdup (optarg);
2863         if (config_base_dir == NULL)
2864         {
2865           fprintf (stderr, "read_options: strdup failed.\n");
2866           return (3);
2867         }
2869         /* make sure that the base directory is not resolved via
2870          * symbolic links.  this makes some performance-enhancing
2871          * assumptions possible (we don't have to resolve paths
2872          * that start with a "/")
2873          */
2874         if (realpath(config_base_dir, base_realpath) == NULL)
2875         {
2876           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2877           return 5;
2878         }
2879         else if (strncmp(config_base_dir,
2880                          base_realpath, sizeof(base_realpath)) != 0)
2881         {
2882           fprintf(stderr,
2883                   "Base directory (-b) resolved via file system links!\n"
2884                   "Please consult rrdcached '-b' documentation!\n"
2885                   "Consider specifying the real directory (%s)\n",
2886                   base_realpath);
2887           return 5;
2888         }
2890         len = strlen (config_base_dir);
2891         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2892         {
2893           config_base_dir[len - 1] = 0;
2894           len--;
2895         }
2897         if (len < 1)
2898         {
2899           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2900           return (4);
2901         }
2903         _config_base_dir_len = len;
2904       }
2905       break;
2907       case 'p':
2908       {
2909         if (config_pid_file != NULL)
2910           free (config_pid_file);
2911         config_pid_file = strdup (optarg);
2912         if (config_pid_file == NULL)
2913         {
2914           fprintf (stderr, "read_options: strdup failed.\n");
2915           return (3);
2916         }
2917       }
2918       break;
2920       case 'F':
2921         config_flush_at_shutdown = 1;
2922         break;
2924       case 'j':
2925       {
2926         struct stat statbuf;
2927         const char *dir = journal_dir = strdup(optarg);
2929         status = stat(dir, &statbuf);
2930         if (status != 0)
2931         {
2932           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2933           return 6;
2934         }
2936         if (!S_ISDIR(statbuf.st_mode)
2937             || access(dir, R_OK|W_OK|X_OK) != 0)
2938         {
2939           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2940                   errno ? rrd_strerror(errno) : "");
2941           return 6;
2942         }
2943       }
2944       break;
2946       case 'h':
2947       case '?':
2948         printf ("RRDCacheD %s\n"
2949             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
2950             "\n"
2951             "Usage: rrdcached [options]\n"
2952             "\n"
2953             "Valid options are:\n"
2954             "  -l <address>  Socket address to listen to.\n"
2955             "  -P <perms>    Sets the permissions to assign to all following "
2956                             "sockets\n"
2957             "  -w <seconds>  Interval in which to write data.\n"
2958             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2959             "  -t <threads>  Number of write threads.\n"
2960             "  -f <seconds>  Interval in which to flush dead data.\n"
2961             "  -p <file>     Location of the PID-file.\n"
2962             "  -b <dir>      Base directory to change to.\n"
2963             "  -B            Restrict file access to paths within -b <dir>\n"
2964             "  -g            Do not fork and run in the foreground.\n"
2965             "  -j <dir>      Directory in which to create the journal files.\n"
2966             "  -F            Always flush all updates at shutdown\n"
2967             "\n"
2968             "For more information and a detailed description of all options "
2969             "please refer\n"
2970             "to the rrdcached(1) manual page.\n",
2971             VERSION);
2972         status = -1;
2973         break;
2974     } /* switch (option) */
2975   } /* while (getopt) */
2977   /* advise the user when values are not sane */
2978   if (config_flush_interval < 2 * config_write_interval)
2979     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2980             " 2x write interval (-w) !\n");
2981   if (config_write_jitter > config_write_interval)
2982     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2983             " write interval (-w) !\n");
2985   if (config_write_base_only && config_base_dir == NULL)
2986     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2987             "  Consult the rrdcached documentation\n");
2989   if (journal_dir == NULL)
2990     config_flush_at_shutdown = 1;
2992   rrd_free_ptrs ((void *) &permissions, &permissions_len);
2994   return (status);
2995 } /* }}} int read_options */
2997 int main (int argc, char **argv)
2999   int status;
3001   status = read_options (argc, argv);
3002   if (status != 0)
3003   {
3004     if (status < 0)
3005       status = 0;
3006     return (status);
3007   }
3009   status = daemonize ();
3010   if (status != 0)
3011   {
3012     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3013     return (1);
3014   }
3016   journal_init();
3018   /* start the queue threads */
3019   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3020   if (queue_threads == NULL)
3021   {
3022     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3023     cleanup();
3024     return (1);
3025   }
3026   for (int i = 0; i < config_queue_threads; i++)
3027   {
3028     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3029     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3030     if (status != 0)
3031     {
3032       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3033       cleanup();
3034       return (1);
3035     }
3036   }
3038   /* start the flush thread */
3039   memset(&flush_thread, 0, sizeof(flush_thread));
3040   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3041   if (status != 0)
3042   {
3043     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3044     cleanup();
3045     return (1);
3046   }
3048   listen_thread_main (NULL);
3049   cleanup ();
3051   return (0);
3052 } /* int main */
3054 /*
3055  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3056  */