Code

This patch includes utility functions to support dynamically sized arrays.
[rrdtool-all.git] / program / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
74 #include "rrd.h"
75 #include "rrd_client.h"
77 #include <stdlib.h>
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
86 #else
88 #endif
89 #include <stdio.h>
90 #include <string.h>
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
106 #include <glib-2.0/glib.h>
107 /* }}} */
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
115 /*
116  * Types
117  */
118 typedef enum
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126 struct listen_socket_s
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
168   char *syntax;
169   char *help;
170 };
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
189 struct callback_flush_data_s
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
198 enum queue_side_e
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
218 static int do_shutdown = 0;
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
231 /* Cache stuff */
232 static GTree          *cache_tree = NULL;
233 static cache_item_t   *cache_queue_head = NULL;
234 static cache_item_t   *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
237 static int config_write_interval = 300;
238 static int config_write_jitter   = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
246 static listen_socket_t **config_listen_address_list = NULL;
247 static size_t config_listen_address_list_len = 0;
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
270 /* 
271  * Functions
272  */
273 static void sig_common (const char *sig) /* {{{ */
275   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276   do_shutdown++;
277   pthread_cond_broadcast(&flush_cond);
278   pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
283   sig_common("INT");
284 } /* }}} void sig_int_handler */
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
288   sig_common("TERM");
289 } /* }}} void sig_term_handler */
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
293   config_flush_at_shutdown = 1;
294   sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
299   config_flush_at_shutdown = 0;
300   sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
303 static void install_signal_handlers(void) /* {{{ */
305   /* These structures are static, because `sigaction' behaves weird if the are
306    * overwritten.. */
307   static struct sigaction sa_int;
308   static struct sigaction sa_term;
309   static struct sigaction sa_pipe;
310   static struct sigaction sa_usr1;
311   static struct sigaction sa_usr2;
313   /* Install signal handlers */
314   memset (&sa_int, 0, sizeof (sa_int));
315   sa_int.sa_handler = sig_int_handler;
316   sigaction (SIGINT, &sa_int, NULL);
318   memset (&sa_term, 0, sizeof (sa_term));
319   sa_term.sa_handler = sig_term_handler;
320   sigaction (SIGTERM, &sa_term, NULL);
322   memset (&sa_pipe, 0, sizeof (sa_pipe));
323   sa_pipe.sa_handler = SIG_IGN;
324   sigaction (SIGPIPE, &sa_pipe, NULL);
326   memset (&sa_pipe, 0, sizeof (sa_usr1));
327   sa_usr1.sa_handler = sig_usr1_handler;
328   sigaction (SIGUSR1, &sa_usr1, NULL);
330   memset (&sa_usr2, 0, sizeof (sa_usr2));
331   sa_usr2.sa_handler = sig_usr2_handler;
332   sigaction (SIGUSR2, &sa_usr2, NULL);
334 } /* }}} void install_signal_handlers */
336 static int open_pidfile(char *action, int oflag) /* {{{ */
338   int fd;
339   char *file;
341   file = (config_pid_file != NULL)
342     ? config_pid_file
343     : LOCALSTATEDIR "/run/rrdcached.pid";
345   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346   if (fd < 0)
347     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348             action, file, rrd_strerror(errno));
350   return(fd);
351 } /* }}} static int open_pidfile */
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
356   int pid_fd;
357   pid_t pid;
358   char pid_str[16];
360   pid_fd = open_pidfile("open", O_RDWR);
361   if (pid_fd < 0)
362     return pid_fd;
364   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365     return -1;
367   pid = atoi(pid_str);
368   if (pid <= 0)
369     return -1;
371   /* another running process that we can signal COULD be
372    * a competing rrdcached */
373   if (pid != getpid() && kill(pid, 0) == 0)
374   {
375     fprintf(stderr,
376             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377     close(pid_fd);
378     return -1;
379   }
381   lseek(pid_fd, 0, SEEK_SET);
382   ftruncate(pid_fd, 0);
384   fprintf(stderr,
385           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
386           "rrdcached: starting normally.\n", pid);
388   return pid_fd;
389 } /* }}} static int check_pidfile */
391 static int write_pidfile (int fd) /* {{{ */
393   pid_t pid;
394   FILE *fh;
396   pid = getpid ();
398   fh = fdopen (fd, "w");
399   if (fh == NULL)
400   {
401     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
402     close(fd);
403     return (-1);
404   }
406   fprintf (fh, "%i\n", (int) pid);
407   fclose (fh);
409   return (0);
410 } /* }}} int write_pidfile */
412 static int remove_pidfile (void) /* {{{ */
414   char *file;
415   int status;
417   file = (config_pid_file != NULL)
418     ? config_pid_file
419     : LOCALSTATEDIR "/run/rrdcached.pid";
421   status = unlink (file);
422   if (status == 0)
423     return (0);
424   return (errno);
425 } /* }}} int remove_pidfile */
427 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
429   char *eol;
431   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
432                sock->next_read - sock->next_cmd);
434   if (eol == NULL)
435   {
436     /* no commands left, move remainder back to front of rbuf */
437     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
438             sock->next_read - sock->next_cmd);
439     sock->next_read -= sock->next_cmd;
440     sock->next_cmd = 0;
441     *len = 0;
442     return NULL;
443   }
444   else
445   {
446     char *cmd = sock->rbuf + sock->next_cmd;
447     *eol = '\0';
449     sock->next_cmd = eol - sock->rbuf + 1;
451     if (eol > sock->rbuf && *(eol-1) == '\r')
452       *(--eol) = '\0'; /* handle "\r\n" EOL */
454     *len = eol - cmd;
456     return cmd;
457   }
459   /* NOTREACHED */
460   assert(1==0);
463 /* add the characters directly to the write buffer */
464 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
466   char *new_buf;
468   assert(sock != NULL);
470   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
471   if (new_buf == NULL)
472   {
473     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
474     return -1;
475   }
477   strncpy(new_buf + sock->wbuf_len, str, len + 1);
479   sock->wbuf = new_buf;
480   sock->wbuf_len += len;
482   return 0;
483 } /* }}} static int add_to_wbuf */
485 /* add the text to the "extra" info that's sent after the status line */
486 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
488   va_list argp;
489   char buffer[CMD_MAX];
490   int len;
492   if (sock == NULL) return 0; /* journal replay mode */
493   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
495   va_start(argp, fmt);
496 #ifdef HAVE_VSNPRINTF
497   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
498 #else
499   len = vsprintf(buffer, fmt, argp);
500 #endif
501   va_end(argp);
502   if (len < 0)
503   {
504     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
505     return -1;
506   }
508   return add_to_wbuf(sock, buffer, len);
509 } /* }}} static int add_response_info */
511 static int count_lines(char *str) /* {{{ */
513   int lines = 0;
515   if (str != NULL)
516   {
517     while ((str = strchr(str, '\n')) != NULL)
518     {
519       ++lines;
520       ++str;
521     }
522   }
524   return lines;
525 } /* }}} static int count_lines */
527 /* send the response back to the user.
528  * returns 0 on success, -1 on error
529  * write buffer is always zeroed after this call */
530 static int send_response (listen_socket_t *sock, response_code rc,
531                           char *fmt, ...) /* {{{ */
533   va_list argp;
534   char buffer[CMD_MAX];
535   int lines;
536   ssize_t wrote;
537   int rclen, len;
539   if (sock == NULL) return rc;  /* journal replay mode */
541   if (sock->batch_start)
542   {
543     if (rc == RESP_OK)
544       return rc; /* no response on success during BATCH */
545     lines = sock->batch_cmd;
546   }
547   else if (rc == RESP_OK)
548     lines = count_lines(sock->wbuf);
549   else
550     lines = -1;
552   rclen = sprintf(buffer, "%d ", lines);
553   va_start(argp, fmt);
554 #ifdef HAVE_VSNPRINTF
555   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
556 #else
557   len = vsprintf(buffer+rclen, fmt, argp);
558 #endif
559   va_end(argp);
560   if (len < 0)
561     return -1;
563   len += rclen;
565   /* append the result to the wbuf, don't write to the user */
566   if (sock->batch_start)
567     return add_to_wbuf(sock, buffer, len);
569   /* first write must be complete */
570   if (len != write(sock->fd, buffer, len))
571   {
572     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
573     return -1;
574   }
576   if (sock->wbuf != NULL && rc == RESP_OK)
577   {
578     wrote = 0;
579     while (wrote < sock->wbuf_len)
580     {
581       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
582       if (wb <= 0)
583       {
584         RRDD_LOG(LOG_INFO, "send_response: could not write results");
585         return -1;
586       }
587       wrote += wb;
588     }
589   }
591   free(sock->wbuf); sock->wbuf = NULL;
592   sock->wbuf_len = 0;
594   return 0;
595 } /* }}} */
597 static void wipe_ci_values(cache_item_t *ci, time_t when)
599   ci->values = NULL;
600   ci->values_num = 0;
602   ci->last_flush_time = when;
603   if (config_write_jitter > 0)
604     ci->last_flush_time += (rrd_random() % config_write_jitter);
607 /* remove_from_queue
608  * remove a "cache_item_t" item from the queue.
609  * must hold 'cache_lock' when calling this
610  */
611 static void remove_from_queue(cache_item_t *ci) /* {{{ */
613   if (ci == NULL) return;
614   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
616   if (ci->prev == NULL)
617     cache_queue_head = ci->next; /* reset head */
618   else
619     ci->prev->next = ci->next;
621   if (ci->next == NULL)
622     cache_queue_tail = ci->prev; /* reset the tail */
623   else
624     ci->next->prev = ci->prev;
626   ci->next = ci->prev = NULL;
627   ci->flags &= ~CI_FLAGS_IN_QUEUE;
629   pthread_mutex_lock (&stats_lock);
630   assert (stats_queue_length > 0);
631   stats_queue_length--;
632   pthread_mutex_unlock (&stats_lock);
634 } /* }}} static void remove_from_queue */
636 /* free the resources associated with the cache_item_t
637  * must hold cache_lock when calling this function
638  */
639 static void *free_cache_item(cache_item_t *ci) /* {{{ */
641   if (ci == NULL) return NULL;
643   remove_from_queue(ci);
645   for (size_t i=0; i < ci->values_num; i++)
646     free(ci->values[i]);
648   free (ci->values);
649   free (ci->file);
651   /* in case anyone is waiting */
652   pthread_cond_broadcast(&ci->flushed);
654   free (ci);
656   return NULL;
657 } /* }}} static void *free_cache_item */
659 /*
660  * enqueue_cache_item:
661  * `cache_lock' must be acquired before calling this function!
662  */
663 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
664     queue_side_t side)
666   if (ci == NULL)
667     return (-1);
669   if (ci->values_num == 0)
670     return (0);
672   if (side == HEAD)
673   {
674     if (cache_queue_head == ci)
675       return 0;
677     /* remove if further down in queue */
678     remove_from_queue(ci);
680     ci->prev = NULL;
681     ci->next = cache_queue_head;
682     if (ci->next != NULL)
683       ci->next->prev = ci;
684     cache_queue_head = ci;
686     if (cache_queue_tail == NULL)
687       cache_queue_tail = cache_queue_head;
688   }
689   else /* (side == TAIL) */
690   {
691     /* We don't move values back in the list.. */
692     if (ci->flags & CI_FLAGS_IN_QUEUE)
693       return (0);
695     assert (ci->next == NULL);
696     assert (ci->prev == NULL);
698     ci->prev = cache_queue_tail;
700     if (cache_queue_tail == NULL)
701       cache_queue_head = ci;
702     else
703       cache_queue_tail->next = ci;
705     cache_queue_tail = ci;
706   }
708   ci->flags |= CI_FLAGS_IN_QUEUE;
710   pthread_cond_signal(&queue_cond);
711   pthread_mutex_lock (&stats_lock);
712   stats_queue_length++;
713   pthread_mutex_unlock (&stats_lock);
715   return (0);
716 } /* }}} int enqueue_cache_item */
718 /*
719  * tree_callback_flush:
720  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
721  * while this is in progress.
722  */
723 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
724     gpointer data)
726   cache_item_t *ci;
727   callback_flush_data_t *cfd;
729   ci = (cache_item_t *) value;
730   cfd = (callback_flush_data_t *) data;
732   if (ci->flags & CI_FLAGS_IN_QUEUE)
733     return FALSE;
735   if ((ci->last_flush_time <= cfd->abs_timeout)
736       && (ci->values_num > 0))
737   {
738     enqueue_cache_item (ci, TAIL);
739   }
740   else if ((do_shutdown != 0)
741       && (ci->values_num > 0))
742   {
743     enqueue_cache_item (ci, TAIL);
744   }
745   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
746       && (ci->values_num <= 0))
747   {
748     assert ((char *) key == ci->file);
749     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
750     {
751       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
752       return (FALSE);
753     }
754   }
756   return (FALSE);
757 } /* }}} gboolean tree_callback_flush */
759 static int flush_old_values (int max_age)
761   callback_flush_data_t cfd;
762   size_t k;
764   memset (&cfd, 0, sizeof (cfd));
765   /* Pass the current time as user data so that we don't need to call
766    * `time' for each node. */
767   cfd.now = time (NULL);
768   cfd.keys = NULL;
769   cfd.keys_num = 0;
771   if (max_age > 0)
772     cfd.abs_timeout = cfd.now - max_age;
773   else
774     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
776   /* `tree_callback_flush' will return the keys of all values that haven't
777    * been touched in the last `config_flush_interval' seconds in `cfd'.
778    * The char*'s in this array point to the same memory as ci->file, so we
779    * don't need to free them separately. */
780   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
782   for (k = 0; k < cfd.keys_num; k++)
783   {
784     /* should never fail, since we have held the cache_lock
785      * the entire time */
786     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
787   }
789   if (cfd.keys != NULL)
790   {
791     free (cfd.keys);
792     cfd.keys = NULL;
793   }
795   return (0);
796 } /* int flush_old_values */
798 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
800   struct timeval now;
801   struct timespec next_flush;
802   int status;
804   gettimeofday (&now, NULL);
805   next_flush.tv_sec = now.tv_sec + config_flush_interval;
806   next_flush.tv_nsec = 1000 * now.tv_usec;
808   pthread_mutex_lock(&cache_lock);
810   while (!do_shutdown)
811   {
812     gettimeofday (&now, NULL);
813     if ((now.tv_sec > next_flush.tv_sec)
814         || ((now.tv_sec == next_flush.tv_sec)
815           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
816     {
817       /* Flush all values that haven't been written in the last
818        * `config_write_interval' seconds. */
819       flush_old_values (config_write_interval);
821       /* Determine the time of the next cache flush. */
822       next_flush.tv_sec =
823         now.tv_sec + next_flush.tv_sec % config_flush_interval;
825       /* unlock the cache while we rotate so we don't block incoming
826        * updates if the fsync() blocks on disk I/O */
827       pthread_mutex_unlock(&cache_lock);
828       journal_rotate();
829       pthread_mutex_lock(&cache_lock);
830     }
832     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
833     if (status != 0 && status != ETIMEDOUT)
834     {
835       RRDD_LOG (LOG_ERR, "flush_thread_main: "
836                 "pthread_cond_timedwait returned %i.", status);
837     }
838   }
840   if (config_flush_at_shutdown)
841     flush_old_values (-1); /* flush everything */
843   pthread_mutex_unlock(&cache_lock);
845   return NULL;
846 } /* void *flush_thread_main */
848 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
850   pthread_mutex_lock (&cache_lock);
852   while (!do_shutdown
853          || (cache_queue_head != NULL && config_flush_at_shutdown))
854   {
855     cache_item_t *ci;
856     char *file;
857     char **values;
858     size_t values_num;
859     int status;
861     /* Now, check if there's something to store away. If not, wait until
862      * something comes in.  if we are shutting down, do not wait around.  */
863     if (cache_queue_head == NULL && !do_shutdown)
864     {
865       status = pthread_cond_wait (&queue_cond, &cache_lock);
866       if ((status != 0) && (status != ETIMEDOUT))
867       {
868         RRDD_LOG (LOG_ERR, "queue_thread_main: "
869             "pthread_cond_wait returned %i.", status);
870       }
871     }
873     /* Check if a value has arrived. This may be NULL if we timed out or there
874      * was an interrupt such as a signal. */
875     if (cache_queue_head == NULL)
876       continue;
878     ci = cache_queue_head;
880     /* copy the relevant parts */
881     file = strdup (ci->file);
882     if (file == NULL)
883     {
884       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
885       continue;
886     }
888     assert(ci->values != NULL);
889     assert(ci->values_num > 0);
891     values = ci->values;
892     values_num = ci->values_num;
894     wipe_ci_values(ci, time(NULL));
895     remove_from_queue(ci);
897     pthread_mutex_unlock (&cache_lock);
899     rrd_clear_error ();
900     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
901     if (status != 0)
902     {
903       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
904           "rrd_update_r (%s) failed with status %i. (%s)",
905           file, status, rrd_get_error());
906     }
908     journal_write("wrote", file);
909     pthread_cond_broadcast(&ci->flushed);
911     rrd_free_ptrs((void ***) &values, &values_num);
912     free(file);
914     if (status == 0)
915     {
916       pthread_mutex_lock (&stats_lock);
917       stats_updates_written++;
918       stats_data_sets_written += values_num;
919       pthread_mutex_unlock (&stats_lock);
920     }
922     pthread_mutex_lock (&cache_lock);
923   }
924   pthread_mutex_unlock (&cache_lock);
926   return (NULL);
927 } /* }}} void *queue_thread_main */
929 static int buffer_get_field (char **buffer_ret, /* {{{ */
930     size_t *buffer_size_ret, char **field_ret)
932   char *buffer;
933   size_t buffer_pos;
934   size_t buffer_size;
935   char *field;
936   size_t field_size;
937   int status;
939   buffer = *buffer_ret;
940   buffer_pos = 0;
941   buffer_size = *buffer_size_ret;
942   field = *buffer_ret;
943   field_size = 0;
945   if (buffer_size <= 0)
946     return (-1);
948   /* This is ensured by `handle_request'. */
949   assert (buffer[buffer_size - 1] == '\0');
951   status = -1;
952   while (buffer_pos < buffer_size)
953   {
954     /* Check for end-of-field or end-of-buffer */
955     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
956     {
957       field[field_size] = 0;
958       field_size++;
959       buffer_pos++;
960       status = 0;
961       break;
962     }
963     /* Handle escaped characters. */
964     else if (buffer[buffer_pos] == '\\')
965     {
966       if (buffer_pos >= (buffer_size - 1))
967         break;
968       buffer_pos++;
969       field[field_size] = buffer[buffer_pos];
970       field_size++;
971       buffer_pos++;
972     }
973     /* Normal operation */ 
974     else
975     {
976       field[field_size] = buffer[buffer_pos];
977       field_size++;
978       buffer_pos++;
979     }
980   } /* while (buffer_pos < buffer_size) */
982   if (status != 0)
983     return (status);
985   *buffer_ret = buffer + buffer_pos;
986   *buffer_size_ret = buffer_size - buffer_pos;
987   *field_ret = field;
989   return (0);
990 } /* }}} int buffer_get_field */
992 /* if we're restricting writes to the base directory,
993  * check whether the file falls within the dir
994  * returns 1 if OK, otherwise 0
995  */
996 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
998   assert(file != NULL);
1000   if (!config_write_base_only
1001       || sock == NULL /* journal replay */
1002       || config_base_dir == NULL)
1003     return 1;
1005   if (strstr(file, "../") != NULL) goto err;
1007   /* relative paths without "../" are ok */
1008   if (*file != '/') return 1;
1010   /* file must be of the format base + "/" + <1+ char filename> */
1011   if (strlen(file) < _config_base_dir_len + 2) goto err;
1012   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1013   if (*(file + _config_base_dir_len) != '/') goto err;
1015   return 1;
1017 err:
1018   if (sock != NULL && sock->fd >= 0)
1019     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1021   return 0;
1022 } /* }}} static int check_file_access */
1024 /* when using a base dir, convert relative paths to absolute paths.
1025  * if necessary, modifies the "filename" pointer to point
1026  * to the new path created in "tmp".  "tmp" is provided
1027  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1028  *
1029  * this allows us to optimize for the expected case (absolute path)
1030  * with a no-op.
1031  */
1032 static void get_abs_path(char **filename, char *tmp)
1034   assert(tmp != NULL);
1035   assert(filename != NULL && *filename != NULL);
1037   if (config_base_dir == NULL || **filename == '/')
1038     return;
1040   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1041   *filename = tmp;
1042 } /* }}} static int get_abs_path */
1044 /* returns 1 if we have the required privilege level,
1045  * otherwise issue an error to the user on sock */
1046 static int has_privilege (listen_socket_t *sock, /* {{{ */
1047                           socket_privilege priv)
1049   if (sock == NULL) /* journal replay */
1050     return 1;
1052   if (sock->privilege >= priv)
1053     return 1;
1055   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1056 } /* }}} static int has_privilege */
1058 static int flush_file (const char *filename) /* {{{ */
1060   cache_item_t *ci;
1062   pthread_mutex_lock (&cache_lock);
1064   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1065   if (ci == NULL)
1066   {
1067     pthread_mutex_unlock (&cache_lock);
1068     return (ENOENT);
1069   }
1071   if (ci->values_num > 0)
1072   {
1073     /* Enqueue at head */
1074     enqueue_cache_item (ci, HEAD);
1075     pthread_cond_wait(&ci->flushed, &cache_lock);
1076   }
1078   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1079    * may have been purged during our cond_wait() */
1081   pthread_mutex_unlock(&cache_lock);
1083   return (0);
1084 } /* }}} int flush_file */
1086 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1088   char *err = "Syntax error.\n";
1090   if (cmd && cmd->syntax)
1091     err = cmd->syntax;
1093   return send_response(sock, RESP_ERR, "Usage: %s", err);
1094 } /* }}} static int syntax_error() */
1096 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1098   uint64_t copy_queue_length;
1099   uint64_t copy_updates_received;
1100   uint64_t copy_flush_received;
1101   uint64_t copy_updates_written;
1102   uint64_t copy_data_sets_written;
1103   uint64_t copy_journal_bytes;
1104   uint64_t copy_journal_rotate;
1106   uint64_t tree_nodes_number;
1107   uint64_t tree_depth;
1109   pthread_mutex_lock (&stats_lock);
1110   copy_queue_length       = stats_queue_length;
1111   copy_updates_received   = stats_updates_received;
1112   copy_flush_received     = stats_flush_received;
1113   copy_updates_written    = stats_updates_written;
1114   copy_data_sets_written  = stats_data_sets_written;
1115   copy_journal_bytes      = stats_journal_bytes;
1116   copy_journal_rotate     = stats_journal_rotate;
1117   pthread_mutex_unlock (&stats_lock);
1119   pthread_mutex_lock (&cache_lock);
1120   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1121   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1122   pthread_mutex_unlock (&cache_lock);
1124   add_response_info(sock,
1125                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1126   add_response_info(sock,
1127                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1128   add_response_info(sock,
1129                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1130   add_response_info(sock,
1131                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1132   add_response_info(sock,
1133                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1134   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1135   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1136   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1137   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1139   send_response(sock, RESP_OK, "Statistics follow\n");
1141   return (0);
1142 } /* }}} int handle_request_stats */
1144 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1146   char *file, file_tmp[PATH_MAX];
1147   int status;
1149   status = buffer_get_field (&buffer, &buffer_size, &file);
1150   if (status != 0)
1151   {
1152     return syntax_error(sock,cmd);
1153   }
1154   else
1155   {
1156     pthread_mutex_lock(&stats_lock);
1157     stats_flush_received++;
1158     pthread_mutex_unlock(&stats_lock);
1160     get_abs_path(&file, file_tmp);
1161     if (!check_file_access(file, sock)) return 0;
1163     status = flush_file (file);
1164     if (status == 0)
1165       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1166     else if (status == ENOENT)
1167     {
1168       /* no file in our tree; see whether it exists at all */
1169       struct stat statbuf;
1171       memset(&statbuf, 0, sizeof(statbuf));
1172       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1173         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1174       else
1175         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1176     }
1177     else if (status < 0)
1178       return send_response(sock, RESP_ERR, "Internal error.\n");
1179     else
1180       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1181   }
1183   /* NOTREACHED */
1184   assert(1==0);
1185 } /* }}} int handle_request_flush */
1187 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1189   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1191   pthread_mutex_lock(&cache_lock);
1192   flush_old_values(-1);
1193   pthread_mutex_unlock(&cache_lock);
1195   return send_response(sock, RESP_OK, "Started flush.\n");
1196 } /* }}} static int handle_request_flushall */
1198 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1200   int status;
1201   char *file, file_tmp[PATH_MAX];
1202   cache_item_t *ci;
1204   status = buffer_get_field(&buffer, &buffer_size, &file);
1205   if (status != 0)
1206     return syntax_error(sock,cmd);
1208   get_abs_path(&file, file_tmp);
1210   pthread_mutex_lock(&cache_lock);
1211   ci = g_tree_lookup(cache_tree, file);
1212   if (ci == NULL)
1213   {
1214     pthread_mutex_unlock(&cache_lock);
1215     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1216   }
1218   for (size_t i=0; i < ci->values_num; i++)
1219     add_response_info(sock, "%s\n", ci->values[i]);
1221   pthread_mutex_unlock(&cache_lock);
1222   return send_response(sock, RESP_OK, "updates pending\n");
1223 } /* }}} static int handle_request_pending */
1225 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1227   int status;
1228   gboolean found;
1229   char *file, file_tmp[PATH_MAX];
1231   status = buffer_get_field(&buffer, &buffer_size, &file);
1232   if (status != 0)
1233     return syntax_error(sock,cmd);
1235   get_abs_path(&file, file_tmp);
1236   if (!check_file_access(file, sock)) return 0;
1238   pthread_mutex_lock(&cache_lock);
1239   found = g_tree_remove(cache_tree, file);
1240   pthread_mutex_unlock(&cache_lock);
1242   if (found == TRUE)
1243   {
1244     if (sock != NULL)
1245       journal_write("forget", file);
1247     return send_response(sock, RESP_OK, "Gone!\n");
1248   }
1249   else
1250     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1252   /* NOTREACHED */
1253   assert(1==0);
1254 } /* }}} static int handle_request_forget */
1256 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1258   cache_item_t *ci;
1260   pthread_mutex_lock(&cache_lock);
1262   ci = cache_queue_head;
1263   while (ci != NULL)
1264   {
1265     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1266     ci = ci->next;
1267   }
1269   pthread_mutex_unlock(&cache_lock);
1271   return send_response(sock, RESP_OK, "in queue.\n");
1272 } /* }}} int handle_request_queue */
1274 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1276   char *file, file_tmp[PATH_MAX];
1277   int values_num = 0;
1278   int status;
1279   char orig_buf[CMD_MAX];
1281   cache_item_t *ci;
1283   /* save it for the journal later */
1284   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1286   status = buffer_get_field (&buffer, &buffer_size, &file);
1287   if (status != 0)
1288     return syntax_error(sock,cmd);
1290   pthread_mutex_lock(&stats_lock);
1291   stats_updates_received++;
1292   pthread_mutex_unlock(&stats_lock);
1294   get_abs_path(&file, file_tmp);
1295   if (!check_file_access(file, sock)) return 0;
1297   pthread_mutex_lock (&cache_lock);
1298   ci = g_tree_lookup (cache_tree, file);
1300   if (ci == NULL) /* {{{ */
1301   {
1302     struct stat statbuf;
1304     /* don't hold the lock while we setup; stat(2) might block */
1305     pthread_mutex_unlock(&cache_lock);
1307     memset (&statbuf, 0, sizeof (statbuf));
1308     status = stat (file, &statbuf);
1309     if (status != 0)
1310     {
1311       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1313       status = errno;
1314       if (status == ENOENT)
1315         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1316       else
1317         return send_response(sock, RESP_ERR,
1318                              "stat failed with error %i.\n", status);
1319     }
1320     if (!S_ISREG (statbuf.st_mode))
1321       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1323     if (access(file, R_OK|W_OK) != 0)
1324       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1325                            file, rrd_strerror(errno));
1327     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1328     if (ci == NULL)
1329     {
1330       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1332       return send_response(sock, RESP_ERR, "malloc failed.\n");
1333     }
1334     memset (ci, 0, sizeof (cache_item_t));
1336     ci->file = strdup (file);
1337     if (ci->file == NULL)
1338     {
1339       free (ci);
1340       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1342       return send_response(sock, RESP_ERR, "strdup failed.\n");
1343     }
1345     wipe_ci_values(ci, now);
1346     ci->flags = CI_FLAGS_IN_TREE;
1347     pthread_cond_init(&ci->flushed, NULL);
1349     pthread_mutex_lock(&cache_lock);
1350     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1351   } /* }}} */
1352   assert (ci != NULL);
1354   /* don't re-write updates in replay mode */
1355   if (sock != NULL)
1356     journal_write("update", orig_buf);
1358   while (buffer_size > 0)
1359   {
1360     char *value;
1361     time_t stamp;
1362     char *eostamp;
1364     status = buffer_get_field (&buffer, &buffer_size, &value);
1365     if (status != 0)
1366     {
1367       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1368       break;
1369     }
1371     /* make sure update time is always moving forward */
1372     stamp = strtol(value, &eostamp, 10);
1373     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1374     {
1375       pthread_mutex_unlock(&cache_lock);
1376       return send_response(sock, RESP_ERR,
1377                            "Cannot find timestamp in '%s'!\n", value);
1378     }
1379     else if (stamp <= ci->last_update_stamp)
1380     {
1381       pthread_mutex_unlock(&cache_lock);
1382       return send_response(sock, RESP_ERR,
1383                            "illegal attempt to update using time %ld when last"
1384                            " update time is %ld (minimum one second step)\n",
1385                            stamp, ci->last_update_stamp);
1386     }
1387     else
1388       ci->last_update_stamp = stamp;
1390     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1391     {
1392       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1393       continue;
1394     }
1396     values_num++;
1397   }
1399   if (((now - ci->last_flush_time) >= config_write_interval)
1400       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1401       && (ci->values_num > 0))
1402   {
1403     enqueue_cache_item (ci, TAIL);
1404   }
1406   pthread_mutex_unlock (&cache_lock);
1408   if (values_num < 1)
1409     return send_response(sock, RESP_ERR, "No values updated.\n");
1410   else
1411     return send_response(sock, RESP_OK,
1412                          "errors, enqueued %i value(s).\n", values_num);
1414   /* NOTREACHED */
1415   assert(1==0);
1417 } /* }}} int handle_request_update */
1419 /* we came across a "WROTE" entry during journal replay.
1420  * throw away any values that we have accumulated for this file
1421  */
1422 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1424   cache_item_t *ci;
1425   const char *file = buffer;
1427   pthread_mutex_lock(&cache_lock);
1429   ci = g_tree_lookup(cache_tree, file);
1430   if (ci == NULL)
1431   {
1432     pthread_mutex_unlock(&cache_lock);
1433     return (0);
1434   }
1436   if (ci->values)
1437     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1439   wipe_ci_values(ci, now);
1440   remove_from_queue(ci);
1442   pthread_mutex_unlock(&cache_lock);
1443   return (0);
1444 } /* }}} int handle_request_wrote */
1446 /* start "BATCH" processing */
1447 static int batch_start (HANDLER_PROTO) /* {{{ */
1449   int status;
1450   if (sock->batch_start)
1451     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1453   status = send_response(sock, RESP_OK,
1454                          "Go ahead.  End with dot '.' on its own line.\n");
1455   sock->batch_start = time(NULL);
1456   sock->batch_cmd = 0;
1458   return status;
1459 } /* }}} static int batch_start */
1461 /* finish "BATCH" processing and return results to the client */
1462 static int batch_done (HANDLER_PROTO) /* {{{ */
1464   assert(sock->batch_start);
1465   sock->batch_start = 0;
1466   sock->batch_cmd  = 0;
1467   return send_response(sock, RESP_OK, "errors\n");
1468 } /* }}} static int batch_done */
1470 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1472   return -1;
1473 } /* }}} static int handle_request_quit */
1475 struct command COMMANDS[] = {
1476   {
1477     "UPDATE",
1478     handle_request_update,
1479     PRIV_HIGH,
1480     CMD_CONTEXT_ANY,
1481     "UPDATE <filename> <values> [<values> ...]\n"
1482     ,
1483     "Adds the given file to the internal cache if it is not yet known and\n"
1484     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1485     "for details.\n"
1486     "\n"
1487     "Each <values> has the following form:\n"
1488     "  <values> = <time>:<value>[:<value>[...]]\n"
1489     "See the rrdupdate(1) manpage for details.\n"
1490   },
1491   {
1492     "WROTE",
1493     handle_request_wrote,
1494     PRIV_HIGH,
1495     CMD_CONTEXT_JOURNAL,
1496     NULL,
1497     NULL
1498   },
1499   {
1500     "FLUSH",
1501     handle_request_flush,
1502     PRIV_LOW,
1503     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1504     "FLUSH <filename>\n"
1505     ,
1506     "Adds the given filename to the head of the update queue and returns\n"
1507     "after it has been dequeued.\n"
1508   },
1509   {
1510     "FLUSHALL",
1511     handle_request_flushall,
1512     PRIV_HIGH,
1513     CMD_CONTEXT_CLIENT,
1514     "FLUSHALL\n"
1515     ,
1516     "Triggers writing of all pending updates.  Returns immediately.\n"
1517   },
1518   {
1519     "PENDING",
1520     handle_request_pending,
1521     PRIV_HIGH,
1522     CMD_CONTEXT_CLIENT,
1523     "PENDING <filename>\n"
1524     ,
1525     "Shows any 'pending' updates for a file, in order.\n"
1526     "The updates shown have not yet been written to the underlying RRD file.\n"
1527   },
1528   {
1529     "FORGET",
1530     handle_request_forget,
1531     PRIV_HIGH,
1532     CMD_CONTEXT_ANY,
1533     "FORGET <filename>\n"
1534     ,
1535     "Removes the file completely from the cache.\n"
1536     "Any pending updates for the file will be lost.\n"
1537   },
1538   {
1539     "QUEUE",
1540     handle_request_queue,
1541     PRIV_LOW,
1542     CMD_CONTEXT_CLIENT,
1543     "QUEUE\n"
1544     ,
1545         "Shows all files in the output queue.\n"
1546     "The output is zero or more lines in the following format:\n"
1547     "(where <num_vals> is the number of values to be written)\n"
1548     "\n"
1549     "<num_vals> <filename>\n"
1550   },
1551   {
1552     "STATS",
1553     handle_request_stats,
1554     PRIV_LOW,
1555     CMD_CONTEXT_CLIENT,
1556     "STATS\n"
1557     ,
1558     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1559     "a description of the values.\n"
1560   },
1561   {
1562     "HELP",
1563     handle_request_help,
1564     PRIV_LOW,
1565     CMD_CONTEXT_CLIENT,
1566     "HELP [<command>]\n",
1567     NULL, /* special! */
1568   },
1569   {
1570     "BATCH",
1571     batch_start,
1572     PRIV_LOW,
1573     CMD_CONTEXT_CLIENT,
1574     "BATCH\n"
1575     ,
1576     "The 'BATCH' command permits the client to initiate a bulk load\n"
1577     "   of commands to rrdcached.\n"
1578     "\n"
1579     "Usage:\n"
1580     "\n"
1581     "    client: BATCH\n"
1582     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1583     "    client: command #1\n"
1584     "    client: command #2\n"
1585     "    client: ... and so on\n"
1586     "    client: .\n"
1587     "    server: 2 errors\n"
1588     "    server: 7 message for command #7\n"
1589     "    server: 9 message for command #9\n"
1590     "\n"
1591     "For more information, consult the rrdcached(1) documentation.\n"
1592   },
1593   {
1594     ".",   /* BATCH terminator */
1595     batch_done,
1596     PRIV_LOW,
1597     CMD_CONTEXT_BATCH,
1598     NULL,
1599     NULL
1600   },
1601   {
1602     "QUIT",
1603     handle_request_quit,
1604     PRIV_LOW,
1605     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1606     "QUIT\n"
1607     ,
1608     "Disconnect from rrdcached.\n"
1609   },
1610   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1611 };
1613 static struct command *find_command(char *cmd)
1615   struct command *c = COMMANDS;
1617   while (c->cmd != NULL)
1618   {
1619     if (strcasecmp(cmd, c->cmd) == 0)
1620       break;
1621     c++;
1622   }
1624   if (c->cmd == NULL)
1625     return NULL;
1626   else
1627     return c;
1630 /* check whether commands are received in the expected context */
1631 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1633   if (sock == NULL)
1634     return (cmd->context & CMD_CONTEXT_JOURNAL);
1635   else if (sock->batch_start)
1636     return (cmd->context & CMD_CONTEXT_BATCH);
1637   else
1638     return (cmd->context & CMD_CONTEXT_CLIENT);
1640   /* NOTREACHED */
1641   assert(1==0);
1644 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1646   int status;
1647   char *cmd_str;
1648   char *resp_txt;
1649   struct command *help = NULL;
1651   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1652   if (status == 0)
1653     help = find_command(cmd_str);
1655   if (help && (help->syntax || help->help))
1656   {
1657     char tmp[CMD_MAX];
1659     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1660     resp_txt = tmp;
1662     if (help->syntax)
1663       add_response_info(sock, "Usage: %s\n", help->syntax);
1665     if (help->help)
1666       add_response_info(sock, "%s\n", help->help);
1667   }
1668   else
1669   {
1670     help = COMMANDS;
1671     resp_txt = "Command overview\n";
1673     while (help->cmd)
1674     {
1675       if (help->syntax)
1676         add_response_info(sock, "%s", help->syntax);
1677       help++;
1678     }
1679   }
1681   return send_response(sock, RESP_OK, resp_txt);
1682 } /* }}} int handle_request_help */
1684 /* if sock==NULL, we are in journal replay mode */
1685 static int handle_request (DISPATCH_PROTO) /* {{{ */
1687   char *buffer_ptr = buffer;
1688   char *cmd_str = NULL;
1689   struct command *cmd = NULL;
1690   int status;
1692   assert (buffer[buffer_size - 1] == '\0');
1694   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1695   if (status != 0)
1696   {
1697     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1698     return (-1);
1699   }
1701   if (sock != NULL && sock->batch_start)
1702     sock->batch_cmd++;
1704   cmd = find_command(cmd_str);
1705   if (!cmd)
1706     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1708   status = has_privilege(sock, cmd->min_priv);
1709   if (status <= 0)
1710     return status;
1712   if (!command_check_context(sock, cmd))
1713     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1715   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1716 } /* }}} int handle_request */
1718 /* MUST NOT hold journal_lock before calling this */
1719 static void journal_rotate(void) /* {{{ */
1721   FILE *old_fh = NULL;
1722   int new_fd;
1724   if (journal_cur == NULL || journal_old == NULL)
1725     return;
1727   pthread_mutex_lock(&journal_lock);
1729   /* we rotate this way (rename before close) so that the we can release
1730    * the journal lock as fast as possible.  Journal writes to the new
1731    * journal can proceed immediately after the new file is opened.  The
1732    * fclose can then block without affecting new updates.
1733    */
1734   if (journal_fh != NULL)
1735   {
1736     old_fh = journal_fh;
1737     journal_fh = NULL;
1738     rename(journal_cur, journal_old);
1739     ++stats_journal_rotate;
1740   }
1742   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1743                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1744   if (new_fd >= 0)
1745   {
1746     journal_fh = fdopen(new_fd, "a");
1747     if (journal_fh == NULL)
1748       close(new_fd);
1749   }
1751   pthread_mutex_unlock(&journal_lock);
1753   if (old_fh != NULL)
1754     fclose(old_fh);
1756   if (journal_fh == NULL)
1757   {
1758     RRDD_LOG(LOG_CRIT,
1759              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1760              journal_cur, rrd_strerror(errno));
1762     RRDD_LOG(LOG_ERR,
1763              "JOURNALING DISABLED: All values will be flushed at shutdown");
1764     config_flush_at_shutdown = 1;
1765   }
1767 } /* }}} static void journal_rotate */
1769 static void journal_done(void) /* {{{ */
1771   if (journal_cur == NULL)
1772     return;
1774   pthread_mutex_lock(&journal_lock);
1775   if (journal_fh != NULL)
1776   {
1777     fclose(journal_fh);
1778     journal_fh = NULL;
1779   }
1781   if (config_flush_at_shutdown)
1782   {
1783     RRDD_LOG(LOG_INFO, "removing journals");
1784     unlink(journal_old);
1785     unlink(journal_cur);
1786   }
1787   else
1788   {
1789     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1790              "journals will be used at next startup");
1791   }
1793   pthread_mutex_unlock(&journal_lock);
1795 } /* }}} static void journal_done */
1797 static int journal_write(char *cmd, char *args) /* {{{ */
1799   int chars;
1801   if (journal_fh == NULL)
1802     return 0;
1804   pthread_mutex_lock(&journal_lock);
1805   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1806   pthread_mutex_unlock(&journal_lock);
1808   if (chars > 0)
1809   {
1810     pthread_mutex_lock(&stats_lock);
1811     stats_journal_bytes += chars;
1812     pthread_mutex_unlock(&stats_lock);
1813   }
1815   return chars;
1816 } /* }}} static int journal_write */
1818 static int journal_replay (const char *file) /* {{{ */
1820   FILE *fh;
1821   int entry_cnt = 0;
1822   int fail_cnt = 0;
1823   uint64_t line = 0;
1824   char entry[CMD_MAX];
1825   time_t now;
1827   if (file == NULL) return 0;
1829   {
1830     char *reason = "unknown error";
1831     int status = 0;
1832     struct stat statbuf;
1834     memset(&statbuf, 0, sizeof(statbuf));
1835     if (stat(file, &statbuf) != 0)
1836     {
1837       if (errno == ENOENT)
1838         return 0;
1840       reason = "stat error";
1841       status = errno;
1842     }
1843     else if (!S_ISREG(statbuf.st_mode))
1844     {
1845       reason = "not a regular file";
1846       status = EPERM;
1847     }
1848     if (statbuf.st_uid != daemon_uid)
1849     {
1850       reason = "not owned by daemon user";
1851       status = EACCES;
1852     }
1853     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1854     {
1855       reason = "must not be user/group writable";
1856       status = EACCES;
1857     }
1859     if (status != 0)
1860     {
1861       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1862                file, rrd_strerror(status), reason);
1863       return 0;
1864     }
1865   }
1867   fh = fopen(file, "r");
1868   if (fh == NULL)
1869   {
1870     if (errno != ENOENT)
1871       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1872                file, rrd_strerror(errno));
1873     return 0;
1874   }
1875   else
1876     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1878   now = time(NULL);
1880   while(!feof(fh))
1881   {
1882     size_t entry_len;
1884     ++line;
1885     if (fgets(entry, sizeof(entry), fh) == NULL)
1886       break;
1887     entry_len = strlen(entry);
1889     /* check \n termination in case journal writing crashed mid-line */
1890     if (entry_len == 0)
1891       continue;
1892     else if (entry[entry_len - 1] != '\n')
1893     {
1894       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1895       ++fail_cnt;
1896       continue;
1897     }
1899     entry[entry_len - 1] = '\0';
1901     if (handle_request(NULL, now, entry, entry_len) == 0)
1902       ++entry_cnt;
1903     else
1904       ++fail_cnt;
1905   }
1907   fclose(fh);
1909   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1910            entry_cnt, fail_cnt);
1912   return entry_cnt > 0 ? 1 : 0;
1913 } /* }}} static int journal_replay */
1915 static void journal_init(void) /* {{{ */
1917   int had_journal = 0;
1919   if (journal_cur == NULL) return;
1921   pthread_mutex_lock(&journal_lock);
1923   RRDD_LOG(LOG_INFO, "checking for journal files");
1925   had_journal += journal_replay(journal_old);
1926   had_journal += journal_replay(journal_cur);
1928   /* it must have been a crash.  start a flush */
1929   if (had_journal && config_flush_at_shutdown)
1930     flush_old_values(-1);
1932   pthread_mutex_unlock(&journal_lock);
1933   journal_rotate();
1935   RRDD_LOG(LOG_INFO, "journal processing complete");
1937 } /* }}} static void journal_init */
1939 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1941   assert(sock != NULL);
1943   free(sock->rbuf);  sock->rbuf = NULL;
1944   free(sock->wbuf);  sock->wbuf = NULL;
1945   free(sock);
1946 } /* }}} void free_listen_socket */
1948 static void close_connection(listen_socket_t *sock) /* {{{ */
1950   if (sock->fd >= 0)
1951   {
1952     close(sock->fd);
1953     sock->fd = -1;
1954   }
1956   free_listen_socket(sock);
1958 } /* }}} void close_connection */
1960 static void *connection_thread_main (void *args) /* {{{ */
1962   listen_socket_t *sock;
1963   int fd;
1965   sock = (listen_socket_t *) args;
1966   fd = sock->fd;
1968   /* init read buffers */
1969   sock->next_read = sock->next_cmd = 0;
1970   sock->rbuf = malloc(RBUF_SIZE);
1971   if (sock->rbuf == NULL)
1972   {
1973     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1974     close_connection(sock);
1975     return NULL;
1976   }
1978   pthread_mutex_lock (&connection_threads_lock);
1979   connection_threads_num++;
1980   pthread_mutex_unlock (&connection_threads_lock);
1982   while (do_shutdown == 0)
1983   {
1984     char *cmd;
1985     ssize_t cmd_len;
1986     ssize_t rbytes;
1987     time_t now;
1989     struct pollfd pollfd;
1990     int status;
1992     pollfd.fd = fd;
1993     pollfd.events = POLLIN | POLLPRI;
1994     pollfd.revents = 0;
1996     status = poll (&pollfd, 1, /* timeout = */ 500);
1997     if (do_shutdown)
1998       break;
1999     else if (status == 0) /* timeout */
2000       continue;
2001     else if (status < 0) /* error */
2002     {
2003       status = errno;
2004       if (status != EINTR)
2005         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2006       continue;
2007     }
2009     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2010       break;
2011     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2012     {
2013       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2014           "poll(2) returned something unexpected: %#04hx",
2015           pollfd.revents);
2016       break;
2017     }
2019     rbytes = read(fd, sock->rbuf + sock->next_read,
2020                   RBUF_SIZE - sock->next_read);
2021     if (rbytes < 0)
2022     {
2023       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2024       break;
2025     }
2026     else if (rbytes == 0)
2027       break; /* eof */
2029     sock->next_read += rbytes;
2031     if (sock->batch_start)
2032       now = sock->batch_start;
2033     else
2034       now = time(NULL);
2036     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2037     {
2038       status = handle_request (sock, now, cmd, cmd_len+1);
2039       if (status != 0)
2040         goto out_close;
2041     }
2042   }
2044 out_close:
2045   close_connection(sock);
2047   /* Remove this thread from the connection threads list */
2048   pthread_mutex_lock (&connection_threads_lock);
2049   connection_threads_num--;
2050   if (connection_threads_num <= 0)
2051     pthread_cond_broadcast(&connection_threads_done);
2052   pthread_mutex_unlock (&connection_threads_lock);
2054   return (NULL);
2055 } /* }}} void *connection_thread_main */
2057 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2059   int fd;
2060   struct sockaddr_un sa;
2061   listen_socket_t *temp;
2062   int status;
2063   const char *path;
2065   path = sock->addr;
2066   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2067     path += strlen("unix:");
2069   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2070       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2071   if (temp == NULL)
2072   {
2073     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2074     return (-1);
2075   }
2076   listen_fds = temp;
2077   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2079   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2080   if (fd < 0)
2081   {
2082     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2083              rrd_strerror(errno));
2084     return (-1);
2085   }
2087   memset (&sa, 0, sizeof (sa));
2088   sa.sun_family = AF_UNIX;
2089   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2091   /* if we've gotten this far, we own the pid file.  any daemon started
2092    * with the same args must not be alive.  therefore, ensure that we can
2093    * create the socket...
2094    */
2095   unlink(path);
2097   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2098   if (status != 0)
2099   {
2100     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2101              path, rrd_strerror(errno));
2102     close (fd);
2103     return (-1);
2104   }
2106   status = listen (fd, /* backlog = */ 10);
2107   if (status != 0)
2108   {
2109     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2110              path, rrd_strerror(errno));
2111     close (fd);
2112     unlink (path);
2113     return (-1);
2114   }
2116   listen_fds[listen_fds_num].fd = fd;
2117   listen_fds[listen_fds_num].family = PF_UNIX;
2118   strncpy(listen_fds[listen_fds_num].addr, path,
2119           sizeof (listen_fds[listen_fds_num].addr) - 1);
2120   listen_fds_num++;
2122   return (0);
2123 } /* }}} int open_listen_socket_unix */
2125 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2127   struct addrinfo ai_hints;
2128   struct addrinfo *ai_res;
2129   struct addrinfo *ai_ptr;
2130   char addr_copy[NI_MAXHOST];
2131   char *addr;
2132   char *port;
2133   int status;
2135   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2136   addr_copy[sizeof (addr_copy) - 1] = 0;
2137   addr = addr_copy;
2139   memset (&ai_hints, 0, sizeof (ai_hints));
2140   ai_hints.ai_flags = 0;
2141 #ifdef AI_ADDRCONFIG
2142   ai_hints.ai_flags |= AI_ADDRCONFIG;
2143 #endif
2144   ai_hints.ai_family = AF_UNSPEC;
2145   ai_hints.ai_socktype = SOCK_STREAM;
2147   port = NULL;
2148   if (*addr == '[') /* IPv6+port format */
2149   {
2150     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2151     addr++;
2153     port = strchr (addr, ']');
2154     if (port == NULL)
2155     {
2156       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2157       return (-1);
2158     }
2159     *port = 0;
2160     port++;
2162     if (*port == ':')
2163       port++;
2164     else if (*port == 0)
2165       port = NULL;
2166     else
2167     {
2168       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2169       return (-1);
2170     }
2171   } /* if (*addr = ']') */
2172   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2173   {
2174     port = rindex(addr, ':');
2175     if (port != NULL)
2176     {
2177       *port = 0;
2178       port++;
2179     }
2180   }
2181   ai_res = NULL;
2182   status = getaddrinfo (addr,
2183                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2184                         &ai_hints, &ai_res);
2185   if (status != 0)
2186   {
2187     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2188              addr, gai_strerror (status));
2189     return (-1);
2190   }
2192   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2193   {
2194     int fd;
2195     listen_socket_t *temp;
2196     int one = 1;
2198     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2199         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2200     if (temp == NULL)
2201     {
2202       fprintf (stderr,
2203                "rrdcached: open_listen_socket_network: realloc failed.\n");
2204       continue;
2205     }
2206     listen_fds = temp;
2207     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2209     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2210     if (fd < 0)
2211     {
2212       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2213                rrd_strerror(errno));
2214       continue;
2215     }
2217     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2219     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2220     if (status != 0)
2221     {
2222       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2223                sock->addr, rrd_strerror(errno));
2224       close (fd);
2225       continue;
2226     }
2228     status = listen (fd, /* backlog = */ 10);
2229     if (status != 0)
2230     {
2231       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2232                sock->addr, rrd_strerror(errno));
2233       close (fd);
2234       freeaddrinfo(ai_res);
2235       return (-1);
2236     }
2238     listen_fds[listen_fds_num].fd = fd;
2239     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2240     listen_fds_num++;
2241   } /* for (ai_ptr) */
2243   freeaddrinfo(ai_res);
2244   return (0);
2245 } /* }}} static int open_listen_socket_network */
2247 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2249   assert(sock != NULL);
2250   assert(sock->addr != NULL);
2252   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2253       || sock->addr[0] == '/')
2254     return (open_listen_socket_unix(sock));
2255   else
2256     return (open_listen_socket_network(sock));
2257 } /* }}} int open_listen_socket */
2259 static int close_listen_sockets (void) /* {{{ */
2261   size_t i;
2263   for (i = 0; i < listen_fds_num; i++)
2264   {
2265     close (listen_fds[i].fd);
2267     if (listen_fds[i].family == PF_UNIX)
2268       unlink(listen_fds[i].addr);
2269   }
2271   free (listen_fds);
2272   listen_fds = NULL;
2273   listen_fds_num = 0;
2275   return (0);
2276 } /* }}} int close_listen_sockets */
2278 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2280   struct pollfd *pollfds;
2281   int pollfds_num;
2282   int status;
2283   int i;
2285   if (listen_fds_num < 1)
2286   {
2287     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2288     return (NULL);
2289   }
2291   pollfds_num = listen_fds_num;
2292   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2293   if (pollfds == NULL)
2294   {
2295     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2296     return (NULL);
2297   }
2298   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2300   RRDD_LOG(LOG_INFO, "listening for connections");
2302   while (do_shutdown == 0)
2303   {
2304     for (i = 0; i < pollfds_num; i++)
2305     {
2306       pollfds[i].fd = listen_fds[i].fd;
2307       pollfds[i].events = POLLIN | POLLPRI;
2308       pollfds[i].revents = 0;
2309     }
2311     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2312     if (do_shutdown)
2313       break;
2314     else if (status == 0) /* timeout */
2315       continue;
2316     else if (status < 0) /* error */
2317     {
2318       status = errno;
2319       if (status != EINTR)
2320       {
2321         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2322       }
2323       continue;
2324     }
2326     for (i = 0; i < pollfds_num; i++)
2327     {
2328       listen_socket_t *client_sock;
2329       struct sockaddr_storage client_sa;
2330       socklen_t client_sa_size;
2331       pthread_t tid;
2332       pthread_attr_t attr;
2334       if (pollfds[i].revents == 0)
2335         continue;
2337       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2338       {
2339         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2340             "poll(2) returned something unexpected for listen FD #%i.",
2341             pollfds[i].fd);
2342         continue;
2343       }
2345       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2346       if (client_sock == NULL)
2347       {
2348         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2349         continue;
2350       }
2351       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2353       client_sa_size = sizeof (client_sa);
2354       client_sock->fd = accept (pollfds[i].fd,
2355           (struct sockaddr *) &client_sa, &client_sa_size);
2356       if (client_sock->fd < 0)
2357       {
2358         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2359         free(client_sock);
2360         continue;
2361       }
2363       pthread_attr_init (&attr);
2364       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2366       status = pthread_create (&tid, &attr, connection_thread_main,
2367                                client_sock);
2368       if (status != 0)
2369       {
2370         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2371         close_connection(client_sock);
2372         continue;
2373       }
2374     } /* for (pollfds_num) */
2375   } /* while (do_shutdown == 0) */
2377   RRDD_LOG(LOG_INFO, "starting shutdown");
2379   close_listen_sockets ();
2381   pthread_mutex_lock (&connection_threads_lock);
2382   while (connection_threads_num > 0)
2383     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2384   pthread_mutex_unlock (&connection_threads_lock);
2386   free(pollfds);
2388   return (NULL);
2389 } /* }}} void *listen_thread_main */
2391 static int daemonize (void) /* {{{ */
2393   int pid_fd;
2394   char *base_dir;
2396   daemon_uid = geteuid();
2398   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2399   if (pid_fd < 0)
2400     pid_fd = check_pidfile();
2401   if (pid_fd < 0)
2402     return pid_fd;
2404   /* open all the listen sockets */
2405   if (config_listen_address_list_len > 0)
2406   {
2407     for (size_t i = 0; i < config_listen_address_list_len; i++)
2408       open_listen_socket (config_listen_address_list[i]);
2410     rrd_free_ptrs((void ***) &config_listen_address_list,
2411                   &config_listen_address_list_len);
2412   }
2413   else
2414   {
2415     listen_socket_t sock;
2416     memset(&sock, 0, sizeof(sock));
2417     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2418     open_listen_socket (&sock);
2419   }
2421   if (listen_fds_num < 1)
2422   {
2423     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2424     goto error;
2425   }
2427   if (!stay_foreground)
2428   {
2429     pid_t child;
2431     child = fork ();
2432     if (child < 0)
2433     {
2434       fprintf (stderr, "daemonize: fork(2) failed.\n");
2435       goto error;
2436     }
2437     else if (child > 0)
2438       exit(0);
2440     /* Become session leader */
2441     setsid ();
2443     /* Open the first three file descriptors to /dev/null */
2444     close (2);
2445     close (1);
2446     close (0);
2448     open ("/dev/null", O_RDWR);
2449     dup (0);
2450     dup (0);
2451   } /* if (!stay_foreground) */
2453   /* Change into the /tmp directory. */
2454   base_dir = (config_base_dir != NULL)
2455     ? config_base_dir
2456     : "/tmp";
2458   if (chdir (base_dir) != 0)
2459   {
2460     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2461     goto error;
2462   }
2464   install_signal_handlers();
2466   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2467   RRDD_LOG(LOG_INFO, "starting up");
2469   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2470                                 (GDestroyNotify) free_cache_item);
2471   if (cache_tree == NULL)
2472   {
2473     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2474     goto error;
2475   }
2477   return write_pidfile (pid_fd);
2479 error:
2480   remove_pidfile();
2481   return -1;
2482 } /* }}} int daemonize */
2484 static int cleanup (void) /* {{{ */
2486   do_shutdown++;
2488   pthread_cond_broadcast (&flush_cond);
2489   pthread_join (flush_thread, NULL);
2491   pthread_cond_broadcast (&queue_cond);
2492   for (int i = 0; i < config_queue_threads; i++)
2493     pthread_join (queue_threads[i], NULL);
2495   if (config_flush_at_shutdown)
2496   {
2497     assert(cache_queue_head == NULL);
2498     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2499   }
2501   journal_done();
2502   remove_pidfile ();
2504   free(queue_threads);
2505   free(config_base_dir);
2506   free(config_pid_file);
2507   free(journal_cur);
2508   free(journal_old);
2510   pthread_mutex_lock(&cache_lock);
2511   g_tree_destroy(cache_tree);
2513   RRDD_LOG(LOG_INFO, "goodbye");
2514   closelog ();
2516   return (0);
2517 } /* }}} int cleanup */
2519 static int read_options (int argc, char **argv) /* {{{ */
2521   int option;
2522   int status = 0;
2524   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2525   {
2526     switch (option)
2527     {
2528       case 'g':
2529         stay_foreground=1;
2530         break;
2532       case 'L':
2533       case 'l':
2534       {
2535         listen_socket_t *new;
2537         new = malloc(sizeof(listen_socket_t));
2538         if (new == NULL)
2539         {
2540           fprintf(stderr, "read_options: malloc failed.\n");
2541           return(2);
2542         }
2543         memset(new, 0, sizeof(listen_socket_t));
2545         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2546         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2548         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2549                          &config_listen_address_list_len, new))
2550         {
2551           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2552           return (2);
2553         }
2554       }
2555       break;
2557       case 'f':
2558       {
2559         int temp;
2561         temp = atoi (optarg);
2562         if (temp > 0)
2563           config_flush_interval = temp;
2564         else
2565         {
2566           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2567           status = 3;
2568         }
2569       }
2570       break;
2572       case 'w':
2573       {
2574         int temp;
2576         temp = atoi (optarg);
2577         if (temp > 0)
2578           config_write_interval = temp;
2579         else
2580         {
2581           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2582           status = 2;
2583         }
2584       }
2585       break;
2587       case 'z':
2588       {
2589         int temp;
2591         temp = atoi(optarg);
2592         if (temp > 0)
2593           config_write_jitter = temp;
2594         else
2595         {
2596           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2597           status = 2;
2598         }
2600         break;
2601       }
2603       case 't':
2604       {
2605         int threads;
2606         threads = atoi(optarg);
2607         if (threads >= 1)
2608           config_queue_threads = threads;
2609         else
2610         {
2611           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2612           return 1;
2613         }
2614       }
2615       break;
2617       case 'B':
2618         config_write_base_only = 1;
2619         break;
2621       case 'b':
2622       {
2623         size_t len;
2624         char base_realpath[PATH_MAX];
2626         if (config_base_dir != NULL)
2627           free (config_base_dir);
2628         config_base_dir = strdup (optarg);
2629         if (config_base_dir == NULL)
2630         {
2631           fprintf (stderr, "read_options: strdup failed.\n");
2632           return (3);
2633         }
2635         /* make sure that the base directory is not resolved via
2636          * symbolic links.  this makes some performance-enhancing
2637          * assumptions possible (we don't have to resolve paths
2638          * that start with a "/")
2639          */
2640         if (realpath(config_base_dir, base_realpath) == NULL)
2641         {
2642           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2643           return 5;
2644         }
2645         else if (strncmp(config_base_dir,
2646                          base_realpath, sizeof(base_realpath)) != 0)
2647         {
2648           fprintf(stderr,
2649                   "Base directory (-b) resolved via file system links!\n"
2650                   "Please consult rrdcached '-b' documentation!\n"
2651                   "Consider specifying the real directory (%s)\n",
2652                   base_realpath);
2653           return 5;
2654         }
2656         len = strlen (config_base_dir);
2657         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2658         {
2659           config_base_dir[len - 1] = 0;
2660           len--;
2661         }
2663         if (len < 1)
2664         {
2665           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2666           return (4);
2667         }
2669         _config_base_dir_len = len;
2670       }
2671       break;
2673       case 'p':
2674       {
2675         if (config_pid_file != NULL)
2676           free (config_pid_file);
2677         config_pid_file = strdup (optarg);
2678         if (config_pid_file == NULL)
2679         {
2680           fprintf (stderr, "read_options: strdup failed.\n");
2681           return (3);
2682         }
2683       }
2684       break;
2686       case 'F':
2687         config_flush_at_shutdown = 1;
2688         break;
2690       case 'j':
2691       {
2692         struct stat statbuf;
2693         const char *dir = optarg;
2695         status = stat(dir, &statbuf);
2696         if (status != 0)
2697         {
2698           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2699           return 6;
2700         }
2702         if (!S_ISDIR(statbuf.st_mode)
2703             || access(dir, R_OK|W_OK|X_OK) != 0)
2704         {
2705           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2706                   errno ? rrd_strerror(errno) : "");
2707           return 6;
2708         }
2710         journal_cur = malloc(PATH_MAX + 1);
2711         journal_old = malloc(PATH_MAX + 1);
2712         if (journal_cur == NULL || journal_old == NULL)
2713         {
2714           fprintf(stderr, "malloc failure for journal files\n");
2715           return 6;
2716         }
2717         else 
2718         {
2719           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2720           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2721         }
2722       }
2723       break;
2725       case 'h':
2726       case '?':
2727         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2728             "\n"
2729             "Usage: rrdcached [options]\n"
2730             "\n"
2731             "Valid options are:\n"
2732             "  -l <address>  Socket address to listen to.\n"
2733             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2734             "  -w <seconds>  Interval in which to write data.\n"
2735             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2736             "  -t <threads>  Number of write threads.\n"
2737             "  -f <seconds>  Interval in which to flush dead data.\n"
2738             "  -p <file>     Location of the PID-file.\n"
2739             "  -b <dir>      Base directory to change to.\n"
2740             "  -B            Restrict file access to paths within -b <dir>\n"
2741             "  -g            Do not fork and run in the foreground.\n"
2742             "  -j <dir>      Directory in which to create the journal files.\n"
2743             "  -F            Always flush all updates at shutdown\n"
2744             "\n"
2745             "For more information and a detailed description of all options "
2746             "please refer\n"
2747             "to the rrdcached(1) manual page.\n",
2748             VERSION);
2749         status = -1;
2750         break;
2751     } /* switch (option) */
2752   } /* while (getopt) */
2754   /* advise the user when values are not sane */
2755   if (config_flush_interval < 2 * config_write_interval)
2756     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2757             " 2x write interval (-w) !\n");
2758   if (config_write_jitter > config_write_interval)
2759     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2760             " write interval (-w) !\n");
2762   if (config_write_base_only && config_base_dir == NULL)
2763     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2764             "  Consult the rrdcached documentation\n");
2766   if (journal_cur == NULL)
2767     config_flush_at_shutdown = 1;
2769   return (status);
2770 } /* }}} int read_options */
2772 int main (int argc, char **argv)
2774   int status;
2776   status = read_options (argc, argv);
2777   if (status != 0)
2778   {
2779     if (status < 0)
2780       status = 0;
2781     return (status);
2782   }
2784   status = daemonize ();
2785   if (status != 0)
2786   {
2787     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2788     return (1);
2789   }
2791   journal_init();
2793   /* start the queue threads */
2794   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2795   if (queue_threads == NULL)
2796   {
2797     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2798     cleanup();
2799     return (1);
2800   }
2801   for (int i = 0; i < config_queue_threads; i++)
2802   {
2803     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2804     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2805     if (status != 0)
2806     {
2807       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2808       cleanup();
2809       return (1);
2810     }
2811   }
2813   /* start the flush thread */
2814   memset(&flush_thread, 0, sizeof(flush_thread));
2815   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2816   if (status != 0)
2817   {
2818     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2819     cleanup();
2820     return (1);
2821   }
2823   listen_thread_main (NULL);
2824   cleanup ();
2826   return (0);
2827 } /* int main */
2829 /*
2830  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2831  */