Code

rrd_random() is a wrapper around random() that ensures the PRNG is seeded
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116  * Types
117  */
118 typedef enum
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
168   char *syntax;
169   char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
176   char *file;
177   char **values;
178   int values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
189 struct callback_flush_data_s
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 static int do_shutdown = 0;
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
231 /* Cache stuff */
232 static GTree          *cache_tree = NULL;
233 static cache_item_t   *cache_queue_head = NULL;
234 static cache_item_t   *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
237 static int config_write_interval = 300;
238 static int config_write_jitter   = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
246 static listen_socket_t **config_listen_address_list = NULL;
247 static int config_listen_address_list_len = 0;
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
270 /* 
271  * Functions
272  */
273 static void sig_common (const char *sig) /* {{{ */
275   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276   do_shutdown++;
277   pthread_cond_broadcast(&flush_cond);
278   pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
283   sig_common("INT");
284 } /* }}} void sig_int_handler */
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
288   sig_common("TERM");
289 } /* }}} void sig_term_handler */
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
293   config_flush_at_shutdown = 1;
294   sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
299   config_flush_at_shutdown = 0;
300   sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
303 static void install_signal_handlers(void) /* {{{ */
305   /* These structures are static, because `sigaction' behaves weird if the are
306    * overwritten.. */
307   static struct sigaction sa_int;
308   static struct sigaction sa_term;
309   static struct sigaction sa_pipe;
310   static struct sigaction sa_usr1;
311   static struct sigaction sa_usr2;
313   /* Install signal handlers */
314   memset (&sa_int, 0, sizeof (sa_int));
315   sa_int.sa_handler = sig_int_handler;
316   sigaction (SIGINT, &sa_int, NULL);
318   memset (&sa_term, 0, sizeof (sa_term));
319   sa_term.sa_handler = sig_term_handler;
320   sigaction (SIGTERM, &sa_term, NULL);
322   memset (&sa_pipe, 0, sizeof (sa_pipe));
323   sa_pipe.sa_handler = SIG_IGN;
324   sigaction (SIGPIPE, &sa_pipe, NULL);
326   memset (&sa_pipe, 0, sizeof (sa_usr1));
327   sa_usr1.sa_handler = sig_usr1_handler;
328   sigaction (SIGUSR1, &sa_usr1, NULL);
330   memset (&sa_usr2, 0, sizeof (sa_usr2));
331   sa_usr2.sa_handler = sig_usr2_handler;
332   sigaction (SIGUSR2, &sa_usr2, NULL);
334 } /* }}} void install_signal_handlers */
336 static int open_pidfile(char *action, int oflag) /* {{{ */
338   int fd;
339   char *file;
341   file = (config_pid_file != NULL)
342     ? config_pid_file
343     : LOCALSTATEDIR "/run/rrdcached.pid";
345   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346   if (fd < 0)
347     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348             action, file, rrd_strerror(errno));
350   return(fd);
351 } /* }}} static int open_pidfile */
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
356   int pid_fd;
357   pid_t pid;
358   char pid_str[16];
360   pid_fd = open_pidfile("open", O_RDWR);
361   if (pid_fd < 0)
362     return pid_fd;
364   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365     return -1;
367   pid = atoi(pid_str);
368   if (pid <= 0)
369     return -1;
371   /* another running process that we can signal COULD be
372    * a competing rrdcached */
373   if (pid != getpid() && kill(pid, 0) == 0)
374   {
375     fprintf(stderr,
376             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377     close(pid_fd);
378     return -1;
379   }
381   lseek(pid_fd, 0, SEEK_SET);
382   ftruncate(pid_fd, 0);
384   fprintf(stderr,
385           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
386           "rrdcached: starting normally.\n", pid);
388   return pid_fd;
389 } /* }}} static int check_pidfile */
391 static int write_pidfile (int fd) /* {{{ */
393   pid_t pid;
394   FILE *fh;
396   pid = getpid ();
398   fh = fdopen (fd, "w");
399   if (fh == NULL)
400   {
401     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
402     close(fd);
403     return (-1);
404   }
406   fprintf (fh, "%i\n", (int) pid);
407   fclose (fh);
409   return (0);
410 } /* }}} int write_pidfile */
412 static int remove_pidfile (void) /* {{{ */
414   char *file;
415   int status;
417   file = (config_pid_file != NULL)
418     ? config_pid_file
419     : LOCALSTATEDIR "/run/rrdcached.pid";
421   status = unlink (file);
422   if (status == 0)
423     return (0);
424   return (errno);
425 } /* }}} int remove_pidfile */
427 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
429   char *eol;
431   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
432                sock->next_read - sock->next_cmd);
434   if (eol == NULL)
435   {
436     /* no commands left, move remainder back to front of rbuf */
437     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
438             sock->next_read - sock->next_cmd);
439     sock->next_read -= sock->next_cmd;
440     sock->next_cmd = 0;
441     *len = 0;
442     return NULL;
443   }
444   else
445   {
446     char *cmd = sock->rbuf + sock->next_cmd;
447     *eol = '\0';
449     sock->next_cmd = eol - sock->rbuf + 1;
451     if (eol > sock->rbuf && *(eol-1) == '\r')
452       *(--eol) = '\0'; /* handle "\r\n" EOL */
454     *len = eol - cmd;
456     return cmd;
457   }
459   /* NOTREACHED */
460   assert(1==0);
463 /* add the characters directly to the write buffer */
464 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
466   char *new_buf;
468   assert(sock != NULL);
470   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
471   if (new_buf == NULL)
472   {
473     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
474     return -1;
475   }
477   strncpy(new_buf + sock->wbuf_len, str, len + 1);
479   sock->wbuf = new_buf;
480   sock->wbuf_len += len;
482   return 0;
483 } /* }}} static int add_to_wbuf */
485 /* add the text to the "extra" info that's sent after the status line */
486 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
488   va_list argp;
489   char buffer[CMD_MAX];
490   int len;
492   if (sock == NULL) return 0; /* journal replay mode */
493   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
495   va_start(argp, fmt);
496 #ifdef HAVE_VSNPRINTF
497   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
498 #else
499   len = vsprintf(buffer, fmt, argp);
500 #endif
501   va_end(argp);
502   if (len < 0)
503   {
504     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
505     return -1;
506   }
508   return add_to_wbuf(sock, buffer, len);
509 } /* }}} static int add_response_info */
511 static int count_lines(char *str) /* {{{ */
513   int lines = 0;
515   if (str != NULL)
516   {
517     while ((str = strchr(str, '\n')) != NULL)
518     {
519       ++lines;
520       ++str;
521     }
522   }
524   return lines;
525 } /* }}} static int count_lines */
527 /* send the response back to the user.
528  * returns 0 on success, -1 on error
529  * write buffer is always zeroed after this call */
530 static int send_response (listen_socket_t *sock, response_code rc,
531                           char *fmt, ...) /* {{{ */
533   va_list argp;
534   char buffer[CMD_MAX];
535   int lines;
536   ssize_t wrote;
537   int rclen, len;
539   if (sock == NULL) return rc;  /* journal replay mode */
541   if (sock->batch_start)
542   {
543     if (rc == RESP_OK)
544       return rc; /* no response on success during BATCH */
545     lines = sock->batch_cmd;
546   }
547   else if (rc == RESP_OK)
548     lines = count_lines(sock->wbuf);
549   else
550     lines = -1;
552   rclen = sprintf(buffer, "%d ", lines);
553   va_start(argp, fmt);
554 #ifdef HAVE_VSNPRINTF
555   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
556 #else
557   len = vsprintf(buffer+rclen, fmt, argp);
558 #endif
559   va_end(argp);
560   if (len < 0)
561     return -1;
563   len += rclen;
565   /* append the result to the wbuf, don't write to the user */
566   if (sock->batch_start)
567     return add_to_wbuf(sock, buffer, len);
569   /* first write must be complete */
570   if (len != write(sock->fd, buffer, len))
571   {
572     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
573     return -1;
574   }
576   if (sock->wbuf != NULL && rc == RESP_OK)
577   {
578     wrote = 0;
579     while (wrote < sock->wbuf_len)
580     {
581       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
582       if (wb <= 0)
583       {
584         RRDD_LOG(LOG_INFO, "send_response: could not write results");
585         return -1;
586       }
587       wrote += wb;
588     }
589   }
591   free(sock->wbuf); sock->wbuf = NULL;
592   sock->wbuf_len = 0;
594   return 0;
595 } /* }}} */
597 static void wipe_ci_values(cache_item_t *ci, time_t when)
599   ci->values = NULL;
600   ci->values_num = 0;
602   ci->last_flush_time = when;
603   if (config_write_jitter > 0)
604     ci->last_flush_time += (rrd_random() % config_write_jitter);
607 /* remove_from_queue
608  * remove a "cache_item_t" item from the queue.
609  * must hold 'cache_lock' when calling this
610  */
611 static void remove_from_queue(cache_item_t *ci) /* {{{ */
613   if (ci == NULL) return;
614   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
616   if (ci->prev == NULL)
617     cache_queue_head = ci->next; /* reset head */
618   else
619     ci->prev->next = ci->next;
621   if (ci->next == NULL)
622     cache_queue_tail = ci->prev; /* reset the tail */
623   else
624     ci->next->prev = ci->prev;
626   ci->next = ci->prev = NULL;
627   ci->flags &= ~CI_FLAGS_IN_QUEUE;
629   pthread_mutex_lock (&stats_lock);
630   assert (stats_queue_length > 0);
631   stats_queue_length--;
632   pthread_mutex_unlock (&stats_lock);
634 } /* }}} static void remove_from_queue */
636 /* free the resources associated with the cache_item_t
637  * must hold cache_lock when calling this function
638  */
639 static void *free_cache_item(cache_item_t *ci) /* {{{ */
641   if (ci == NULL) return NULL;
643   remove_from_queue(ci);
645   for (int i=0; i < ci->values_num; i++)
646     free(ci->values[i]);
648   free (ci->values);
649   free (ci->file);
651   /* in case anyone is waiting */
652   pthread_cond_broadcast(&ci->flushed);
654   free (ci);
656   return NULL;
657 } /* }}} static void *free_cache_item */
659 /*
660  * enqueue_cache_item:
661  * `cache_lock' must be acquired before calling this function!
662  */
663 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
664     queue_side_t side)
666   if (ci == NULL)
667     return (-1);
669   if (ci->values_num == 0)
670     return (0);
672   if (side == HEAD)
673   {
674     if (cache_queue_head == ci)
675       return 0;
677     /* remove if further down in queue */
678     remove_from_queue(ci);
680     ci->prev = NULL;
681     ci->next = cache_queue_head;
682     if (ci->next != NULL)
683       ci->next->prev = ci;
684     cache_queue_head = ci;
686     if (cache_queue_tail == NULL)
687       cache_queue_tail = cache_queue_head;
688   }
689   else /* (side == TAIL) */
690   {
691     /* We don't move values back in the list.. */
692     if (ci->flags & CI_FLAGS_IN_QUEUE)
693       return (0);
695     assert (ci->next == NULL);
696     assert (ci->prev == NULL);
698     ci->prev = cache_queue_tail;
700     if (cache_queue_tail == NULL)
701       cache_queue_head = ci;
702     else
703       cache_queue_tail->next = ci;
705     cache_queue_tail = ci;
706   }
708   ci->flags |= CI_FLAGS_IN_QUEUE;
710   pthread_cond_signal(&queue_cond);
711   pthread_mutex_lock (&stats_lock);
712   stats_queue_length++;
713   pthread_mutex_unlock (&stats_lock);
715   return (0);
716 } /* }}} int enqueue_cache_item */
718 /*
719  * tree_callback_flush:
720  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
721  * while this is in progress.
722  */
723 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
724     gpointer data)
726   cache_item_t *ci;
727   callback_flush_data_t *cfd;
729   ci = (cache_item_t *) value;
730   cfd = (callback_flush_data_t *) data;
732   if (ci->flags & CI_FLAGS_IN_QUEUE)
733     return FALSE;
735   if ((ci->last_flush_time <= cfd->abs_timeout)
736       && (ci->values_num > 0))
737   {
738     enqueue_cache_item (ci, TAIL);
739   }
740   else if ((do_shutdown != 0)
741       && (ci->values_num > 0))
742   {
743     enqueue_cache_item (ci, TAIL);
744   }
745   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
746       && (ci->values_num <= 0))
747   {
748     char **temp;
750     temp = (char **) rrd_realloc (cfd->keys,
751         sizeof (char *) * (cfd->keys_num + 1));
752     if (temp == NULL)
753     {
754       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
755       return (FALSE);
756     }
757     cfd->keys = temp;
758     /* Make really sure this points to the _same_ place */
759     assert ((char *) key == ci->file);
760     cfd->keys[cfd->keys_num] = (char *) key;
761     cfd->keys_num++;
762   }
764   return (FALSE);
765 } /* }}} gboolean tree_callback_flush */
767 static int flush_old_values (int max_age)
769   callback_flush_data_t cfd;
770   size_t k;
772   memset (&cfd, 0, sizeof (cfd));
773   /* Pass the current time as user data so that we don't need to call
774    * `time' for each node. */
775   cfd.now = time (NULL);
776   cfd.keys = NULL;
777   cfd.keys_num = 0;
779   if (max_age > 0)
780     cfd.abs_timeout = cfd.now - max_age;
781   else
782     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
784   /* `tree_callback_flush' will return the keys of all values that haven't
785    * been touched in the last `config_flush_interval' seconds in `cfd'.
786    * The char*'s in this array point to the same memory as ci->file, so we
787    * don't need to free them separately. */
788   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
790   for (k = 0; k < cfd.keys_num; k++)
791   {
792     /* should never fail, since we have held the cache_lock
793      * the entire time */
794     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
795   }
797   if (cfd.keys != NULL)
798   {
799     free (cfd.keys);
800     cfd.keys = NULL;
801   }
803   return (0);
804 } /* int flush_old_values */
806 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
808   struct timeval now;
809   struct timespec next_flush;
810   int status;
812   gettimeofday (&now, NULL);
813   next_flush.tv_sec = now.tv_sec + config_flush_interval;
814   next_flush.tv_nsec = 1000 * now.tv_usec;
816   pthread_mutex_lock(&cache_lock);
818   while (!do_shutdown)
819   {
820     gettimeofday (&now, NULL);
821     if ((now.tv_sec > next_flush.tv_sec)
822         || ((now.tv_sec == next_flush.tv_sec)
823           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
824     {
825       /* Flush all values that haven't been written in the last
826        * `config_write_interval' seconds. */
827       flush_old_values (config_write_interval);
829       /* Determine the time of the next cache flush. */
830       next_flush.tv_sec =
831         now.tv_sec + next_flush.tv_sec % config_flush_interval;
833       /* unlock the cache while we rotate so we don't block incoming
834        * updates if the fsync() blocks on disk I/O */
835       pthread_mutex_unlock(&cache_lock);
836       journal_rotate();
837       pthread_mutex_lock(&cache_lock);
838     }
840     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
841     if (status != 0 && status != ETIMEDOUT)
842     {
843       RRDD_LOG (LOG_ERR, "flush_thread_main: "
844                 "pthread_cond_timedwait returned %i.", status);
845     }
846   }
848   if (config_flush_at_shutdown)
849     flush_old_values (-1); /* flush everything */
851   pthread_mutex_unlock(&cache_lock);
853   return NULL;
854 } /* void *flush_thread_main */
856 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
858   pthread_mutex_lock (&cache_lock);
860   while (!do_shutdown
861          || (cache_queue_head != NULL && config_flush_at_shutdown))
862   {
863     cache_item_t *ci;
864     char *file;
865     char **values;
866     int values_num;
867     int status;
868     int i;
870     /* Now, check if there's something to store away. If not, wait until
871      * something comes in.  if we are shutting down, do not wait around.  */
872     if (cache_queue_head == NULL && !do_shutdown)
873     {
874       status = pthread_cond_wait (&queue_cond, &cache_lock);
875       if ((status != 0) && (status != ETIMEDOUT))
876       {
877         RRDD_LOG (LOG_ERR, "queue_thread_main: "
878             "pthread_cond_wait returned %i.", status);
879       }
880     }
882     /* Check if a value has arrived. This may be NULL if we timed out or there
883      * was an interrupt such as a signal. */
884     if (cache_queue_head == NULL)
885       continue;
887     ci = cache_queue_head;
889     /* copy the relevant parts */
890     file = strdup (ci->file);
891     if (file == NULL)
892     {
893       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
894       continue;
895     }
897     assert(ci->values != NULL);
898     assert(ci->values_num > 0);
900     values = ci->values;
901     values_num = ci->values_num;
903     wipe_ci_values(ci, time(NULL));
904     remove_from_queue(ci);
906     pthread_mutex_unlock (&cache_lock);
908     rrd_clear_error ();
909     status = rrd_update_r (file, NULL, values_num, (void *) values);
910     if (status != 0)
911     {
912       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
913           "rrd_update_r (%s) failed with status %i. (%s)",
914           file, status, rrd_get_error());
915     }
917     journal_write("wrote", file);
918     pthread_cond_broadcast(&ci->flushed);
920     for (i = 0; i < values_num; i++)
921       free (values[i]);
923     free(values);
924     free(file);
926     if (status == 0)
927     {
928       pthread_mutex_lock (&stats_lock);
929       stats_updates_written++;
930       stats_data_sets_written += values_num;
931       pthread_mutex_unlock (&stats_lock);
932     }
934     pthread_mutex_lock (&cache_lock);
935   }
936   pthread_mutex_unlock (&cache_lock);
938   return (NULL);
939 } /* }}} void *queue_thread_main */
941 static int buffer_get_field (char **buffer_ret, /* {{{ */
942     size_t *buffer_size_ret, char **field_ret)
944   char *buffer;
945   size_t buffer_pos;
946   size_t buffer_size;
947   char *field;
948   size_t field_size;
949   int status;
951   buffer = *buffer_ret;
952   buffer_pos = 0;
953   buffer_size = *buffer_size_ret;
954   field = *buffer_ret;
955   field_size = 0;
957   if (buffer_size <= 0)
958     return (-1);
960   /* This is ensured by `handle_request'. */
961   assert (buffer[buffer_size - 1] == '\0');
963   status = -1;
964   while (buffer_pos < buffer_size)
965   {
966     /* Check for end-of-field or end-of-buffer */
967     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
968     {
969       field[field_size] = 0;
970       field_size++;
971       buffer_pos++;
972       status = 0;
973       break;
974     }
975     /* Handle escaped characters. */
976     else if (buffer[buffer_pos] == '\\')
977     {
978       if (buffer_pos >= (buffer_size - 1))
979         break;
980       buffer_pos++;
981       field[field_size] = buffer[buffer_pos];
982       field_size++;
983       buffer_pos++;
984     }
985     /* Normal operation */ 
986     else
987     {
988       field[field_size] = buffer[buffer_pos];
989       field_size++;
990       buffer_pos++;
991     }
992   } /* while (buffer_pos < buffer_size) */
994   if (status != 0)
995     return (status);
997   *buffer_ret = buffer + buffer_pos;
998   *buffer_size_ret = buffer_size - buffer_pos;
999   *field_ret = field;
1001   return (0);
1002 } /* }}} int buffer_get_field */
1004 /* if we're restricting writes to the base directory,
1005  * check whether the file falls within the dir
1006  * returns 1 if OK, otherwise 0
1007  */
1008 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1010   assert(file != NULL);
1012   if (!config_write_base_only
1013       || sock == NULL /* journal replay */
1014       || config_base_dir == NULL)
1015     return 1;
1017   if (strstr(file, "../") != NULL) goto err;
1019   /* relative paths without "../" are ok */
1020   if (*file != '/') return 1;
1022   /* file must be of the format base + "/" + <1+ char filename> */
1023   if (strlen(file) < _config_base_dir_len + 2) goto err;
1024   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1025   if (*(file + _config_base_dir_len) != '/') goto err;
1027   return 1;
1029 err:
1030   if (sock != NULL && sock->fd >= 0)
1031     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033   return 0;
1034 } /* }}} static int check_file_access */
1036 /* when using a base dir, convert relative paths to absolute paths.
1037  * if necessary, modifies the "filename" pointer to point
1038  * to the new path created in "tmp".  "tmp" is provided
1039  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1040  *
1041  * this allows us to optimize for the expected case (absolute path)
1042  * with a no-op.
1043  */
1044 static void get_abs_path(char **filename, char *tmp)
1046   assert(tmp != NULL);
1047   assert(filename != NULL && *filename != NULL);
1049   if (config_base_dir == NULL || **filename == '/')
1050     return;
1052   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1053   *filename = tmp;
1054 } /* }}} static int get_abs_path */
1056 /* returns 1 if we have the required privilege level,
1057  * otherwise issue an error to the user on sock */
1058 static int has_privilege (listen_socket_t *sock, /* {{{ */
1059                           socket_privilege priv)
1061   if (sock == NULL) /* journal replay */
1062     return 1;
1064   if (sock->privilege >= priv)
1065     return 1;
1067   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1068 } /* }}} static int has_privilege */
1070 static int flush_file (const char *filename) /* {{{ */
1072   cache_item_t *ci;
1074   pthread_mutex_lock (&cache_lock);
1076   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1077   if (ci == NULL)
1078   {
1079     pthread_mutex_unlock (&cache_lock);
1080     return (ENOENT);
1081   }
1083   if (ci->values_num > 0)
1084   {
1085     /* Enqueue at head */
1086     enqueue_cache_item (ci, HEAD);
1087     pthread_cond_wait(&ci->flushed, &cache_lock);
1088   }
1090   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1091    * may have been purged during our cond_wait() */
1093   pthread_mutex_unlock(&cache_lock);
1095   return (0);
1096 } /* }}} int flush_file */
1098 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1100   char *err = "Syntax error.\n";
1102   if (cmd && cmd->syntax)
1103     err = cmd->syntax;
1105   return send_response(sock, RESP_ERR, "Usage: %s", err);
1106 } /* }}} static int syntax_error() */
1108 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1110   uint64_t copy_queue_length;
1111   uint64_t copy_updates_received;
1112   uint64_t copy_flush_received;
1113   uint64_t copy_updates_written;
1114   uint64_t copy_data_sets_written;
1115   uint64_t copy_journal_bytes;
1116   uint64_t copy_journal_rotate;
1118   uint64_t tree_nodes_number;
1119   uint64_t tree_depth;
1121   pthread_mutex_lock (&stats_lock);
1122   copy_queue_length       = stats_queue_length;
1123   copy_updates_received   = stats_updates_received;
1124   copy_flush_received     = stats_flush_received;
1125   copy_updates_written    = stats_updates_written;
1126   copy_data_sets_written  = stats_data_sets_written;
1127   copy_journal_bytes      = stats_journal_bytes;
1128   copy_journal_rotate     = stats_journal_rotate;
1129   pthread_mutex_unlock (&stats_lock);
1131   pthread_mutex_lock (&cache_lock);
1132   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1133   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1134   pthread_mutex_unlock (&cache_lock);
1136   add_response_info(sock,
1137                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1138   add_response_info(sock,
1139                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1140   add_response_info(sock,
1141                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1142   add_response_info(sock,
1143                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1144   add_response_info(sock,
1145                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1146   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1147   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1148   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1149   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1151   send_response(sock, RESP_OK, "Statistics follow\n");
1153   return (0);
1154 } /* }}} int handle_request_stats */
1156 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1158   char *file, file_tmp[PATH_MAX];
1159   int status;
1161   status = buffer_get_field (&buffer, &buffer_size, &file);
1162   if (status != 0)
1163   {
1164     return syntax_error(sock,cmd);
1165   }
1166   else
1167   {
1168     pthread_mutex_lock(&stats_lock);
1169     stats_flush_received++;
1170     pthread_mutex_unlock(&stats_lock);
1172     get_abs_path(&file, file_tmp);
1173     if (!check_file_access(file, sock)) return 0;
1175     status = flush_file (file);
1176     if (status == 0)
1177       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1178     else if (status == ENOENT)
1179     {
1180       /* no file in our tree; see whether it exists at all */
1181       struct stat statbuf;
1183       memset(&statbuf, 0, sizeof(statbuf));
1184       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1185         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1186       else
1187         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1188     }
1189     else if (status < 0)
1190       return send_response(sock, RESP_ERR, "Internal error.\n");
1191     else
1192       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1193   }
1195   /* NOTREACHED */
1196   assert(1==0);
1197 } /* }}} int handle_request_flush */
1199 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1201   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1203   pthread_mutex_lock(&cache_lock);
1204   flush_old_values(-1);
1205   pthread_mutex_unlock(&cache_lock);
1207   return send_response(sock, RESP_OK, "Started flush.\n");
1208 } /* }}} static int handle_request_flushall */
1210 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1212   int status;
1213   char *file, file_tmp[PATH_MAX];
1214   cache_item_t *ci;
1216   status = buffer_get_field(&buffer, &buffer_size, &file);
1217   if (status != 0)
1218     return syntax_error(sock,cmd);
1220   get_abs_path(&file, file_tmp);
1222   pthread_mutex_lock(&cache_lock);
1223   ci = g_tree_lookup(cache_tree, file);
1224   if (ci == NULL)
1225   {
1226     pthread_mutex_unlock(&cache_lock);
1227     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1228   }
1230   for (int i=0; i < ci->values_num; i++)
1231     add_response_info(sock, "%s\n", ci->values[i]);
1233   pthread_mutex_unlock(&cache_lock);
1234   return send_response(sock, RESP_OK, "updates pending\n");
1235 } /* }}} static int handle_request_pending */
1237 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1239   int status;
1240   gboolean found;
1241   char *file, file_tmp[PATH_MAX];
1243   status = buffer_get_field(&buffer, &buffer_size, &file);
1244   if (status != 0)
1245     return syntax_error(sock,cmd);
1247   get_abs_path(&file, file_tmp);
1248   if (!check_file_access(file, sock)) return 0;
1250   pthread_mutex_lock(&cache_lock);
1251   found = g_tree_remove(cache_tree, file);
1252   pthread_mutex_unlock(&cache_lock);
1254   if (found == TRUE)
1255   {
1256     if (sock != NULL)
1257       journal_write("forget", file);
1259     return send_response(sock, RESP_OK, "Gone!\n");
1260   }
1261   else
1262     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1264   /* NOTREACHED */
1265   assert(1==0);
1266 } /* }}} static int handle_request_forget */
1268 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1270   cache_item_t *ci;
1272   pthread_mutex_lock(&cache_lock);
1274   ci = cache_queue_head;
1275   while (ci != NULL)
1276   {
1277     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1278     ci = ci->next;
1279   }
1281   pthread_mutex_unlock(&cache_lock);
1283   return send_response(sock, RESP_OK, "in queue.\n");
1284 } /* }}} int handle_request_queue */
1286 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1288   char *file, file_tmp[PATH_MAX];
1289   int values_num = 0;
1290   int status;
1291   char orig_buf[CMD_MAX];
1293   cache_item_t *ci;
1295   /* save it for the journal later */
1296   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1298   status = buffer_get_field (&buffer, &buffer_size, &file);
1299   if (status != 0)
1300     return syntax_error(sock,cmd);
1302   pthread_mutex_lock(&stats_lock);
1303   stats_updates_received++;
1304   pthread_mutex_unlock(&stats_lock);
1306   get_abs_path(&file, file_tmp);
1307   if (!check_file_access(file, sock)) return 0;
1309   pthread_mutex_lock (&cache_lock);
1310   ci = g_tree_lookup (cache_tree, file);
1312   if (ci == NULL) /* {{{ */
1313   {
1314     struct stat statbuf;
1316     /* don't hold the lock while we setup; stat(2) might block */
1317     pthread_mutex_unlock(&cache_lock);
1319     memset (&statbuf, 0, sizeof (statbuf));
1320     status = stat (file, &statbuf);
1321     if (status != 0)
1322     {
1323       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1325       status = errno;
1326       if (status == ENOENT)
1327         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1328       else
1329         return send_response(sock, RESP_ERR,
1330                              "stat failed with error %i.\n", status);
1331     }
1332     if (!S_ISREG (statbuf.st_mode))
1333       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1335     if (access(file, R_OK|W_OK) != 0)
1336       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1337                            file, rrd_strerror(errno));
1339     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1340     if (ci == NULL)
1341     {
1342       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1344       return send_response(sock, RESP_ERR, "malloc failed.\n");
1345     }
1346     memset (ci, 0, sizeof (cache_item_t));
1348     ci->file = strdup (file);
1349     if (ci->file == NULL)
1350     {
1351       free (ci);
1352       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1354       return send_response(sock, RESP_ERR, "strdup failed.\n");
1355     }
1357     wipe_ci_values(ci, now);
1358     ci->flags = CI_FLAGS_IN_TREE;
1359     pthread_cond_init(&ci->flushed, NULL);
1361     pthread_mutex_lock(&cache_lock);
1362     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1363   } /* }}} */
1364   assert (ci != NULL);
1366   /* don't re-write updates in replay mode */
1367   if (sock != NULL)
1368     journal_write("update", orig_buf);
1370   while (buffer_size > 0)
1371   {
1372     char **temp;
1373     char *value;
1374     time_t stamp;
1375     char *eostamp;
1377     status = buffer_get_field (&buffer, &buffer_size, &value);
1378     if (status != 0)
1379     {
1380       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1381       break;
1382     }
1384     /* make sure update time is always moving forward */
1385     stamp = strtol(value, &eostamp, 10);
1386     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1387     {
1388       pthread_mutex_unlock(&cache_lock);
1389       return send_response(sock, RESP_ERR,
1390                            "Cannot find timestamp in '%s'!\n", value);
1391     }
1392     else if (stamp <= ci->last_update_stamp)
1393     {
1394       pthread_mutex_unlock(&cache_lock);
1395       return send_response(sock, RESP_ERR,
1396                            "illegal attempt to update using time %ld when last"
1397                            " update time is %ld (minimum one second step)\n",
1398                            stamp, ci->last_update_stamp);
1399     }
1400     else
1401       ci->last_update_stamp = stamp;
1403     temp = (char **) rrd_realloc (ci->values,
1404         sizeof (char *) * (ci->values_num + 1));
1405     if (temp == NULL)
1406     {
1407       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1408       continue;
1409     }
1410     ci->values = temp;
1412     ci->values[ci->values_num] = strdup (value);
1413     if (ci->values[ci->values_num] == NULL)
1414     {
1415       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1416       continue;
1417     }
1418     ci->values_num++;
1420     values_num++;
1421   }
1423   if (((now - ci->last_flush_time) >= config_write_interval)
1424       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1425       && (ci->values_num > 0))
1426   {
1427     enqueue_cache_item (ci, TAIL);
1428   }
1430   pthread_mutex_unlock (&cache_lock);
1432   if (values_num < 1)
1433     return send_response(sock, RESP_ERR, "No values updated.\n");
1434   else
1435     return send_response(sock, RESP_OK,
1436                          "errors, enqueued %i value(s).\n", values_num);
1438   /* NOTREACHED */
1439   assert(1==0);
1441 } /* }}} int handle_request_update */
1443 /* we came across a "WROTE" entry during journal replay.
1444  * throw away any values that we have accumulated for this file
1445  */
1446 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1448   int i;
1449   cache_item_t *ci;
1450   const char *file = buffer;
1452   pthread_mutex_lock(&cache_lock);
1454   ci = g_tree_lookup(cache_tree, file);
1455   if (ci == NULL)
1456   {
1457     pthread_mutex_unlock(&cache_lock);
1458     return (0);
1459   }
1461   if (ci->values)
1462   {
1463     for (i=0; i < ci->values_num; i++)
1464       free(ci->values[i]);
1466     free(ci->values);
1467   }
1469   wipe_ci_values(ci, now);
1470   remove_from_queue(ci);
1472   pthread_mutex_unlock(&cache_lock);
1473   return (0);
1474 } /* }}} int handle_request_wrote */
1476 /* start "BATCH" processing */
1477 static int batch_start (HANDLER_PROTO) /* {{{ */
1479   int status;
1480   if (sock->batch_start)
1481     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1483   status = send_response(sock, RESP_OK,
1484                          "Go ahead.  End with dot '.' on its own line.\n");
1485   sock->batch_start = time(NULL);
1486   sock->batch_cmd = 0;
1488   return status;
1489 } /* }}} static int batch_start */
1491 /* finish "BATCH" processing and return results to the client */
1492 static int batch_done (HANDLER_PROTO) /* {{{ */
1494   assert(sock->batch_start);
1495   sock->batch_start = 0;
1496   sock->batch_cmd  = 0;
1497   return send_response(sock, RESP_OK, "errors\n");
1498 } /* }}} static int batch_done */
1500 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1502   return -1;
1503 } /* }}} static int handle_request_quit */
1505 struct command COMMANDS[] = {
1506   {
1507     "UPDATE",
1508     handle_request_update,
1509     PRIV_HIGH,
1510     CMD_CONTEXT_ANY,
1511     "UPDATE <filename> <values> [<values> ...]\n"
1512     ,
1513     "Adds the given file to the internal cache if it is not yet known and\n"
1514     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1515     "for details.\n"
1516     "\n"
1517     "Each <values> has the following form:\n"
1518     "  <values> = <time>:<value>[:<value>[...]]\n"
1519     "See the rrdupdate(1) manpage for details.\n"
1520   },
1521   {
1522     "WROTE",
1523     handle_request_wrote,
1524     PRIV_HIGH,
1525     CMD_CONTEXT_JOURNAL,
1526     NULL,
1527     NULL
1528   },
1529   {
1530     "FLUSH",
1531     handle_request_flush,
1532     PRIV_LOW,
1533     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1534     "FLUSH <filename>\n"
1535     ,
1536     "Adds the given filename to the head of the update queue and returns\n"
1537     "after it has been dequeued.\n"
1538   },
1539   {
1540     "FLUSHALL",
1541     handle_request_flushall,
1542     PRIV_HIGH,
1543     CMD_CONTEXT_CLIENT,
1544     "FLUSHALL\n"
1545     ,
1546     "Triggers writing of all pending updates.  Returns immediately.\n"
1547   },
1548   {
1549     "PENDING",
1550     handle_request_pending,
1551     PRIV_HIGH,
1552     CMD_CONTEXT_CLIENT,
1553     "PENDING <filename>\n"
1554     ,
1555     "Shows any 'pending' updates for a file, in order.\n"
1556     "The updates shown have not yet been written to the underlying RRD file.\n"
1557   },
1558   {
1559     "FORGET",
1560     handle_request_forget,
1561     PRIV_HIGH,
1562     CMD_CONTEXT_ANY,
1563     "FORGET <filename>\n"
1564     ,
1565     "Removes the file completely from the cache.\n"
1566     "Any pending updates for the file will be lost.\n"
1567   },
1568   {
1569     "QUEUE",
1570     handle_request_queue,
1571     PRIV_LOW,
1572     CMD_CONTEXT_CLIENT,
1573     "QUEUE\n"
1574     ,
1575         "Shows all files in the output queue.\n"
1576     "The output is zero or more lines in the following format:\n"
1577     "(where <num_vals> is the number of values to be written)\n"
1578     "\n"
1579     "<num_vals> <filename>\n"
1580   },
1581   {
1582     "STATS",
1583     handle_request_stats,
1584     PRIV_LOW,
1585     CMD_CONTEXT_CLIENT,
1586     "STATS\n"
1587     ,
1588     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1589     "a description of the values.\n"
1590   },
1591   {
1592     "HELP",
1593     handle_request_help,
1594     PRIV_LOW,
1595     CMD_CONTEXT_CLIENT,
1596     "HELP [<command>]\n",
1597     NULL, /* special! */
1598   },
1599   {
1600     "BATCH",
1601     batch_start,
1602     PRIV_LOW,
1603     CMD_CONTEXT_CLIENT,
1604     "BATCH\n"
1605     ,
1606     "The 'BATCH' command permits the client to initiate a bulk load\n"
1607     "   of commands to rrdcached.\n"
1608     "\n"
1609     "Usage:\n"
1610     "\n"
1611     "    client: BATCH\n"
1612     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1613     "    client: command #1\n"
1614     "    client: command #2\n"
1615     "    client: ... and so on\n"
1616     "    client: .\n"
1617     "    server: 2 errors\n"
1618     "    server: 7 message for command #7\n"
1619     "    server: 9 message for command #9\n"
1620     "\n"
1621     "For more information, consult the rrdcached(1) documentation.\n"
1622   },
1623   {
1624     ".",   /* BATCH terminator */
1625     batch_done,
1626     PRIV_LOW,
1627     CMD_CONTEXT_BATCH,
1628     NULL,
1629     NULL
1630   },
1631   {
1632     "QUIT",
1633     handle_request_quit,
1634     PRIV_LOW,
1635     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1636     "QUIT\n"
1637     ,
1638     "Disconnect from rrdcached.\n"
1639   },
1640   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1641 };
1643 static struct command *find_command(char *cmd)
1645   struct command *c = COMMANDS;
1647   while (c->cmd != NULL)
1648   {
1649     if (strcasecmp(cmd, c->cmd) == 0)
1650       break;
1651     c++;
1652   }
1654   if (c->cmd == NULL)
1655     return NULL;
1656   else
1657     return c;
1660 /* check whether commands are received in the expected context */
1661 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1663   if (sock == NULL)
1664     return (cmd->context & CMD_CONTEXT_JOURNAL);
1665   else if (sock->batch_start)
1666     return (cmd->context & CMD_CONTEXT_BATCH);
1667   else
1668     return (cmd->context & CMD_CONTEXT_CLIENT);
1670   /* NOTREACHED */
1671   assert(1==0);
1674 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1676   int status;
1677   char *cmd_str;
1678   char *resp_txt;
1679   struct command *help = NULL;
1681   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1682   if (status == 0)
1683     help = find_command(cmd_str);
1685   if (help && (help->syntax || help->help))
1686   {
1687     char tmp[CMD_MAX];
1689     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1690     resp_txt = tmp;
1692     if (help->syntax)
1693       add_response_info(sock, "Usage: %s\n", help->syntax);
1695     if (help->help)
1696       add_response_info(sock, "%s\n", help->help);
1697   }
1698   else
1699   {
1700     help = COMMANDS;
1701     resp_txt = "Command overview\n";
1703     while (help->cmd)
1704     {
1705       if (help->syntax)
1706         add_response_info(sock, "%s", help->syntax);
1707       help++;
1708     }
1709   }
1711   return send_response(sock, RESP_OK, resp_txt);
1712 } /* }}} int handle_request_help */
1714 /* if sock==NULL, we are in journal replay mode */
1715 static int handle_request (DISPATCH_PROTO) /* {{{ */
1717   char *buffer_ptr = buffer;
1718   char *cmd_str = NULL;
1719   struct command *cmd = NULL;
1720   int status;
1722   assert (buffer[buffer_size - 1] == '\0');
1724   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1725   if (status != 0)
1726   {
1727     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1728     return (-1);
1729   }
1731   if (sock != NULL && sock->batch_start)
1732     sock->batch_cmd++;
1734   cmd = find_command(cmd_str);
1735   if (!cmd)
1736     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1738   status = has_privilege(sock, cmd->min_priv);
1739   if (status <= 0)
1740     return status;
1742   if (!command_check_context(sock, cmd))
1743     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1745   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1746 } /* }}} int handle_request */
1748 /* MUST NOT hold journal_lock before calling this */
1749 static void journal_rotate(void) /* {{{ */
1751   FILE *old_fh = NULL;
1752   int new_fd;
1754   if (journal_cur == NULL || journal_old == NULL)
1755     return;
1757   pthread_mutex_lock(&journal_lock);
1759   /* we rotate this way (rename before close) so that the we can release
1760    * the journal lock as fast as possible.  Journal writes to the new
1761    * journal can proceed immediately after the new file is opened.  The
1762    * fclose can then block without affecting new updates.
1763    */
1764   if (journal_fh != NULL)
1765   {
1766     old_fh = journal_fh;
1767     journal_fh = NULL;
1768     rename(journal_cur, journal_old);
1769     ++stats_journal_rotate;
1770   }
1772   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1773                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1774   if (new_fd >= 0)
1775   {
1776     journal_fh = fdopen(new_fd, "a");
1777     if (journal_fh == NULL)
1778       close(new_fd);
1779   }
1781   pthread_mutex_unlock(&journal_lock);
1783   if (old_fh != NULL)
1784     fclose(old_fh);
1786   if (journal_fh == NULL)
1787   {
1788     RRDD_LOG(LOG_CRIT,
1789              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1790              journal_cur, rrd_strerror(errno));
1792     RRDD_LOG(LOG_ERR,
1793              "JOURNALING DISABLED: All values will be flushed at shutdown");
1794     config_flush_at_shutdown = 1;
1795   }
1797 } /* }}} static void journal_rotate */
1799 static void journal_done(void) /* {{{ */
1801   if (journal_cur == NULL)
1802     return;
1804   pthread_mutex_lock(&journal_lock);
1805   if (journal_fh != NULL)
1806   {
1807     fclose(journal_fh);
1808     journal_fh = NULL;
1809   }
1811   if (config_flush_at_shutdown)
1812   {
1813     RRDD_LOG(LOG_INFO, "removing journals");
1814     unlink(journal_old);
1815     unlink(journal_cur);
1816   }
1817   else
1818   {
1819     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1820              "journals will be used at next startup");
1821   }
1823   pthread_mutex_unlock(&journal_lock);
1825 } /* }}} static void journal_done */
1827 static int journal_write(char *cmd, char *args) /* {{{ */
1829   int chars;
1831   if (journal_fh == NULL)
1832     return 0;
1834   pthread_mutex_lock(&journal_lock);
1835   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1836   pthread_mutex_unlock(&journal_lock);
1838   if (chars > 0)
1839   {
1840     pthread_mutex_lock(&stats_lock);
1841     stats_journal_bytes += chars;
1842     pthread_mutex_unlock(&stats_lock);
1843   }
1845   return chars;
1846 } /* }}} static int journal_write */
1848 static int journal_replay (const char *file) /* {{{ */
1850   FILE *fh;
1851   int entry_cnt = 0;
1852   int fail_cnt = 0;
1853   uint64_t line = 0;
1854   char entry[CMD_MAX];
1855   time_t now;
1857   if (file == NULL) return 0;
1859   {
1860     char *reason = "unknown error";
1861     int status = 0;
1862     struct stat statbuf;
1864     memset(&statbuf, 0, sizeof(statbuf));
1865     if (stat(file, &statbuf) != 0)
1866     {
1867       if (errno == ENOENT)
1868         return 0;
1870       reason = "stat error";
1871       status = errno;
1872     }
1873     else if (!S_ISREG(statbuf.st_mode))
1874     {
1875       reason = "not a regular file";
1876       status = EPERM;
1877     }
1878     if (statbuf.st_uid != daemon_uid)
1879     {
1880       reason = "not owned by daemon user";
1881       status = EACCES;
1882     }
1883     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1884     {
1885       reason = "must not be user/group writable";
1886       status = EACCES;
1887     }
1889     if (status != 0)
1890     {
1891       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1892                file, rrd_strerror(status), reason);
1893       return 0;
1894     }
1895   }
1897   fh = fopen(file, "r");
1898   if (fh == NULL)
1899   {
1900     if (errno != ENOENT)
1901       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1902                file, rrd_strerror(errno));
1903     return 0;
1904   }
1905   else
1906     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1908   now = time(NULL);
1910   while(!feof(fh))
1911   {
1912     size_t entry_len;
1914     ++line;
1915     if (fgets(entry, sizeof(entry), fh) == NULL)
1916       break;
1917     entry_len = strlen(entry);
1919     /* check \n termination in case journal writing crashed mid-line */
1920     if (entry_len == 0)
1921       continue;
1922     else if (entry[entry_len - 1] != '\n')
1923     {
1924       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1925       ++fail_cnt;
1926       continue;
1927     }
1929     entry[entry_len - 1] = '\0';
1931     if (handle_request(NULL, now, entry, entry_len) == 0)
1932       ++entry_cnt;
1933     else
1934       ++fail_cnt;
1935   }
1937   fclose(fh);
1939   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1940            entry_cnt, fail_cnt);
1942   return entry_cnt > 0 ? 1 : 0;
1943 } /* }}} static int journal_replay */
1945 static void journal_init(void) /* {{{ */
1947   int had_journal = 0;
1949   if (journal_cur == NULL) return;
1951   pthread_mutex_lock(&journal_lock);
1953   RRDD_LOG(LOG_INFO, "checking for journal files");
1955   had_journal += journal_replay(journal_old);
1956   had_journal += journal_replay(journal_cur);
1958   /* it must have been a crash.  start a flush */
1959   if (had_journal && config_flush_at_shutdown)
1960     flush_old_values(-1);
1962   pthread_mutex_unlock(&journal_lock);
1963   journal_rotate();
1965   RRDD_LOG(LOG_INFO, "journal processing complete");
1967 } /* }}} static void journal_init */
1969 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1971   assert(sock != NULL);
1973   free(sock->rbuf);  sock->rbuf = NULL;
1974   free(sock->wbuf);  sock->wbuf = NULL;
1975   free(sock);
1976 } /* }}} void free_listen_socket */
1978 static void close_connection(listen_socket_t *sock) /* {{{ */
1980   if (sock->fd >= 0)
1981   {
1982     close(sock->fd);
1983     sock->fd = -1;
1984   }
1986   free_listen_socket(sock);
1988 } /* }}} void close_connection */
1990 static void *connection_thread_main (void *args) /* {{{ */
1992   listen_socket_t *sock;
1993   int fd;
1995   sock = (listen_socket_t *) args;
1996   fd = sock->fd;
1998   /* init read buffers */
1999   sock->next_read = sock->next_cmd = 0;
2000   sock->rbuf = malloc(RBUF_SIZE);
2001   if (sock->rbuf == NULL)
2002   {
2003     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2004     close_connection(sock);
2005     return NULL;
2006   }
2008   pthread_mutex_lock (&connection_threads_lock);
2009   connection_threads_num++;
2010   pthread_mutex_unlock (&connection_threads_lock);
2012   while (do_shutdown == 0)
2013   {
2014     char *cmd;
2015     ssize_t cmd_len;
2016     ssize_t rbytes;
2017     time_t now;
2019     struct pollfd pollfd;
2020     int status;
2022     pollfd.fd = fd;
2023     pollfd.events = POLLIN | POLLPRI;
2024     pollfd.revents = 0;
2026     status = poll (&pollfd, 1, /* timeout = */ 500);
2027     if (do_shutdown)
2028       break;
2029     else if (status == 0) /* timeout */
2030       continue;
2031     else if (status < 0) /* error */
2032     {
2033       status = errno;
2034       if (status != EINTR)
2035         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2036       continue;
2037     }
2039     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2040       break;
2041     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2042     {
2043       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2044           "poll(2) returned something unexpected: %#04hx",
2045           pollfd.revents);
2046       break;
2047     }
2049     rbytes = read(fd, sock->rbuf + sock->next_read,
2050                   RBUF_SIZE - sock->next_read);
2051     if (rbytes < 0)
2052     {
2053       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2054       break;
2055     }
2056     else if (rbytes == 0)
2057       break; /* eof */
2059     sock->next_read += rbytes;
2061     if (sock->batch_start)
2062       now = sock->batch_start;
2063     else
2064       now = time(NULL);
2066     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2067     {
2068       status = handle_request (sock, now, cmd, cmd_len+1);
2069       if (status != 0)
2070         goto out_close;
2071     }
2072   }
2074 out_close:
2075   close_connection(sock);
2077   /* Remove this thread from the connection threads list */
2078   pthread_mutex_lock (&connection_threads_lock);
2079   connection_threads_num--;
2080   if (connection_threads_num <= 0)
2081     pthread_cond_broadcast(&connection_threads_done);
2082   pthread_mutex_unlock (&connection_threads_lock);
2084   return (NULL);
2085 } /* }}} void *connection_thread_main */
2087 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2089   int fd;
2090   struct sockaddr_un sa;
2091   listen_socket_t *temp;
2092   int status;
2093   const char *path;
2095   path = sock->addr;
2096   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2097     path += strlen("unix:");
2099   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2100       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2101   if (temp == NULL)
2102   {
2103     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2104     return (-1);
2105   }
2106   listen_fds = temp;
2107   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2109   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2110   if (fd < 0)
2111   {
2112     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2113              rrd_strerror(errno));
2114     return (-1);
2115   }
2117   memset (&sa, 0, sizeof (sa));
2118   sa.sun_family = AF_UNIX;
2119   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2121   /* if we've gotten this far, we own the pid file.  any daemon started
2122    * with the same args must not be alive.  therefore, ensure that we can
2123    * create the socket...
2124    */
2125   unlink(path);
2127   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2128   if (status != 0)
2129   {
2130     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2131              path, rrd_strerror(errno));
2132     close (fd);
2133     return (-1);
2134   }
2136   status = listen (fd, /* backlog = */ 10);
2137   if (status != 0)
2138   {
2139     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2140              path, rrd_strerror(errno));
2141     close (fd);
2142     unlink (path);
2143     return (-1);
2144   }
2146   listen_fds[listen_fds_num].fd = fd;
2147   listen_fds[listen_fds_num].family = PF_UNIX;
2148   strncpy(listen_fds[listen_fds_num].addr, path,
2149           sizeof (listen_fds[listen_fds_num].addr) - 1);
2150   listen_fds_num++;
2152   return (0);
2153 } /* }}} int open_listen_socket_unix */
2155 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2157   struct addrinfo ai_hints;
2158   struct addrinfo *ai_res;
2159   struct addrinfo *ai_ptr;
2160   char addr_copy[NI_MAXHOST];
2161   char *addr;
2162   char *port;
2163   int status;
2165   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2166   addr_copy[sizeof (addr_copy) - 1] = 0;
2167   addr = addr_copy;
2169   memset (&ai_hints, 0, sizeof (ai_hints));
2170   ai_hints.ai_flags = 0;
2171 #ifdef AI_ADDRCONFIG
2172   ai_hints.ai_flags |= AI_ADDRCONFIG;
2173 #endif
2174   ai_hints.ai_family = AF_UNSPEC;
2175   ai_hints.ai_socktype = SOCK_STREAM;
2177   port = NULL;
2178   if (*addr == '[') /* IPv6+port format */
2179   {
2180     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2181     addr++;
2183     port = strchr (addr, ']');
2184     if (port == NULL)
2185     {
2186       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2187       return (-1);
2188     }
2189     *port = 0;
2190     port++;
2192     if (*port == ':')
2193       port++;
2194     else if (*port == 0)
2195       port = NULL;
2196     else
2197     {
2198       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2199       return (-1);
2200     }
2201   } /* if (*addr = ']') */
2202   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2203   {
2204     port = rindex(addr, ':');
2205     if (port != NULL)
2206     {
2207       *port = 0;
2208       port++;
2209     }
2210   }
2211   ai_res = NULL;
2212   status = getaddrinfo (addr,
2213                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2214                         &ai_hints, &ai_res);
2215   if (status != 0)
2216   {
2217     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2218              addr, gai_strerror (status));
2219     return (-1);
2220   }
2222   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2223   {
2224     int fd;
2225     listen_socket_t *temp;
2226     int one = 1;
2228     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2229         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2230     if (temp == NULL)
2231     {
2232       fprintf (stderr,
2233                "rrdcached: open_listen_socket_network: realloc failed.\n");
2234       continue;
2235     }
2236     listen_fds = temp;
2237     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2239     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2240     if (fd < 0)
2241     {
2242       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2243                rrd_strerror(errno));
2244       continue;
2245     }
2247     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2249     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2250     if (status != 0)
2251     {
2252       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2253                sock->addr, rrd_strerror(errno));
2254       close (fd);
2255       continue;
2256     }
2258     status = listen (fd, /* backlog = */ 10);
2259     if (status != 0)
2260     {
2261       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2262                sock->addr, rrd_strerror(errno));
2263       close (fd);
2264       freeaddrinfo(ai_res);
2265       return (-1);
2266     }
2268     listen_fds[listen_fds_num].fd = fd;
2269     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2270     listen_fds_num++;
2271   } /* for (ai_ptr) */
2273   freeaddrinfo(ai_res);
2274   return (0);
2275 } /* }}} static int open_listen_socket_network */
2277 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2279   assert(sock != NULL);
2280   assert(sock->addr != NULL);
2282   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2283       || sock->addr[0] == '/')
2284     return (open_listen_socket_unix(sock));
2285   else
2286     return (open_listen_socket_network(sock));
2287 } /* }}} int open_listen_socket */
2289 static int close_listen_sockets (void) /* {{{ */
2291   size_t i;
2293   for (i = 0; i < listen_fds_num; i++)
2294   {
2295     close (listen_fds[i].fd);
2297     if (listen_fds[i].family == PF_UNIX)
2298       unlink(listen_fds[i].addr);
2299   }
2301   free (listen_fds);
2302   listen_fds = NULL;
2303   listen_fds_num = 0;
2305   return (0);
2306 } /* }}} int close_listen_sockets */
2308 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2310   struct pollfd *pollfds;
2311   int pollfds_num;
2312   int status;
2313   int i;
2315   if (listen_fds_num < 1)
2316   {
2317     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2318     return (NULL);
2319   }
2321   pollfds_num = listen_fds_num;
2322   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2323   if (pollfds == NULL)
2324   {
2325     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2326     return (NULL);
2327   }
2328   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2330   RRDD_LOG(LOG_INFO, "listening for connections");
2332   while (do_shutdown == 0)
2333   {
2334     for (i = 0; i < pollfds_num; i++)
2335     {
2336       pollfds[i].fd = listen_fds[i].fd;
2337       pollfds[i].events = POLLIN | POLLPRI;
2338       pollfds[i].revents = 0;
2339     }
2341     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2342     if (do_shutdown)
2343       break;
2344     else if (status == 0) /* timeout */
2345       continue;
2346     else if (status < 0) /* error */
2347     {
2348       status = errno;
2349       if (status != EINTR)
2350       {
2351         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2352       }
2353       continue;
2354     }
2356     for (i = 0; i < pollfds_num; i++)
2357     {
2358       listen_socket_t *client_sock;
2359       struct sockaddr_storage client_sa;
2360       socklen_t client_sa_size;
2361       pthread_t tid;
2362       pthread_attr_t attr;
2364       if (pollfds[i].revents == 0)
2365         continue;
2367       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2368       {
2369         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2370             "poll(2) returned something unexpected for listen FD #%i.",
2371             pollfds[i].fd);
2372         continue;
2373       }
2375       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2376       if (client_sock == NULL)
2377       {
2378         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2379         continue;
2380       }
2381       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2383       client_sa_size = sizeof (client_sa);
2384       client_sock->fd = accept (pollfds[i].fd,
2385           (struct sockaddr *) &client_sa, &client_sa_size);
2386       if (client_sock->fd < 0)
2387       {
2388         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2389         free(client_sock);
2390         continue;
2391       }
2393       pthread_attr_init (&attr);
2394       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2396       status = pthread_create (&tid, &attr, connection_thread_main,
2397                                client_sock);
2398       if (status != 0)
2399       {
2400         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2401         close_connection(client_sock);
2402         continue;
2403       }
2404     } /* for (pollfds_num) */
2405   } /* while (do_shutdown == 0) */
2407   RRDD_LOG(LOG_INFO, "starting shutdown");
2409   close_listen_sockets ();
2411   pthread_mutex_lock (&connection_threads_lock);
2412   while (connection_threads_num > 0)
2413     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2414   pthread_mutex_unlock (&connection_threads_lock);
2416   free(pollfds);
2418   return (NULL);
2419 } /* }}} void *listen_thread_main */
2421 static int daemonize (void) /* {{{ */
2423   int pid_fd;
2424   char *base_dir;
2426   daemon_uid = geteuid();
2428   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2429   if (pid_fd < 0)
2430     pid_fd = check_pidfile();
2431   if (pid_fd < 0)
2432     return pid_fd;
2434   /* open all the listen sockets */
2435   if (config_listen_address_list_len > 0)
2436   {
2437     for (int i = 0; i < config_listen_address_list_len; i++)
2438     {
2439       open_listen_socket (config_listen_address_list[i]);
2440       free_listen_socket (config_listen_address_list[i]);
2441     }
2443     free(config_listen_address_list);
2444   }
2445   else
2446   {
2447     listen_socket_t sock;
2448     memset(&sock, 0, sizeof(sock));
2449     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2450     open_listen_socket (&sock);
2451   }
2453   if (listen_fds_num < 1)
2454   {
2455     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2456     goto error;
2457   }
2459   if (!stay_foreground)
2460   {
2461     pid_t child;
2463     child = fork ();
2464     if (child < 0)
2465     {
2466       fprintf (stderr, "daemonize: fork(2) failed.\n");
2467       goto error;
2468     }
2469     else if (child > 0)
2470       exit(0);
2472     /* Become session leader */
2473     setsid ();
2475     /* Open the first three file descriptors to /dev/null */
2476     close (2);
2477     close (1);
2478     close (0);
2480     open ("/dev/null", O_RDWR);
2481     dup (0);
2482     dup (0);
2483   } /* if (!stay_foreground) */
2485   /* Change into the /tmp directory. */
2486   base_dir = (config_base_dir != NULL)
2487     ? config_base_dir
2488     : "/tmp";
2490   if (chdir (base_dir) != 0)
2491   {
2492     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2493     goto error;
2494   }
2496   install_signal_handlers();
2498   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2499   RRDD_LOG(LOG_INFO, "starting up");
2501   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2502                                 (GDestroyNotify) free_cache_item);
2503   if (cache_tree == NULL)
2504   {
2505     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2506     goto error;
2507   }
2509   return write_pidfile (pid_fd);
2511 error:
2512   remove_pidfile();
2513   return -1;
2514 } /* }}} int daemonize */
2516 static int cleanup (void) /* {{{ */
2518   do_shutdown++;
2520   pthread_cond_broadcast (&flush_cond);
2521   pthread_join (flush_thread, NULL);
2523   pthread_cond_broadcast (&queue_cond);
2524   for (int i = 0; i < config_queue_threads; i++)
2525     pthread_join (queue_threads[i], NULL);
2527   if (config_flush_at_shutdown)
2528   {
2529     assert(cache_queue_head == NULL);
2530     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2531   }
2533   journal_done();
2534   remove_pidfile ();
2536   free(queue_threads);
2537   free(config_base_dir);
2538   free(config_pid_file);
2539   free(journal_cur);
2540   free(journal_old);
2542   pthread_mutex_lock(&cache_lock);
2543   g_tree_destroy(cache_tree);
2545   RRDD_LOG(LOG_INFO, "goodbye");
2546   closelog ();
2548   return (0);
2549 } /* }}} int cleanup */
2551 static int read_options (int argc, char **argv) /* {{{ */
2553   int option;
2554   int status = 0;
2556   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2557   {
2558     switch (option)
2559     {
2560       case 'g':
2561         stay_foreground=1;
2562         break;
2564       case 'L':
2565       case 'l':
2566       {
2567         listen_socket_t **temp;
2568         listen_socket_t *new;
2570         new = malloc(sizeof(listen_socket_t));
2571         if (new == NULL)
2572         {
2573           fprintf(stderr, "read_options: malloc failed.\n");
2574           return(2);
2575         }
2576         memset(new, 0, sizeof(listen_socket_t));
2578         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2579             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2580         if (temp == NULL)
2581         {
2582           fprintf (stderr, "read_options: realloc failed.\n");
2583           return (2);
2584         }
2585         config_listen_address_list = temp;
2587         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2588         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2590         temp[config_listen_address_list_len] = new;
2591         config_listen_address_list_len++;
2592       }
2593       break;
2595       case 'f':
2596       {
2597         int temp;
2599         temp = atoi (optarg);
2600         if (temp > 0)
2601           config_flush_interval = temp;
2602         else
2603         {
2604           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2605           status = 3;
2606         }
2607       }
2608       break;
2610       case 'w':
2611       {
2612         int temp;
2614         temp = atoi (optarg);
2615         if (temp > 0)
2616           config_write_interval = temp;
2617         else
2618         {
2619           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2620           status = 2;
2621         }
2622       }
2623       break;
2625       case 'z':
2626       {
2627         int temp;
2629         temp = atoi(optarg);
2630         if (temp > 0)
2631           config_write_jitter = temp;
2632         else
2633         {
2634           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2635           status = 2;
2636         }
2638         break;
2639       }
2641       case 't':
2642       {
2643         int threads;
2644         threads = atoi(optarg);
2645         if (threads >= 1)
2646           config_queue_threads = threads;
2647         else
2648         {
2649           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2650           return 1;
2651         }
2652       }
2653       break;
2655       case 'B':
2656         config_write_base_only = 1;
2657         break;
2659       case 'b':
2660       {
2661         size_t len;
2662         char base_realpath[PATH_MAX];
2664         if (config_base_dir != NULL)
2665           free (config_base_dir);
2666         config_base_dir = strdup (optarg);
2667         if (config_base_dir == NULL)
2668         {
2669           fprintf (stderr, "read_options: strdup failed.\n");
2670           return (3);
2671         }
2673         /* make sure that the base directory is not resolved via
2674          * symbolic links.  this makes some performance-enhancing
2675          * assumptions possible (we don't have to resolve paths
2676          * that start with a "/")
2677          */
2678         if (realpath(config_base_dir, base_realpath) == NULL)
2679         {
2680           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2681           return 5;
2682         }
2683         else if (strncmp(config_base_dir,
2684                          base_realpath, sizeof(base_realpath)) != 0)
2685         {
2686           fprintf(stderr,
2687                   "Base directory (-b) resolved via file system links!\n"
2688                   "Please consult rrdcached '-b' documentation!\n"
2689                   "Consider specifying the real directory (%s)\n",
2690                   base_realpath);
2691           return 5;
2692         }
2694         len = strlen (config_base_dir);
2695         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2696         {
2697           config_base_dir[len - 1] = 0;
2698           len--;
2699         }
2701         if (len < 1)
2702         {
2703           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2704           return (4);
2705         }
2707         _config_base_dir_len = len;
2708       }
2709       break;
2711       case 'p':
2712       {
2713         if (config_pid_file != NULL)
2714           free (config_pid_file);
2715         config_pid_file = strdup (optarg);
2716         if (config_pid_file == NULL)
2717         {
2718           fprintf (stderr, "read_options: strdup failed.\n");
2719           return (3);
2720         }
2721       }
2722       break;
2724       case 'F':
2725         config_flush_at_shutdown = 1;
2726         break;
2728       case 'j':
2729       {
2730         struct stat statbuf;
2731         const char *dir = optarg;
2733         status = stat(dir, &statbuf);
2734         if (status != 0)
2735         {
2736           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2737           return 6;
2738         }
2740         if (!S_ISDIR(statbuf.st_mode)
2741             || access(dir, R_OK|W_OK|X_OK) != 0)
2742         {
2743           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2744                   errno ? rrd_strerror(errno) : "");
2745           return 6;
2746         }
2748         journal_cur = malloc(PATH_MAX + 1);
2749         journal_old = malloc(PATH_MAX + 1);
2750         if (journal_cur == NULL || journal_old == NULL)
2751         {
2752           fprintf(stderr, "malloc failure for journal files\n");
2753           return 6;
2754         }
2755         else 
2756         {
2757           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2758           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2759         }
2760       }
2761       break;
2763       case 'h':
2764       case '?':
2765         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2766             "\n"
2767             "Usage: rrdcached [options]\n"
2768             "\n"
2769             "Valid options are:\n"
2770             "  -l <address>  Socket address to listen to.\n"
2771             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2772             "  -w <seconds>  Interval in which to write data.\n"
2773             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2774             "  -t <threads>  Number of write threads.\n"
2775             "  -f <seconds>  Interval in which to flush dead data.\n"
2776             "  -p <file>     Location of the PID-file.\n"
2777             "  -b <dir>      Base directory to change to.\n"
2778             "  -B            Restrict file access to paths within -b <dir>\n"
2779             "  -g            Do not fork and run in the foreground.\n"
2780             "  -j <dir>      Directory in which to create the journal files.\n"
2781             "  -F            Always flush all updates at shutdown\n"
2782             "\n"
2783             "For more information and a detailed description of all options "
2784             "please refer\n"
2785             "to the rrdcached(1) manual page.\n",
2786             VERSION);
2787         status = -1;
2788         break;
2789     } /* switch (option) */
2790   } /* while (getopt) */
2792   /* advise the user when values are not sane */
2793   if (config_flush_interval < 2 * config_write_interval)
2794     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2795             " 2x write interval (-w) !\n");
2796   if (config_write_jitter > config_write_interval)
2797     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2798             " write interval (-w) !\n");
2800   if (config_write_base_only && config_base_dir == NULL)
2801     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2802             "  Consult the rrdcached documentation\n");
2804   if (journal_cur == NULL)
2805     config_flush_at_shutdown = 1;
2807   return (status);
2808 } /* }}} int read_options */
2810 int main (int argc, char **argv)
2812   int status;
2814   status = read_options (argc, argv);
2815   if (status != 0)
2816   {
2817     if (status < 0)
2818       status = 0;
2819     return (status);
2820   }
2822   status = daemonize ();
2823   if (status != 0)
2824   {
2825     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2826     return (1);
2827   }
2829   journal_init();
2831   /* start the queue threads */
2832   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2833   if (queue_threads == NULL)
2834   {
2835     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2836     cleanup();
2837     return (1);
2838   }
2839   for (int i = 0; i < config_queue_threads; i++)
2840   {
2841     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2842     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2843     if (status != 0)
2844     {
2845       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2846       cleanup();
2847       return (1);
2848     }
2849   }
2851   /* start the flush thread */
2852   memset(&flush_thread, 0, sizeof(flush_thread));
2853   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2854   if (status != 0)
2855   {
2856     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2857     cleanup();
2858     return (1);
2859   }
2861   listen_thread_main (NULL);
2862   cleanup ();
2864   return (0);
2865 } /* int main */
2867 /*
2868  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2869  */