Code

Two-phase shutdown for rrdcached ensures that values are flushed.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116  * Types
117  */
118 typedef enum
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
168   char *syntax;
169   char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
189 struct callback_flush_data_s
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 enum {
219   RUNNING,              /* normal operation */
220   FLUSHING,             /* flushing remaining values */
221   SHUTDOWN              /* shutting down */
222 } state = RUNNING;
224 static pthread_t *queue_threads;
225 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
226 static int config_queue_threads = 4;
228 static pthread_t flush_thread;
229 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
231 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
232 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
233 static int connection_threads_num = 0;
235 /* Cache stuff */
236 static GTree          *cache_tree = NULL;
237 static cache_item_t   *cache_queue_head = NULL;
238 static cache_item_t   *cache_queue_tail = NULL;
239 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
241 static int config_write_interval = 300;
242 static int config_write_jitter   = 0;
243 static int config_flush_interval = 3600;
244 static int config_flush_at_shutdown = 0;
245 static char *config_pid_file = NULL;
246 static char *config_base_dir = NULL;
247 static size_t _config_base_dir_len = 0;
248 static int config_write_base_only = 0;
250 static listen_socket_t **config_listen_address_list = NULL;
251 static size_t config_listen_address_list_len = 0;
253 static uint64_t stats_queue_length = 0;
254 static uint64_t stats_updates_received = 0;
255 static uint64_t stats_flush_received = 0;
256 static uint64_t stats_updates_written = 0;
257 static uint64_t stats_data_sets_written = 0;
258 static uint64_t stats_journal_bytes = 0;
259 static uint64_t stats_journal_rotate = 0;
260 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
262 /* Journaled updates */
263 static char *journal_cur = NULL;
264 static char *journal_old = NULL;
265 static FILE *journal_fh = NULL;
266 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
267 static int journal_write(char *cmd, char *args);
268 static void journal_done(void);
269 static void journal_rotate(void);
271 /* prototypes for forward refernces */
272 static int handle_request_help (HANDLER_PROTO);
274 /* 
275  * Functions
276  */
277 static void sig_common (const char *sig) /* {{{ */
279   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
280   state = FLUSHING;
281   pthread_cond_broadcast(&flush_cond);
282   pthread_cond_broadcast(&queue_cond);
283 } /* }}} void sig_common */
285 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
287   sig_common("INT");
288 } /* }}} void sig_int_handler */
290 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
292   sig_common("TERM");
293 } /* }}} void sig_term_handler */
295 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
297   config_flush_at_shutdown = 1;
298   sig_common("USR1");
299 } /* }}} void sig_usr1_handler */
301 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
303   config_flush_at_shutdown = 0;
304   sig_common("USR2");
305 } /* }}} void sig_usr2_handler */
307 static void install_signal_handlers(void) /* {{{ */
309   /* These structures are static, because `sigaction' behaves weird if the are
310    * overwritten.. */
311   static struct sigaction sa_int;
312   static struct sigaction sa_term;
313   static struct sigaction sa_pipe;
314   static struct sigaction sa_usr1;
315   static struct sigaction sa_usr2;
317   /* Install signal handlers */
318   memset (&sa_int, 0, sizeof (sa_int));
319   sa_int.sa_handler = sig_int_handler;
320   sigaction (SIGINT, &sa_int, NULL);
322   memset (&sa_term, 0, sizeof (sa_term));
323   sa_term.sa_handler = sig_term_handler;
324   sigaction (SIGTERM, &sa_term, NULL);
326   memset (&sa_pipe, 0, sizeof (sa_pipe));
327   sa_pipe.sa_handler = SIG_IGN;
328   sigaction (SIGPIPE, &sa_pipe, NULL);
330   memset (&sa_pipe, 0, sizeof (sa_usr1));
331   sa_usr1.sa_handler = sig_usr1_handler;
332   sigaction (SIGUSR1, &sa_usr1, NULL);
334   memset (&sa_usr2, 0, sizeof (sa_usr2));
335   sa_usr2.sa_handler = sig_usr2_handler;
336   sigaction (SIGUSR2, &sa_usr2, NULL);
338 } /* }}} void install_signal_handlers */
340 static int open_pidfile(char *action, int oflag) /* {{{ */
342   int fd;
343   char *file;
345   file = (config_pid_file != NULL)
346     ? config_pid_file
347     : LOCALSTATEDIR "/run/rrdcached.pid";
349   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
350   if (fd < 0)
351     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
352             action, file, rrd_strerror(errno));
354   return(fd);
355 } /* }}} static int open_pidfile */
357 /* check existing pid file to see whether a daemon is running */
358 static int check_pidfile(void)
360   int pid_fd;
361   pid_t pid;
362   char pid_str[16];
364   pid_fd = open_pidfile("open", O_RDWR);
365   if (pid_fd < 0)
366     return pid_fd;
368   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
369     return -1;
371   pid = atoi(pid_str);
372   if (pid <= 0)
373     return -1;
375   /* another running process that we can signal COULD be
376    * a competing rrdcached */
377   if (pid != getpid() && kill(pid, 0) == 0)
378   {
379     fprintf(stderr,
380             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
381     close(pid_fd);
382     return -1;
383   }
385   lseek(pid_fd, 0, SEEK_SET);
386   if (ftruncate(pid_fd, 0) == -1)
387   {
388     fprintf(stderr,
389             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
390     close(pid_fd);
391     return -1;
392   }
394   fprintf(stderr,
395           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
396           "rrdcached: starting normally.\n", pid);
398   return pid_fd;
399 } /* }}} static int check_pidfile */
401 static int write_pidfile (int fd) /* {{{ */
403   pid_t pid;
404   FILE *fh;
406   pid = getpid ();
408   fh = fdopen (fd, "w");
409   if (fh == NULL)
410   {
411     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
412     close(fd);
413     return (-1);
414   }
416   fprintf (fh, "%i\n", (int) pid);
417   fclose (fh);
419   return (0);
420 } /* }}} int write_pidfile */
422 static int remove_pidfile (void) /* {{{ */
424   char *file;
425   int status;
427   file = (config_pid_file != NULL)
428     ? config_pid_file
429     : LOCALSTATEDIR "/run/rrdcached.pid";
431   status = unlink (file);
432   if (status == 0)
433     return (0);
434   return (errno);
435 } /* }}} int remove_pidfile */
437 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
439   char *eol;
441   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
442                sock->next_read - sock->next_cmd);
444   if (eol == NULL)
445   {
446     /* no commands left, move remainder back to front of rbuf */
447     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
448             sock->next_read - sock->next_cmd);
449     sock->next_read -= sock->next_cmd;
450     sock->next_cmd = 0;
451     *len = 0;
452     return NULL;
453   }
454   else
455   {
456     char *cmd = sock->rbuf + sock->next_cmd;
457     *eol = '\0';
459     sock->next_cmd = eol - sock->rbuf + 1;
461     if (eol > sock->rbuf && *(eol-1) == '\r')
462       *(--eol) = '\0'; /* handle "\r\n" EOL */
464     *len = eol - cmd;
466     return cmd;
467   }
469   /* NOTREACHED */
470   assert(1==0);
473 /* add the characters directly to the write buffer */
474 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
476   char *new_buf;
478   assert(sock != NULL);
480   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
481   if (new_buf == NULL)
482   {
483     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
484     return -1;
485   }
487   strncpy(new_buf + sock->wbuf_len, str, len + 1);
489   sock->wbuf = new_buf;
490   sock->wbuf_len += len;
492   return 0;
493 } /* }}} static int add_to_wbuf */
495 /* add the text to the "extra" info that's sent after the status line */
496 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
498   va_list argp;
499   char buffer[CMD_MAX];
500   int len;
502   if (sock == NULL) return 0; /* journal replay mode */
503   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
505   va_start(argp, fmt);
506 #ifdef HAVE_VSNPRINTF
507   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
508 #else
509   len = vsprintf(buffer, fmt, argp);
510 #endif
511   va_end(argp);
512   if (len < 0)
513   {
514     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
515     return -1;
516   }
518   return add_to_wbuf(sock, buffer, len);
519 } /* }}} static int add_response_info */
521 static int count_lines(char *str) /* {{{ */
523   int lines = 0;
525   if (str != NULL)
526   {
527     while ((str = strchr(str, '\n')) != NULL)
528     {
529       ++lines;
530       ++str;
531     }
532   }
534   return lines;
535 } /* }}} static int count_lines */
537 /* send the response back to the user.
538  * returns 0 on success, -1 on error
539  * write buffer is always zeroed after this call */
540 static int send_response (listen_socket_t *sock, response_code rc,
541                           char *fmt, ...) /* {{{ */
543   va_list argp;
544   char buffer[CMD_MAX];
545   int lines;
546   ssize_t wrote;
547   int rclen, len;
549   if (sock == NULL) return rc;  /* journal replay mode */
551   if (sock->batch_start)
552   {
553     if (rc == RESP_OK)
554       return rc; /* no response on success during BATCH */
555     lines = sock->batch_cmd;
556   }
557   else if (rc == RESP_OK)
558     lines = count_lines(sock->wbuf);
559   else
560     lines = -1;
562   rclen = sprintf(buffer, "%d ", lines);
563   va_start(argp, fmt);
564 #ifdef HAVE_VSNPRINTF
565   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
566 #else
567   len = vsprintf(buffer+rclen, fmt, argp);
568 #endif
569   va_end(argp);
570   if (len < 0)
571     return -1;
573   len += rclen;
575   /* append the result to the wbuf, don't write to the user */
576   if (sock->batch_start)
577     return add_to_wbuf(sock, buffer, len);
579   /* first write must be complete */
580   if (len != write(sock->fd, buffer, len))
581   {
582     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
583     return -1;
584   }
586   if (sock->wbuf != NULL && rc == RESP_OK)
587   {
588     wrote = 0;
589     while (wrote < sock->wbuf_len)
590     {
591       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
592       if (wb <= 0)
593       {
594         RRDD_LOG(LOG_INFO, "send_response: could not write results");
595         return -1;
596       }
597       wrote += wb;
598     }
599   }
601   free(sock->wbuf); sock->wbuf = NULL;
602   sock->wbuf_len = 0;
604   return 0;
605 } /* }}} */
607 static void wipe_ci_values(cache_item_t *ci, time_t when)
609   ci->values = NULL;
610   ci->values_num = 0;
612   ci->last_flush_time = when;
613   if (config_write_jitter > 0)
614     ci->last_flush_time += (rrd_random() % config_write_jitter);
617 /* remove_from_queue
618  * remove a "cache_item_t" item from the queue.
619  * must hold 'cache_lock' when calling this
620  */
621 static void remove_from_queue(cache_item_t *ci) /* {{{ */
623   if (ci == NULL) return;
624   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
626   if (ci->prev == NULL)
627     cache_queue_head = ci->next; /* reset head */
628   else
629     ci->prev->next = ci->next;
631   if (ci->next == NULL)
632     cache_queue_tail = ci->prev; /* reset the tail */
633   else
634     ci->next->prev = ci->prev;
636   ci->next = ci->prev = NULL;
637   ci->flags &= ~CI_FLAGS_IN_QUEUE;
639   pthread_mutex_lock (&stats_lock);
640   assert (stats_queue_length > 0);
641   stats_queue_length--;
642   pthread_mutex_unlock (&stats_lock);
644 } /* }}} static void remove_from_queue */
646 /* free the resources associated with the cache_item_t
647  * must hold cache_lock when calling this function
648  */
649 static void *free_cache_item(cache_item_t *ci) /* {{{ */
651   if (ci == NULL) return NULL;
653   remove_from_queue(ci);
655   for (size_t i=0; i < ci->values_num; i++)
656     free(ci->values[i]);
658   free (ci->values);
659   free (ci->file);
661   /* in case anyone is waiting */
662   pthread_cond_broadcast(&ci->flushed);
663   pthread_cond_destroy(&ci->flushed);
665   free (ci);
667   return NULL;
668 } /* }}} static void *free_cache_item */
670 /*
671  * enqueue_cache_item:
672  * `cache_lock' must be acquired before calling this function!
673  */
674 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
675     queue_side_t side)
677   if (ci == NULL)
678     return (-1);
680   if (ci->values_num == 0)
681     return (0);
683   if (side == HEAD)
684   {
685     if (cache_queue_head == ci)
686       return 0;
688     /* remove if further down in queue */
689     remove_from_queue(ci);
691     ci->prev = NULL;
692     ci->next = cache_queue_head;
693     if (ci->next != NULL)
694       ci->next->prev = ci;
695     cache_queue_head = ci;
697     if (cache_queue_tail == NULL)
698       cache_queue_tail = cache_queue_head;
699   }
700   else /* (side == TAIL) */
701   {
702     /* We don't move values back in the list.. */
703     if (ci->flags & CI_FLAGS_IN_QUEUE)
704       return (0);
706     assert (ci->next == NULL);
707     assert (ci->prev == NULL);
709     ci->prev = cache_queue_tail;
711     if (cache_queue_tail == NULL)
712       cache_queue_head = ci;
713     else
714       cache_queue_tail->next = ci;
716     cache_queue_tail = ci;
717   }
719   ci->flags |= CI_FLAGS_IN_QUEUE;
721   pthread_cond_signal(&queue_cond);
722   pthread_mutex_lock (&stats_lock);
723   stats_queue_length++;
724   pthread_mutex_unlock (&stats_lock);
726   return (0);
727 } /* }}} int enqueue_cache_item */
729 /*
730  * tree_callback_flush:
731  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
732  * while this is in progress.
733  */
734 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
735     gpointer data)
737   cache_item_t *ci;
738   callback_flush_data_t *cfd;
740   ci = (cache_item_t *) value;
741   cfd = (callback_flush_data_t *) data;
743   if (ci->flags & CI_FLAGS_IN_QUEUE)
744     return FALSE;
746   if (ci->values_num > 0
747       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
748   {
749     enqueue_cache_item (ci, TAIL);
750   }
751   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
752       && (ci->values_num <= 0))
753   {
754     assert ((char *) key == ci->file);
755     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
756     {
757       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
758       return (FALSE);
759     }
760   }
762   return (FALSE);
763 } /* }}} gboolean tree_callback_flush */
765 static int flush_old_values (int max_age)
767   callback_flush_data_t cfd;
768   size_t k;
770   memset (&cfd, 0, sizeof (cfd));
771   /* Pass the current time as user data so that we don't need to call
772    * `time' for each node. */
773   cfd.now = time (NULL);
774   cfd.keys = NULL;
775   cfd.keys_num = 0;
777   if (max_age > 0)
778     cfd.abs_timeout = cfd.now - max_age;
779   else
780     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
782   /* `tree_callback_flush' will return the keys of all values that haven't
783    * been touched in the last `config_flush_interval' seconds in `cfd'.
784    * The char*'s in this array point to the same memory as ci->file, so we
785    * don't need to free them separately. */
786   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
788   for (k = 0; k < cfd.keys_num; k++)
789   {
790     /* should never fail, since we have held the cache_lock
791      * the entire time */
792     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
793   }
795   if (cfd.keys != NULL)
796   {
797     free (cfd.keys);
798     cfd.keys = NULL;
799   }
801   return (0);
802 } /* int flush_old_values */
804 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
806   struct timeval now;
807   struct timespec next_flush;
808   int status;
810   gettimeofday (&now, NULL);
811   next_flush.tv_sec = now.tv_sec + config_flush_interval;
812   next_flush.tv_nsec = 1000 * now.tv_usec;
814   pthread_mutex_lock(&cache_lock);
816   while (state == RUNNING)
817   {
818     gettimeofday (&now, NULL);
819     if ((now.tv_sec > next_flush.tv_sec)
820         || ((now.tv_sec == next_flush.tv_sec)
821           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
822     {
823       /* Flush all values that haven't been written in the last
824        * `config_write_interval' seconds. */
825       flush_old_values (config_write_interval);
827       /* Determine the time of the next cache flush. */
828       next_flush.tv_sec =
829         now.tv_sec + next_flush.tv_sec % config_flush_interval;
831       /* unlock the cache while we rotate so we don't block incoming
832        * updates if the fsync() blocks on disk I/O */
833       pthread_mutex_unlock(&cache_lock);
834       journal_rotate();
835       pthread_mutex_lock(&cache_lock);
836     }
838     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
839     if (status != 0 && status != ETIMEDOUT)
840     {
841       RRDD_LOG (LOG_ERR, "flush_thread_main: "
842                 "pthread_cond_timedwait returned %i.", status);
843     }
844   }
846   if (config_flush_at_shutdown)
847     flush_old_values (-1); /* flush everything */
849   state = SHUTDOWN;
851   pthread_mutex_unlock(&cache_lock);
853   return NULL;
854 } /* void *flush_thread_main */
856 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
858   pthread_mutex_lock (&cache_lock);
860   while (state != SHUTDOWN
861          || (cache_queue_head != NULL && config_flush_at_shutdown))
862   {
863     cache_item_t *ci;
864     char *file;
865     char **values;
866     size_t values_num;
867     int status;
869     /* Now, check if there's something to store away. If not, wait until
870      * something comes in. */
871     if (cache_queue_head == NULL)
872     {
873       status = pthread_cond_wait (&queue_cond, &cache_lock);
874       if ((status != 0) && (status != ETIMEDOUT))
875       {
876         RRDD_LOG (LOG_ERR, "queue_thread_main: "
877             "pthread_cond_wait returned %i.", status);
878       }
879     }
881     /* Check if a value has arrived. This may be NULL if we timed out or there
882      * was an interrupt such as a signal. */
883     if (cache_queue_head == NULL)
884       continue;
886     ci = cache_queue_head;
888     /* copy the relevant parts */
889     file = strdup (ci->file);
890     if (file == NULL)
891     {
892       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
893       continue;
894     }
896     assert(ci->values != NULL);
897     assert(ci->values_num > 0);
899     values = ci->values;
900     values_num = ci->values_num;
902     wipe_ci_values(ci, time(NULL));
903     remove_from_queue(ci);
905     pthread_mutex_unlock (&cache_lock);
907     rrd_clear_error ();
908     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
909     if (status != 0)
910     {
911       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
912           "rrd_update_r (%s) failed with status %i. (%s)",
913           file, status, rrd_get_error());
914     }
916     journal_write("wrote", file);
918     /* Search again in the tree.  It's possible someone issued a "FORGET"
919      * while we were writing the update values. */
920     pthread_mutex_lock(&cache_lock);
921     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
922     if (ci)
923       pthread_cond_broadcast(&ci->flushed);
924     pthread_mutex_unlock(&cache_lock);
926     rrd_free_ptrs((void ***) &values, &values_num);
927     free(file);
929     if (status == 0)
930     {
931       pthread_mutex_lock (&stats_lock);
932       stats_updates_written++;
933       stats_data_sets_written += values_num;
934       pthread_mutex_unlock (&stats_lock);
935     }
937     pthread_mutex_lock (&cache_lock);
938   }
939   pthread_mutex_unlock (&cache_lock);
941   return (NULL);
942 } /* }}} void *queue_thread_main */
944 static int buffer_get_field (char **buffer_ret, /* {{{ */
945     size_t *buffer_size_ret, char **field_ret)
947   char *buffer;
948   size_t buffer_pos;
949   size_t buffer_size;
950   char *field;
951   size_t field_size;
952   int status;
954   buffer = *buffer_ret;
955   buffer_pos = 0;
956   buffer_size = *buffer_size_ret;
957   field = *buffer_ret;
958   field_size = 0;
960   if (buffer_size <= 0)
961     return (-1);
963   /* This is ensured by `handle_request'. */
964   assert (buffer[buffer_size - 1] == '\0');
966   status = -1;
967   while (buffer_pos < buffer_size)
968   {
969     /* Check for end-of-field or end-of-buffer */
970     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
971     {
972       field[field_size] = 0;
973       field_size++;
974       buffer_pos++;
975       status = 0;
976       break;
977     }
978     /* Handle escaped characters. */
979     else if (buffer[buffer_pos] == '\\')
980     {
981       if (buffer_pos >= (buffer_size - 1))
982         break;
983       buffer_pos++;
984       field[field_size] = buffer[buffer_pos];
985       field_size++;
986       buffer_pos++;
987     }
988     /* Normal operation */ 
989     else
990     {
991       field[field_size] = buffer[buffer_pos];
992       field_size++;
993       buffer_pos++;
994     }
995   } /* while (buffer_pos < buffer_size) */
997   if (status != 0)
998     return (status);
1000   *buffer_ret = buffer + buffer_pos;
1001   *buffer_size_ret = buffer_size - buffer_pos;
1002   *field_ret = field;
1004   return (0);
1005 } /* }}} int buffer_get_field */
1007 /* if we're restricting writes to the base directory,
1008  * check whether the file falls within the dir
1009  * returns 1 if OK, otherwise 0
1010  */
1011 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1013   assert(file != NULL);
1015   if (!config_write_base_only
1016       || sock == NULL /* journal replay */
1017       || config_base_dir == NULL)
1018     return 1;
1020   if (strstr(file, "../") != NULL) goto err;
1022   /* relative paths without "../" are ok */
1023   if (*file != '/') return 1;
1025   /* file must be of the format base + "/" + <1+ char filename> */
1026   if (strlen(file) < _config_base_dir_len + 2) goto err;
1027   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1028   if (*(file + _config_base_dir_len) != '/') goto err;
1030   return 1;
1032 err:
1033   if (sock != NULL && sock->fd >= 0)
1034     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1036   return 0;
1037 } /* }}} static int check_file_access */
1039 /* when using a base dir, convert relative paths to absolute paths.
1040  * if necessary, modifies the "filename" pointer to point
1041  * to the new path created in "tmp".  "tmp" is provided
1042  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1043  *
1044  * this allows us to optimize for the expected case (absolute path)
1045  * with a no-op.
1046  */
1047 static void get_abs_path(char **filename, char *tmp)
1049   assert(tmp != NULL);
1050   assert(filename != NULL && *filename != NULL);
1052   if (config_base_dir == NULL || **filename == '/')
1053     return;
1055   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1056   *filename = tmp;
1057 } /* }}} static int get_abs_path */
1059 /* returns 1 if we have the required privilege level,
1060  * otherwise issue an error to the user on sock */
1061 static int has_privilege (listen_socket_t *sock, /* {{{ */
1062                           socket_privilege priv)
1064   if (sock == NULL) /* journal replay */
1065     return 1;
1067   if (sock->privilege >= priv)
1068     return 1;
1070   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1071 } /* }}} static int has_privilege */
1073 static int flush_file (const char *filename) /* {{{ */
1075   cache_item_t *ci;
1077   pthread_mutex_lock (&cache_lock);
1079   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1080   if (ci == NULL)
1081   {
1082     pthread_mutex_unlock (&cache_lock);
1083     return (ENOENT);
1084   }
1086   if (ci->values_num > 0)
1087   {
1088     /* Enqueue at head */
1089     enqueue_cache_item (ci, HEAD);
1090     pthread_cond_wait(&ci->flushed, &cache_lock);
1091   }
1093   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1094    * may have been purged during our cond_wait() */
1096   pthread_mutex_unlock(&cache_lock);
1098   return (0);
1099 } /* }}} int flush_file */
1101 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1103   char *err = "Syntax error.\n";
1105   if (cmd && cmd->syntax)
1106     err = cmd->syntax;
1108   return send_response(sock, RESP_ERR, "Usage: %s", err);
1109 } /* }}} static int syntax_error() */
1111 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1113   uint64_t copy_queue_length;
1114   uint64_t copy_updates_received;
1115   uint64_t copy_flush_received;
1116   uint64_t copy_updates_written;
1117   uint64_t copy_data_sets_written;
1118   uint64_t copy_journal_bytes;
1119   uint64_t copy_journal_rotate;
1121   uint64_t tree_nodes_number;
1122   uint64_t tree_depth;
1124   pthread_mutex_lock (&stats_lock);
1125   copy_queue_length       = stats_queue_length;
1126   copy_updates_received   = stats_updates_received;
1127   copy_flush_received     = stats_flush_received;
1128   copy_updates_written    = stats_updates_written;
1129   copy_data_sets_written  = stats_data_sets_written;
1130   copy_journal_bytes      = stats_journal_bytes;
1131   copy_journal_rotate     = stats_journal_rotate;
1132   pthread_mutex_unlock (&stats_lock);
1134   pthread_mutex_lock (&cache_lock);
1135   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1136   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1137   pthread_mutex_unlock (&cache_lock);
1139   add_response_info(sock,
1140                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1141   add_response_info(sock,
1142                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1143   add_response_info(sock,
1144                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1145   add_response_info(sock,
1146                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1147   add_response_info(sock,
1148                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1149   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1150   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1151   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1152   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1154   send_response(sock, RESP_OK, "Statistics follow\n");
1156   return (0);
1157 } /* }}} int handle_request_stats */
1159 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1161   char *file, file_tmp[PATH_MAX];
1162   int status;
1164   status = buffer_get_field (&buffer, &buffer_size, &file);
1165   if (status != 0)
1166   {
1167     return syntax_error(sock,cmd);
1168   }
1169   else
1170   {
1171     pthread_mutex_lock(&stats_lock);
1172     stats_flush_received++;
1173     pthread_mutex_unlock(&stats_lock);
1175     get_abs_path(&file, file_tmp);
1176     if (!check_file_access(file, sock)) return 0;
1178     status = flush_file (file);
1179     if (status == 0)
1180       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1181     else if (status == ENOENT)
1182     {
1183       /* no file in our tree; see whether it exists at all */
1184       struct stat statbuf;
1186       memset(&statbuf, 0, sizeof(statbuf));
1187       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1188         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1189       else
1190         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1191     }
1192     else if (status < 0)
1193       return send_response(sock, RESP_ERR, "Internal error.\n");
1194     else
1195       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1196   }
1198   /* NOTREACHED */
1199   assert(1==0);
1200 } /* }}} int handle_request_flush */
1202 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1204   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1206   pthread_mutex_lock(&cache_lock);
1207   flush_old_values(-1);
1208   pthread_mutex_unlock(&cache_lock);
1210   return send_response(sock, RESP_OK, "Started flush.\n");
1211 } /* }}} static int handle_request_flushall */
1213 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1215   int status;
1216   char *file, file_tmp[PATH_MAX];
1217   cache_item_t *ci;
1219   status = buffer_get_field(&buffer, &buffer_size, &file);
1220   if (status != 0)
1221     return syntax_error(sock,cmd);
1223   get_abs_path(&file, file_tmp);
1225   pthread_mutex_lock(&cache_lock);
1226   ci = g_tree_lookup(cache_tree, file);
1227   if (ci == NULL)
1228   {
1229     pthread_mutex_unlock(&cache_lock);
1230     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1231   }
1233   for (size_t i=0; i < ci->values_num; i++)
1234     add_response_info(sock, "%s\n", ci->values[i]);
1236   pthread_mutex_unlock(&cache_lock);
1237   return send_response(sock, RESP_OK, "updates pending\n");
1238 } /* }}} static int handle_request_pending */
1240 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1242   int status;
1243   gboolean found;
1244   char *file, file_tmp[PATH_MAX];
1246   status = buffer_get_field(&buffer, &buffer_size, &file);
1247   if (status != 0)
1248     return syntax_error(sock,cmd);
1250   get_abs_path(&file, file_tmp);
1251   if (!check_file_access(file, sock)) return 0;
1253   pthread_mutex_lock(&cache_lock);
1254   found = g_tree_remove(cache_tree, file);
1255   pthread_mutex_unlock(&cache_lock);
1257   if (found == TRUE)
1258   {
1259     if (sock != NULL)
1260       journal_write("forget", file);
1262     return send_response(sock, RESP_OK, "Gone!\n");
1263   }
1264   else
1265     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267   /* NOTREACHED */
1268   assert(1==0);
1269 } /* }}} static int handle_request_forget */
1271 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1273   cache_item_t *ci;
1275   pthread_mutex_lock(&cache_lock);
1277   ci = cache_queue_head;
1278   while (ci != NULL)
1279   {
1280     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1281     ci = ci->next;
1282   }
1284   pthread_mutex_unlock(&cache_lock);
1286   return send_response(sock, RESP_OK, "in queue.\n");
1287 } /* }}} int handle_request_queue */
1289 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1291   char *file, file_tmp[PATH_MAX];
1292   int values_num = 0;
1293   int status;
1294   char orig_buf[CMD_MAX];
1296   cache_item_t *ci;
1298   /* save it for the journal later */
1299   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1301   status = buffer_get_field (&buffer, &buffer_size, &file);
1302   if (status != 0)
1303     return syntax_error(sock,cmd);
1305   pthread_mutex_lock(&stats_lock);
1306   stats_updates_received++;
1307   pthread_mutex_unlock(&stats_lock);
1309   get_abs_path(&file, file_tmp);
1310   if (!check_file_access(file, sock)) return 0;
1312   pthread_mutex_lock (&cache_lock);
1313   ci = g_tree_lookup (cache_tree, file);
1315   if (ci == NULL) /* {{{ */
1316   {
1317     struct stat statbuf;
1318     cache_item_t *tmp;
1320     /* don't hold the lock while we setup; stat(2) might block */
1321     pthread_mutex_unlock(&cache_lock);
1323     memset (&statbuf, 0, sizeof (statbuf));
1324     status = stat (file, &statbuf);
1325     if (status != 0)
1326     {
1327       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1329       status = errno;
1330       if (status == ENOENT)
1331         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1332       else
1333         return send_response(sock, RESP_ERR,
1334                              "stat failed with error %i.\n", status);
1335     }
1336     if (!S_ISREG (statbuf.st_mode))
1337       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1339     if (access(file, R_OK|W_OK) != 0)
1340       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1341                            file, rrd_strerror(errno));
1343     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1344     if (ci == NULL)
1345     {
1346       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1348       return send_response(sock, RESP_ERR, "malloc failed.\n");
1349     }
1350     memset (ci, 0, sizeof (cache_item_t));
1352     ci->file = strdup (file);
1353     if (ci->file == NULL)
1354     {
1355       free (ci);
1356       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1358       return send_response(sock, RESP_ERR, "strdup failed.\n");
1359     }
1361     wipe_ci_values(ci, now);
1362     ci->flags = CI_FLAGS_IN_TREE;
1363     pthread_cond_init(&ci->flushed, NULL);
1365     pthread_mutex_lock(&cache_lock);
1367     /* another UPDATE might have added this entry in the meantime */
1368     tmp = g_tree_lookup (cache_tree, file);
1369     if (tmp == NULL)
1370       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1371     else
1372     {
1373       free_cache_item (ci);
1374       ci = tmp;
1375     }
1377     /* state may have changed while we were unlocked */
1378     if (state == SHUTDOWN)
1379       return -1;
1380   } /* }}} */
1381   assert (ci != NULL);
1383   /* don't re-write updates in replay mode */
1384   if (sock != NULL)
1385     journal_write("update", orig_buf);
1387   while (buffer_size > 0)
1388   {
1389     char *value;
1390     time_t stamp;
1391     char *eostamp;
1393     status = buffer_get_field (&buffer, &buffer_size, &value);
1394     if (status != 0)
1395     {
1396       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1397       break;
1398     }
1400     /* make sure update time is always moving forward */
1401     stamp = strtol(value, &eostamp, 10);
1402     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1403     {
1404       pthread_mutex_unlock(&cache_lock);
1405       return send_response(sock, RESP_ERR,
1406                            "Cannot find timestamp in '%s'!\n", value);
1407     }
1408     else if (stamp <= ci->last_update_stamp)
1409     {
1410       pthread_mutex_unlock(&cache_lock);
1411       return send_response(sock, RESP_ERR,
1412                            "illegal attempt to update using time %ld when last"
1413                            " update time is %ld (minimum one second step)\n",
1414                            stamp, ci->last_update_stamp);
1415     }
1416     else
1417       ci->last_update_stamp = stamp;
1419     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1420     {
1421       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1422       continue;
1423     }
1425     values_num++;
1426   }
1428   if (((now - ci->last_flush_time) >= config_write_interval)
1429       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1430       && (ci->values_num > 0))
1431   {
1432     enqueue_cache_item (ci, TAIL);
1433   }
1435   pthread_mutex_unlock (&cache_lock);
1437   if (values_num < 1)
1438     return send_response(sock, RESP_ERR, "No values updated.\n");
1439   else
1440     return send_response(sock, RESP_OK,
1441                          "errors, enqueued %i value(s).\n", values_num);
1443   /* NOTREACHED */
1444   assert(1==0);
1446 } /* }}} int handle_request_update */
1448 /* we came across a "WROTE" entry during journal replay.
1449  * throw away any values that we have accumulated for this file
1450  */
1451 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1453   cache_item_t *ci;
1454   const char *file = buffer;
1456   pthread_mutex_lock(&cache_lock);
1458   ci = g_tree_lookup(cache_tree, file);
1459   if (ci == NULL)
1460   {
1461     pthread_mutex_unlock(&cache_lock);
1462     return (0);
1463   }
1465   if (ci->values)
1466     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1468   wipe_ci_values(ci, now);
1469   remove_from_queue(ci);
1471   pthread_mutex_unlock(&cache_lock);
1472   return (0);
1473 } /* }}} int handle_request_wrote */
1475 /* start "BATCH" processing */
1476 static int batch_start (HANDLER_PROTO) /* {{{ */
1478   int status;
1479   if (sock->batch_start)
1480     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1482   status = send_response(sock, RESP_OK,
1483                          "Go ahead.  End with dot '.' on its own line.\n");
1484   sock->batch_start = time(NULL);
1485   sock->batch_cmd = 0;
1487   return status;
1488 } /* }}} static int batch_start */
1490 /* finish "BATCH" processing and return results to the client */
1491 static int batch_done (HANDLER_PROTO) /* {{{ */
1493   assert(sock->batch_start);
1494   sock->batch_start = 0;
1495   sock->batch_cmd  = 0;
1496   return send_response(sock, RESP_OK, "errors\n");
1497 } /* }}} static int batch_done */
1499 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1501   return -1;
1502 } /* }}} static int handle_request_quit */
1504 struct command COMMANDS[] = {
1505   {
1506     "UPDATE",
1507     handle_request_update,
1508     PRIV_HIGH,
1509     CMD_CONTEXT_ANY,
1510     "UPDATE <filename> <values> [<values> ...]\n"
1511     ,
1512     "Adds the given file to the internal cache if it is not yet known and\n"
1513     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1514     "for details.\n"
1515     "\n"
1516     "Each <values> has the following form:\n"
1517     "  <values> = <time>:<value>[:<value>[...]]\n"
1518     "See the rrdupdate(1) manpage for details.\n"
1519   },
1520   {
1521     "WROTE",
1522     handle_request_wrote,
1523     PRIV_HIGH,
1524     CMD_CONTEXT_JOURNAL,
1525     NULL,
1526     NULL
1527   },
1528   {
1529     "FLUSH",
1530     handle_request_flush,
1531     PRIV_LOW,
1532     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1533     "FLUSH <filename>\n"
1534     ,
1535     "Adds the given filename to the head of the update queue and returns\n"
1536     "after it has been dequeued.\n"
1537   },
1538   {
1539     "FLUSHALL",
1540     handle_request_flushall,
1541     PRIV_HIGH,
1542     CMD_CONTEXT_CLIENT,
1543     "FLUSHALL\n"
1544     ,
1545     "Triggers writing of all pending updates.  Returns immediately.\n"
1546   },
1547   {
1548     "PENDING",
1549     handle_request_pending,
1550     PRIV_HIGH,
1551     CMD_CONTEXT_CLIENT,
1552     "PENDING <filename>\n"
1553     ,
1554     "Shows any 'pending' updates for a file, in order.\n"
1555     "The updates shown have not yet been written to the underlying RRD file.\n"
1556   },
1557   {
1558     "FORGET",
1559     handle_request_forget,
1560     PRIV_HIGH,
1561     CMD_CONTEXT_ANY,
1562     "FORGET <filename>\n"
1563     ,
1564     "Removes the file completely from the cache.\n"
1565     "Any pending updates for the file will be lost.\n"
1566   },
1567   {
1568     "QUEUE",
1569     handle_request_queue,
1570     PRIV_LOW,
1571     CMD_CONTEXT_CLIENT,
1572     "QUEUE\n"
1573     ,
1574         "Shows all files in the output queue.\n"
1575     "The output is zero or more lines in the following format:\n"
1576     "(where <num_vals> is the number of values to be written)\n"
1577     "\n"
1578     "<num_vals> <filename>\n"
1579   },
1580   {
1581     "STATS",
1582     handle_request_stats,
1583     PRIV_LOW,
1584     CMD_CONTEXT_CLIENT,
1585     "STATS\n"
1586     ,
1587     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1588     "a description of the values.\n"
1589   },
1590   {
1591     "HELP",
1592     handle_request_help,
1593     PRIV_LOW,
1594     CMD_CONTEXT_CLIENT,
1595     "HELP [<command>]\n",
1596     NULL, /* special! */
1597   },
1598   {
1599     "BATCH",
1600     batch_start,
1601     PRIV_LOW,
1602     CMD_CONTEXT_CLIENT,
1603     "BATCH\n"
1604     ,
1605     "The 'BATCH' command permits the client to initiate a bulk load\n"
1606     "   of commands to rrdcached.\n"
1607     "\n"
1608     "Usage:\n"
1609     "\n"
1610     "    client: BATCH\n"
1611     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1612     "    client: command #1\n"
1613     "    client: command #2\n"
1614     "    client: ... and so on\n"
1615     "    client: .\n"
1616     "    server: 2 errors\n"
1617     "    server: 7 message for command #7\n"
1618     "    server: 9 message for command #9\n"
1619     "\n"
1620     "For more information, consult the rrdcached(1) documentation.\n"
1621   },
1622   {
1623     ".",   /* BATCH terminator */
1624     batch_done,
1625     PRIV_LOW,
1626     CMD_CONTEXT_BATCH,
1627     NULL,
1628     NULL
1629   },
1630   {
1631     "QUIT",
1632     handle_request_quit,
1633     PRIV_LOW,
1634     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1635     "QUIT\n"
1636     ,
1637     "Disconnect from rrdcached.\n"
1638   },
1639   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1640 };
1642 static struct command *find_command(char *cmd)
1644   struct command *c = COMMANDS;
1646   while (c->cmd != NULL)
1647   {
1648     if (strcasecmp(cmd, c->cmd) == 0)
1649       break;
1650     c++;
1651   }
1653   if (c->cmd == NULL)
1654     return NULL;
1655   else
1656     return c;
1659 /* check whether commands are received in the expected context */
1660 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1662   if (sock == NULL)
1663     return (cmd->context & CMD_CONTEXT_JOURNAL);
1664   else if (sock->batch_start)
1665     return (cmd->context & CMD_CONTEXT_BATCH);
1666   else
1667     return (cmd->context & CMD_CONTEXT_CLIENT);
1669   /* NOTREACHED */
1670   assert(1==0);
1673 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1675   int status;
1676   char *cmd_str;
1677   char *resp_txt;
1678   struct command *help = NULL;
1680   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1681   if (status == 0)
1682     help = find_command(cmd_str);
1684   if (help && (help->syntax || help->help))
1685   {
1686     char tmp[CMD_MAX];
1688     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1689     resp_txt = tmp;
1691     if (help->syntax)
1692       add_response_info(sock, "Usage: %s\n", help->syntax);
1694     if (help->help)
1695       add_response_info(sock, "%s\n", help->help);
1696   }
1697   else
1698   {
1699     help = COMMANDS;
1700     resp_txt = "Command overview\n";
1702     while (help->cmd)
1703     {
1704       if (help->syntax)
1705         add_response_info(sock, "%s", help->syntax);
1706       help++;
1707     }
1708   }
1710   return send_response(sock, RESP_OK, resp_txt);
1711 } /* }}} int handle_request_help */
1713 /* if sock==NULL, we are in journal replay mode */
1714 static int handle_request (DISPATCH_PROTO) /* {{{ */
1716   char *buffer_ptr = buffer;
1717   char *cmd_str = NULL;
1718   struct command *cmd = NULL;
1719   int status;
1721   assert (buffer[buffer_size - 1] == '\0');
1723   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1724   if (status != 0)
1725   {
1726     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1727     return (-1);
1728   }
1730   if (sock != NULL && sock->batch_start)
1731     sock->batch_cmd++;
1733   cmd = find_command(cmd_str);
1734   if (!cmd)
1735     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1737   status = has_privilege(sock, cmd->min_priv);
1738   if (status <= 0)
1739     return status;
1741   if (!command_check_context(sock, cmd))
1742     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1744   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1745 } /* }}} int handle_request */
1747 /* MUST NOT hold journal_lock before calling this */
1748 static void journal_rotate(void) /* {{{ */
1750   FILE *old_fh = NULL;
1751   int new_fd;
1753   if (journal_cur == NULL || journal_old == NULL)
1754     return;
1756   pthread_mutex_lock(&journal_lock);
1758   /* we rotate this way (rename before close) so that the we can release
1759    * the journal lock as fast as possible.  Journal writes to the new
1760    * journal can proceed immediately after the new file is opened.  The
1761    * fclose can then block without affecting new updates.
1762    */
1763   if (journal_fh != NULL)
1764   {
1765     old_fh = journal_fh;
1766     journal_fh = NULL;
1767     rename(journal_cur, journal_old);
1768     ++stats_journal_rotate;
1769   }
1771   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1772                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1773   if (new_fd >= 0)
1774   {
1775     journal_fh = fdopen(new_fd, "a");
1776     if (journal_fh == NULL)
1777       close(new_fd);
1778   }
1780   pthread_mutex_unlock(&journal_lock);
1782   if (old_fh != NULL)
1783     fclose(old_fh);
1785   if (journal_fh == NULL)
1786   {
1787     RRDD_LOG(LOG_CRIT,
1788              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1789              journal_cur, rrd_strerror(errno));
1791     RRDD_LOG(LOG_ERR,
1792              "JOURNALING DISABLED: All values will be flushed at shutdown");
1793     config_flush_at_shutdown = 1;
1794   }
1796 } /* }}} static void journal_rotate */
1798 static void journal_done(void) /* {{{ */
1800   if (journal_cur == NULL)
1801     return;
1803   pthread_mutex_lock(&journal_lock);
1804   if (journal_fh != NULL)
1805   {
1806     fclose(journal_fh);
1807     journal_fh = NULL;
1808   }
1810   if (config_flush_at_shutdown)
1811   {
1812     RRDD_LOG(LOG_INFO, "removing journals");
1813     unlink(journal_old);
1814     unlink(journal_cur);
1815   }
1816   else
1817   {
1818     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1819              "journals will be used at next startup");
1820   }
1822   pthread_mutex_unlock(&journal_lock);
1824 } /* }}} static void journal_done */
1826 static int journal_write(char *cmd, char *args) /* {{{ */
1828   int chars;
1830   if (journal_fh == NULL)
1831     return 0;
1833   pthread_mutex_lock(&journal_lock);
1834   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1835   pthread_mutex_unlock(&journal_lock);
1837   if (chars > 0)
1838   {
1839     pthread_mutex_lock(&stats_lock);
1840     stats_journal_bytes += chars;
1841     pthread_mutex_unlock(&stats_lock);
1842   }
1844   return chars;
1845 } /* }}} static int journal_write */
1847 static int journal_replay (const char *file) /* {{{ */
1849   FILE *fh;
1850   int entry_cnt = 0;
1851   int fail_cnt = 0;
1852   uint64_t line = 0;
1853   char entry[CMD_MAX];
1854   time_t now;
1856   if (file == NULL) return 0;
1858   {
1859     char *reason = "unknown error";
1860     int status = 0;
1861     struct stat statbuf;
1863     memset(&statbuf, 0, sizeof(statbuf));
1864     if (stat(file, &statbuf) != 0)
1865     {
1866       if (errno == ENOENT)
1867         return 0;
1869       reason = "stat error";
1870       status = errno;
1871     }
1872     else if (!S_ISREG(statbuf.st_mode))
1873     {
1874       reason = "not a regular file";
1875       status = EPERM;
1876     }
1877     if (statbuf.st_uid != daemon_uid)
1878     {
1879       reason = "not owned by daemon user";
1880       status = EACCES;
1881     }
1882     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1883     {
1884       reason = "must not be user/group writable";
1885       status = EACCES;
1886     }
1888     if (status != 0)
1889     {
1890       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1891                file, rrd_strerror(status), reason);
1892       return 0;
1893     }
1894   }
1896   fh = fopen(file, "r");
1897   if (fh == NULL)
1898   {
1899     if (errno != ENOENT)
1900       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1901                file, rrd_strerror(errno));
1902     return 0;
1903   }
1904   else
1905     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1907   now = time(NULL);
1909   while(!feof(fh))
1910   {
1911     size_t entry_len;
1913     ++line;
1914     if (fgets(entry, sizeof(entry), fh) == NULL)
1915       break;
1916     entry_len = strlen(entry);
1918     /* check \n termination in case journal writing crashed mid-line */
1919     if (entry_len == 0)
1920       continue;
1921     else if (entry[entry_len - 1] != '\n')
1922     {
1923       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1924       ++fail_cnt;
1925       continue;
1926     }
1928     entry[entry_len - 1] = '\0';
1930     if (handle_request(NULL, now, entry, entry_len) == 0)
1931       ++entry_cnt;
1932     else
1933       ++fail_cnt;
1934   }
1936   fclose(fh);
1938   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1939            entry_cnt, fail_cnt);
1941   return entry_cnt > 0 ? 1 : 0;
1942 } /* }}} static int journal_replay */
1944 static void journal_init(void) /* {{{ */
1946   int had_journal = 0;
1948   if (journal_cur == NULL) return;
1950   pthread_mutex_lock(&journal_lock);
1952   RRDD_LOG(LOG_INFO, "checking for journal files");
1954   had_journal += journal_replay(journal_old);
1955   had_journal += journal_replay(journal_cur);
1957   /* it must have been a crash.  start a flush */
1958   if (had_journal && config_flush_at_shutdown)
1959     flush_old_values(-1);
1961   pthread_mutex_unlock(&journal_lock);
1962   journal_rotate();
1964   RRDD_LOG(LOG_INFO, "journal processing complete");
1966 } /* }}} static void journal_init */
1968 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1970   assert(sock != NULL);
1972   free(sock->rbuf);  sock->rbuf = NULL;
1973   free(sock->wbuf);  sock->wbuf = NULL;
1974   free(sock);
1975 } /* }}} void free_listen_socket */
1977 static void close_connection(listen_socket_t *sock) /* {{{ */
1979   if (sock->fd >= 0)
1980   {
1981     close(sock->fd);
1982     sock->fd = -1;
1983   }
1985   free_listen_socket(sock);
1987 } /* }}} void close_connection */
1989 static void *connection_thread_main (void *args) /* {{{ */
1991   listen_socket_t *sock;
1992   int fd;
1994   sock = (listen_socket_t *) args;
1995   fd = sock->fd;
1997   /* init read buffers */
1998   sock->next_read = sock->next_cmd = 0;
1999   sock->rbuf = malloc(RBUF_SIZE);
2000   if (sock->rbuf == NULL)
2001   {
2002     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2003     close_connection(sock);
2004     return NULL;
2005   }
2007   pthread_mutex_lock (&connection_threads_lock);
2008   connection_threads_num++;
2009   pthread_mutex_unlock (&connection_threads_lock);
2011   while (state == RUNNING)
2012   {
2013     char *cmd;
2014     ssize_t cmd_len;
2015     ssize_t rbytes;
2016     time_t now;
2018     struct pollfd pollfd;
2019     int status;
2021     pollfd.fd = fd;
2022     pollfd.events = POLLIN | POLLPRI;
2023     pollfd.revents = 0;
2025     status = poll (&pollfd, 1, /* timeout = */ 500);
2026     if (state != RUNNING)
2027       break;
2028     else if (status == 0) /* timeout */
2029       continue;
2030     else if (status < 0) /* error */
2031     {
2032       status = errno;
2033       if (status != EINTR)
2034         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2035       continue;
2036     }
2038     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2039       break;
2040     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2041     {
2042       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2043           "poll(2) returned something unexpected: %#04hx",
2044           pollfd.revents);
2045       break;
2046     }
2048     rbytes = read(fd, sock->rbuf + sock->next_read,
2049                   RBUF_SIZE - sock->next_read);
2050     if (rbytes < 0)
2051     {
2052       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2053       break;
2054     }
2055     else if (rbytes == 0)
2056       break; /* eof */
2058     sock->next_read += rbytes;
2060     if (sock->batch_start)
2061       now = sock->batch_start;
2062     else
2063       now = time(NULL);
2065     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2066     {
2067       status = handle_request (sock, now, cmd, cmd_len+1);
2068       if (status != 0)
2069         goto out_close;
2070     }
2071   }
2073 out_close:
2074   close_connection(sock);
2076   /* Remove this thread from the connection threads list */
2077   pthread_mutex_lock (&connection_threads_lock);
2078   connection_threads_num--;
2079   if (connection_threads_num <= 0)
2080     pthread_cond_broadcast(&connection_threads_done);
2081   pthread_mutex_unlock (&connection_threads_lock);
2083   return (NULL);
2084 } /* }}} void *connection_thread_main */
2086 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2088   int fd;
2089   struct sockaddr_un sa;
2090   listen_socket_t *temp;
2091   int status;
2092   const char *path;
2094   path = sock->addr;
2095   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2096     path += strlen("unix:");
2098   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2099       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2100   if (temp == NULL)
2101   {
2102     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2103     return (-1);
2104   }
2105   listen_fds = temp;
2106   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2108   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2109   if (fd < 0)
2110   {
2111     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2112              rrd_strerror(errno));
2113     return (-1);
2114   }
2116   memset (&sa, 0, sizeof (sa));
2117   sa.sun_family = AF_UNIX;
2118   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2120   /* if we've gotten this far, we own the pid file.  any daemon started
2121    * with the same args must not be alive.  therefore, ensure that we can
2122    * create the socket...
2123    */
2124   unlink(path);
2126   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2127   if (status != 0)
2128   {
2129     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2130              path, rrd_strerror(errno));
2131     close (fd);
2132     return (-1);
2133   }
2135   status = listen (fd, /* backlog = */ 10);
2136   if (status != 0)
2137   {
2138     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2139              path, rrd_strerror(errno));
2140     close (fd);
2141     unlink (path);
2142     return (-1);
2143   }
2145   listen_fds[listen_fds_num].fd = fd;
2146   listen_fds[listen_fds_num].family = PF_UNIX;
2147   strncpy(listen_fds[listen_fds_num].addr, path,
2148           sizeof (listen_fds[listen_fds_num].addr) - 1);
2149   listen_fds_num++;
2151   return (0);
2152 } /* }}} int open_listen_socket_unix */
2154 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2156   struct addrinfo ai_hints;
2157   struct addrinfo *ai_res;
2158   struct addrinfo *ai_ptr;
2159   char addr_copy[NI_MAXHOST];
2160   char *addr;
2161   char *port;
2162   int status;
2164   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2165   addr_copy[sizeof (addr_copy) - 1] = 0;
2166   addr = addr_copy;
2168   memset (&ai_hints, 0, sizeof (ai_hints));
2169   ai_hints.ai_flags = 0;
2170 #ifdef AI_ADDRCONFIG
2171   ai_hints.ai_flags |= AI_ADDRCONFIG;
2172 #endif
2173   ai_hints.ai_family = AF_UNSPEC;
2174   ai_hints.ai_socktype = SOCK_STREAM;
2176   port = NULL;
2177   if (*addr == '[') /* IPv6+port format */
2178   {
2179     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2180     addr++;
2182     port = strchr (addr, ']');
2183     if (port == NULL)
2184     {
2185       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2186       return (-1);
2187     }
2188     *port = 0;
2189     port++;
2191     if (*port == ':')
2192       port++;
2193     else if (*port == 0)
2194       port = NULL;
2195     else
2196     {
2197       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2198       return (-1);
2199     }
2200   } /* if (*addr = ']') */
2201   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2202   {
2203     port = rindex(addr, ':');
2204     if (port != NULL)
2205     {
2206       *port = 0;
2207       port++;
2208     }
2209   }
2210   ai_res = NULL;
2211   status = getaddrinfo (addr,
2212                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2213                         &ai_hints, &ai_res);
2214   if (status != 0)
2215   {
2216     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2217              addr, gai_strerror (status));
2218     return (-1);
2219   }
2221   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2222   {
2223     int fd;
2224     listen_socket_t *temp;
2225     int one = 1;
2227     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2228         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2229     if (temp == NULL)
2230     {
2231       fprintf (stderr,
2232                "rrdcached: open_listen_socket_network: realloc failed.\n");
2233       continue;
2234     }
2235     listen_fds = temp;
2236     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2238     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2239     if (fd < 0)
2240     {
2241       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2242                rrd_strerror(errno));
2243       continue;
2244     }
2246     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2248     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2249     if (status != 0)
2250     {
2251       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2252                sock->addr, rrd_strerror(errno));
2253       close (fd);
2254       continue;
2255     }
2257     status = listen (fd, /* backlog = */ 10);
2258     if (status != 0)
2259     {
2260       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2261                sock->addr, rrd_strerror(errno));
2262       close (fd);
2263       freeaddrinfo(ai_res);
2264       return (-1);
2265     }
2267     listen_fds[listen_fds_num].fd = fd;
2268     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2269     listen_fds_num++;
2270   } /* for (ai_ptr) */
2272   freeaddrinfo(ai_res);
2273   return (0);
2274 } /* }}} static int open_listen_socket_network */
2276 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2278   assert(sock != NULL);
2279   assert(sock->addr != NULL);
2281   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2282       || sock->addr[0] == '/')
2283     return (open_listen_socket_unix(sock));
2284   else
2285     return (open_listen_socket_network(sock));
2286 } /* }}} int open_listen_socket */
2288 static int close_listen_sockets (void) /* {{{ */
2290   size_t i;
2292   for (i = 0; i < listen_fds_num; i++)
2293   {
2294     close (listen_fds[i].fd);
2296     if (listen_fds[i].family == PF_UNIX)
2297       unlink(listen_fds[i].addr);
2298   }
2300   free (listen_fds);
2301   listen_fds = NULL;
2302   listen_fds_num = 0;
2304   return (0);
2305 } /* }}} int close_listen_sockets */
2307 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2309   struct pollfd *pollfds;
2310   int pollfds_num;
2311   int status;
2312   int i;
2314   if (listen_fds_num < 1)
2315   {
2316     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2317     return (NULL);
2318   }
2320   pollfds_num = listen_fds_num;
2321   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2322   if (pollfds == NULL)
2323   {
2324     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2325     return (NULL);
2326   }
2327   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2329   RRDD_LOG(LOG_INFO, "listening for connections");
2331   while (state == RUNNING)
2332   {
2333     for (i = 0; i < pollfds_num; i++)
2334     {
2335       pollfds[i].fd = listen_fds[i].fd;
2336       pollfds[i].events = POLLIN | POLLPRI;
2337       pollfds[i].revents = 0;
2338     }
2340     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2341     if (state != RUNNING)
2342       break;
2343     else if (status == 0) /* timeout */
2344       continue;
2345     else if (status < 0) /* error */
2346     {
2347       status = errno;
2348       if (status != EINTR)
2349       {
2350         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2351       }
2352       continue;
2353     }
2355     for (i = 0; i < pollfds_num; i++)
2356     {
2357       listen_socket_t *client_sock;
2358       struct sockaddr_storage client_sa;
2359       socklen_t client_sa_size;
2360       pthread_t tid;
2361       pthread_attr_t attr;
2363       if (pollfds[i].revents == 0)
2364         continue;
2366       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2367       {
2368         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2369             "poll(2) returned something unexpected for listen FD #%i.",
2370             pollfds[i].fd);
2371         continue;
2372       }
2374       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2375       if (client_sock == NULL)
2376       {
2377         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2378         continue;
2379       }
2380       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2382       client_sa_size = sizeof (client_sa);
2383       client_sock->fd = accept (pollfds[i].fd,
2384           (struct sockaddr *) &client_sa, &client_sa_size);
2385       if (client_sock->fd < 0)
2386       {
2387         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2388         free(client_sock);
2389         continue;
2390       }
2392       pthread_attr_init (&attr);
2393       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2395       status = pthread_create (&tid, &attr, connection_thread_main,
2396                                client_sock);
2397       if (status != 0)
2398       {
2399         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2400         close_connection(client_sock);
2401         continue;
2402       }
2403     } /* for (pollfds_num) */
2404   } /* while (state == RUNNING) */
2406   RRDD_LOG(LOG_INFO, "starting shutdown");
2408   close_listen_sockets ();
2410   pthread_mutex_lock (&connection_threads_lock);
2411   while (connection_threads_num > 0)
2412     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2413   pthread_mutex_unlock (&connection_threads_lock);
2415   free(pollfds);
2417   return (NULL);
2418 } /* }}} void *listen_thread_main */
2420 static int daemonize (void) /* {{{ */
2422   int pid_fd;
2423   char *base_dir;
2425   daemon_uid = geteuid();
2427   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2428   if (pid_fd < 0)
2429     pid_fd = check_pidfile();
2430   if (pid_fd < 0)
2431     return pid_fd;
2433   /* open all the listen sockets */
2434   if (config_listen_address_list_len > 0)
2435   {
2436     for (size_t i = 0; i < config_listen_address_list_len; i++)
2437       open_listen_socket (config_listen_address_list[i]);
2439     rrd_free_ptrs((void ***) &config_listen_address_list,
2440                   &config_listen_address_list_len);
2441   }
2442   else
2443   {
2444     listen_socket_t sock;
2445     memset(&sock, 0, sizeof(sock));
2446     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2447     open_listen_socket (&sock);
2448   }
2450   if (listen_fds_num < 1)
2451   {
2452     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2453     goto error;
2454   }
2456   if (!stay_foreground)
2457   {
2458     pid_t child;
2460     child = fork ();
2461     if (child < 0)
2462     {
2463       fprintf (stderr, "daemonize: fork(2) failed.\n");
2464       goto error;
2465     }
2466     else if (child > 0)
2467       exit(0);
2469     /* Become session leader */
2470     setsid ();
2472     /* Open the first three file descriptors to /dev/null */
2473     close (2);
2474     close (1);
2475     close (0);
2477     open ("/dev/null", O_RDWR);
2478     if (dup(0) == -1 || dup(0) == -1){
2479         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2480     }
2481   } /* if (!stay_foreground) */
2483   /* Change into the /tmp directory. */
2484   base_dir = (config_base_dir != NULL)
2485     ? config_base_dir
2486     : "/tmp";
2488   if (chdir (base_dir) != 0)
2489   {
2490     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2491     goto error;
2492   }
2494   install_signal_handlers();
2496   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2497   RRDD_LOG(LOG_INFO, "starting up");
2499   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2500                                 (GDestroyNotify) free_cache_item);
2501   if (cache_tree == NULL)
2502   {
2503     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2504     goto error;
2505   }
2507   return write_pidfile (pid_fd);
2509 error:
2510   remove_pidfile();
2511   return -1;
2512 } /* }}} int daemonize */
2514 static int cleanup (void) /* {{{ */
2516   pthread_cond_broadcast (&flush_cond);
2517   pthread_join (flush_thread, NULL);
2519   pthread_cond_broadcast (&queue_cond);
2520   for (int i = 0; i < config_queue_threads; i++)
2521     pthread_join (queue_threads[i], NULL);
2523   if (config_flush_at_shutdown)
2524   {
2525     assert(cache_queue_head == NULL);
2526     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2527   }
2529   journal_done();
2530   remove_pidfile ();
2532   free(queue_threads);
2533   free(config_base_dir);
2534   free(config_pid_file);
2535   free(journal_cur);
2536   free(journal_old);
2538   pthread_mutex_lock(&cache_lock);
2539   g_tree_destroy(cache_tree);
2541   RRDD_LOG(LOG_INFO, "goodbye");
2542   closelog ();
2544   return (0);
2545 } /* }}} int cleanup */
2547 static int read_options (int argc, char **argv) /* {{{ */
2549   int option;
2550   int status = 0;
2552   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2553   {
2554     switch (option)
2555     {
2556       case 'g':
2557         stay_foreground=1;
2558         break;
2560       case 'L':
2561       case 'l':
2562       {
2563         listen_socket_t *new;
2565         new = malloc(sizeof(listen_socket_t));
2566         if (new == NULL)
2567         {
2568           fprintf(stderr, "read_options: malloc failed.\n");
2569           return(2);
2570         }
2571         memset(new, 0, sizeof(listen_socket_t));
2573         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2574         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2576         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2577                          &config_listen_address_list_len, new))
2578         {
2579           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2580           return (2);
2581         }
2582       }
2583       break;
2585       case 'f':
2586       {
2587         int temp;
2589         temp = atoi (optarg);
2590         if (temp > 0)
2591           config_flush_interval = temp;
2592         else
2593         {
2594           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2595           status = 3;
2596         }
2597       }
2598       break;
2600       case 'w':
2601       {
2602         int temp;
2604         temp = atoi (optarg);
2605         if (temp > 0)
2606           config_write_interval = temp;
2607         else
2608         {
2609           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2610           status = 2;
2611         }
2612       }
2613       break;
2615       case 'z':
2616       {
2617         int temp;
2619         temp = atoi(optarg);
2620         if (temp > 0)
2621           config_write_jitter = temp;
2622         else
2623         {
2624           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2625           status = 2;
2626         }
2628         break;
2629       }
2631       case 't':
2632       {
2633         int threads;
2634         threads = atoi(optarg);
2635         if (threads >= 1)
2636           config_queue_threads = threads;
2637         else
2638         {
2639           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2640           return 1;
2641         }
2642       }
2643       break;
2645       case 'B':
2646         config_write_base_only = 1;
2647         break;
2649       case 'b':
2650       {
2651         size_t len;
2652         char base_realpath[PATH_MAX];
2654         if (config_base_dir != NULL)
2655           free (config_base_dir);
2656         config_base_dir = strdup (optarg);
2657         if (config_base_dir == NULL)
2658         {
2659           fprintf (stderr, "read_options: strdup failed.\n");
2660           return (3);
2661         }
2663         /* make sure that the base directory is not resolved via
2664          * symbolic links.  this makes some performance-enhancing
2665          * assumptions possible (we don't have to resolve paths
2666          * that start with a "/")
2667          */
2668         if (realpath(config_base_dir, base_realpath) == NULL)
2669         {
2670           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2671           return 5;
2672         }
2673         else if (strncmp(config_base_dir,
2674                          base_realpath, sizeof(base_realpath)) != 0)
2675         {
2676           fprintf(stderr,
2677                   "Base directory (-b) resolved via file system links!\n"
2678                   "Please consult rrdcached '-b' documentation!\n"
2679                   "Consider specifying the real directory (%s)\n",
2680                   base_realpath);
2681           return 5;
2682         }
2684         len = strlen (config_base_dir);
2685         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2686         {
2687           config_base_dir[len - 1] = 0;
2688           len--;
2689         }
2691         if (len < 1)
2692         {
2693           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2694           return (4);
2695         }
2697         _config_base_dir_len = len;
2698       }
2699       break;
2701       case 'p':
2702       {
2703         if (config_pid_file != NULL)
2704           free (config_pid_file);
2705         config_pid_file = strdup (optarg);
2706         if (config_pid_file == NULL)
2707         {
2708           fprintf (stderr, "read_options: strdup failed.\n");
2709           return (3);
2710         }
2711       }
2712       break;
2714       case 'F':
2715         config_flush_at_shutdown = 1;
2716         break;
2718       case 'j':
2719       {
2720         struct stat statbuf;
2721         const char *dir = optarg;
2723         status = stat(dir, &statbuf);
2724         if (status != 0)
2725         {
2726           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2727           return 6;
2728         }
2730         if (!S_ISDIR(statbuf.st_mode)
2731             || access(dir, R_OK|W_OK|X_OK) != 0)
2732         {
2733           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2734                   errno ? rrd_strerror(errno) : "");
2735           return 6;
2736         }
2738         journal_cur = malloc(PATH_MAX + 1);
2739         journal_old = malloc(PATH_MAX + 1);
2740         if (journal_cur == NULL || journal_old == NULL)
2741         {
2742           fprintf(stderr, "malloc failure for journal files\n");
2743           return 6;
2744         }
2745         else 
2746         {
2747           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2748           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2749         }
2750       }
2751       break;
2753       case 'h':
2754       case '?':
2755         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2756             "\n"
2757             "Usage: rrdcached [options]\n"
2758             "\n"
2759             "Valid options are:\n"
2760             "  -l <address>  Socket address to listen to.\n"
2761             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2762             "  -w <seconds>  Interval in which to write data.\n"
2763             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2764             "  -t <threads>  Number of write threads.\n"
2765             "  -f <seconds>  Interval in which to flush dead data.\n"
2766             "  -p <file>     Location of the PID-file.\n"
2767             "  -b <dir>      Base directory to change to.\n"
2768             "  -B            Restrict file access to paths within -b <dir>\n"
2769             "  -g            Do not fork and run in the foreground.\n"
2770             "  -j <dir>      Directory in which to create the journal files.\n"
2771             "  -F            Always flush all updates at shutdown\n"
2772             "\n"
2773             "For more information and a detailed description of all options "
2774             "please refer\n"
2775             "to the rrdcached(1) manual page.\n",
2776             VERSION);
2777         status = -1;
2778         break;
2779     } /* switch (option) */
2780   } /* while (getopt) */
2782   /* advise the user when values are not sane */
2783   if (config_flush_interval < 2 * config_write_interval)
2784     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2785             " 2x write interval (-w) !\n");
2786   if (config_write_jitter > config_write_interval)
2787     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2788             " write interval (-w) !\n");
2790   if (config_write_base_only && config_base_dir == NULL)
2791     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2792             "  Consult the rrdcached documentation\n");
2794   if (journal_cur == NULL)
2795     config_flush_at_shutdown = 1;
2797   return (status);
2798 } /* }}} int read_options */
2800 int main (int argc, char **argv)
2802   int status;
2804   status = read_options (argc, argv);
2805   if (status != 0)
2806   {
2807     if (status < 0)
2808       status = 0;
2809     return (status);
2810   }
2812   status = daemonize ();
2813   if (status != 0)
2814   {
2815     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2816     return (1);
2817   }
2819   journal_init();
2821   /* start the queue threads */
2822   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2823   if (queue_threads == NULL)
2824   {
2825     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2826     cleanup();
2827     return (1);
2828   }
2829   for (int i = 0; i < config_queue_threads; i++)
2830   {
2831     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2832     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2833     if (status != 0)
2834     {
2835       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2836       cleanup();
2837       return (1);
2838     }
2839   }
2841   /* start the flush thread */
2842   memset(&flush_thread, 0, sizeof(flush_thread));
2843   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2844   if (status != 0)
2845   {
2846     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2847     cleanup();
2848     return (1);
2849   }
2851   listen_thread_main (NULL);
2852   cleanup ();
2854   return (0);
2855 } /* int main */
2857 /*
2858  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2859  */