Code

0ca1818d462b40e0bdd766c2de45eda6003178ef
[pkg-rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
88 #else
90 #endif
91 #include <stdio.h>
92 #include <string.h>
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
110 #include <glib-2.0/glib.h>
111 /* }}} */
113 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
115 #ifndef __GNUC__
116 # define __attribute__(x) /**/
117 #endif
119 /*
120  * Types
121  */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124 struct listen_socket_s
126   int fd;
127   char addr[PATH_MAX + 1];
128   int family;
130   /* state for BATCH processing */
131   time_t batch_start;
132   int batch_cmd;
134   /* buffered IO */
135   char *rbuf;
136   off_t next_cmd;
137   off_t next_read;
139   char *wbuf;
140   ssize_t wbuf_len;
142   uint32_t permissions;
143 };
144 typedef struct listen_socket_s listen_socket_t;
146 struct command_s;
147 typedef struct command_s command_t;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
154 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
155                         DISPATCH_PROTO
157 struct command_s {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
161   char  context;                /* where we expect to see it */
162 #define CMD_CONTEXT_CLIENT      (1<<0)
163 #define CMD_CONTEXT_BATCH       (1<<1)
164 #define CMD_CONTEXT_JOURNAL     (1<<2)
165 #define CMD_CONTEXT_ANY         (0x7f)
167   char *syntax;
168   char *help;
169 };
171 struct cache_item_s;
172 typedef struct cache_item_s cache_item_t;
173 struct cache_item_s
175   char *file;
176   char **values;
177   size_t values_num;
178   time_t last_flush_time;
179   time_t last_update_stamp;
180 #define CI_FLAGS_IN_TREE  (1<<0)
181 #define CI_FLAGS_IN_QUEUE (1<<1)
182   int flags;
183   pthread_cond_t  flushed;
184   cache_item_t *prev;
185   cache_item_t *next;
186 };
188 struct callback_flush_data_s
190   time_t now;
191   time_t abs_timeout;
192   char **keys;
193   size_t keys_num;
194 };
195 typedef struct callback_flush_data_s callback_flush_data_t;
197 enum queue_side_e
199   HEAD,
200   TAIL
201 };
202 typedef enum queue_side_e queue_side_t;
204 /* describe a set of journal files */
205 typedef struct {
206   char **files;
207   size_t files_num;
208 } journal_set;
210 /* max length of socket command or response */
211 #define CMD_MAX 4096
212 #define RBUF_SIZE (CMD_MAX*2)
214 /*
215  * Variables
216  */
217 static int stay_foreground = 0;
218 static uid_t daemon_uid;
220 static listen_socket_t *listen_fds = NULL;
221 static size_t listen_fds_num = 0;
223 enum {
224   RUNNING,              /* normal operation */
225   FLUSHING,             /* flushing remaining values */
226   SHUTDOWN              /* shutting down */
227 } state = RUNNING;
229 static pthread_t *queue_threads;
230 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
231 static int config_queue_threads = 4;
233 static pthread_t flush_thread;
234 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
236 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
237 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
238 static int connection_threads_num = 0;
240 /* Cache stuff */
241 static GTree          *cache_tree = NULL;
242 static cache_item_t   *cache_queue_head = NULL;
243 static cache_item_t   *cache_queue_tail = NULL;
244 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
246 static int config_write_interval = 300;
247 static int config_write_jitter   = 0;
248 static int config_flush_interval = 3600;
249 static int config_flush_at_shutdown = 0;
250 static char *config_pid_file = NULL;
251 static char *config_base_dir = NULL;
252 static size_t _config_base_dir_len = 0;
253 static int config_write_base_only = 0;
255 static listen_socket_t **config_listen_address_list = NULL;
256 static size_t config_listen_address_list_len = 0;
258 static uint64_t stats_queue_length = 0;
259 static uint64_t stats_updates_received = 0;
260 static uint64_t stats_flush_received = 0;
261 static uint64_t stats_updates_written = 0;
262 static uint64_t stats_data_sets_written = 0;
263 static uint64_t stats_journal_bytes = 0;
264 static uint64_t stats_journal_rotate = 0;
265 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
267 /* Journaled updates */
268 #define JOURNAL_BASE "rrd.journal"
269 static journal_set *journal_cur = NULL;
270 static journal_set *journal_old = NULL;
271 static char *journal_dir = NULL;
272 static FILE *journal_fh = NULL;         /* current journal file handle */
273 static long  journal_size = 0;          /* current journal size */
274 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
275 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
276 static int journal_write(char *cmd, char *args);
277 static void journal_done(void);
278 static void journal_rotate(void);
280 /* prototypes for forward refernces */
281 static int handle_request_help (HANDLER_PROTO);
283 /* 
284  * Functions
285  */
286 static void sig_common (const char *sig) /* {{{ */
288   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
289   state = FLUSHING;
290   pthread_cond_broadcast(&flush_cond);
291   pthread_cond_broadcast(&queue_cond);
292 } /* }}} void sig_common */
294 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
296   sig_common("INT");
297 } /* }}} void sig_int_handler */
299 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
301   sig_common("TERM");
302 } /* }}} void sig_term_handler */
304 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
306   config_flush_at_shutdown = 1;
307   sig_common("USR1");
308 } /* }}} void sig_usr1_handler */
310 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
312   config_flush_at_shutdown = 0;
313   sig_common("USR2");
314 } /* }}} void sig_usr2_handler */
316 static void install_signal_handlers(void) /* {{{ */
318   /* These structures are static, because `sigaction' behaves weird if the are
319    * overwritten.. */
320   static struct sigaction sa_int;
321   static struct sigaction sa_term;
322   static struct sigaction sa_pipe;
323   static struct sigaction sa_usr1;
324   static struct sigaction sa_usr2;
326   /* Install signal handlers */
327   memset (&sa_int, 0, sizeof (sa_int));
328   sa_int.sa_handler = sig_int_handler;
329   sigaction (SIGINT, &sa_int, NULL);
331   memset (&sa_term, 0, sizeof (sa_term));
332   sa_term.sa_handler = sig_term_handler;
333   sigaction (SIGTERM, &sa_term, NULL);
335   memset (&sa_pipe, 0, sizeof (sa_pipe));
336   sa_pipe.sa_handler = SIG_IGN;
337   sigaction (SIGPIPE, &sa_pipe, NULL);
339   memset (&sa_pipe, 0, sizeof (sa_usr1));
340   sa_usr1.sa_handler = sig_usr1_handler;
341   sigaction (SIGUSR1, &sa_usr1, NULL);
343   memset (&sa_usr2, 0, sizeof (sa_usr2));
344   sa_usr2.sa_handler = sig_usr2_handler;
345   sigaction (SIGUSR2, &sa_usr2, NULL);
347 } /* }}} void install_signal_handlers */
349 static int open_pidfile(char *action, int oflag) /* {{{ */
351   int fd;
352   const char *file;
353   char *file_copy, *dir;
355   file = (config_pid_file != NULL)
356     ? config_pid_file
357     : LOCALSTATEDIR "/run/rrdcached.pid";
359   /* dirname may modify its argument */
360   file_copy = strdup(file);
361   if (file_copy == NULL)
362   {
363     fprintf(stderr, "rrdcached: strdup(): %s\n",
364         rrd_strerror(errno));
365     return -1;
366   }
368   dir = dirname(file_copy);
369   if (rrd_mkdir_p(dir, 0777) != 0)
370   {
371     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
372         dir, rrd_strerror(errno));
373     return -1;
374   }
376   free(file_copy);
378   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
379   if (fd < 0)
380     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
381             action, file, rrd_strerror(errno));
383   return(fd);
384 } /* }}} static int open_pidfile */
386 /* check existing pid file to see whether a daemon is running */
387 static int check_pidfile(void)
389   int pid_fd;
390   pid_t pid;
391   char pid_str[16];
393   pid_fd = open_pidfile("open", O_RDWR);
394   if (pid_fd < 0)
395     return pid_fd;
397   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
398     return -1;
400   pid = atoi(pid_str);
401   if (pid <= 0)
402     return -1;
404   /* another running process that we can signal COULD be
405    * a competing rrdcached */
406   if (pid != getpid() && kill(pid, 0) == 0)
407   {
408     fprintf(stderr,
409             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
410     close(pid_fd);
411     return -1;
412   }
414   lseek(pid_fd, 0, SEEK_SET);
415   if (ftruncate(pid_fd, 0) == -1)
416   {
417     fprintf(stderr,
418             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
419     close(pid_fd);
420     return -1;
421   }
423   fprintf(stderr,
424           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
425           "rrdcached: starting normally.\n", pid);
427   return pid_fd;
428 } /* }}} static int check_pidfile */
430 static int write_pidfile (int fd) /* {{{ */
432   pid_t pid;
433   FILE *fh;
435   pid = getpid ();
437   fh = fdopen (fd, "w");
438   if (fh == NULL)
439   {
440     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
441     close(fd);
442     return (-1);
443   }
445   fprintf (fh, "%i\n", (int) pid);
446   fclose (fh);
448   return (0);
449 } /* }}} int write_pidfile */
451 static int remove_pidfile (void) /* {{{ */
453   char *file;
454   int status;
456   file = (config_pid_file != NULL)
457     ? config_pid_file
458     : LOCALSTATEDIR "/run/rrdcached.pid";
460   status = unlink (file);
461   if (status == 0)
462     return (0);
463   return (errno);
464 } /* }}} int remove_pidfile */
466 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
468   char *eol;
470   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
471                sock->next_read - sock->next_cmd);
473   if (eol == NULL)
474   {
475     /* no commands left, move remainder back to front of rbuf */
476     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
477             sock->next_read - sock->next_cmd);
478     sock->next_read -= sock->next_cmd;
479     sock->next_cmd = 0;
480     *len = 0;
481     return NULL;
482   }
483   else
484   {
485     char *cmd = sock->rbuf + sock->next_cmd;
486     *eol = '\0';
488     sock->next_cmd = eol - sock->rbuf + 1;
490     if (eol > sock->rbuf && *(eol-1) == '\r')
491       *(--eol) = '\0'; /* handle "\r\n" EOL */
493     *len = eol - cmd;
495     return cmd;
496   }
498   /* NOTREACHED */
499   assert(1==0);
500 } /* }}} char *next_cmd */
502 /* add the characters directly to the write buffer */
503 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
505   char *new_buf;
507   assert(sock != NULL);
509   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
510   if (new_buf == NULL)
511   {
512     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
513     return -1;
514   }
516   strncpy(new_buf + sock->wbuf_len, str, len + 1);
518   sock->wbuf = new_buf;
519   sock->wbuf_len += len;
521   return 0;
522 } /* }}} static int add_to_wbuf */
524 /* add the text to the "extra" info that's sent after the status line */
525 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
527   va_list argp;
528   char buffer[CMD_MAX];
529   int len;
531   if (sock == NULL) return 0; /* journal replay mode */
532   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
534   va_start(argp, fmt);
535 #ifdef HAVE_VSNPRINTF
536   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
537 #else
538   len = vsprintf(buffer, fmt, argp);
539 #endif
540   va_end(argp);
541   if (len < 0)
542   {
543     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
544     return -1;
545   }
547   return add_to_wbuf(sock, buffer, len);
548 } /* }}} static int add_response_info */
550 static int count_lines(char *str) /* {{{ */
552   int lines = 0;
554   if (str != NULL)
555   {
556     while ((str = strchr(str, '\n')) != NULL)
557     {
558       ++lines;
559       ++str;
560     }
561   }
563   return lines;
564 } /* }}} static int count_lines */
566 /* send the response back to the user.
567  * returns 0 on success, -1 on error
568  * write buffer is always zeroed after this call */
569 static int send_response (listen_socket_t *sock, response_code rc,
570                           char *fmt, ...) /* {{{ */
572   va_list argp;
573   char buffer[CMD_MAX];
574   int lines;
575   ssize_t wrote;
576   int rclen, len;
578   if (sock == NULL) return rc;  /* journal replay mode */
580   if (sock->batch_start)
581   {
582     if (rc == RESP_OK)
583       return rc; /* no response on success during BATCH */
584     lines = sock->batch_cmd;
585   }
586   else if (rc == RESP_OK)
587     lines = count_lines(sock->wbuf);
588   else
589     lines = -1;
591   rclen = sprintf(buffer, "%d ", lines);
592   va_start(argp, fmt);
593 #ifdef HAVE_VSNPRINTF
594   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
595 #else
596   len = vsprintf(buffer+rclen, fmt, argp);
597 #endif
598   va_end(argp);
599   if (len < 0)
600     return -1;
602   len += rclen;
604   /* append the result to the wbuf, don't write to the user */
605   if (sock->batch_start)
606     return add_to_wbuf(sock, buffer, len);
608   /* first write must be complete */
609   if (len != write(sock->fd, buffer, len))
610   {
611     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
612     return -1;
613   }
615   if (sock->wbuf != NULL && rc == RESP_OK)
616   {
617     wrote = 0;
618     while (wrote < sock->wbuf_len)
619     {
620       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
621       if (wb <= 0)
622       {
623         RRDD_LOG(LOG_INFO, "send_response: could not write results");
624         return -1;
625       }
626       wrote += wb;
627     }
628   }
630   free(sock->wbuf); sock->wbuf = NULL;
631   sock->wbuf_len = 0;
633   return 0;
634 } /* }}} */
636 static void wipe_ci_values(cache_item_t *ci, time_t when)
638   ci->values = NULL;
639   ci->values_num = 0;
641   ci->last_flush_time = when;
642   if (config_write_jitter > 0)
643     ci->last_flush_time += (rrd_random() % config_write_jitter);
646 /* remove_from_queue
647  * remove a "cache_item_t" item from the queue.
648  * must hold 'cache_lock' when calling this
649  */
650 static void remove_from_queue(cache_item_t *ci) /* {{{ */
652   if (ci == NULL) return;
653   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
655   if (ci->prev == NULL)
656     cache_queue_head = ci->next; /* reset head */
657   else
658     ci->prev->next = ci->next;
660   if (ci->next == NULL)
661     cache_queue_tail = ci->prev; /* reset the tail */
662   else
663     ci->next->prev = ci->prev;
665   ci->next = ci->prev = NULL;
666   ci->flags &= ~CI_FLAGS_IN_QUEUE;
668   pthread_mutex_lock (&stats_lock);
669   assert (stats_queue_length > 0);
670   stats_queue_length--;
671   pthread_mutex_unlock (&stats_lock);
673 } /* }}} static void remove_from_queue */
675 /* free the resources associated with the cache_item_t
676  * must hold cache_lock when calling this function
677  */
678 static void *free_cache_item(cache_item_t *ci) /* {{{ */
680   if (ci == NULL) return NULL;
682   remove_from_queue(ci);
684   for (size_t i=0; i < ci->values_num; i++)
685     free(ci->values[i]);
687   free (ci->values);
688   free (ci->file);
690   /* in case anyone is waiting */
691   pthread_cond_broadcast(&ci->flushed);
692   pthread_cond_destroy(&ci->flushed);
694   free (ci);
696   return NULL;
697 } /* }}} static void *free_cache_item */
699 /*
700  * enqueue_cache_item:
701  * `cache_lock' must be acquired before calling this function!
702  */
703 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
704     queue_side_t side)
706   if (ci == NULL)
707     return (-1);
709   if (ci->values_num == 0)
710     return (0);
712   if (side == HEAD)
713   {
714     if (cache_queue_head == ci)
715       return 0;
717     /* remove if further down in queue */
718     remove_from_queue(ci);
720     ci->prev = NULL;
721     ci->next = cache_queue_head;
722     if (ci->next != NULL)
723       ci->next->prev = ci;
724     cache_queue_head = ci;
726     if (cache_queue_tail == NULL)
727       cache_queue_tail = cache_queue_head;
728   }
729   else /* (side == TAIL) */
730   {
731     /* We don't move values back in the list.. */
732     if (ci->flags & CI_FLAGS_IN_QUEUE)
733       return (0);
735     assert (ci->next == NULL);
736     assert (ci->prev == NULL);
738     ci->prev = cache_queue_tail;
740     if (cache_queue_tail == NULL)
741       cache_queue_head = ci;
742     else
743       cache_queue_tail->next = ci;
745     cache_queue_tail = ci;
746   }
748   ci->flags |= CI_FLAGS_IN_QUEUE;
750   pthread_cond_signal(&queue_cond);
751   pthread_mutex_lock (&stats_lock);
752   stats_queue_length++;
753   pthread_mutex_unlock (&stats_lock);
755   return (0);
756 } /* }}} int enqueue_cache_item */
758 /*
759  * tree_callback_flush:
760  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
761  * while this is in progress.
762  */
763 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
764     gpointer data)
766   cache_item_t *ci;
767   callback_flush_data_t *cfd;
769   ci = (cache_item_t *) value;
770   cfd = (callback_flush_data_t *) data;
772   if (ci->flags & CI_FLAGS_IN_QUEUE)
773     return FALSE;
775   if (ci->values_num > 0
776       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
777   {
778     enqueue_cache_item (ci, TAIL);
779   }
780   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
781       && (ci->values_num <= 0))
782   {
783     assert ((char *) key == ci->file);
784     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
785     {
786       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
787       return (FALSE);
788     }
789   }
791   return (FALSE);
792 } /* }}} gboolean tree_callback_flush */
794 static int flush_old_values (int max_age)
796   callback_flush_data_t cfd;
797   size_t k;
799   memset (&cfd, 0, sizeof (cfd));
800   /* Pass the current time as user data so that we don't need to call
801    * `time' for each node. */
802   cfd.now = time (NULL);
803   cfd.keys = NULL;
804   cfd.keys_num = 0;
806   if (max_age > 0)
807     cfd.abs_timeout = cfd.now - max_age;
808   else
809     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
811   /* `tree_callback_flush' will return the keys of all values that haven't
812    * been touched in the last `config_flush_interval' seconds in `cfd'.
813    * The char*'s in this array point to the same memory as ci->file, so we
814    * don't need to free them separately. */
815   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
817   for (k = 0; k < cfd.keys_num; k++)
818   {
819     /* should never fail, since we have held the cache_lock
820      * the entire time */
821     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
822   }
824   if (cfd.keys != NULL)
825   {
826     free (cfd.keys);
827     cfd.keys = NULL;
828   }
830   return (0);
831 } /* int flush_old_values */
833 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
835   struct timeval now;
836   struct timespec next_flush;
837   int status;
839   gettimeofday (&now, NULL);
840   next_flush.tv_sec = now.tv_sec + config_flush_interval;
841   next_flush.tv_nsec = 1000 * now.tv_usec;
843   pthread_mutex_lock(&cache_lock);
845   while (state == RUNNING)
846   {
847     gettimeofday (&now, NULL);
848     if ((now.tv_sec > next_flush.tv_sec)
849         || ((now.tv_sec == next_flush.tv_sec)
850           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
851     {
852       RRDD_LOG(LOG_DEBUG, "flushing old values");
854       /* Determine the time of the next cache flush. */
855       next_flush.tv_sec = now.tv_sec + config_flush_interval;
857       /* Flush all values that haven't been written in the last
858        * `config_write_interval' seconds. */
859       flush_old_values (config_write_interval);
861       /* unlock the cache while we rotate so we don't block incoming
862        * updates if the fsync() blocks on disk I/O */
863       pthread_mutex_unlock(&cache_lock);
864       journal_rotate();
865       pthread_mutex_lock(&cache_lock);
866     }
868     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
869     if (status != 0 && status != ETIMEDOUT)
870     {
871       RRDD_LOG (LOG_ERR, "flush_thread_main: "
872                 "pthread_cond_timedwait returned %i.", status);
873     }
874   }
876   if (config_flush_at_shutdown)
877     flush_old_values (-1); /* flush everything */
879   state = SHUTDOWN;
881   pthread_mutex_unlock(&cache_lock);
883   return NULL;
884 } /* void *flush_thread_main */
886 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
888   pthread_mutex_lock (&cache_lock);
890   while (state != SHUTDOWN
891          || (cache_queue_head != NULL && config_flush_at_shutdown))
892   {
893     cache_item_t *ci;
894     char *file;
895     char **values;
896     size_t values_num;
897     int status;
899     /* Now, check if there's something to store away. If not, wait until
900      * something comes in. */
901     if (cache_queue_head == NULL)
902     {
903       status = pthread_cond_wait (&queue_cond, &cache_lock);
904       if ((status != 0) && (status != ETIMEDOUT))
905       {
906         RRDD_LOG (LOG_ERR, "queue_thread_main: "
907             "pthread_cond_wait returned %i.", status);
908       }
909     }
911     /* Check if a value has arrived. This may be NULL if we timed out or there
912      * was an interrupt such as a signal. */
913     if (cache_queue_head == NULL)
914       continue;
916     ci = cache_queue_head;
918     /* copy the relevant parts */
919     file = strdup (ci->file);
920     if (file == NULL)
921     {
922       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
923       continue;
924     }
926     assert(ci->values != NULL);
927     assert(ci->values_num > 0);
929     values = ci->values;
930     values_num = ci->values_num;
932     wipe_ci_values(ci, time(NULL));
933     remove_from_queue(ci);
935     pthread_mutex_unlock (&cache_lock);
937     rrd_clear_error ();
938     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
939     if (status != 0)
940     {
941       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
942           "rrd_update_r (%s) failed with status %i. (%s)",
943           file, status, rrd_get_error());
944     }
946     journal_write("wrote", file);
948     /* Search again in the tree.  It's possible someone issued a "FORGET"
949      * while we were writing the update values. */
950     pthread_mutex_lock(&cache_lock);
951     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
952     if (ci)
953       pthread_cond_broadcast(&ci->flushed);
954     pthread_mutex_unlock(&cache_lock);
956     if (status == 0)
957     {
958       pthread_mutex_lock (&stats_lock);
959       stats_updates_written++;
960       stats_data_sets_written += values_num;
961       pthread_mutex_unlock (&stats_lock);
962     }
964     rrd_free_ptrs((void ***) &values, &values_num);
965     free(file);
967     pthread_mutex_lock (&cache_lock);
968   }
969   pthread_mutex_unlock (&cache_lock);
971   return (NULL);
972 } /* }}} void *queue_thread_main */
974 static int buffer_get_field (char **buffer_ret, /* {{{ */
975     size_t *buffer_size_ret, char **field_ret)
977   char *buffer;
978   size_t buffer_pos;
979   size_t buffer_size;
980   char *field;
981   size_t field_size;
982   int status;
984   buffer = *buffer_ret;
985   buffer_pos = 0;
986   buffer_size = *buffer_size_ret;
987   field = *buffer_ret;
988   field_size = 0;
990   if (buffer_size <= 0)
991     return (-1);
993   /* This is ensured by `handle_request'. */
994   assert (buffer[buffer_size - 1] == '\0');
996   status = -1;
997   while (buffer_pos < buffer_size)
998   {
999     /* Check for end-of-field or end-of-buffer */
1000     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1001     {
1002       field[field_size] = 0;
1003       field_size++;
1004       buffer_pos++;
1005       status = 0;
1006       break;
1007     }
1008     /* Handle escaped characters. */
1009     else if (buffer[buffer_pos] == '\\')
1010     {
1011       if (buffer_pos >= (buffer_size - 1))
1012         break;
1013       buffer_pos++;
1014       field[field_size] = buffer[buffer_pos];
1015       field_size++;
1016       buffer_pos++;
1017     }
1018     /* Normal operation */ 
1019     else
1020     {
1021       field[field_size] = buffer[buffer_pos];
1022       field_size++;
1023       buffer_pos++;
1024     }
1025   } /* while (buffer_pos < buffer_size) */
1027   if (status != 0)
1028     return (status);
1030   *buffer_ret = buffer + buffer_pos;
1031   *buffer_size_ret = buffer_size - buffer_pos;
1032   *field_ret = field;
1034   return (0);
1035 } /* }}} int buffer_get_field */
1037 /* if we're restricting writes to the base directory,
1038  * check whether the file falls within the dir
1039  * returns 1 if OK, otherwise 0
1040  */
1041 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1043   assert(file != NULL);
1045   if (!config_write_base_only
1046       || sock == NULL /* journal replay */
1047       || config_base_dir == NULL)
1048     return 1;
1050   if (strstr(file, "../") != NULL) goto err;
1052   /* relative paths without "../" are ok */
1053   if (*file != '/') return 1;
1055   /* file must be of the format base + "/" + <1+ char filename> */
1056   if (strlen(file) < _config_base_dir_len + 2) goto err;
1057   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1058   if (*(file + _config_base_dir_len) != '/') goto err;
1060   return 1;
1062 err:
1063   if (sock != NULL && sock->fd >= 0)
1064     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1066   return 0;
1067 } /* }}} static int check_file_access */
1069 /* when using a base dir, convert relative paths to absolute paths.
1070  * if necessary, modifies the "filename" pointer to point
1071  * to the new path created in "tmp".  "tmp" is provided
1072  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1073  *
1074  * this allows us to optimize for the expected case (absolute path)
1075  * with a no-op.
1076  */
1077 static void get_abs_path(char **filename, char *tmp)
1079   assert(tmp != NULL);
1080   assert(filename != NULL && *filename != NULL);
1082   if (config_base_dir == NULL || **filename == '/')
1083     return;
1085   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1086   *filename = tmp;
1087 } /* }}} static int get_abs_path */
1089 static int flush_file (const char *filename) /* {{{ */
1091   cache_item_t *ci;
1093   pthread_mutex_lock (&cache_lock);
1095   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1096   if (ci == NULL)
1097   {
1098     pthread_mutex_unlock (&cache_lock);
1099     return (ENOENT);
1100   }
1102   if (ci->values_num > 0)
1103   {
1104     /* Enqueue at head */
1105     enqueue_cache_item (ci, HEAD);
1106     pthread_cond_wait(&ci->flushed, &cache_lock);
1107   }
1109   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1110    * may have been purged during our cond_wait() */
1112   pthread_mutex_unlock(&cache_lock);
1114   return (0);
1115 } /* }}} int flush_file */
1117 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1119   char *err = "Syntax error.\n";
1121   if (cmd && cmd->syntax)
1122     err = cmd->syntax;
1124   return send_response(sock, RESP_ERR, "Usage: %s", err);
1125 } /* }}} static int syntax_error() */
1127 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1129   uint64_t copy_queue_length;
1130   uint64_t copy_updates_received;
1131   uint64_t copy_flush_received;
1132   uint64_t copy_updates_written;
1133   uint64_t copy_data_sets_written;
1134   uint64_t copy_journal_bytes;
1135   uint64_t copy_journal_rotate;
1137   uint64_t tree_nodes_number;
1138   uint64_t tree_depth;
1140   pthread_mutex_lock (&stats_lock);
1141   copy_queue_length       = stats_queue_length;
1142   copy_updates_received   = stats_updates_received;
1143   copy_flush_received     = stats_flush_received;
1144   copy_updates_written    = stats_updates_written;
1145   copy_data_sets_written  = stats_data_sets_written;
1146   copy_journal_bytes      = stats_journal_bytes;
1147   copy_journal_rotate     = stats_journal_rotate;
1148   pthread_mutex_unlock (&stats_lock);
1150   pthread_mutex_lock (&cache_lock);
1151   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1152   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1153   pthread_mutex_unlock (&cache_lock);
1155   add_response_info(sock,
1156                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1157   add_response_info(sock,
1158                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1159   add_response_info(sock,
1160                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1161   add_response_info(sock,
1162                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1163   add_response_info(sock,
1164                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1165   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1166   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1167   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1168   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1170   send_response(sock, RESP_OK, "Statistics follow\n");
1172   return (0);
1173 } /* }}} int handle_request_stats */
1175 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1177   char *file, file_tmp[PATH_MAX];
1178   int status;
1180   status = buffer_get_field (&buffer, &buffer_size, &file);
1181   if (status != 0)
1182   {
1183     return syntax_error(sock,cmd);
1184   }
1185   else
1186   {
1187     pthread_mutex_lock(&stats_lock);
1188     stats_flush_received++;
1189     pthread_mutex_unlock(&stats_lock);
1191     get_abs_path(&file, file_tmp);
1192     if (!check_file_access(file, sock)) return 0;
1194     status = flush_file (file);
1195     if (status == 0)
1196       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1197     else if (status == ENOENT)
1198     {
1199       /* no file in our tree; see whether it exists at all */
1200       struct stat statbuf;
1202       memset(&statbuf, 0, sizeof(statbuf));
1203       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1204         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1205       else
1206         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1207     }
1208     else if (status < 0)
1209       return send_response(sock, RESP_ERR, "Internal error.\n");
1210     else
1211       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1212   }
1214   /* NOTREACHED */
1215   assert(1==0);
1216 } /* }}} int handle_request_flush */
1218 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1220   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1222   pthread_mutex_lock(&cache_lock);
1223   flush_old_values(-1);
1224   pthread_mutex_unlock(&cache_lock);
1226   return send_response(sock, RESP_OK, "Started flush.\n");
1227 } /* }}} static int handle_request_flushall */
1229 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1231   int status;
1232   char *file, file_tmp[PATH_MAX];
1233   cache_item_t *ci;
1235   status = buffer_get_field(&buffer, &buffer_size, &file);
1236   if (status != 0)
1237     return syntax_error(sock,cmd);
1239   get_abs_path(&file, file_tmp);
1241   pthread_mutex_lock(&cache_lock);
1242   ci = g_tree_lookup(cache_tree, file);
1243   if (ci == NULL)
1244   {
1245     pthread_mutex_unlock(&cache_lock);
1246     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1247   }
1249   for (size_t i=0; i < ci->values_num; i++)
1250     add_response_info(sock, "%s\n", ci->values[i]);
1252   pthread_mutex_unlock(&cache_lock);
1253   return send_response(sock, RESP_OK, "updates pending\n");
1254 } /* }}} static int handle_request_pending */
1256 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1258   int status;
1259   gboolean found;
1260   char *file, file_tmp[PATH_MAX];
1262   status = buffer_get_field(&buffer, &buffer_size, &file);
1263   if (status != 0)
1264     return syntax_error(sock,cmd);
1266   get_abs_path(&file, file_tmp);
1267   if (!check_file_access(file, sock)) return 0;
1269   pthread_mutex_lock(&cache_lock);
1270   found = g_tree_remove(cache_tree, file);
1271   pthread_mutex_unlock(&cache_lock);
1273   if (found == TRUE)
1274   {
1275     if (sock != NULL)
1276       journal_write("forget", file);
1278     return send_response(sock, RESP_OK, "Gone!\n");
1279   }
1280   else
1281     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1283   /* NOTREACHED */
1284   assert(1==0);
1285 } /* }}} static int handle_request_forget */
1287 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1289   cache_item_t *ci;
1291   pthread_mutex_lock(&cache_lock);
1293   ci = cache_queue_head;
1294   while (ci != NULL)
1295   {
1296     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1297     ci = ci->next;
1298   }
1300   pthread_mutex_unlock(&cache_lock);
1302   return send_response(sock, RESP_OK, "in queue.\n");
1303 } /* }}} int handle_request_queue */
1305 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1307   char *file, file_tmp[PATH_MAX];
1308   int values_num = 0;
1309   int status;
1310   char orig_buf[CMD_MAX];
1312   cache_item_t *ci;
1314   /* save it for the journal later */
1315   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1317   status = buffer_get_field (&buffer, &buffer_size, &file);
1318   if (status != 0)
1319     return syntax_error(sock,cmd);
1321   pthread_mutex_lock(&stats_lock);
1322   stats_updates_received++;
1323   pthread_mutex_unlock(&stats_lock);
1325   get_abs_path(&file, file_tmp);
1326   if (!check_file_access(file, sock)) return 0;
1328   pthread_mutex_lock (&cache_lock);
1329   ci = g_tree_lookup (cache_tree, file);
1331   if (ci == NULL) /* {{{ */
1332   {
1333     struct stat statbuf;
1334     cache_item_t *tmp;
1336     /* don't hold the lock while we setup; stat(2) might block */
1337     pthread_mutex_unlock(&cache_lock);
1339     memset (&statbuf, 0, sizeof (statbuf));
1340     status = stat (file, &statbuf);
1341     if (status != 0)
1342     {
1343       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1345       status = errno;
1346       if (status == ENOENT)
1347         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1348       else
1349         return send_response(sock, RESP_ERR,
1350                              "stat failed with error %i.\n", status);
1351     }
1352     if (!S_ISREG (statbuf.st_mode))
1353       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1355     if (access(file, R_OK|W_OK) != 0)
1356       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1357                            file, rrd_strerror(errno));
1359     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1360     if (ci == NULL)
1361     {
1362       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1364       return send_response(sock, RESP_ERR, "malloc failed.\n");
1365     }
1366     memset (ci, 0, sizeof (cache_item_t));
1368     ci->file = strdup (file);
1369     if (ci->file == NULL)
1370     {
1371       free (ci);
1372       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1374       return send_response(sock, RESP_ERR, "strdup failed.\n");
1375     }
1377     wipe_ci_values(ci, now);
1378     ci->flags = CI_FLAGS_IN_TREE;
1379     pthread_cond_init(&ci->flushed, NULL);
1381     pthread_mutex_lock(&cache_lock);
1383     /* another UPDATE might have added this entry in the meantime */
1384     tmp = g_tree_lookup (cache_tree, file);
1385     if (tmp == NULL)
1386       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1387     else
1388     {
1389       free_cache_item (ci);
1390       ci = tmp;
1391     }
1393     /* state may have changed while we were unlocked */
1394     if (state == SHUTDOWN)
1395       return -1;
1396   } /* }}} */
1397   assert (ci != NULL);
1399   /* don't re-write updates in replay mode */
1400   if (sock != NULL)
1401     journal_write("update", orig_buf);
1403   while (buffer_size > 0)
1404   {
1405     char *value;
1406     time_t stamp;
1407     char *eostamp;
1409     status = buffer_get_field (&buffer, &buffer_size, &value);
1410     if (status != 0)
1411     {
1412       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1413       break;
1414     }
1416     /* make sure update time is always moving forward */
1417     stamp = strtol(value, &eostamp, 10);
1418     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1419     {
1420       pthread_mutex_unlock(&cache_lock);
1421       return send_response(sock, RESP_ERR,
1422                            "Cannot find timestamp in '%s'!\n", value);
1423     }
1424     else if (stamp <= ci->last_update_stamp)
1425     {
1426       pthread_mutex_unlock(&cache_lock);
1427       return send_response(sock, RESP_ERR,
1428                            "illegal attempt to update using time %ld when last"
1429                            " update time is %ld (minimum one second step)\n",
1430                            stamp, ci->last_update_stamp);
1431     }
1432     else
1433       ci->last_update_stamp = stamp;
1435     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1436     {
1437       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1438       continue;
1439     }
1441     values_num++;
1442   }
1444   if (((now - ci->last_flush_time) >= config_write_interval)
1445       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1446       && (ci->values_num > 0))
1447   {
1448     enqueue_cache_item (ci, TAIL);
1449   }
1451   pthread_mutex_unlock (&cache_lock);
1453   if (values_num < 1)
1454     return send_response(sock, RESP_ERR, "No values updated.\n");
1455   else
1456     return send_response(sock, RESP_OK,
1457                          "errors, enqueued %i value(s).\n", values_num);
1459   /* NOTREACHED */
1460   assert(1==0);
1462 } /* }}} int handle_request_update */
1464 /* we came across a "WROTE" entry during journal replay.
1465  * throw away any values that we have accumulated for this file
1466  */
1467 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1469   cache_item_t *ci;
1470   const char *file = buffer;
1472   pthread_mutex_lock(&cache_lock);
1474   ci = g_tree_lookup(cache_tree, file);
1475   if (ci == NULL)
1476   {
1477     pthread_mutex_unlock(&cache_lock);
1478     return (0);
1479   }
1481   if (ci->values)
1482     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1484   wipe_ci_values(ci, now);
1485   remove_from_queue(ci);
1487   pthread_mutex_unlock(&cache_lock);
1488   return (0);
1489 } /* }}} int handle_request_wrote */
1491 /* start "BATCH" processing */
1492 static int batch_start (HANDLER_PROTO) /* {{{ */
1494   int status;
1495   if (sock->batch_start)
1496     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1498   status = send_response(sock, RESP_OK,
1499                          "Go ahead.  End with dot '.' on its own line.\n");
1500   sock->batch_start = time(NULL);
1501   sock->batch_cmd = 0;
1503   return status;
1504 } /* }}} static int batch_start */
1506 /* finish "BATCH" processing and return results to the client */
1507 static int batch_done (HANDLER_PROTO) /* {{{ */
1509   assert(sock->batch_start);
1510   sock->batch_start = 0;
1511   sock->batch_cmd  = 0;
1512   return send_response(sock, RESP_OK, "errors\n");
1513 } /* }}} static int batch_done */
1515 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1517   return -1;
1518 } /* }}} static int handle_request_quit */
1520 static command_t list_of_commands[] = { /* {{{ */
1521   {
1522     "UPDATE",
1523     handle_request_update,
1524     CMD_CONTEXT_ANY,
1525     "UPDATE <filename> <values> [<values> ...]\n"
1526     ,
1527     "Adds the given file to the internal cache if it is not yet known and\n"
1528     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1529     "for details.\n"
1530     "\n"
1531     "Each <values> has the following form:\n"
1532     "  <values> = <time>:<value>[:<value>[...]]\n"
1533     "See the rrdupdate(1) manpage for details.\n"
1534   },
1535   {
1536     "WROTE",
1537     handle_request_wrote,
1538     CMD_CONTEXT_JOURNAL,
1539     NULL,
1540     NULL
1541   },
1542   {
1543     "FLUSH",
1544     handle_request_flush,
1545     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1546     "FLUSH <filename>\n"
1547     ,
1548     "Adds the given filename to the head of the update queue and returns\n"
1549     "after it has been dequeued.\n"
1550   },
1551   {
1552     "FLUSHALL",
1553     handle_request_flushall,
1554     CMD_CONTEXT_CLIENT,
1555     "FLUSHALL\n"
1556     ,
1557     "Triggers writing of all pending updates.  Returns immediately.\n"
1558   },
1559   {
1560     "PENDING",
1561     handle_request_pending,
1562     CMD_CONTEXT_CLIENT,
1563     "PENDING <filename>\n"
1564     ,
1565     "Shows any 'pending' updates for a file, in order.\n"
1566     "The updates shown have not yet been written to the underlying RRD file.\n"
1567   },
1568   {
1569     "FORGET",
1570     handle_request_forget,
1571     CMD_CONTEXT_ANY,
1572     "FORGET <filename>\n"
1573     ,
1574     "Removes the file completely from the cache.\n"
1575     "Any pending updates for the file will be lost.\n"
1576   },
1577   {
1578     "QUEUE",
1579     handle_request_queue,
1580     CMD_CONTEXT_CLIENT,
1581     "QUEUE\n"
1582     ,
1583         "Shows all files in the output queue.\n"
1584     "The output is zero or more lines in the following format:\n"
1585     "(where <num_vals> is the number of values to be written)\n"
1586     "\n"
1587     "<num_vals> <filename>\n"
1588   },
1589   {
1590     "STATS",
1591     handle_request_stats,
1592     CMD_CONTEXT_CLIENT,
1593     "STATS\n"
1594     ,
1595     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1596     "a description of the values.\n"
1597   },
1598   {
1599     "HELP",
1600     handle_request_help,
1601     CMD_CONTEXT_CLIENT,
1602     "HELP [<command>]\n",
1603     NULL, /* special! */
1604   },
1605   {
1606     "BATCH",
1607     batch_start,
1608     CMD_CONTEXT_CLIENT,
1609     "BATCH\n"
1610     ,
1611     "The 'BATCH' command permits the client to initiate a bulk load\n"
1612     "   of commands to rrdcached.\n"
1613     "\n"
1614     "Usage:\n"
1615     "\n"
1616     "    client: BATCH\n"
1617     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1618     "    client: command #1\n"
1619     "    client: command #2\n"
1620     "    client: ... and so on\n"
1621     "    client: .\n"
1622     "    server: 2 errors\n"
1623     "    server: 7 message for command #7\n"
1624     "    server: 9 message for command #9\n"
1625     "\n"
1626     "For more information, consult the rrdcached(1) documentation.\n"
1627   },
1628   {
1629     ".",   /* BATCH terminator */
1630     batch_done,
1631     CMD_CONTEXT_BATCH,
1632     NULL,
1633     NULL
1634   },
1635   {
1636     "QUIT",
1637     handle_request_quit,
1638     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1639     "QUIT\n"
1640     ,
1641     "Disconnect from rrdcached.\n"
1642   }
1643 }; /* }}} command_t list_of_commands[] */
1644 static size_t list_of_commands_len = sizeof (list_of_commands)
1645   / sizeof (list_of_commands[0]);
1647 static command_t *find_command(char *cmd)
1649   size_t i;
1651   for (i = 0; i < list_of_commands_len; i++)
1652     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1653       return (&list_of_commands[i]);
1654   return NULL;
1657 /* We currently use the index in the `list_of_commands' array as a bit position
1658  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1659  * outside these functions so that switching to a more elegant storage method
1660  * is easily possible. */
1661 static ssize_t find_command_index (const char *cmd) /* {{{ */
1663   size_t i;
1665   for (i = 0; i < list_of_commands_len; i++)
1666     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1667       return ((ssize_t) i);
1668   return (-1);
1669 } /* }}} ssize_t find_command_index */
1671 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1672     const char *cmd)
1674   ssize_t i;
1676   if (sock == NULL) /* journal replay */
1677     return (1);
1679   if (cmd == NULL)
1680     return (-1);
1682   if ((strcasecmp ("QUIT", cmd) == 0)
1683       || (strcasecmp ("HELP", cmd) == 0))
1684     return (1);
1685   else if (strcmp (".", cmd) == 0)
1686     cmd = "BATCH";
1688   i = find_command_index (cmd);
1689   if (i < 0)
1690     return (-1);
1691   assert (i < 32);
1693   if ((sock->permissions & (1 << i)) != 0)
1694     return (1);
1695   return (0);
1696 } /* }}} int socket_permission_check */
1698 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1699     const char *cmd)
1701   ssize_t i;
1703   i = find_command_index (cmd);
1704   if (i < 0)
1705     return (-1);
1706   assert (i < 32);
1708   sock->permissions |= (1 << i);
1709   return (0);
1710 } /* }}} int socket_permission_add */
1712 /* check whether commands are received in the expected context */
1713 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1715   if (sock == NULL)
1716     return (cmd->context & CMD_CONTEXT_JOURNAL);
1717   else if (sock->batch_start)
1718     return (cmd->context & CMD_CONTEXT_BATCH);
1719   else
1720     return (cmd->context & CMD_CONTEXT_CLIENT);
1722   /* NOTREACHED */
1723   assert(1==0);
1726 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1728   int status;
1729   char *cmd_str;
1730   char *resp_txt;
1731   command_t *help = NULL;
1733   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1734   if (status == 0)
1735     help = find_command(cmd_str);
1737   if (help && (help->syntax || help->help))
1738   {
1739     char tmp[CMD_MAX];
1741     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1742     resp_txt = tmp;
1744     if (help->syntax)
1745       add_response_info(sock, "Usage: %s\n", help->syntax);
1747     if (help->help)
1748       add_response_info(sock, "%s\n", help->help);
1749   }
1750   else
1751   {
1752     size_t i;
1754     resp_txt = "Command overview\n";
1756     for (i = 0; i < list_of_commands_len; i++)
1757     {
1758       if (list_of_commands[i].syntax == NULL)
1759         continue;
1760       add_response_info (sock, "%s", list_of_commands[i].syntax);
1761     }
1762   }
1764   return send_response(sock, RESP_OK, resp_txt);
1765 } /* }}} int handle_request_help */
1767 /* if sock==NULL, we are in journal replay mode */
1768 static int handle_request (DISPATCH_PROTO) /* {{{ */
1770   char *buffer_ptr = buffer;
1771   char *cmd_str = NULL;
1772   command_t *cmd = NULL;
1773   int status;
1775   assert (buffer[buffer_size - 1] == '\0');
1777   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1778   if (status != 0)
1779   {
1780     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1781     return (-1);
1782   }
1784   if (sock != NULL && sock->batch_start)
1785     sock->batch_cmd++;
1787   cmd = find_command(cmd_str);
1788   if (!cmd)
1789     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1791   if (!socket_permission_check (sock, cmd->cmd))
1792     return send_response(sock, RESP_ERR, "Permission denied.\n");
1794   if (!command_check_context(sock, cmd))
1795     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1797   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1798 } /* }}} int handle_request */
1800 static void journal_set_free (journal_set *js) /* {{{ */
1802   if (js == NULL)
1803     return;
1805   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1807   free(js);
1808 } /* }}} journal_set_free */
1810 static void journal_set_remove (journal_set *js) /* {{{ */
1812   if (js == NULL)
1813     return;
1815   for (uint i=0; i < js->files_num; i++)
1816   {
1817     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1818     unlink(js->files[i]);
1819   }
1820 } /* }}} journal_set_remove */
1822 /* close current journal file handle.
1823  * MUST hold journal_lock before calling */
1824 static void journal_close(void) /* {{{ */
1826   if (journal_fh != NULL)
1827   {
1828     if (fclose(journal_fh) != 0)
1829       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1830   }
1832   journal_fh = NULL;
1833   journal_size = 0;
1834 } /* }}} journal_close */
1836 /* MUST hold journal_lock before calling */
1837 static void journal_new_file(void) /* {{{ */
1839   struct timeval now;
1840   int  new_fd;
1841   char new_file[PATH_MAX + 1];
1843   assert(journal_dir != NULL);
1844   assert(journal_cur != NULL);
1846   journal_close();
1848   gettimeofday(&now, NULL);
1849   /* this format assures that the files sort in strcmp() order */
1850   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1851            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1853   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1854                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1855   if (new_fd < 0)
1856     goto error;
1858   journal_fh = fdopen(new_fd, "a");
1859   if (journal_fh == NULL)
1860     goto error;
1862   journal_size = ftell(journal_fh);
1863   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1865   /* record the file in the journal set */
1866   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1868   return;
1870 error:
1871   RRDD_LOG(LOG_CRIT,
1872            "JOURNALING DISABLED: Error while trying to create %s : %s",
1873            new_file, rrd_strerror(errno));
1874   RRDD_LOG(LOG_CRIT,
1875            "JOURNALING DISABLED: All values will be flushed at shutdown");
1877   close(new_fd);
1878   config_flush_at_shutdown = 1;
1880 } /* }}} journal_new_file */
1882 /* MUST NOT hold journal_lock before calling this */
1883 static void journal_rotate(void) /* {{{ */
1885   journal_set *old_js = NULL;
1887   if (journal_dir == NULL)
1888     return;
1890   RRDD_LOG(LOG_DEBUG, "rotating journals");
1892   pthread_mutex_lock(&stats_lock);
1893   ++stats_journal_rotate;
1894   pthread_mutex_unlock(&stats_lock);
1896   pthread_mutex_lock(&journal_lock);
1898   journal_close();
1900   /* rotate the journal sets */
1901   old_js = journal_old;
1902   journal_old = journal_cur;
1903   journal_cur = calloc(1, sizeof(journal_set));
1905   if (journal_cur != NULL)
1906     journal_new_file();
1907   else
1908     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1910   pthread_mutex_unlock(&journal_lock);
1912   journal_set_remove(old_js);
1913   journal_set_free  (old_js);
1915 } /* }}} static void journal_rotate */
1917 /* MUST hold journal_lock when calling */
1918 static void journal_done(void) /* {{{ */
1920   if (journal_cur == NULL)
1921     return;
1923   journal_close();
1925   if (config_flush_at_shutdown)
1926   {
1927     RRDD_LOG(LOG_INFO, "removing journals");
1928     journal_set_remove(journal_old);
1929     journal_set_remove(journal_cur);
1930   }
1931   else
1932   {
1933     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1934              "journals will be used at next startup");
1935   }
1937   journal_set_free(journal_cur);
1938   journal_set_free(journal_old);
1939   free(journal_dir);
1941 } /* }}} static void journal_done */
1943 static int journal_write(char *cmd, char *args) /* {{{ */
1945   int chars;
1947   if (journal_fh == NULL)
1948     return 0;
1950   pthread_mutex_lock(&journal_lock);
1951   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1952   journal_size += chars;
1954   if (journal_size > JOURNAL_MAX)
1955     journal_new_file();
1957   pthread_mutex_unlock(&journal_lock);
1959   if (chars > 0)
1960   {
1961     pthread_mutex_lock(&stats_lock);
1962     stats_journal_bytes += chars;
1963     pthread_mutex_unlock(&stats_lock);
1964   }
1966   return chars;
1967 } /* }}} static int journal_write */
1969 static int journal_replay (const char *file) /* {{{ */
1971   FILE *fh;
1972   int entry_cnt = 0;
1973   int fail_cnt = 0;
1974   uint64_t line = 0;
1975   char entry[CMD_MAX];
1976   time_t now;
1978   if (file == NULL) return 0;
1980   {
1981     char *reason = "unknown error";
1982     int status = 0;
1983     struct stat statbuf;
1985     memset(&statbuf, 0, sizeof(statbuf));
1986     if (stat(file, &statbuf) != 0)
1987     {
1988       reason = "stat error";
1989       status = errno;
1990     }
1991     else if (!S_ISREG(statbuf.st_mode))
1992     {
1993       reason = "not a regular file";
1994       status = EPERM;
1995     }
1996     if (statbuf.st_uid != daemon_uid)
1997     {
1998       reason = "not owned by daemon user";
1999       status = EACCES;
2000     }
2001     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2002     {
2003       reason = "must not be user/group writable";
2004       status = EACCES;
2005     }
2007     if (status != 0)
2008     {
2009       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2010                file, rrd_strerror(status), reason);
2011       return 0;
2012     }
2013   }
2015   fh = fopen(file, "r");
2016   if (fh == NULL)
2017   {
2018     if (errno != ENOENT)
2019       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2020                file, rrd_strerror(errno));
2021     return 0;
2022   }
2023   else
2024     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2026   now = time(NULL);
2028   while(!feof(fh))
2029   {
2030     size_t entry_len;
2032     ++line;
2033     if (fgets(entry, sizeof(entry), fh) == NULL)
2034       break;
2035     entry_len = strlen(entry);
2037     /* check \n termination in case journal writing crashed mid-line */
2038     if (entry_len == 0)
2039       continue;
2040     else if (entry[entry_len - 1] != '\n')
2041     {
2042       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2043       ++fail_cnt;
2044       continue;
2045     }
2047     entry[entry_len - 1] = '\0';
2049     if (handle_request(NULL, now, entry, entry_len) == 0)
2050       ++entry_cnt;
2051     else
2052       ++fail_cnt;
2053   }
2055   fclose(fh);
2057   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2058            entry_cnt, fail_cnt);
2060   return entry_cnt > 0 ? 1 : 0;
2061 } /* }}} static int journal_replay */
2063 static int journal_sort(const void *v1, const void *v2)
2065   char **jn1 = (char **) v1;
2066   char **jn2 = (char **) v2;
2068   return strcmp(*jn1,*jn2);
2071 static void journal_init(void) /* {{{ */
2073   int had_journal = 0;
2074   DIR *dir;
2075   struct dirent *dent;
2076   char path[PATH_MAX+1];
2078   if (journal_dir == NULL) return;
2080   pthread_mutex_lock(&journal_lock);
2082   journal_cur = calloc(1, sizeof(journal_set));
2083   if (journal_cur == NULL)
2084   {
2085     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2086     return;
2087   }
2089   RRDD_LOG(LOG_INFO, "checking for journal files");
2091   /* Handle old journal files during transition.  This gives them the
2092    * correct sort order.  TODO: remove after first release
2093    */
2094   {
2095     char old_path[PATH_MAX+1];
2096     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2097     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2098     rename(old_path, path);
2100     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2101     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2102     rename(old_path, path);
2103   }
2105   dir = opendir(journal_dir);
2106   while ((dent = readdir(dir)) != NULL)
2107   {
2108     /* looks like a journal file? */
2109     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2110       continue;
2112     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2114     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2115     {
2116       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2117                dent->d_name);
2118       break;
2119     }
2120   }
2121   closedir(dir);
2123   qsort(journal_cur->files, journal_cur->files_num,
2124         sizeof(journal_cur->files[0]), journal_sort);
2126   for (uint i=0; i < journal_cur->files_num; i++)
2127     had_journal += journal_replay(journal_cur->files[i]);
2129   journal_new_file();
2131   /* it must have been a crash.  start a flush */
2132   if (had_journal && config_flush_at_shutdown)
2133     flush_old_values(-1);
2135   pthread_mutex_unlock(&journal_lock);
2137   RRDD_LOG(LOG_INFO, "journal processing complete");
2139 } /* }}} static void journal_init */
2141 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2143   assert(sock != NULL);
2145   free(sock->rbuf);  sock->rbuf = NULL;
2146   free(sock->wbuf);  sock->wbuf = NULL;
2147   free(sock);
2148 } /* }}} void free_listen_socket */
2150 static void close_connection(listen_socket_t *sock) /* {{{ */
2152   if (sock->fd >= 0)
2153   {
2154     close(sock->fd);
2155     sock->fd = -1;
2156   }
2158   free_listen_socket(sock);
2160 } /* }}} void close_connection */
2162 static void *connection_thread_main (void *args) /* {{{ */
2164   listen_socket_t *sock;
2165   int fd;
2167   sock = (listen_socket_t *) args;
2168   fd = sock->fd;
2170   /* init read buffers */
2171   sock->next_read = sock->next_cmd = 0;
2172   sock->rbuf = malloc(RBUF_SIZE);
2173   if (sock->rbuf == NULL)
2174   {
2175     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2176     close_connection(sock);
2177     return NULL;
2178   }
2180   pthread_mutex_lock (&connection_threads_lock);
2181   connection_threads_num++;
2182   pthread_mutex_unlock (&connection_threads_lock);
2184   while (state == RUNNING)
2185   {
2186     char *cmd;
2187     ssize_t cmd_len;
2188     ssize_t rbytes;
2189     time_t now;
2191     struct pollfd pollfd;
2192     int status;
2194     pollfd.fd = fd;
2195     pollfd.events = POLLIN | POLLPRI;
2196     pollfd.revents = 0;
2198     status = poll (&pollfd, 1, /* timeout = */ 500);
2199     if (state != RUNNING)
2200       break;
2201     else if (status == 0) /* timeout */
2202       continue;
2203     else if (status < 0) /* error */
2204     {
2205       status = errno;
2206       if (status != EINTR)
2207         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2208       continue;
2209     }
2211     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2212       break;
2213     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2214     {
2215       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2216           "poll(2) returned something unexpected: %#04hx",
2217           pollfd.revents);
2218       break;
2219     }
2221     rbytes = read(fd, sock->rbuf + sock->next_read,
2222                   RBUF_SIZE - sock->next_read);
2223     if (rbytes < 0)
2224     {
2225       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2226       break;
2227     }
2228     else if (rbytes == 0)
2229       break; /* eof */
2231     sock->next_read += rbytes;
2233     if (sock->batch_start)
2234       now = sock->batch_start;
2235     else
2236       now = time(NULL);
2238     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2239     {
2240       status = handle_request (sock, now, cmd, cmd_len+1);
2241       if (status != 0)
2242         goto out_close;
2243     }
2244   }
2246 out_close:
2247   close_connection(sock);
2249   /* Remove this thread from the connection threads list */
2250   pthread_mutex_lock (&connection_threads_lock);
2251   connection_threads_num--;
2252   if (connection_threads_num <= 0)
2253     pthread_cond_broadcast(&connection_threads_done);
2254   pthread_mutex_unlock (&connection_threads_lock);
2256   return (NULL);
2257 } /* }}} void *connection_thread_main */
2259 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2261   int fd;
2262   struct sockaddr_un sa;
2263   listen_socket_t *temp;
2264   int status;
2265   const char *path;
2266   char *path_copy, *dir;
2268   path = sock->addr;
2269   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2270     path += strlen("unix:");
2272   /* dirname may modify its argument */
2273   path_copy = strdup(path);
2274   if (path_copy == NULL)
2275   {
2276     fprintf(stderr, "rrdcached: strdup(): %s\n",
2277         rrd_strerror(errno));
2278     return (-1);
2279   }
2281   dir = dirname(path_copy);
2282   if (rrd_mkdir_p(dir, 0777) != 0)
2283   {
2284     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2285         dir, rrd_strerror(errno));
2286     return (-1);
2287   }
2289   free(path_copy);
2291   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2292       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2293   if (temp == NULL)
2294   {
2295     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2296     return (-1);
2297   }
2298   listen_fds = temp;
2299   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2301   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2302   if (fd < 0)
2303   {
2304     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2305              rrd_strerror(errno));
2306     return (-1);
2307   }
2309   memset (&sa, 0, sizeof (sa));
2310   sa.sun_family = AF_UNIX;
2311   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2313   /* if we've gotten this far, we own the pid file.  any daemon started
2314    * with the same args must not be alive.  therefore, ensure that we can
2315    * create the socket...
2316    */
2317   unlink(path);
2319   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2320   if (status != 0)
2321   {
2322     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2323              path, rrd_strerror(errno));
2324     close (fd);
2325     return (-1);
2326   }
2328   status = listen (fd, /* backlog = */ 10);
2329   if (status != 0)
2330   {
2331     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2332              path, rrd_strerror(errno));
2333     close (fd);
2334     unlink (path);
2335     return (-1);
2336   }
2338   listen_fds[listen_fds_num].fd = fd;
2339   listen_fds[listen_fds_num].family = PF_UNIX;
2340   strncpy(listen_fds[listen_fds_num].addr, path,
2341           sizeof (listen_fds[listen_fds_num].addr) - 1);
2342   listen_fds_num++;
2344   return (0);
2345 } /* }}} int open_listen_socket_unix */
2347 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2349   struct addrinfo ai_hints;
2350   struct addrinfo *ai_res;
2351   struct addrinfo *ai_ptr;
2352   char addr_copy[NI_MAXHOST];
2353   char *addr;
2354   char *port;
2355   int status;
2357   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2358   addr_copy[sizeof (addr_copy) - 1] = 0;
2359   addr = addr_copy;
2361   memset (&ai_hints, 0, sizeof (ai_hints));
2362   ai_hints.ai_flags = 0;
2363 #ifdef AI_ADDRCONFIG
2364   ai_hints.ai_flags |= AI_ADDRCONFIG;
2365 #endif
2366   ai_hints.ai_family = AF_UNSPEC;
2367   ai_hints.ai_socktype = SOCK_STREAM;
2369   port = NULL;
2370   if (*addr == '[') /* IPv6+port format */
2371   {
2372     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2373     addr++;
2375     port = strchr (addr, ']');
2376     if (port == NULL)
2377     {
2378       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2379       return (-1);
2380     }
2381     *port = 0;
2382     port++;
2384     if (*port == ':')
2385       port++;
2386     else if (*port == 0)
2387       port = NULL;
2388     else
2389     {
2390       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2391       return (-1);
2392     }
2393   } /* if (*addr == '[') */
2394   else
2395   {
2396     port = rindex(addr, ':');
2397     if (port != NULL)
2398     {
2399       *port = 0;
2400       port++;
2401     }
2402   }
2403   ai_res = NULL;
2404   status = getaddrinfo (addr,
2405                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2406                         &ai_hints, &ai_res);
2407   if (status != 0)
2408   {
2409     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2410              addr, gai_strerror (status));
2411     return (-1);
2412   }
2414   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2415   {
2416     int fd;
2417     listen_socket_t *temp;
2418     int one = 1;
2420     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2421         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2422     if (temp == NULL)
2423     {
2424       fprintf (stderr,
2425                "rrdcached: open_listen_socket_network: realloc failed.\n");
2426       continue;
2427     }
2428     listen_fds = temp;
2429     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2431     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2432     if (fd < 0)
2433     {
2434       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2435                rrd_strerror(errno));
2436       continue;
2437     }
2439     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2441     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2442     if (status != 0)
2443     {
2444       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2445                sock->addr, rrd_strerror(errno));
2446       close (fd);
2447       continue;
2448     }
2450     status = listen (fd, /* backlog = */ 10);
2451     if (status != 0)
2452     {
2453       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2454                sock->addr, rrd_strerror(errno));
2455       close (fd);
2456       freeaddrinfo(ai_res);
2457       return (-1);
2458     }
2460     listen_fds[listen_fds_num].fd = fd;
2461     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2462     listen_fds_num++;
2463   } /* for (ai_ptr) */
2465   freeaddrinfo(ai_res);
2466   return (0);
2467 } /* }}} static int open_listen_socket_network */
2469 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2471   assert(sock != NULL);
2472   assert(sock->addr != NULL);
2474   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2475       || sock->addr[0] == '/')
2476     return (open_listen_socket_unix(sock));
2477   else
2478     return (open_listen_socket_network(sock));
2479 } /* }}} int open_listen_socket */
2481 static int close_listen_sockets (void) /* {{{ */
2483   size_t i;
2485   for (i = 0; i < listen_fds_num; i++)
2486   {
2487     close (listen_fds[i].fd);
2489     if (listen_fds[i].family == PF_UNIX)
2490       unlink(listen_fds[i].addr);
2491   }
2493   free (listen_fds);
2494   listen_fds = NULL;
2495   listen_fds_num = 0;
2497   return (0);
2498 } /* }}} int close_listen_sockets */
2500 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2502   struct pollfd *pollfds;
2503   int pollfds_num;
2504   int status;
2505   int i;
2507   if (listen_fds_num < 1)
2508   {
2509     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2510     return (NULL);
2511   }
2513   pollfds_num = listen_fds_num;
2514   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2515   if (pollfds == NULL)
2516   {
2517     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2518     return (NULL);
2519   }
2520   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2522   RRDD_LOG(LOG_INFO, "listening for connections");
2524   while (state == RUNNING)
2525   {
2526     for (i = 0; i < pollfds_num; i++)
2527     {
2528       pollfds[i].fd = listen_fds[i].fd;
2529       pollfds[i].events = POLLIN | POLLPRI;
2530       pollfds[i].revents = 0;
2531     }
2533     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2534     if (state != RUNNING)
2535       break;
2536     else if (status == 0) /* timeout */
2537       continue;
2538     else if (status < 0) /* error */
2539     {
2540       status = errno;
2541       if (status != EINTR)
2542       {
2543         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2544       }
2545       continue;
2546     }
2548     for (i = 0; i < pollfds_num; i++)
2549     {
2550       listen_socket_t *client_sock;
2551       struct sockaddr_storage client_sa;
2552       socklen_t client_sa_size;
2553       pthread_t tid;
2554       pthread_attr_t attr;
2556       if (pollfds[i].revents == 0)
2557         continue;
2559       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2560       {
2561         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2562             "poll(2) returned something unexpected for listen FD #%i.",
2563             pollfds[i].fd);
2564         continue;
2565       }
2567       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2568       if (client_sock == NULL)
2569       {
2570         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2571         continue;
2572       }
2573       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2575       client_sa_size = sizeof (client_sa);
2576       client_sock->fd = accept (pollfds[i].fd,
2577           (struct sockaddr *) &client_sa, &client_sa_size);
2578       if (client_sock->fd < 0)
2579       {
2580         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2581         free(client_sock);
2582         continue;
2583       }
2585       pthread_attr_init (&attr);
2586       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2588       status = pthread_create (&tid, &attr, connection_thread_main,
2589                                client_sock);
2590       if (status != 0)
2591       {
2592         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2593         close_connection(client_sock);
2594         continue;
2595       }
2596     } /* for (pollfds_num) */
2597   } /* while (state == RUNNING) */
2599   RRDD_LOG(LOG_INFO, "starting shutdown");
2601   close_listen_sockets ();
2603   pthread_mutex_lock (&connection_threads_lock);
2604   while (connection_threads_num > 0)
2605     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2606   pthread_mutex_unlock (&connection_threads_lock);
2608   free(pollfds);
2610   return (NULL);
2611 } /* }}} void *listen_thread_main */
2613 static int daemonize (void) /* {{{ */
2615   int pid_fd;
2616   char *base_dir;
2618   daemon_uid = geteuid();
2620   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2621   if (pid_fd < 0)
2622     pid_fd = check_pidfile();
2623   if (pid_fd < 0)
2624     return pid_fd;
2626   /* open all the listen sockets */
2627   if (config_listen_address_list_len > 0)
2628   {
2629     for (size_t i = 0; i < config_listen_address_list_len; i++)
2630       open_listen_socket (config_listen_address_list[i]);
2632     rrd_free_ptrs((void ***) &config_listen_address_list,
2633                   &config_listen_address_list_len);
2634   }
2635   else
2636   {
2637     listen_socket_t sock;
2638     memset(&sock, 0, sizeof(sock));
2639     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2640     open_listen_socket (&sock);
2641   }
2643   if (listen_fds_num < 1)
2644   {
2645     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2646     goto error;
2647   }
2649   if (!stay_foreground)
2650   {
2651     pid_t child;
2653     child = fork ();
2654     if (child < 0)
2655     {
2656       fprintf (stderr, "daemonize: fork(2) failed.\n");
2657       goto error;
2658     }
2659     else if (child > 0)
2660       exit(0);
2662     /* Become session leader */
2663     setsid ();
2665     /* Open the first three file descriptors to /dev/null */
2666     close (2);
2667     close (1);
2668     close (0);
2670     open ("/dev/null", O_RDWR);
2671     if (dup(0) == -1 || dup(0) == -1){
2672         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2673     }
2674   } /* if (!stay_foreground) */
2676   /* Change into the /tmp directory. */
2677   base_dir = (config_base_dir != NULL)
2678     ? config_base_dir
2679     : "/tmp";
2681   if (chdir (base_dir) != 0)
2682   {
2683     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2684     goto error;
2685   }
2687   install_signal_handlers();
2689   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2690   RRDD_LOG(LOG_INFO, "starting up");
2692   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2693                                 (GDestroyNotify) free_cache_item);
2694   if (cache_tree == NULL)
2695   {
2696     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2697     goto error;
2698   }
2700   return write_pidfile (pid_fd);
2702 error:
2703   remove_pidfile();
2704   return -1;
2705 } /* }}} int daemonize */
2707 static int cleanup (void) /* {{{ */
2709   pthread_cond_broadcast (&flush_cond);
2710   pthread_join (flush_thread, NULL);
2712   pthread_cond_broadcast (&queue_cond);
2713   for (int i = 0; i < config_queue_threads; i++)
2714     pthread_join (queue_threads[i], NULL);
2716   if (config_flush_at_shutdown)
2717   {
2718     assert(cache_queue_head == NULL);
2719     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2720   }
2722   free(queue_threads);
2723   free(config_base_dir);
2725   pthread_mutex_lock(&cache_lock);
2726   g_tree_destroy(cache_tree);
2728   pthread_mutex_lock(&journal_lock);
2729   journal_done();
2731   RRDD_LOG(LOG_INFO, "goodbye");
2732   closelog ();
2734   remove_pidfile ();
2735   free(config_pid_file);
2737   return (0);
2738 } /* }}} int cleanup */
2740 static int read_options (int argc, char **argv) /* {{{ */
2742   int option;
2743   int status = 0;
2745   char **permissions = NULL;
2746   size_t permissions_len = 0;
2748   while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2749   {
2750     switch (option)
2751     {
2752       case 'g':
2753         stay_foreground=1;
2754         break;
2756       case 'l':
2757       {
2758         listen_socket_t *new;
2760         new = malloc(sizeof(listen_socket_t));
2761         if (new == NULL)
2762         {
2763           fprintf(stderr, "read_options: malloc failed.\n");
2764           return(2);
2765         }
2766         memset(new, 0, sizeof(listen_socket_t));
2768         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2770         /* Add permissions to the socket {{{ */
2771         if (permissions_len != 0)
2772         {
2773           size_t i;
2774           for (i = 0; i < permissions_len; i++)
2775           {
2776             status = socket_permission_add (new, permissions[i]);
2777             if (status != 0)
2778             {
2779               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2780                   "socket failed. Most likely, this permission doesn't "
2781                   "exist. Check your command line.\n", permissions[i]);
2782               status = 4;
2783             }
2784           }
2785         }
2786         else /* if (permissions_len == 0) */
2787         {
2788           /* Add permission for ALL commands to the socket. */
2789           size_t i;
2790           for (i = 0; i < list_of_commands_len; i++)
2791           {
2792             status = socket_permission_add (new, list_of_commands[i].cmd);
2793             if (status != 0)
2794             {
2795               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2796                   "socket failed. This should never happen, ever! Sorry.\n",
2797                   permissions[i]);
2798               status = 4;
2799             }
2800           }
2801         }
2802         /* }}} Done adding permissions. */
2804         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2805                          &config_listen_address_list_len, new))
2806         {
2807           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2808           return (2);
2809         }
2810       }
2811       break;
2813       case 'P':
2814       {
2815         char *optcopy;
2816         char *saveptr;
2817         char *dummy;
2818         char *ptr;
2820         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2822         optcopy = strdup (optarg);
2823         dummy = optcopy;
2824         saveptr = NULL;
2825         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2826         {
2827           dummy = NULL;
2828           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2829         }
2831         free (optcopy);
2832       }
2833       break;
2835       case 'f':
2836       {
2837         int temp;
2839         temp = atoi (optarg);
2840         if (temp > 0)
2841           config_flush_interval = temp;
2842         else
2843         {
2844           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2845           status = 3;
2846         }
2847       }
2848       break;
2850       case 'w':
2851       {
2852         int temp;
2854         temp = atoi (optarg);
2855         if (temp > 0)
2856           config_write_interval = temp;
2857         else
2858         {
2859           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2860           status = 2;
2861         }
2862       }
2863       break;
2865       case 'z':
2866       {
2867         int temp;
2869         temp = atoi(optarg);
2870         if (temp > 0)
2871           config_write_jitter = temp;
2872         else
2873         {
2874           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2875           status = 2;
2876         }
2878         break;
2879       }
2881       case 't':
2882       {
2883         int threads;
2884         threads = atoi(optarg);
2885         if (threads >= 1)
2886           config_queue_threads = threads;
2887         else
2888         {
2889           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2890           return 1;
2891         }
2892       }
2893       break;
2895       case 'B':
2896         config_write_base_only = 1;
2897         break;
2899       case 'b':
2900       {
2901         size_t len;
2902         char base_realpath[PATH_MAX];
2904         if (config_base_dir != NULL)
2905           free (config_base_dir);
2906         config_base_dir = strdup (optarg);
2907         if (config_base_dir == NULL)
2908         {
2909           fprintf (stderr, "read_options: strdup failed.\n");
2910           return (3);
2911         }
2913         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2914         {
2915           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2916               config_base_dir, rrd_strerror (errno));
2917           return (3);
2918         }
2920         /* make sure that the base directory is not resolved via
2921          * symbolic links.  this makes some performance-enhancing
2922          * assumptions possible (we don't have to resolve paths
2923          * that start with a "/")
2924          */
2925         if (realpath(config_base_dir, base_realpath) == NULL)
2926         {
2927           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2928               "%s\n", config_base_dir, rrd_strerror(errno));
2929           return 5;
2930         }
2932         len = strlen (config_base_dir);
2933         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2934         {
2935           config_base_dir[len - 1] = 0;
2936           len--;
2937         }
2939         if (len < 1)
2940         {
2941           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2942           return (4);
2943         }
2945         _config_base_dir_len = len;
2947         len = strlen (base_realpath);
2948         while ((len > 0) && (base_realpath[len - 1] == '/'))
2949         {
2950           base_realpath[len - 1] = '\0';
2951           len--;
2952         }
2954         if (strncmp(config_base_dir,
2955                          base_realpath, sizeof(base_realpath)) != 0)
2956         {
2957           fprintf(stderr,
2958                   "Base directory (-b) resolved via file system links!\n"
2959                   "Please consult rrdcached '-b' documentation!\n"
2960                   "Consider specifying the real directory (%s)\n",
2961                   base_realpath);
2962           return 5;
2963         }
2964       }
2965       break;
2967       case 'p':
2968       {
2969         if (config_pid_file != NULL)
2970           free (config_pid_file);
2971         config_pid_file = strdup (optarg);
2972         if (config_pid_file == NULL)
2973         {
2974           fprintf (stderr, "read_options: strdup failed.\n");
2975           return (3);
2976         }
2977       }
2978       break;
2980       case 'F':
2981         config_flush_at_shutdown = 1;
2982         break;
2984       case 'j':
2985       {
2986         const char *dir = journal_dir = strdup(optarg);
2988         status = rrd_mkdir_p(dir, 0777);
2989         if (status != 0)
2990         {
2991           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
2992               dir, rrd_strerror(errno));
2993           return 6;
2994         }
2996         if (access(dir, R_OK|W_OK|X_OK) != 0)
2997         {
2998           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2999                   errno ? rrd_strerror(errno) : "");
3000           return 6;
3001         }
3002       }
3003       break;
3005       case 'h':
3006       case '?':
3007         printf ("RRDCacheD %s\n"
3008             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3009             "\n"
3010             "Usage: rrdcached [options]\n"
3011             "\n"
3012             "Valid options are:\n"
3013             "  -l <address>  Socket address to listen to.\n"
3014             "  -P <perms>    Sets the permissions to assign to all following "
3015                             "sockets\n"
3016             "  -w <seconds>  Interval in which to write data.\n"
3017             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3018             "  -t <threads>  Number of write threads.\n"
3019             "  -f <seconds>  Interval in which to flush dead data.\n"
3020             "  -p <file>     Location of the PID-file.\n"
3021             "  -b <dir>      Base directory to change to.\n"
3022             "  -B            Restrict file access to paths within -b <dir>\n"
3023             "  -g            Do not fork and run in the foreground.\n"
3024             "  -j <dir>      Directory in which to create the journal files.\n"
3025             "  -F            Always flush all updates at shutdown\n"
3026             "\n"
3027             "For more information and a detailed description of all options "
3028             "please refer\n"
3029             "to the rrdcached(1) manual page.\n",
3030             VERSION);
3031         status = -1;
3032         break;
3033     } /* switch (option) */
3034   } /* while (getopt) */
3036   /* advise the user when values are not sane */
3037   if (config_flush_interval < 2 * config_write_interval)
3038     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3039             " 2x write interval (-w) !\n");
3040   if (config_write_jitter > config_write_interval)
3041     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3042             " write interval (-w) !\n");
3044   if (config_write_base_only && config_base_dir == NULL)
3045     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3046             "  Consult the rrdcached documentation\n");
3048   if (journal_dir == NULL)
3049     config_flush_at_shutdown = 1;
3051   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3053   return (status);
3054 } /* }}} int read_options */
3056 int main (int argc, char **argv)
3058   int status;
3060   status = read_options (argc, argv);
3061   if (status != 0)
3062   {
3063     if (status < 0)
3064       status = 0;
3065     return (status);
3066   }
3068   status = daemonize ();
3069   if (status != 0)
3070   {
3071     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3072     return (1);
3073   }
3075   journal_init();
3077   /* start the queue threads */
3078   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3079   if (queue_threads == NULL)
3080   {
3081     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3082     cleanup();
3083     return (1);
3084   }
3085   for (int i = 0; i < config_queue_threads; i++)
3086   {
3087     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3088     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3089     if (status != 0)
3090     {
3091       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3092       cleanup();
3093       return (1);
3094     }
3095   }
3097   /* start the flush thread */
3098   memset(&flush_thread, 0, sizeof(flush_thread));
3099   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3100   if (status != 0)
3101   {
3102     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3103     cleanup();
3104     return (1);
3105   }
3107   listen_thread_main (NULL);
3108   cleanup ();
3110   return (0);
3111 } /* int main */
3113 /*
3114  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3115  */