Code

The current method may have caused flushes (and journal rotations) more
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116  * Types
117  */
118 typedef enum
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
168   char *syntax;
169   char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
189 struct callback_flush_data_s
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 enum {
219   RUNNING,              /* normal operation */
220   FLUSHING,             /* flushing remaining values */
221   SHUTDOWN              /* shutting down */
222 } state = RUNNING;
224 static pthread_t *queue_threads;
225 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
226 static int config_queue_threads = 4;
228 static pthread_t flush_thread;
229 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
231 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
232 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
233 static int connection_threads_num = 0;
235 /* Cache stuff */
236 static GTree          *cache_tree = NULL;
237 static cache_item_t   *cache_queue_head = NULL;
238 static cache_item_t   *cache_queue_tail = NULL;
239 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
241 static int config_write_interval = 300;
242 static int config_write_jitter   = 0;
243 static int config_flush_interval = 3600;
244 static int config_flush_at_shutdown = 0;
245 static char *config_pid_file = NULL;
246 static char *config_base_dir = NULL;
247 static size_t _config_base_dir_len = 0;
248 static int config_write_base_only = 0;
250 static listen_socket_t **config_listen_address_list = NULL;
251 static size_t config_listen_address_list_len = 0;
253 static uint64_t stats_queue_length = 0;
254 static uint64_t stats_updates_received = 0;
255 static uint64_t stats_flush_received = 0;
256 static uint64_t stats_updates_written = 0;
257 static uint64_t stats_data_sets_written = 0;
258 static uint64_t stats_journal_bytes = 0;
259 static uint64_t stats_journal_rotate = 0;
260 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
262 /* Journaled updates */
263 static char *journal_cur = NULL;
264 static char *journal_old = NULL;
265 static FILE *journal_fh = NULL;
266 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
267 static int journal_write(char *cmd, char *args);
268 static void journal_done(void);
269 static void journal_rotate(void);
271 /* prototypes for forward refernces */
272 static int handle_request_help (HANDLER_PROTO);
274 /* 
275  * Functions
276  */
277 static void sig_common (const char *sig) /* {{{ */
279   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
280   state = FLUSHING;
281   pthread_cond_broadcast(&flush_cond);
282   pthread_cond_broadcast(&queue_cond);
283 } /* }}} void sig_common */
285 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
287   sig_common("INT");
288 } /* }}} void sig_int_handler */
290 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
292   sig_common("TERM");
293 } /* }}} void sig_term_handler */
295 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
297   config_flush_at_shutdown = 1;
298   sig_common("USR1");
299 } /* }}} void sig_usr1_handler */
301 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
303   config_flush_at_shutdown = 0;
304   sig_common("USR2");
305 } /* }}} void sig_usr2_handler */
307 static void install_signal_handlers(void) /* {{{ */
309   /* These structures are static, because `sigaction' behaves weird if the are
310    * overwritten.. */
311   static struct sigaction sa_int;
312   static struct sigaction sa_term;
313   static struct sigaction sa_pipe;
314   static struct sigaction sa_usr1;
315   static struct sigaction sa_usr2;
317   /* Install signal handlers */
318   memset (&sa_int, 0, sizeof (sa_int));
319   sa_int.sa_handler = sig_int_handler;
320   sigaction (SIGINT, &sa_int, NULL);
322   memset (&sa_term, 0, sizeof (sa_term));
323   sa_term.sa_handler = sig_term_handler;
324   sigaction (SIGTERM, &sa_term, NULL);
326   memset (&sa_pipe, 0, sizeof (sa_pipe));
327   sa_pipe.sa_handler = SIG_IGN;
328   sigaction (SIGPIPE, &sa_pipe, NULL);
330   memset (&sa_pipe, 0, sizeof (sa_usr1));
331   sa_usr1.sa_handler = sig_usr1_handler;
332   sigaction (SIGUSR1, &sa_usr1, NULL);
334   memset (&sa_usr2, 0, sizeof (sa_usr2));
335   sa_usr2.sa_handler = sig_usr2_handler;
336   sigaction (SIGUSR2, &sa_usr2, NULL);
338 } /* }}} void install_signal_handlers */
340 static int open_pidfile(char *action, int oflag) /* {{{ */
342   int fd;
343   char *file;
345   file = (config_pid_file != NULL)
346     ? config_pid_file
347     : LOCALSTATEDIR "/run/rrdcached.pid";
349   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
350   if (fd < 0)
351     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
352             action, file, rrd_strerror(errno));
354   return(fd);
355 } /* }}} static int open_pidfile */
357 /* check existing pid file to see whether a daemon is running */
358 static int check_pidfile(void)
360   int pid_fd;
361   pid_t pid;
362   char pid_str[16];
364   pid_fd = open_pidfile("open", O_RDWR);
365   if (pid_fd < 0)
366     return pid_fd;
368   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
369     return -1;
371   pid = atoi(pid_str);
372   if (pid <= 0)
373     return -1;
375   /* another running process that we can signal COULD be
376    * a competing rrdcached */
377   if (pid != getpid() && kill(pid, 0) == 0)
378   {
379     fprintf(stderr,
380             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
381     close(pid_fd);
382     return -1;
383   }
385   lseek(pid_fd, 0, SEEK_SET);
386   if (ftruncate(pid_fd, 0) == -1)
387   {
388     fprintf(stderr,
389             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
390     close(pid_fd);
391     return -1;
392   }
394   fprintf(stderr,
395           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
396           "rrdcached: starting normally.\n", pid);
398   return pid_fd;
399 } /* }}} static int check_pidfile */
401 static int write_pidfile (int fd) /* {{{ */
403   pid_t pid;
404   FILE *fh;
406   pid = getpid ();
408   fh = fdopen (fd, "w");
409   if (fh == NULL)
410   {
411     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
412     close(fd);
413     return (-1);
414   }
416   fprintf (fh, "%i\n", (int) pid);
417   fclose (fh);
419   return (0);
420 } /* }}} int write_pidfile */
422 static int remove_pidfile (void) /* {{{ */
424   char *file;
425   int status;
427   file = (config_pid_file != NULL)
428     ? config_pid_file
429     : LOCALSTATEDIR "/run/rrdcached.pid";
431   status = unlink (file);
432   if (status == 0)
433     return (0);
434   return (errno);
435 } /* }}} int remove_pidfile */
437 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
439   char *eol;
441   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
442                sock->next_read - sock->next_cmd);
444   if (eol == NULL)
445   {
446     /* no commands left, move remainder back to front of rbuf */
447     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
448             sock->next_read - sock->next_cmd);
449     sock->next_read -= sock->next_cmd;
450     sock->next_cmd = 0;
451     *len = 0;
452     return NULL;
453   }
454   else
455   {
456     char *cmd = sock->rbuf + sock->next_cmd;
457     *eol = '\0';
459     sock->next_cmd = eol - sock->rbuf + 1;
461     if (eol > sock->rbuf && *(eol-1) == '\r')
462       *(--eol) = '\0'; /* handle "\r\n" EOL */
464     *len = eol - cmd;
466     return cmd;
467   }
469   /* NOTREACHED */
470   assert(1==0);
473 /* add the characters directly to the write buffer */
474 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
476   char *new_buf;
478   assert(sock != NULL);
480   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
481   if (new_buf == NULL)
482   {
483     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
484     return -1;
485   }
487   strncpy(new_buf + sock->wbuf_len, str, len + 1);
489   sock->wbuf = new_buf;
490   sock->wbuf_len += len;
492   return 0;
493 } /* }}} static int add_to_wbuf */
495 /* add the text to the "extra" info that's sent after the status line */
496 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
498   va_list argp;
499   char buffer[CMD_MAX];
500   int len;
502   if (sock == NULL) return 0; /* journal replay mode */
503   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
505   va_start(argp, fmt);
506 #ifdef HAVE_VSNPRINTF
507   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
508 #else
509   len = vsprintf(buffer, fmt, argp);
510 #endif
511   va_end(argp);
512   if (len < 0)
513   {
514     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
515     return -1;
516   }
518   return add_to_wbuf(sock, buffer, len);
519 } /* }}} static int add_response_info */
521 static int count_lines(char *str) /* {{{ */
523   int lines = 0;
525   if (str != NULL)
526   {
527     while ((str = strchr(str, '\n')) != NULL)
528     {
529       ++lines;
530       ++str;
531     }
532   }
534   return lines;
535 } /* }}} static int count_lines */
537 /* send the response back to the user.
538  * returns 0 on success, -1 on error
539  * write buffer is always zeroed after this call */
540 static int send_response (listen_socket_t *sock, response_code rc,
541                           char *fmt, ...) /* {{{ */
543   va_list argp;
544   char buffer[CMD_MAX];
545   int lines;
546   ssize_t wrote;
547   int rclen, len;
549   if (sock == NULL) return rc;  /* journal replay mode */
551   if (sock->batch_start)
552   {
553     if (rc == RESP_OK)
554       return rc; /* no response on success during BATCH */
555     lines = sock->batch_cmd;
556   }
557   else if (rc == RESP_OK)
558     lines = count_lines(sock->wbuf);
559   else
560     lines = -1;
562   rclen = sprintf(buffer, "%d ", lines);
563   va_start(argp, fmt);
564 #ifdef HAVE_VSNPRINTF
565   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
566 #else
567   len = vsprintf(buffer+rclen, fmt, argp);
568 #endif
569   va_end(argp);
570   if (len < 0)
571     return -1;
573   len += rclen;
575   /* append the result to the wbuf, don't write to the user */
576   if (sock->batch_start)
577     return add_to_wbuf(sock, buffer, len);
579   /* first write must be complete */
580   if (len != write(sock->fd, buffer, len))
581   {
582     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
583     return -1;
584   }
586   if (sock->wbuf != NULL && rc == RESP_OK)
587   {
588     wrote = 0;
589     while (wrote < sock->wbuf_len)
590     {
591       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
592       if (wb <= 0)
593       {
594         RRDD_LOG(LOG_INFO, "send_response: could not write results");
595         return -1;
596       }
597       wrote += wb;
598     }
599   }
601   free(sock->wbuf); sock->wbuf = NULL;
602   sock->wbuf_len = 0;
604   return 0;
605 } /* }}} */
607 static void wipe_ci_values(cache_item_t *ci, time_t when)
609   ci->values = NULL;
610   ci->values_num = 0;
612   ci->last_flush_time = when;
613   if (config_write_jitter > 0)
614     ci->last_flush_time += (rrd_random() % config_write_jitter);
617 /* remove_from_queue
618  * remove a "cache_item_t" item from the queue.
619  * must hold 'cache_lock' when calling this
620  */
621 static void remove_from_queue(cache_item_t *ci) /* {{{ */
623   if (ci == NULL) return;
624   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
626   if (ci->prev == NULL)
627     cache_queue_head = ci->next; /* reset head */
628   else
629     ci->prev->next = ci->next;
631   if (ci->next == NULL)
632     cache_queue_tail = ci->prev; /* reset the tail */
633   else
634     ci->next->prev = ci->prev;
636   ci->next = ci->prev = NULL;
637   ci->flags &= ~CI_FLAGS_IN_QUEUE;
639   pthread_mutex_lock (&stats_lock);
640   assert (stats_queue_length > 0);
641   stats_queue_length--;
642   pthread_mutex_unlock (&stats_lock);
644 } /* }}} static void remove_from_queue */
646 /* free the resources associated with the cache_item_t
647  * must hold cache_lock when calling this function
648  */
649 static void *free_cache_item(cache_item_t *ci) /* {{{ */
651   if (ci == NULL) return NULL;
653   remove_from_queue(ci);
655   for (size_t i=0; i < ci->values_num; i++)
656     free(ci->values[i]);
658   free (ci->values);
659   free (ci->file);
661   /* in case anyone is waiting */
662   pthread_cond_broadcast(&ci->flushed);
663   pthread_cond_destroy(&ci->flushed);
665   free (ci);
667   return NULL;
668 } /* }}} static void *free_cache_item */
670 /*
671  * enqueue_cache_item:
672  * `cache_lock' must be acquired before calling this function!
673  */
674 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
675     queue_side_t side)
677   if (ci == NULL)
678     return (-1);
680   if (ci->values_num == 0)
681     return (0);
683   if (side == HEAD)
684   {
685     if (cache_queue_head == ci)
686       return 0;
688     /* remove if further down in queue */
689     remove_from_queue(ci);
691     ci->prev = NULL;
692     ci->next = cache_queue_head;
693     if (ci->next != NULL)
694       ci->next->prev = ci;
695     cache_queue_head = ci;
697     if (cache_queue_tail == NULL)
698       cache_queue_tail = cache_queue_head;
699   }
700   else /* (side == TAIL) */
701   {
702     /* We don't move values back in the list.. */
703     if (ci->flags & CI_FLAGS_IN_QUEUE)
704       return (0);
706     assert (ci->next == NULL);
707     assert (ci->prev == NULL);
709     ci->prev = cache_queue_tail;
711     if (cache_queue_tail == NULL)
712       cache_queue_head = ci;
713     else
714       cache_queue_tail->next = ci;
716     cache_queue_tail = ci;
717   }
719   ci->flags |= CI_FLAGS_IN_QUEUE;
721   pthread_cond_signal(&queue_cond);
722   pthread_mutex_lock (&stats_lock);
723   stats_queue_length++;
724   pthread_mutex_unlock (&stats_lock);
726   return (0);
727 } /* }}} int enqueue_cache_item */
729 /*
730  * tree_callback_flush:
731  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
732  * while this is in progress.
733  */
734 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
735     gpointer data)
737   cache_item_t *ci;
738   callback_flush_data_t *cfd;
740   ci = (cache_item_t *) value;
741   cfd = (callback_flush_data_t *) data;
743   if (ci->flags & CI_FLAGS_IN_QUEUE)
744     return FALSE;
746   if (ci->values_num > 0
747       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
748   {
749     enqueue_cache_item (ci, TAIL);
750   }
751   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
752       && (ci->values_num <= 0))
753   {
754     assert ((char *) key == ci->file);
755     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
756     {
757       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
758       return (FALSE);
759     }
760   }
762   return (FALSE);
763 } /* }}} gboolean tree_callback_flush */
765 static int flush_old_values (int max_age)
767   callback_flush_data_t cfd;
768   size_t k;
770   memset (&cfd, 0, sizeof (cfd));
771   /* Pass the current time as user data so that we don't need to call
772    * `time' for each node. */
773   cfd.now = time (NULL);
774   cfd.keys = NULL;
775   cfd.keys_num = 0;
777   if (max_age > 0)
778     cfd.abs_timeout = cfd.now - max_age;
779   else
780     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
782   /* `tree_callback_flush' will return the keys of all values that haven't
783    * been touched in the last `config_flush_interval' seconds in `cfd'.
784    * The char*'s in this array point to the same memory as ci->file, so we
785    * don't need to free them separately. */
786   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
788   for (k = 0; k < cfd.keys_num; k++)
789   {
790     /* should never fail, since we have held the cache_lock
791      * the entire time */
792     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
793   }
795   if (cfd.keys != NULL)
796   {
797     free (cfd.keys);
798     cfd.keys = NULL;
799   }
801   return (0);
802 } /* int flush_old_values */
804 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
806   struct timeval now;
807   struct timespec next_flush;
808   int status;
810   gettimeofday (&now, NULL);
811   next_flush.tv_sec = now.tv_sec + config_flush_interval;
812   next_flush.tv_nsec = 1000 * now.tv_usec;
814   pthread_mutex_lock(&cache_lock);
816   while (state == RUNNING)
817   {
818     gettimeofday (&now, NULL);
819     if ((now.tv_sec > next_flush.tv_sec)
820         || ((now.tv_sec == next_flush.tv_sec)
821           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
822     {
823       RRDD_LOG(LOG_DEBUG, "flushing old values");
825       /* Determine the time of the next cache flush. */
826       next_flush.tv_sec = now.tv_sec + config_flush_interval;
828       /* Flush all values that haven't been written in the last
829        * `config_write_interval' seconds. */
830       flush_old_values (config_write_interval);
832       /* unlock the cache while we rotate so we don't block incoming
833        * updates if the fsync() blocks on disk I/O */
834       pthread_mutex_unlock(&cache_lock);
835       journal_rotate();
836       pthread_mutex_lock(&cache_lock);
837     }
839     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
840     if (status != 0 && status != ETIMEDOUT)
841     {
842       RRDD_LOG (LOG_ERR, "flush_thread_main: "
843                 "pthread_cond_timedwait returned %i.", status);
844     }
845   }
847   if (config_flush_at_shutdown)
848     flush_old_values (-1); /* flush everything */
850   state = SHUTDOWN;
852   pthread_mutex_unlock(&cache_lock);
854   return NULL;
855 } /* void *flush_thread_main */
857 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
859   pthread_mutex_lock (&cache_lock);
861   while (state != SHUTDOWN
862          || (cache_queue_head != NULL && config_flush_at_shutdown))
863   {
864     cache_item_t *ci;
865     char *file;
866     char **values;
867     size_t values_num;
868     int status;
870     /* Now, check if there's something to store away. If not, wait until
871      * something comes in. */
872     if (cache_queue_head == NULL)
873     {
874       status = pthread_cond_wait (&queue_cond, &cache_lock);
875       if ((status != 0) && (status != ETIMEDOUT))
876       {
877         RRDD_LOG (LOG_ERR, "queue_thread_main: "
878             "pthread_cond_wait returned %i.", status);
879       }
880     }
882     /* Check if a value has arrived. This may be NULL if we timed out or there
883      * was an interrupt such as a signal. */
884     if (cache_queue_head == NULL)
885       continue;
887     ci = cache_queue_head;
889     /* copy the relevant parts */
890     file = strdup (ci->file);
891     if (file == NULL)
892     {
893       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
894       continue;
895     }
897     assert(ci->values != NULL);
898     assert(ci->values_num > 0);
900     values = ci->values;
901     values_num = ci->values_num;
903     wipe_ci_values(ci, time(NULL));
904     remove_from_queue(ci);
906     pthread_mutex_unlock (&cache_lock);
908     rrd_clear_error ();
909     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
910     if (status != 0)
911     {
912       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
913           "rrd_update_r (%s) failed with status %i. (%s)",
914           file, status, rrd_get_error());
915     }
917     journal_write("wrote", file);
919     /* Search again in the tree.  It's possible someone issued a "FORGET"
920      * while we were writing the update values. */
921     pthread_mutex_lock(&cache_lock);
922     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
923     if (ci)
924       pthread_cond_broadcast(&ci->flushed);
925     pthread_mutex_unlock(&cache_lock);
927     rrd_free_ptrs((void ***) &values, &values_num);
928     free(file);
930     if (status == 0)
931     {
932       pthread_mutex_lock (&stats_lock);
933       stats_updates_written++;
934       stats_data_sets_written += values_num;
935       pthread_mutex_unlock (&stats_lock);
936     }
938     pthread_mutex_lock (&cache_lock);
939   }
940   pthread_mutex_unlock (&cache_lock);
942   return (NULL);
943 } /* }}} void *queue_thread_main */
945 static int buffer_get_field (char **buffer_ret, /* {{{ */
946     size_t *buffer_size_ret, char **field_ret)
948   char *buffer;
949   size_t buffer_pos;
950   size_t buffer_size;
951   char *field;
952   size_t field_size;
953   int status;
955   buffer = *buffer_ret;
956   buffer_pos = 0;
957   buffer_size = *buffer_size_ret;
958   field = *buffer_ret;
959   field_size = 0;
961   if (buffer_size <= 0)
962     return (-1);
964   /* This is ensured by `handle_request'. */
965   assert (buffer[buffer_size - 1] == '\0');
967   status = -1;
968   while (buffer_pos < buffer_size)
969   {
970     /* Check for end-of-field or end-of-buffer */
971     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
972     {
973       field[field_size] = 0;
974       field_size++;
975       buffer_pos++;
976       status = 0;
977       break;
978     }
979     /* Handle escaped characters. */
980     else if (buffer[buffer_pos] == '\\')
981     {
982       if (buffer_pos >= (buffer_size - 1))
983         break;
984       buffer_pos++;
985       field[field_size] = buffer[buffer_pos];
986       field_size++;
987       buffer_pos++;
988     }
989     /* Normal operation */ 
990     else
991     {
992       field[field_size] = buffer[buffer_pos];
993       field_size++;
994       buffer_pos++;
995     }
996   } /* while (buffer_pos < buffer_size) */
998   if (status != 0)
999     return (status);
1001   *buffer_ret = buffer + buffer_pos;
1002   *buffer_size_ret = buffer_size - buffer_pos;
1003   *field_ret = field;
1005   return (0);
1006 } /* }}} int buffer_get_field */
1008 /* if we're restricting writes to the base directory,
1009  * check whether the file falls within the dir
1010  * returns 1 if OK, otherwise 0
1011  */
1012 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1014   assert(file != NULL);
1016   if (!config_write_base_only
1017       || sock == NULL /* journal replay */
1018       || config_base_dir == NULL)
1019     return 1;
1021   if (strstr(file, "../") != NULL) goto err;
1023   /* relative paths without "../" are ok */
1024   if (*file != '/') return 1;
1026   /* file must be of the format base + "/" + <1+ char filename> */
1027   if (strlen(file) < _config_base_dir_len + 2) goto err;
1028   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1029   if (*(file + _config_base_dir_len) != '/') goto err;
1031   return 1;
1033 err:
1034   if (sock != NULL && sock->fd >= 0)
1035     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1037   return 0;
1038 } /* }}} static int check_file_access */
1040 /* when using a base dir, convert relative paths to absolute paths.
1041  * if necessary, modifies the "filename" pointer to point
1042  * to the new path created in "tmp".  "tmp" is provided
1043  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1044  *
1045  * this allows us to optimize for the expected case (absolute path)
1046  * with a no-op.
1047  */
1048 static void get_abs_path(char **filename, char *tmp)
1050   assert(tmp != NULL);
1051   assert(filename != NULL && *filename != NULL);
1053   if (config_base_dir == NULL || **filename == '/')
1054     return;
1056   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1057   *filename = tmp;
1058 } /* }}} static int get_abs_path */
1060 /* returns 1 if we have the required privilege level,
1061  * otherwise issue an error to the user on sock */
1062 static int has_privilege (listen_socket_t *sock, /* {{{ */
1063                           socket_privilege priv)
1065   if (sock == NULL) /* journal replay */
1066     return 1;
1068   if (sock->privilege >= priv)
1069     return 1;
1071   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1072 } /* }}} static int has_privilege */
1074 static int flush_file (const char *filename) /* {{{ */
1076   cache_item_t *ci;
1078   pthread_mutex_lock (&cache_lock);
1080   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1081   if (ci == NULL)
1082   {
1083     pthread_mutex_unlock (&cache_lock);
1084     return (ENOENT);
1085   }
1087   if (ci->values_num > 0)
1088   {
1089     /* Enqueue at head */
1090     enqueue_cache_item (ci, HEAD);
1091     pthread_cond_wait(&ci->flushed, &cache_lock);
1092   }
1094   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1095    * may have been purged during our cond_wait() */
1097   pthread_mutex_unlock(&cache_lock);
1099   return (0);
1100 } /* }}} int flush_file */
1102 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1104   char *err = "Syntax error.\n";
1106   if (cmd && cmd->syntax)
1107     err = cmd->syntax;
1109   return send_response(sock, RESP_ERR, "Usage: %s", err);
1110 } /* }}} static int syntax_error() */
1112 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1114   uint64_t copy_queue_length;
1115   uint64_t copy_updates_received;
1116   uint64_t copy_flush_received;
1117   uint64_t copy_updates_written;
1118   uint64_t copy_data_sets_written;
1119   uint64_t copy_journal_bytes;
1120   uint64_t copy_journal_rotate;
1122   uint64_t tree_nodes_number;
1123   uint64_t tree_depth;
1125   pthread_mutex_lock (&stats_lock);
1126   copy_queue_length       = stats_queue_length;
1127   copy_updates_received   = stats_updates_received;
1128   copy_flush_received     = stats_flush_received;
1129   copy_updates_written    = stats_updates_written;
1130   copy_data_sets_written  = stats_data_sets_written;
1131   copy_journal_bytes      = stats_journal_bytes;
1132   copy_journal_rotate     = stats_journal_rotate;
1133   pthread_mutex_unlock (&stats_lock);
1135   pthread_mutex_lock (&cache_lock);
1136   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1137   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1138   pthread_mutex_unlock (&cache_lock);
1140   add_response_info(sock,
1141                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1142   add_response_info(sock,
1143                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1144   add_response_info(sock,
1145                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1146   add_response_info(sock,
1147                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1148   add_response_info(sock,
1149                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1150   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1151   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1152   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1153   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1155   send_response(sock, RESP_OK, "Statistics follow\n");
1157   return (0);
1158 } /* }}} int handle_request_stats */
1160 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1162   char *file, file_tmp[PATH_MAX];
1163   int status;
1165   status = buffer_get_field (&buffer, &buffer_size, &file);
1166   if (status != 0)
1167   {
1168     return syntax_error(sock,cmd);
1169   }
1170   else
1171   {
1172     pthread_mutex_lock(&stats_lock);
1173     stats_flush_received++;
1174     pthread_mutex_unlock(&stats_lock);
1176     get_abs_path(&file, file_tmp);
1177     if (!check_file_access(file, sock)) return 0;
1179     status = flush_file (file);
1180     if (status == 0)
1181       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1182     else if (status == ENOENT)
1183     {
1184       /* no file in our tree; see whether it exists at all */
1185       struct stat statbuf;
1187       memset(&statbuf, 0, sizeof(statbuf));
1188       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1189         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1190       else
1191         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1192     }
1193     else if (status < 0)
1194       return send_response(sock, RESP_ERR, "Internal error.\n");
1195     else
1196       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1197   }
1199   /* NOTREACHED */
1200   assert(1==0);
1201 } /* }}} int handle_request_flush */
1203 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1205   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1207   pthread_mutex_lock(&cache_lock);
1208   flush_old_values(-1);
1209   pthread_mutex_unlock(&cache_lock);
1211   return send_response(sock, RESP_OK, "Started flush.\n");
1212 } /* }}} static int handle_request_flushall */
1214 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1216   int status;
1217   char *file, file_tmp[PATH_MAX];
1218   cache_item_t *ci;
1220   status = buffer_get_field(&buffer, &buffer_size, &file);
1221   if (status != 0)
1222     return syntax_error(sock,cmd);
1224   get_abs_path(&file, file_tmp);
1226   pthread_mutex_lock(&cache_lock);
1227   ci = g_tree_lookup(cache_tree, file);
1228   if (ci == NULL)
1229   {
1230     pthread_mutex_unlock(&cache_lock);
1231     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1232   }
1234   for (size_t i=0; i < ci->values_num; i++)
1235     add_response_info(sock, "%s\n", ci->values[i]);
1237   pthread_mutex_unlock(&cache_lock);
1238   return send_response(sock, RESP_OK, "updates pending\n");
1239 } /* }}} static int handle_request_pending */
1241 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1243   int status;
1244   gboolean found;
1245   char *file, file_tmp[PATH_MAX];
1247   status = buffer_get_field(&buffer, &buffer_size, &file);
1248   if (status != 0)
1249     return syntax_error(sock,cmd);
1251   get_abs_path(&file, file_tmp);
1252   if (!check_file_access(file, sock)) return 0;
1254   pthread_mutex_lock(&cache_lock);
1255   found = g_tree_remove(cache_tree, file);
1256   pthread_mutex_unlock(&cache_lock);
1258   if (found == TRUE)
1259   {
1260     if (sock != NULL)
1261       journal_write("forget", file);
1263     return send_response(sock, RESP_OK, "Gone!\n");
1264   }
1265   else
1266     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1268   /* NOTREACHED */
1269   assert(1==0);
1270 } /* }}} static int handle_request_forget */
1272 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1274   cache_item_t *ci;
1276   pthread_mutex_lock(&cache_lock);
1278   ci = cache_queue_head;
1279   while (ci != NULL)
1280   {
1281     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1282     ci = ci->next;
1283   }
1285   pthread_mutex_unlock(&cache_lock);
1287   return send_response(sock, RESP_OK, "in queue.\n");
1288 } /* }}} int handle_request_queue */
1290 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1292   char *file, file_tmp[PATH_MAX];
1293   int values_num = 0;
1294   int status;
1295   char orig_buf[CMD_MAX];
1297   cache_item_t *ci;
1299   /* save it for the journal later */
1300   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1302   status = buffer_get_field (&buffer, &buffer_size, &file);
1303   if (status != 0)
1304     return syntax_error(sock,cmd);
1306   pthread_mutex_lock(&stats_lock);
1307   stats_updates_received++;
1308   pthread_mutex_unlock(&stats_lock);
1310   get_abs_path(&file, file_tmp);
1311   if (!check_file_access(file, sock)) return 0;
1313   pthread_mutex_lock (&cache_lock);
1314   ci = g_tree_lookup (cache_tree, file);
1316   if (ci == NULL) /* {{{ */
1317   {
1318     struct stat statbuf;
1319     cache_item_t *tmp;
1321     /* don't hold the lock while we setup; stat(2) might block */
1322     pthread_mutex_unlock(&cache_lock);
1324     memset (&statbuf, 0, sizeof (statbuf));
1325     status = stat (file, &statbuf);
1326     if (status != 0)
1327     {
1328       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1330       status = errno;
1331       if (status == ENOENT)
1332         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1333       else
1334         return send_response(sock, RESP_ERR,
1335                              "stat failed with error %i.\n", status);
1336     }
1337     if (!S_ISREG (statbuf.st_mode))
1338       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1340     if (access(file, R_OK|W_OK) != 0)
1341       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1342                            file, rrd_strerror(errno));
1344     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1345     if (ci == NULL)
1346     {
1347       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1349       return send_response(sock, RESP_ERR, "malloc failed.\n");
1350     }
1351     memset (ci, 0, sizeof (cache_item_t));
1353     ci->file = strdup (file);
1354     if (ci->file == NULL)
1355     {
1356       free (ci);
1357       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1359       return send_response(sock, RESP_ERR, "strdup failed.\n");
1360     }
1362     wipe_ci_values(ci, now);
1363     ci->flags = CI_FLAGS_IN_TREE;
1364     pthread_cond_init(&ci->flushed, NULL);
1366     pthread_mutex_lock(&cache_lock);
1368     /* another UPDATE might have added this entry in the meantime */
1369     tmp = g_tree_lookup (cache_tree, file);
1370     if (tmp == NULL)
1371       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1372     else
1373     {
1374       free_cache_item (ci);
1375       ci = tmp;
1376     }
1378     /* state may have changed while we were unlocked */
1379     if (state == SHUTDOWN)
1380       return -1;
1381   } /* }}} */
1382   assert (ci != NULL);
1384   /* don't re-write updates in replay mode */
1385   if (sock != NULL)
1386     journal_write("update", orig_buf);
1388   while (buffer_size > 0)
1389   {
1390     char *value;
1391     time_t stamp;
1392     char *eostamp;
1394     status = buffer_get_field (&buffer, &buffer_size, &value);
1395     if (status != 0)
1396     {
1397       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1398       break;
1399     }
1401     /* make sure update time is always moving forward */
1402     stamp = strtol(value, &eostamp, 10);
1403     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1404     {
1405       pthread_mutex_unlock(&cache_lock);
1406       return send_response(sock, RESP_ERR,
1407                            "Cannot find timestamp in '%s'!\n", value);
1408     }
1409     else if (stamp <= ci->last_update_stamp)
1410     {
1411       pthread_mutex_unlock(&cache_lock);
1412       return send_response(sock, RESP_ERR,
1413                            "illegal attempt to update using time %ld when last"
1414                            " update time is %ld (minimum one second step)\n",
1415                            stamp, ci->last_update_stamp);
1416     }
1417     else
1418       ci->last_update_stamp = stamp;
1420     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1421     {
1422       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1423       continue;
1424     }
1426     values_num++;
1427   }
1429   if (((now - ci->last_flush_time) >= config_write_interval)
1430       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1431       && (ci->values_num > 0))
1432   {
1433     enqueue_cache_item (ci, TAIL);
1434   }
1436   pthread_mutex_unlock (&cache_lock);
1438   if (values_num < 1)
1439     return send_response(sock, RESP_ERR, "No values updated.\n");
1440   else
1441     return send_response(sock, RESP_OK,
1442                          "errors, enqueued %i value(s).\n", values_num);
1444   /* NOTREACHED */
1445   assert(1==0);
1447 } /* }}} int handle_request_update */
1449 /* we came across a "WROTE" entry during journal replay.
1450  * throw away any values that we have accumulated for this file
1451  */
1452 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1454   cache_item_t *ci;
1455   const char *file = buffer;
1457   pthread_mutex_lock(&cache_lock);
1459   ci = g_tree_lookup(cache_tree, file);
1460   if (ci == NULL)
1461   {
1462     pthread_mutex_unlock(&cache_lock);
1463     return (0);
1464   }
1466   if (ci->values)
1467     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1469   wipe_ci_values(ci, now);
1470   remove_from_queue(ci);
1472   pthread_mutex_unlock(&cache_lock);
1473   return (0);
1474 } /* }}} int handle_request_wrote */
1476 /* start "BATCH" processing */
1477 static int batch_start (HANDLER_PROTO) /* {{{ */
1479   int status;
1480   if (sock->batch_start)
1481     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1483   status = send_response(sock, RESP_OK,
1484                          "Go ahead.  End with dot '.' on its own line.\n");
1485   sock->batch_start = time(NULL);
1486   sock->batch_cmd = 0;
1488   return status;
1489 } /* }}} static int batch_start */
1491 /* finish "BATCH" processing and return results to the client */
1492 static int batch_done (HANDLER_PROTO) /* {{{ */
1494   assert(sock->batch_start);
1495   sock->batch_start = 0;
1496   sock->batch_cmd  = 0;
1497   return send_response(sock, RESP_OK, "errors\n");
1498 } /* }}} static int batch_done */
1500 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1502   return -1;
1503 } /* }}} static int handle_request_quit */
1505 struct command COMMANDS[] = {
1506   {
1507     "UPDATE",
1508     handle_request_update,
1509     PRIV_HIGH,
1510     CMD_CONTEXT_ANY,
1511     "UPDATE <filename> <values> [<values> ...]\n"
1512     ,
1513     "Adds the given file to the internal cache if it is not yet known and\n"
1514     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1515     "for details.\n"
1516     "\n"
1517     "Each <values> has the following form:\n"
1518     "  <values> = <time>:<value>[:<value>[...]]\n"
1519     "See the rrdupdate(1) manpage for details.\n"
1520   },
1521   {
1522     "WROTE",
1523     handle_request_wrote,
1524     PRIV_HIGH,
1525     CMD_CONTEXT_JOURNAL,
1526     NULL,
1527     NULL
1528   },
1529   {
1530     "FLUSH",
1531     handle_request_flush,
1532     PRIV_LOW,
1533     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1534     "FLUSH <filename>\n"
1535     ,
1536     "Adds the given filename to the head of the update queue and returns\n"
1537     "after it has been dequeued.\n"
1538   },
1539   {
1540     "FLUSHALL",
1541     handle_request_flushall,
1542     PRIV_HIGH,
1543     CMD_CONTEXT_CLIENT,
1544     "FLUSHALL\n"
1545     ,
1546     "Triggers writing of all pending updates.  Returns immediately.\n"
1547   },
1548   {
1549     "PENDING",
1550     handle_request_pending,
1551     PRIV_HIGH,
1552     CMD_CONTEXT_CLIENT,
1553     "PENDING <filename>\n"
1554     ,
1555     "Shows any 'pending' updates for a file, in order.\n"
1556     "The updates shown have not yet been written to the underlying RRD file.\n"
1557   },
1558   {
1559     "FORGET",
1560     handle_request_forget,
1561     PRIV_HIGH,
1562     CMD_CONTEXT_ANY,
1563     "FORGET <filename>\n"
1564     ,
1565     "Removes the file completely from the cache.\n"
1566     "Any pending updates for the file will be lost.\n"
1567   },
1568   {
1569     "QUEUE",
1570     handle_request_queue,
1571     PRIV_LOW,
1572     CMD_CONTEXT_CLIENT,
1573     "QUEUE\n"
1574     ,
1575         "Shows all files in the output queue.\n"
1576     "The output is zero or more lines in the following format:\n"
1577     "(where <num_vals> is the number of values to be written)\n"
1578     "\n"
1579     "<num_vals> <filename>\n"
1580   },
1581   {
1582     "STATS",
1583     handle_request_stats,
1584     PRIV_LOW,
1585     CMD_CONTEXT_CLIENT,
1586     "STATS\n"
1587     ,
1588     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1589     "a description of the values.\n"
1590   },
1591   {
1592     "HELP",
1593     handle_request_help,
1594     PRIV_LOW,
1595     CMD_CONTEXT_CLIENT,
1596     "HELP [<command>]\n",
1597     NULL, /* special! */
1598   },
1599   {
1600     "BATCH",
1601     batch_start,
1602     PRIV_LOW,
1603     CMD_CONTEXT_CLIENT,
1604     "BATCH\n"
1605     ,
1606     "The 'BATCH' command permits the client to initiate a bulk load\n"
1607     "   of commands to rrdcached.\n"
1608     "\n"
1609     "Usage:\n"
1610     "\n"
1611     "    client: BATCH\n"
1612     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1613     "    client: command #1\n"
1614     "    client: command #2\n"
1615     "    client: ... and so on\n"
1616     "    client: .\n"
1617     "    server: 2 errors\n"
1618     "    server: 7 message for command #7\n"
1619     "    server: 9 message for command #9\n"
1620     "\n"
1621     "For more information, consult the rrdcached(1) documentation.\n"
1622   },
1623   {
1624     ".",   /* BATCH terminator */
1625     batch_done,
1626     PRIV_LOW,
1627     CMD_CONTEXT_BATCH,
1628     NULL,
1629     NULL
1630   },
1631   {
1632     "QUIT",
1633     handle_request_quit,
1634     PRIV_LOW,
1635     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1636     "QUIT\n"
1637     ,
1638     "Disconnect from rrdcached.\n"
1639   },
1640   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1641 };
1643 static struct command *find_command(char *cmd)
1645   struct command *c = COMMANDS;
1647   while (c->cmd != NULL)
1648   {
1649     if (strcasecmp(cmd, c->cmd) == 0)
1650       break;
1651     c++;
1652   }
1654   if (c->cmd == NULL)
1655     return NULL;
1656   else
1657     return c;
1660 /* check whether commands are received in the expected context */
1661 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1663   if (sock == NULL)
1664     return (cmd->context & CMD_CONTEXT_JOURNAL);
1665   else if (sock->batch_start)
1666     return (cmd->context & CMD_CONTEXT_BATCH);
1667   else
1668     return (cmd->context & CMD_CONTEXT_CLIENT);
1670   /* NOTREACHED */
1671   assert(1==0);
1674 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1676   int status;
1677   char *cmd_str;
1678   char *resp_txt;
1679   struct command *help = NULL;
1681   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1682   if (status == 0)
1683     help = find_command(cmd_str);
1685   if (help && (help->syntax || help->help))
1686   {
1687     char tmp[CMD_MAX];
1689     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1690     resp_txt = tmp;
1692     if (help->syntax)
1693       add_response_info(sock, "Usage: %s\n", help->syntax);
1695     if (help->help)
1696       add_response_info(sock, "%s\n", help->help);
1697   }
1698   else
1699   {
1700     help = COMMANDS;
1701     resp_txt = "Command overview\n";
1703     while (help->cmd)
1704     {
1705       if (help->syntax)
1706         add_response_info(sock, "%s", help->syntax);
1707       help++;
1708     }
1709   }
1711   return send_response(sock, RESP_OK, resp_txt);
1712 } /* }}} int handle_request_help */
1714 /* if sock==NULL, we are in journal replay mode */
1715 static int handle_request (DISPATCH_PROTO) /* {{{ */
1717   char *buffer_ptr = buffer;
1718   char *cmd_str = NULL;
1719   struct command *cmd = NULL;
1720   int status;
1722   assert (buffer[buffer_size - 1] == '\0');
1724   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1725   if (status != 0)
1726   {
1727     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1728     return (-1);
1729   }
1731   if (sock != NULL && sock->batch_start)
1732     sock->batch_cmd++;
1734   cmd = find_command(cmd_str);
1735   if (!cmd)
1736     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1738   status = has_privilege(sock, cmd->min_priv);
1739   if (status <= 0)
1740     return status;
1742   if (!command_check_context(sock, cmd))
1743     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1745   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1746 } /* }}} int handle_request */
1748 /* MUST NOT hold journal_lock before calling this */
1749 static void journal_rotate(void) /* {{{ */
1751   FILE *old_fh = NULL;
1752   int new_fd;
1754   if (journal_cur == NULL || journal_old == NULL)
1755     return;
1757   pthread_mutex_lock(&journal_lock);
1759   /* we rotate this way (rename before close) so that the we can release
1760    * the journal lock as fast as possible.  Journal writes to the new
1761    * journal can proceed immediately after the new file is opened.  The
1762    * fclose can then block without affecting new updates.
1763    */
1764   if (journal_fh != NULL)
1765   {
1766     old_fh = journal_fh;
1767     journal_fh = NULL;
1768     rename(journal_cur, journal_old);
1769     ++stats_journal_rotate;
1770   }
1772   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1773                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1774   if (new_fd >= 0)
1775   {
1776     journal_fh = fdopen(new_fd, "a");
1777     if (journal_fh == NULL)
1778       close(new_fd);
1779   }
1781   pthread_mutex_unlock(&journal_lock);
1783   if (old_fh != NULL)
1784     fclose(old_fh);
1786   if (journal_fh == NULL)
1787   {
1788     RRDD_LOG(LOG_CRIT,
1789              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1790              journal_cur, rrd_strerror(errno));
1792     RRDD_LOG(LOG_ERR,
1793              "JOURNALING DISABLED: All values will be flushed at shutdown");
1794     config_flush_at_shutdown = 1;
1795   }
1797 } /* }}} static void journal_rotate */
1799 static void journal_done(void) /* {{{ */
1801   if (journal_cur == NULL)
1802     return;
1804   pthread_mutex_lock(&journal_lock);
1805   if (journal_fh != NULL)
1806   {
1807     fclose(journal_fh);
1808     journal_fh = NULL;
1809   }
1811   if (config_flush_at_shutdown)
1812   {
1813     RRDD_LOG(LOG_INFO, "removing journals");
1814     unlink(journal_old);
1815     unlink(journal_cur);
1816   }
1817   else
1818   {
1819     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1820              "journals will be used at next startup");
1821   }
1823   pthread_mutex_unlock(&journal_lock);
1825 } /* }}} static void journal_done */
1827 static int journal_write(char *cmd, char *args) /* {{{ */
1829   int chars;
1831   if (journal_fh == NULL)
1832     return 0;
1834   pthread_mutex_lock(&journal_lock);
1835   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1836   pthread_mutex_unlock(&journal_lock);
1838   if (chars > 0)
1839   {
1840     pthread_mutex_lock(&stats_lock);
1841     stats_journal_bytes += chars;
1842     pthread_mutex_unlock(&stats_lock);
1843   }
1845   return chars;
1846 } /* }}} static int journal_write */
1848 static int journal_replay (const char *file) /* {{{ */
1850   FILE *fh;
1851   int entry_cnt = 0;
1852   int fail_cnt = 0;
1853   uint64_t line = 0;
1854   char entry[CMD_MAX];
1855   time_t now;
1857   if (file == NULL) return 0;
1859   {
1860     char *reason = "unknown error";
1861     int status = 0;
1862     struct stat statbuf;
1864     memset(&statbuf, 0, sizeof(statbuf));
1865     if (stat(file, &statbuf) != 0)
1866     {
1867       if (errno == ENOENT)
1868         return 0;
1870       reason = "stat error";
1871       status = errno;
1872     }
1873     else if (!S_ISREG(statbuf.st_mode))
1874     {
1875       reason = "not a regular file";
1876       status = EPERM;
1877     }
1878     if (statbuf.st_uid != daemon_uid)
1879     {
1880       reason = "not owned by daemon user";
1881       status = EACCES;
1882     }
1883     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1884     {
1885       reason = "must not be user/group writable";
1886       status = EACCES;
1887     }
1889     if (status != 0)
1890     {
1891       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1892                file, rrd_strerror(status), reason);
1893       return 0;
1894     }
1895   }
1897   fh = fopen(file, "r");
1898   if (fh == NULL)
1899   {
1900     if (errno != ENOENT)
1901       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1902                file, rrd_strerror(errno));
1903     return 0;
1904   }
1905   else
1906     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1908   now = time(NULL);
1910   while(!feof(fh))
1911   {
1912     size_t entry_len;
1914     ++line;
1915     if (fgets(entry, sizeof(entry), fh) == NULL)
1916       break;
1917     entry_len = strlen(entry);
1919     /* check \n termination in case journal writing crashed mid-line */
1920     if (entry_len == 0)
1921       continue;
1922     else if (entry[entry_len - 1] != '\n')
1923     {
1924       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1925       ++fail_cnt;
1926       continue;
1927     }
1929     entry[entry_len - 1] = '\0';
1931     if (handle_request(NULL, now, entry, entry_len) == 0)
1932       ++entry_cnt;
1933     else
1934       ++fail_cnt;
1935   }
1937   fclose(fh);
1939   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1940            entry_cnt, fail_cnt);
1942   return entry_cnt > 0 ? 1 : 0;
1943 } /* }}} static int journal_replay */
1945 static void journal_init(void) /* {{{ */
1947   int had_journal = 0;
1949   if (journal_cur == NULL) return;
1951   pthread_mutex_lock(&journal_lock);
1953   RRDD_LOG(LOG_INFO, "checking for journal files");
1955   had_journal += journal_replay(journal_old);
1956   had_journal += journal_replay(journal_cur);
1958   /* it must have been a crash.  start a flush */
1959   if (had_journal && config_flush_at_shutdown)
1960     flush_old_values(-1);
1962   pthread_mutex_unlock(&journal_lock);
1963   journal_rotate();
1965   RRDD_LOG(LOG_INFO, "journal processing complete");
1967 } /* }}} static void journal_init */
1969 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1971   assert(sock != NULL);
1973   free(sock->rbuf);  sock->rbuf = NULL;
1974   free(sock->wbuf);  sock->wbuf = NULL;
1975   free(sock);
1976 } /* }}} void free_listen_socket */
1978 static void close_connection(listen_socket_t *sock) /* {{{ */
1980   if (sock->fd >= 0)
1981   {
1982     close(sock->fd);
1983     sock->fd = -1;
1984   }
1986   free_listen_socket(sock);
1988 } /* }}} void close_connection */
1990 static void *connection_thread_main (void *args) /* {{{ */
1992   listen_socket_t *sock;
1993   int fd;
1995   sock = (listen_socket_t *) args;
1996   fd = sock->fd;
1998   /* init read buffers */
1999   sock->next_read = sock->next_cmd = 0;
2000   sock->rbuf = malloc(RBUF_SIZE);
2001   if (sock->rbuf == NULL)
2002   {
2003     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2004     close_connection(sock);
2005     return NULL;
2006   }
2008   pthread_mutex_lock (&connection_threads_lock);
2009   connection_threads_num++;
2010   pthread_mutex_unlock (&connection_threads_lock);
2012   while (state == RUNNING)
2013   {
2014     char *cmd;
2015     ssize_t cmd_len;
2016     ssize_t rbytes;
2017     time_t now;
2019     struct pollfd pollfd;
2020     int status;
2022     pollfd.fd = fd;
2023     pollfd.events = POLLIN | POLLPRI;
2024     pollfd.revents = 0;
2026     status = poll (&pollfd, 1, /* timeout = */ 500);
2027     if (state != RUNNING)
2028       break;
2029     else if (status == 0) /* timeout */
2030       continue;
2031     else if (status < 0) /* error */
2032     {
2033       status = errno;
2034       if (status != EINTR)
2035         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2036       continue;
2037     }
2039     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2040       break;
2041     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2042     {
2043       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2044           "poll(2) returned something unexpected: %#04hx",
2045           pollfd.revents);
2046       break;
2047     }
2049     rbytes = read(fd, sock->rbuf + sock->next_read,
2050                   RBUF_SIZE - sock->next_read);
2051     if (rbytes < 0)
2052     {
2053       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2054       break;
2055     }
2056     else if (rbytes == 0)
2057       break; /* eof */
2059     sock->next_read += rbytes;
2061     if (sock->batch_start)
2062       now = sock->batch_start;
2063     else
2064       now = time(NULL);
2066     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2067     {
2068       status = handle_request (sock, now, cmd, cmd_len+1);
2069       if (status != 0)
2070         goto out_close;
2071     }
2072   }
2074 out_close:
2075   close_connection(sock);
2077   /* Remove this thread from the connection threads list */
2078   pthread_mutex_lock (&connection_threads_lock);
2079   connection_threads_num--;
2080   if (connection_threads_num <= 0)
2081     pthread_cond_broadcast(&connection_threads_done);
2082   pthread_mutex_unlock (&connection_threads_lock);
2084   return (NULL);
2085 } /* }}} void *connection_thread_main */
2087 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2089   int fd;
2090   struct sockaddr_un sa;
2091   listen_socket_t *temp;
2092   int status;
2093   const char *path;
2095   path = sock->addr;
2096   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2097     path += strlen("unix:");
2099   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2100       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2101   if (temp == NULL)
2102   {
2103     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2104     return (-1);
2105   }
2106   listen_fds = temp;
2107   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2109   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2110   if (fd < 0)
2111   {
2112     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2113              rrd_strerror(errno));
2114     return (-1);
2115   }
2117   memset (&sa, 0, sizeof (sa));
2118   sa.sun_family = AF_UNIX;
2119   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2121   /* if we've gotten this far, we own the pid file.  any daemon started
2122    * with the same args must not be alive.  therefore, ensure that we can
2123    * create the socket...
2124    */
2125   unlink(path);
2127   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2128   if (status != 0)
2129   {
2130     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2131              path, rrd_strerror(errno));
2132     close (fd);
2133     return (-1);
2134   }
2136   status = listen (fd, /* backlog = */ 10);
2137   if (status != 0)
2138   {
2139     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2140              path, rrd_strerror(errno));
2141     close (fd);
2142     unlink (path);
2143     return (-1);
2144   }
2146   listen_fds[listen_fds_num].fd = fd;
2147   listen_fds[listen_fds_num].family = PF_UNIX;
2148   strncpy(listen_fds[listen_fds_num].addr, path,
2149           sizeof (listen_fds[listen_fds_num].addr) - 1);
2150   listen_fds_num++;
2152   return (0);
2153 } /* }}} int open_listen_socket_unix */
2155 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2157   struct addrinfo ai_hints;
2158   struct addrinfo *ai_res;
2159   struct addrinfo *ai_ptr;
2160   char addr_copy[NI_MAXHOST];
2161   char *addr;
2162   char *port;
2163   int status;
2165   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2166   addr_copy[sizeof (addr_copy) - 1] = 0;
2167   addr = addr_copy;
2169   memset (&ai_hints, 0, sizeof (ai_hints));
2170   ai_hints.ai_flags = 0;
2171 #ifdef AI_ADDRCONFIG
2172   ai_hints.ai_flags |= AI_ADDRCONFIG;
2173 #endif
2174   ai_hints.ai_family = AF_UNSPEC;
2175   ai_hints.ai_socktype = SOCK_STREAM;
2177   port = NULL;
2178   if (*addr == '[') /* IPv6+port format */
2179   {
2180     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2181     addr++;
2183     port = strchr (addr, ']');
2184     if (port == NULL)
2185     {
2186       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2187       return (-1);
2188     }
2189     *port = 0;
2190     port++;
2192     if (*port == ':')
2193       port++;
2194     else if (*port == 0)
2195       port = NULL;
2196     else
2197     {
2198       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2199       return (-1);
2200     }
2201   } /* if (*addr = ']') */
2202   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2203   {
2204     port = rindex(addr, ':');
2205     if (port != NULL)
2206     {
2207       *port = 0;
2208       port++;
2209     }
2210   }
2211   ai_res = NULL;
2212   status = getaddrinfo (addr,
2213                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2214                         &ai_hints, &ai_res);
2215   if (status != 0)
2216   {
2217     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2218              addr, gai_strerror (status));
2219     return (-1);
2220   }
2222   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2223   {
2224     int fd;
2225     listen_socket_t *temp;
2226     int one = 1;
2228     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2229         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2230     if (temp == NULL)
2231     {
2232       fprintf (stderr,
2233                "rrdcached: open_listen_socket_network: realloc failed.\n");
2234       continue;
2235     }
2236     listen_fds = temp;
2237     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2239     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2240     if (fd < 0)
2241     {
2242       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2243                rrd_strerror(errno));
2244       continue;
2245     }
2247     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2249     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2250     if (status != 0)
2251     {
2252       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2253                sock->addr, rrd_strerror(errno));
2254       close (fd);
2255       continue;
2256     }
2258     status = listen (fd, /* backlog = */ 10);
2259     if (status != 0)
2260     {
2261       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2262                sock->addr, rrd_strerror(errno));
2263       close (fd);
2264       freeaddrinfo(ai_res);
2265       return (-1);
2266     }
2268     listen_fds[listen_fds_num].fd = fd;
2269     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2270     listen_fds_num++;
2271   } /* for (ai_ptr) */
2273   freeaddrinfo(ai_res);
2274   return (0);
2275 } /* }}} static int open_listen_socket_network */
2277 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2279   assert(sock != NULL);
2280   assert(sock->addr != NULL);
2282   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2283       || sock->addr[0] == '/')
2284     return (open_listen_socket_unix(sock));
2285   else
2286     return (open_listen_socket_network(sock));
2287 } /* }}} int open_listen_socket */
2289 static int close_listen_sockets (void) /* {{{ */
2291   size_t i;
2293   for (i = 0; i < listen_fds_num; i++)
2294   {
2295     close (listen_fds[i].fd);
2297     if (listen_fds[i].family == PF_UNIX)
2298       unlink(listen_fds[i].addr);
2299   }
2301   free (listen_fds);
2302   listen_fds = NULL;
2303   listen_fds_num = 0;
2305   return (0);
2306 } /* }}} int close_listen_sockets */
2308 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2310   struct pollfd *pollfds;
2311   int pollfds_num;
2312   int status;
2313   int i;
2315   if (listen_fds_num < 1)
2316   {
2317     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2318     return (NULL);
2319   }
2321   pollfds_num = listen_fds_num;
2322   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2323   if (pollfds == NULL)
2324   {
2325     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2326     return (NULL);
2327   }
2328   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2330   RRDD_LOG(LOG_INFO, "listening for connections");
2332   while (state == RUNNING)
2333   {
2334     for (i = 0; i < pollfds_num; i++)
2335     {
2336       pollfds[i].fd = listen_fds[i].fd;
2337       pollfds[i].events = POLLIN | POLLPRI;
2338       pollfds[i].revents = 0;
2339     }
2341     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2342     if (state != RUNNING)
2343       break;
2344     else if (status == 0) /* timeout */
2345       continue;
2346     else if (status < 0) /* error */
2347     {
2348       status = errno;
2349       if (status != EINTR)
2350       {
2351         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2352       }
2353       continue;
2354     }
2356     for (i = 0; i < pollfds_num; i++)
2357     {
2358       listen_socket_t *client_sock;
2359       struct sockaddr_storage client_sa;
2360       socklen_t client_sa_size;
2361       pthread_t tid;
2362       pthread_attr_t attr;
2364       if (pollfds[i].revents == 0)
2365         continue;
2367       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2368       {
2369         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2370             "poll(2) returned something unexpected for listen FD #%i.",
2371             pollfds[i].fd);
2372         continue;
2373       }
2375       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2376       if (client_sock == NULL)
2377       {
2378         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2379         continue;
2380       }
2381       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2383       client_sa_size = sizeof (client_sa);
2384       client_sock->fd = accept (pollfds[i].fd,
2385           (struct sockaddr *) &client_sa, &client_sa_size);
2386       if (client_sock->fd < 0)
2387       {
2388         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2389         free(client_sock);
2390         continue;
2391       }
2393       pthread_attr_init (&attr);
2394       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2396       status = pthread_create (&tid, &attr, connection_thread_main,
2397                                client_sock);
2398       if (status != 0)
2399       {
2400         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2401         close_connection(client_sock);
2402         continue;
2403       }
2404     } /* for (pollfds_num) */
2405   } /* while (state == RUNNING) */
2407   RRDD_LOG(LOG_INFO, "starting shutdown");
2409   close_listen_sockets ();
2411   pthread_mutex_lock (&connection_threads_lock);
2412   while (connection_threads_num > 0)
2413     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2414   pthread_mutex_unlock (&connection_threads_lock);
2416   free(pollfds);
2418   return (NULL);
2419 } /* }}} void *listen_thread_main */
2421 static int daemonize (void) /* {{{ */
2423   int pid_fd;
2424   char *base_dir;
2426   daemon_uid = geteuid();
2428   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2429   if (pid_fd < 0)
2430     pid_fd = check_pidfile();
2431   if (pid_fd < 0)
2432     return pid_fd;
2434   /* open all the listen sockets */
2435   if (config_listen_address_list_len > 0)
2436   {
2437     for (size_t i = 0; i < config_listen_address_list_len; i++)
2438       open_listen_socket (config_listen_address_list[i]);
2440     rrd_free_ptrs((void ***) &config_listen_address_list,
2441                   &config_listen_address_list_len);
2442   }
2443   else
2444   {
2445     listen_socket_t sock;
2446     memset(&sock, 0, sizeof(sock));
2447     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2448     open_listen_socket (&sock);
2449   }
2451   if (listen_fds_num < 1)
2452   {
2453     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2454     goto error;
2455   }
2457   if (!stay_foreground)
2458   {
2459     pid_t child;
2461     child = fork ();
2462     if (child < 0)
2463     {
2464       fprintf (stderr, "daemonize: fork(2) failed.\n");
2465       goto error;
2466     }
2467     else if (child > 0)
2468       exit(0);
2470     /* Become session leader */
2471     setsid ();
2473     /* Open the first three file descriptors to /dev/null */
2474     close (2);
2475     close (1);
2476     close (0);
2478     open ("/dev/null", O_RDWR);
2479     if (dup(0) == -1 || dup(0) == -1){
2480         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2481     }
2482   } /* if (!stay_foreground) */
2484   /* Change into the /tmp directory. */
2485   base_dir = (config_base_dir != NULL)
2486     ? config_base_dir
2487     : "/tmp";
2489   if (chdir (base_dir) != 0)
2490   {
2491     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2492     goto error;
2493   }
2495   install_signal_handlers();
2497   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2498   RRDD_LOG(LOG_INFO, "starting up");
2500   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2501                                 (GDestroyNotify) free_cache_item);
2502   if (cache_tree == NULL)
2503   {
2504     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2505     goto error;
2506   }
2508   return write_pidfile (pid_fd);
2510 error:
2511   remove_pidfile();
2512   return -1;
2513 } /* }}} int daemonize */
2515 static int cleanup (void) /* {{{ */
2517   pthread_cond_broadcast (&flush_cond);
2518   pthread_join (flush_thread, NULL);
2520   pthread_cond_broadcast (&queue_cond);
2521   for (int i = 0; i < config_queue_threads; i++)
2522     pthread_join (queue_threads[i], NULL);
2524   if (config_flush_at_shutdown)
2525   {
2526     assert(cache_queue_head == NULL);
2527     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2528   }
2530   journal_done();
2531   remove_pidfile ();
2533   free(queue_threads);
2534   free(config_base_dir);
2535   free(config_pid_file);
2536   free(journal_cur);
2537   free(journal_old);
2539   pthread_mutex_lock(&cache_lock);
2540   g_tree_destroy(cache_tree);
2542   RRDD_LOG(LOG_INFO, "goodbye");
2543   closelog ();
2545   return (0);
2546 } /* }}} int cleanup */
2548 static int read_options (int argc, char **argv) /* {{{ */
2550   int option;
2551   int status = 0;
2553   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2554   {
2555     switch (option)
2556     {
2557       case 'g':
2558         stay_foreground=1;
2559         break;
2561       case 'L':
2562       case 'l':
2563       {
2564         listen_socket_t *new;
2566         new = malloc(sizeof(listen_socket_t));
2567         if (new == NULL)
2568         {
2569           fprintf(stderr, "read_options: malloc failed.\n");
2570           return(2);
2571         }
2572         memset(new, 0, sizeof(listen_socket_t));
2574         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2575         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2577         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2578                          &config_listen_address_list_len, new))
2579         {
2580           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2581           return (2);
2582         }
2583       }
2584       break;
2586       case 'f':
2587       {
2588         int temp;
2590         temp = atoi (optarg);
2591         if (temp > 0)
2592           config_flush_interval = temp;
2593         else
2594         {
2595           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2596           status = 3;
2597         }
2598       }
2599       break;
2601       case 'w':
2602       {
2603         int temp;
2605         temp = atoi (optarg);
2606         if (temp > 0)
2607           config_write_interval = temp;
2608         else
2609         {
2610           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2611           status = 2;
2612         }
2613       }
2614       break;
2616       case 'z':
2617       {
2618         int temp;
2620         temp = atoi(optarg);
2621         if (temp > 0)
2622           config_write_jitter = temp;
2623         else
2624         {
2625           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2626           status = 2;
2627         }
2629         break;
2630       }
2632       case 't':
2633       {
2634         int threads;
2635         threads = atoi(optarg);
2636         if (threads >= 1)
2637           config_queue_threads = threads;
2638         else
2639         {
2640           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2641           return 1;
2642         }
2643       }
2644       break;
2646       case 'B':
2647         config_write_base_only = 1;
2648         break;
2650       case 'b':
2651       {
2652         size_t len;
2653         char base_realpath[PATH_MAX];
2655         if (config_base_dir != NULL)
2656           free (config_base_dir);
2657         config_base_dir = strdup (optarg);
2658         if (config_base_dir == NULL)
2659         {
2660           fprintf (stderr, "read_options: strdup failed.\n");
2661           return (3);
2662         }
2664         /* make sure that the base directory is not resolved via
2665          * symbolic links.  this makes some performance-enhancing
2666          * assumptions possible (we don't have to resolve paths
2667          * that start with a "/")
2668          */
2669         if (realpath(config_base_dir, base_realpath) == NULL)
2670         {
2671           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2672           return 5;
2673         }
2674         else if (strncmp(config_base_dir,
2675                          base_realpath, sizeof(base_realpath)) != 0)
2676         {
2677           fprintf(stderr,
2678                   "Base directory (-b) resolved via file system links!\n"
2679                   "Please consult rrdcached '-b' documentation!\n"
2680                   "Consider specifying the real directory (%s)\n",
2681                   base_realpath);
2682           return 5;
2683         }
2685         len = strlen (config_base_dir);
2686         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2687         {
2688           config_base_dir[len - 1] = 0;
2689           len--;
2690         }
2692         if (len < 1)
2693         {
2694           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2695           return (4);
2696         }
2698         _config_base_dir_len = len;
2699       }
2700       break;
2702       case 'p':
2703       {
2704         if (config_pid_file != NULL)
2705           free (config_pid_file);
2706         config_pid_file = strdup (optarg);
2707         if (config_pid_file == NULL)
2708         {
2709           fprintf (stderr, "read_options: strdup failed.\n");
2710           return (3);
2711         }
2712       }
2713       break;
2715       case 'F':
2716         config_flush_at_shutdown = 1;
2717         break;
2719       case 'j':
2720       {
2721         struct stat statbuf;
2722         const char *dir = optarg;
2724         status = stat(dir, &statbuf);
2725         if (status != 0)
2726         {
2727           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2728           return 6;
2729         }
2731         if (!S_ISDIR(statbuf.st_mode)
2732             || access(dir, R_OK|W_OK|X_OK) != 0)
2733         {
2734           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2735                   errno ? rrd_strerror(errno) : "");
2736           return 6;
2737         }
2739         journal_cur = malloc(PATH_MAX + 1);
2740         journal_old = malloc(PATH_MAX + 1);
2741         if (journal_cur == NULL || journal_old == NULL)
2742         {
2743           fprintf(stderr, "malloc failure for journal files\n");
2744           return 6;
2745         }
2746         else 
2747         {
2748           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2749           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2750         }
2751       }
2752       break;
2754       case 'h':
2755       case '?':
2756         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2757             "\n"
2758             "Usage: rrdcached [options]\n"
2759             "\n"
2760             "Valid options are:\n"
2761             "  -l <address>  Socket address to listen to.\n"
2762             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2763             "  -w <seconds>  Interval in which to write data.\n"
2764             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2765             "  -t <threads>  Number of write threads.\n"
2766             "  -f <seconds>  Interval in which to flush dead data.\n"
2767             "  -p <file>     Location of the PID-file.\n"
2768             "  -b <dir>      Base directory to change to.\n"
2769             "  -B            Restrict file access to paths within -b <dir>\n"
2770             "  -g            Do not fork and run in the foreground.\n"
2771             "  -j <dir>      Directory in which to create the journal files.\n"
2772             "  -F            Always flush all updates at shutdown\n"
2773             "\n"
2774             "For more information and a detailed description of all options "
2775             "please refer\n"
2776             "to the rrdcached(1) manual page.\n",
2777             VERSION);
2778         status = -1;
2779         break;
2780     } /* switch (option) */
2781   } /* while (getopt) */
2783   /* advise the user when values are not sane */
2784   if (config_flush_interval < 2 * config_write_interval)
2785     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2786             " 2x write interval (-w) !\n");
2787   if (config_write_jitter > config_write_interval)
2788     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2789             " write interval (-w) !\n");
2791   if (config_write_base_only && config_base_dir == NULL)
2792     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2793             "  Consult the rrdcached documentation\n");
2795   if (journal_cur == NULL)
2796     config_flush_at_shutdown = 1;
2798   return (status);
2799 } /* }}} int read_options */
2801 int main (int argc, char **argv)
2803   int status;
2805   status = read_options (argc, argv);
2806   if (status != 0)
2807   {
2808     if (status < 0)
2809       status = 0;
2810     return (status);
2811   }
2813   status = daemonize ();
2814   if (status != 0)
2815   {
2816     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2817     return (1);
2818   }
2820   journal_init();
2822   /* start the queue threads */
2823   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2824   if (queue_threads == NULL)
2825   {
2826     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2827     cleanup();
2828     return (1);
2829   }
2830   for (int i = 0; i < config_queue_threads; i++)
2831   {
2832     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2833     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2834     if (status != 0)
2835     {
2836       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2837       cleanup();
2838       return (1);
2839     }
2840   }
2842   /* start the flush thread */
2843   memset(&flush_thread, 0, sizeof(flush_thread));
2844   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2845   if (status != 0)
2846   {
2847     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2848     cleanup();
2849     return (1);
2850   }
2852   listen_thread_main (NULL);
2853   cleanup ();
2855   return (0);
2856 } /* int main */
2858 /*
2859  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2860  */